PHP实现请求队列
实现请求队列的基本思路
使用PHP实现请求队列的核心在于将需要处理的请求存储到队列中,再由后台进程或定时任务逐个处理。常见的实现方式包括数据库驱动队列、Redis队列以及专业的消息队列系统(如RabbitMQ)。
数据库驱动队列
创建一个数据库表存储队列任务,包含任务ID、状态、创建时间和处理时间等字段。
CREATE TABLE request_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
payload TEXT NOT NULL,
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL
);
插入新任务到队列:
$pdo->prepare("INSERT INTO request_queue (payload, status) VALUES (?, 'pending')")
->execute([json_encode($requestData)]);
处理队列任务的脚本:

$stmt = $pdo->prepare("SELECT * FROM request_queue WHERE status = 'pending' LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch();
if ($task) {
$pdo->prepare("UPDATE request_queue SET status = 'processing' WHERE id = ?")
->execute([$task['id']]);
try {
// 处理任务逻辑
processRequest(json_decode($task['payload'], true));
$pdo->prepare("UPDATE request_queue SET status = 'completed', processed_at = NOW() WHERE id = ?")
->execute([$task['id']]);
} catch (Exception $e) {
$pdo->prepare("UPDATE request_queue SET status = 'failed' WHERE id = ?")
->execute([$task['id']]);
}
}
Redis队列实现
利用Redis的List数据结构实现简单的队列:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 入队
$redis->rPush('request_queue', json_encode($requestData));
// 出队处理
while ($payload = $redis->lPop('request_queue')) {
$data = json_decode($payload, true);
processRequest($data);
}
对于更复杂的场景,可以使用Redis的Sorted Set实现延迟队列:

// 添加延迟任务(5分钟后执行)
$redis->zAdd('delayed_queue', time() + 300, json_encode($requestData));
// 处理到期任务
$now = time();
$tasks = $redis->zRangeByScore('delayed_queue', 0, $now);
foreach ($tasks as $task) {
processRequest(json_decode($task, true));
$redis->zRem('delayed_queue', $task);
}
使用专业队列系统
对于高吞吐量场景,建议使用专业的消息队列系统如RabbitMQ:
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('request_queue');
$queue->declareQueue();
// 生产者
$exchange = new AMQPExchange($channel);
$exchange->publish(json_encode($requestData), 'request_queue');
// 消费者
$queue->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
$data = json_decode($envelope->getBody(), true);
processRequest($data);
$queue->ack($envelope->getDeliveryTag());
});
队列处理方式
定时任务处理:使用crontab定期运行处理脚本
* * * * * php /path/to/queue_processor.php
长驻进程处理:使用supervisor管理常驻进程
[program:queue_worker]
command=php /path/to/queue_worker.php
autostart=true
autorestart=true
user=www-data
numprocs=4
注意事项
数据库队列需要处理并发问题,可以使用SELECT FOR UPDATE或乐观锁 Redis队列在持久化方面需要配置适当的策略 专业队列系统通常提供更完善的功能如消息确认、重试机制等 队列处理器应该实现适当的错误处理和日志记录 对于高优先级任务可以考虑实现多优先级队列






