php队列的实现
PHP 队列的实现方法
使用数据库实现队列
创建一个数据库表存储队列任务,包含字段如id、status、payload、created_at等。生产者将任务插入表,消费者通过轮询或定时任务处理。

// 生产者示例
$db->insert('queue', [
'payload' => json_encode(['task' => 'send_email', 'data' => $emailData]),
'status' => 'pending'
]);
// 消费者示例
$task = $db->query("SELECT * FROM queue WHERE status = 'pending' ORDER BY id ASC LIMIT 1");
if ($task) {
$db->update('queue', ['status' => 'processing'], ['id' => $task['id']]);
// 处理任务逻辑
$db->update('queue', ['status' => 'completed'], ['id' => $task['id']]);
}
使用Redis实现队列
Redis的LIST数据结构适合实现队列,使用LPUSH/RPUSH添加任务,RPOP/LPOP获取任务。

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 生产者
$redis->lPush('task_queue', json_encode(['type' => 'process_image', 'path' => '/tmp/image.jpg']));
// 消费者
while (true) {
$task = $redis->rPop('task_queue');
if ($task) {
$data = json_decode($task, true);
// 处理任务逻辑
}
sleep(1); // 避免空轮询
}
使用消息队列系统
对于高并发场景,可采用专业消息队列如RabbitMQ、Beanstalkd或Kafka。以Beanstalkd为例:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
// 生产者
$pheanstalk->useTube('test')->put(json_encode(['action' => 'log_cleanup']));
// 消费者
$job = $pheanstalk->watch('test')->ignore('default')->reserve();
$data = json_decode($job->getData(), true);
// 处理任务后删除
$pheanstalk->delete($job);
使用Laravel队列系统
Laravel提供了开箱即用的队列系统,支持数据库、Redis、Amazon SQS等驱动:
// 创建任务类
php artisan make:job ProcessPodcast
// 分发任务
ProcessPodcast::dispatch($podcast)->onQueue('processing');
// 启动队列处理器
php artisan queue:work --queue=processing
实现注意事项
- 确保任务处理的幂等性,防止重复执行导致问题
- 对于关键任务实现重试机制和失败处理
- 监控队列长度和消费速度,避免任务积压
- 多进程消费时注意并发控制和资源竞争
选择具体实现方案时需考虑应用规模、性能要求和运维成本。简单场景可用数据库或Redis,复杂分布式系统建议采用专业消息队列中间件。






