php实现延迟队列
延迟队列的实现方式
PHP中实现延迟队列可以通过多种方式,以下是几种常见的方法:
使用Redis的Sorted Set
Redis的Sorted Set(有序集合)可以用于实现延迟队列。将任务的执行时间作为分数(score),任务内容作为成员(member)。通过定时轮询Sorted Set中分数小于当前时间的成员来执行任务。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加延迟任务
$delayTime = time() + 3600; // 1小时后执行
$redis->zAdd('delayed_queue', $delayTime, json_encode(['task' => 'send_email', 'data' => ['email' => 'user@example.com']]));
// 消费延迟任务
while (true) {
$now = time();
$tasks = $redis->zRangeByScore('delayed_queue', 0, $now);
foreach ($tasks as $task) {
$taskData = json_decode($task, true);
// 执行任务逻辑
echo "Processing task: " . print_r($taskData, true) . "\n";
$redis->zRem('delayed_queue', $task);
}
sleep(1); // 避免频繁轮询
}
使用数据库表
通过数据库表记录任务的执行时间,定时查询并执行到期的任务。
// 创建延迟任务表
CREATE TABLE delayed_tasks (
id INT AUTO_INCREMENT PRIMARY KEY,
task_type VARCHAR(50) NOT NULL,
task_data TEXT NOT NULL,
execute_at TIMESTAMP NOT NULL,
status ENUM('pending', 'processing', 'completed') DEFAULT 'pending'
);
// 添加延迟任务
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO delayed_tasks (task_type, task_data, execute_at) VALUES (?, ?, ?)");
$stmt->execute(['send_email', json_encode(['email' => 'user@example.com']), date('Y-m-d H:i:s', time() + 3600)]);
// 消费延迟任务
while (true) {
$now = date('Y-m-d H:i:s');
$stmt = $pdo->prepare("SELECT * FROM delayed_tasks WHERE execute_at <= ? AND status = 'pending' LIMIT 10");
$stmt->execute([$now]);
$tasks = $stmt->fetchAll(PDO::FETCH_ASSOC);
foreach ($tasks as $task) {
// 更新任务状态为处理中
$updateStmt = $pdo->prepare("UPDATE delayed_tasks SET status = 'processing' WHERE id = ?");
$updateStmt->execute([$task['id']]);
// 执行任务逻辑
echo "Processing task: " . print_r($task, true) . "\n";
// 更新任务状态为已完成
$updateStmt = $pdo->prepare("UPDATE delayed_tasks SET status = 'completed' WHERE id = ?");
$updateStmt->execute([$task['id']]);
}
sleep(1);
}
使用消息队列服务
使用专业的消息队列服务如RabbitMQ、Kafka等,支持延迟队列功能。
以RabbitMQ为例,安装rabbitmq_delayed_message_exchange插件后,可以实现延迟队列。
// 使用php-amqplib库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明延迟交换机
$args = new \PhpAmqpLib\Wire\AMQPTable([
'x-delayed-type' => 'direct'
]);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
// 声明队列
$channel->queue_declare('delayed_queue', false, true, false, false);
$channel->queue_bind('delayed_queue', 'delayed_exchange', 'delayed_routing_key');
// 发送延迟消息
$message = new AMQPMessage(json_encode(['task' => 'send_email', 'data' => ['email' => 'user@example.com']]), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => [
'x-delay' => 3600000 // 延迟1小时(毫秒)
]
]);
$channel->basic_publish($message, 'delayed_exchange', 'delayed_routing_key');
// 消费消息
$callback = function ($msg) {
echo "Processing task: " . $msg->body . "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
使用定时任务(Cron Job)
结合Cron Job和队列系统,定时触发任务执行。
// 在Cron Job中设置每分钟执行一次
* * * * * php /path/to/process_delayed_tasks.php
// process_delayed_tasks.php
$tasks = getDelayedTasksFromDatabase(); // 获取到期的任务
foreach ($tasks as $task) {
processTask($task); // 处理任务
}
选择建议
- 轻量级需求:Redis的Sorted Set或数据库表实现简单,适合小规模应用。
- 高性能需求:RabbitMQ等消息队列服务提供更好的性能和可靠性。
- 大规模分布式系统:考虑使用专业的延迟队列服务如Kafka、AWS SQS等。







