php队列实现
PHP 队列实现方法
PHP 队列实现可以通过多种方式完成,以下是几种常见的方法:
使用数据库实现队列
创建数据库表存储队列任务,包含字段如 id, payload, status, created_at 等。生产者将任务插入数据库,消费者定期查询并处理未完成的任务。
// 生产者代码示例
$payload = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com']]);
$stmt = $pdo->prepare("INSERT INTO queue (payload, status) VALUES (?, 'pending')");
$stmt->execute([$payload]);
// 消费者代码示例
$stmt = $pdo->prepare("SELECT * FROM queue WHERE status = 'pending' LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch();
if ($task) {
// 处理任务
$pdo->prepare("UPDATE queue SET status = 'processing' WHERE id = ?")->execute([$task['id']]);
// 任务完成后更新状态
$pdo->prepare("UPDATE queue SET status = 'completed' WHERE id = ?")->execute([$task['id']]);
}
使用 Redis 实现队列
Redis 的列表数据结构非常适合实现队列,提供原子操作保证可靠性。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 生产者
$redis->rPush('queue', json_encode(['task' => 'process_image', 'path' => '/uploads/image.jpg']));
// 消费者
while (true) {
$task = $redis->blPop('queue', 30); // 阻塞式获取
if ($task) {
$data = json_decode($task[1], true);
// 处理任务
}
}
使用专业队列系统
对于高要求的场景,可以使用专业的队列系统如 RabbitMQ 或 Beanstalkd。
// RabbitMQ 示例
$connection = new AMQPConnection(['host' => 'localhost']);
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
// 生产者
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'task_queue');
// 消费者
$callback = function ($msg) {
echo "Received: ", $msg->body, "\n";
$msg->ack();
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用 Laravel 队列系统
Laravel 提供了内置的队列系统,支持多种驱动如数据库、Redis、Amazon SQS 等。
// 创建任务
php artisan make:job ProcessPodcast
// 定义任务
class ProcessPodcast implements ShouldQueue
{
public function handle()
{
// 处理任务逻辑
}
}
// 分发任务
ProcessPodcast::dispatch($podcast);
队列实现注意事项
- 确保任务处理的幂等性,防止重复处理导致问题
- 考虑失败任务的重试机制和死信队列处理
- 监控队列长度和处理速度,防止任务积压
- 根据业务需求选择合适的持久化方式
- 分布式环境下注意并发控制和锁机制
以上方法可以根据项目规模、性能需求和运维能力选择适合的实现方式。对于小型项目,数据库队列足够使用;高并发场景建议使用 Redis 或专业队列系统。







