实现延迟队列php
延迟队列的实现方法
使用Redis的有序集合(Sorted Set)
Redis的有序集合可以通过设置分数(score)为未来时间戳来实现延迟队列。将任务添加到有序集合时,分数设置为执行时间的时间戳。

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加延迟任务
$delayTime = time() + 10; // 10秒后执行
$redis->zAdd('delayed_queue', $delayTime, 'task_data');
// 消费延迟任务
while (true) {
$now = time();
$tasks = $redis->zRangeByScore('delayed_queue', 0, $now);
foreach ($tasks as $task) {
// 处理任务
processTask($task);
// 从队列中移除已处理的任务
$redis->zRem('delayed_queue', $task);
}
sleep(1); // 避免频繁轮询
}
使用数据库表
创建一个数据库表来存储延迟任务,包含执行时间和状态字段。通过定时任务查询到达执行时间的任务并处理。

CREATE TABLE delayed_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
task_data TEXT,
execute_time TIMESTAMP,
status ENUM('pending', 'processed') DEFAULT 'pending'
);
// 添加延迟任务
$executeTime = date('Y-m-d H:i:s', time() + 10); // 10秒后执行
$db->query("INSERT INTO delayed_queue (task_data, execute_time) VALUES ('task_data', '$executeTime')");
// 消费延迟任务
$tasks = $db->query("SELECT * FROM delayed_queue WHERE execute_time <= NOW() AND status = 'pending'");
foreach ($tasks as $task) {
processTask($task['task_data']);
$db->query("UPDATE delayed_queue SET status = 'processed' WHERE id = " . $task['id']);
}
使用消息队列(如RabbitMQ)
RabbitMQ支持延迟队列插件(rabbitmq-delayed-message-exchange),可以设置消息的延迟时间。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明延迟交换机
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, [
'x-delayed-type' => ['S', 'direct']
]);
// 添加延迟任务
$message = new AMQPMessage('task_data', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => ['x-delay' => 10000] // 10秒延迟
]);
$channel->basic_publish($message, 'delayed_exchange', 'delayed_routing_key');
// 消费延迟任务
$channel->basic_consume('delayed_queue', '', false, true, false, false, function($msg) {
processTask($msg->body);
});
使用定时任务(Cron)
通过Cron定时运行脚本,检查并处理到达执行时间的任务。这种方法适合对延迟精度要求不高的场景。
// 添加延迟任务
$executeTime = time() + 10;
file_put_contents('/path/to/tasks/' . $executeTime . '.task', 'task_data');
// 消费延迟任务(由Cron每分钟执行)
$now = time();
foreach (glob('/path/to/tasks/*.task') as $file) {
$executeTime = basename($file, '.task');
if ($executeTime <= $now) {
$taskData = file_get_contents($file);
processTask($taskData);
unlink($file);
}
}
注意事项
- Redis方案性能高但不持久化,需考虑数据丢失风险。
- 数据库方案简单但性能较低,适合低频任务。
- RabbitMQ方案功能完善但需安装插件,适合复杂场景。
- 定时任务方案实现简单但精度低,适合非实时任务。






