当前位置:首页 > PHP

php实现延迟队列

2026-02-15 12:00:12PHP

延迟队列的实现方式

PHP中实现延迟队列可以通过多种方式,以下是几种常见的方法:

使用Redis的Sorted Set

Redis的Sorted Set(有序集合)可以用于实现延迟队列。将任务的执行时间作为分数(score),任务内容作为成员(member)。通过定时轮询Sorted Set中分数小于当前时间的成员来执行任务。

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 添加延迟任务
$delayTime = time() + 3600; // 1小时后执行
$redis->zAdd('delayed_queue', $delayTime, json_encode(['task' => 'send_email', 'data' => ['email' => 'user@example.com']]));

// 消费延迟任务
while (true) {
    $now = time();
    $tasks = $redis->zRangeByScore('delayed_queue', 0, $now);
    foreach ($tasks as $task) {
        $taskData = json_decode($task, true);
        // 执行任务逻辑
        echo "Processing task: " . print_r($taskData, true) . "\n";
        $redis->zRem('delayed_queue', $task);
    }
    sleep(1); // 避免频繁轮询
}

使用数据库表

通过数据库表记录任务的执行时间,定时查询并执行到期的任务。

// 创建延迟任务表
CREATE TABLE delayed_tasks (
    id INT AUTO_INCREMENT PRIMARY KEY,
    task_type VARCHAR(50) NOT NULL,
    task_data TEXT NOT NULL,
    execute_at TIMESTAMP NOT NULL,
    status ENUM('pending', 'processing', 'completed') DEFAULT 'pending'
);

// 添加延迟任务
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO delayed_tasks (task_type, task_data, execute_at) VALUES (?, ?, ?)");
$stmt->execute(['send_email', json_encode(['email' => 'user@example.com']), date('Y-m-d H:i:s', time() + 3600)]);

// 消费延迟任务
while (true) {
    $now = date('Y-m-d H:i:s');
    $stmt = $pdo->prepare("SELECT * FROM delayed_tasks WHERE execute_at <= ? AND status = 'pending' LIMIT 10");
    $stmt->execute([$now]);
    $tasks = $stmt->fetchAll(PDO::FETCH_ASSOC);
    foreach ($tasks as $task) {
        // 更新任务状态为处理中
        $updateStmt = $pdo->prepare("UPDATE delayed_tasks SET status = 'processing' WHERE id = ?");
        $updateStmt->execute([$task['id']]);

        // 执行任务逻辑
        echo "Processing task: " . print_r($task, true) . "\n";

        // 更新任务状态为已完成
        $updateStmt = $pdo->prepare("UPDATE delayed_tasks SET status = 'completed' WHERE id = ?");
        $updateStmt->execute([$task['id']]);
    }
    sleep(1);
}

使用消息队列服务

使用专业的消息队列服务如RabbitMQ、Kafka等,支持延迟队列功能。

以RabbitMQ为例,安装rabbitmq_delayed_message_exchange插件后,可以实现延迟队列。

// 使用php-amqplib库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明延迟交换机
$args = new \PhpAmqpLib\Wire\AMQPTable([
    'x-delayed-type' => 'direct'
]);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);

// 声明队列
$channel->queue_declare('delayed_queue', false, true, false, false);
$channel->queue_bind('delayed_queue', 'delayed_exchange', 'delayed_routing_key');

// 发送延迟消息
$message = new AMQPMessage(json_encode(['task' => 'send_email', 'data' => ['email' => 'user@example.com']]), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'headers' => [
        'x-delay' => 3600000 // 延迟1小时(毫秒)
    ]
]);
$channel->basic_publish($message, 'delayed_exchange', 'delayed_routing_key');

// 消费消息
$callback = function ($msg) {
    echo "Processing task: " . $msg->body . "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

使用定时任务(Cron Job)

结合Cron Job和队列系统,定时触发任务执行。

// 在Cron Job中设置每分钟执行一次
* * * * * php /path/to/process_delayed_tasks.php

// process_delayed_tasks.php
$tasks = getDelayedTasksFromDatabase(); // 获取到期的任务
foreach ($tasks as $task) {
    processTask($task); // 处理任务
}

选择建议

  • 轻量级需求:Redis的Sorted Set或数据库表实现简单,适合小规模应用。
  • 高性能需求:RabbitMQ等消息队列服务提供更好的性能和可靠性。
  • 大规模分布式系统:考虑使用专业的延迟队列服务如Kafka、AWS SQS等。

php实现延迟队列

标签: 队列php
分享给朋友:

相关文章

vue实现队列

vue实现队列

Vue 实现队列功能 在 Vue 中实现队列功能,可以利用数组的 push 和 shift 方法模拟队列的先进先出(FIFO)特性。以下是一个简单的实现示例: <template>…

php实现定时任务

php实现定时任务

PHP 实现定时任务的几种方法 在 PHP 中实现定时任务可以通过多种方式完成,具体选择取决于项目需求和服务器环境。以下是常见的实现方法: 使用 Cron 任务 Cron 是 Linux 系统中常…

php实现websocket

php实现websocket

PHP 实现 WebSocket WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。PHP 可以通过多种方式实现 WebSocket 功能,以下是几种常见的方法: 使用 Rat…

php实现注册

php实现注册

注册功能实现步骤 数据库准备 创建用户表,包含字段如id、username、password、email等。示例SQL: CREATE TABLE users ( id INT AUTO_I…

php实现排序

php实现排序

PHP 数组排序方法 PHP 提供了多种内置函数用于数组排序,适用于不同的排序需求。以下是常见的排序方法: 按值升序排序 sort() 函数对数组按值升序排序,重置键名(索引数组适用): $num…

php 实现购物车

php 实现购物车

PHP 实现购物车功能 购物车功能是电子商务网站的核心部分,可以通过 PHP 和 Session 或数据库来实现。以下是两种常见的实现方式: 使用 Session 存储购物车数据 Session…