当前位置:首页 > PHP

php队列功能实现

2026-02-15 11:49:20PHP

PHP 队列功能实现方法

使用数据库实现队列

创建一个数据库表来存储队列任务,例如:

CREATE TABLE queue (
    id INT AUTO_INCREMENT PRIMARY KEY,
    task_name VARCHAR(255) NOT NULL,
    payload TEXT,
    status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

添加任务到队列:

function addToQueue($taskName, $data) {
    $db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
    $stmt = $db->prepare("INSERT INTO queue (task_name, payload) VALUES (?, ?)");
    $stmt->execute([$taskName, json_encode($data)]);
    return $db->lastInsertId();
}

处理队列任务:

function processQueue() {
    $db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
    $db->beginTransaction();

    $stmt = $db->prepare("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
    $stmt->execute();
    $task = $stmt->fetch(PDO::FETCH_ASSOC);

    if ($task) {
        $update = $db->prepare("UPDATE queue SET status = 'processing' WHERE id = ?");
        $update->execute([$task['id']]);
        $db->commit();

        try {
            $payload = json_decode($task['payload'], true);
            // 执行任务逻辑
            // ...

            $update = $db->prepare("UPDATE queue SET status = 'completed' WHERE id = ?");
            $update->execute([$task['id']]);
        } catch (Exception $e) {
            $update = $db->prepare("UPDATE queue SET status = 'failed' WHERE id = ?");
            $update->execute([$task['id']]);
        }
    } else {
        $db->rollBack();
    }
}

使用Redis实现队列

利用Redis的列表数据结构实现简单队列:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 添加任务
function addToRedisQueue($queueName, $data) {
    global $redis;
    return $redis->rPush($queueName, json_encode($data));
}

// 处理任务
function processRedisQueue($queueName) {
    global $redis;
    while ($task = $redis->lPop($queueName)) {
        $data = json_decode($task, true);
        // 执行任务逻辑
        // ...
    }
}

使用专业队列系统

集成Laravel队列系统(即使不在Laravel项目中):

php队列功能实现

安装依赖:

composer require illuminate/queue

配置代码示例:

require 'vendor/autoload.php';

use Illuminate\Container\Container;
use Illuminate\Queue\Capsule\Manager as Queue;

$queue = new Queue(new Container);

$queue->addConnection([
    'driver' => 'redis',
    'host' => '127.0.0.1',
    'port' => 6379,
    'queue' => 'default',
]);

// 注册任务类
class ProcessPodcast {
    public function fire($job, $data) {
        // 处理任务
        $job->delete();
    }
}

// 添加任务到队列
$queue->push('ProcessPodcast@fire', ['payload' => 'data']);

// 启动队列worker
// 命令行执行:php worker.php

使用消息队列服务

集成RabbitMQ:

php队列功能实现

安装PHP AMQP扩展或库:

pecl install amqp
composer require php-amqplib/php-amqplib

示例代码:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 生产者
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);

$data = ['task' => 'process_data'];
$msg = new AMQPMessage(
    json_encode($data),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();
$connection->close();

// 消费者
$callback = function ($msg) {
    $data = json_decode($msg->body, true);
    // 处理任务
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

定时任务处理

使用crontab定期运行队列处理器:

* * * * * php /path/to/queue/processor.php

processor.php内容:

// 包含队列处理逻辑
// 每次运行处理一定数量的任务后退出

以上方法可根据项目需求和规模选择适合的实现方式,从简单的数据库队列到专业的消息队列系统,各有优缺点。

标签: 队列功能
分享给朋友:

相关文章

vue实现聊天功能

vue实现聊天功能

Vue实现聊天功能的方法 使用WebSocket实现实时通信 安装WebSocket客户端库如socket.io-client,在Vue组件中建立连接。创建一个消息列表和数据输入框,通过WebSock…

vue实现队列

vue实现队列

Vue 实现队列功能 在 Vue 中实现队列功能,可以利用数组的 push 和 shift 方法模拟队列的先进先出(FIFO)特性。以下是一个简单的实现示例: <template>…

vue实现轨道功能

vue实现轨道功能

Vue 实现轨道功能的方法 轨道功能通常指在界面中实现元素沿特定路径移动的效果,例如进度条、时间轴或动画轨迹。以下是几种常见的实现方式: 使用 CSS 动画与 Vue 结合 通过 Vue 动态绑定样…

vue实现粘贴功能

vue实现粘贴功能

实现粘贴功能的基本方法 在Vue中实现粘贴功能通常涉及监听paste事件,并处理剪贴板数据。以下是一个基础实现示例: <template> <div @paste="handl…

vue各种功能实现

vue各种功能实现

Vue 基础功能实现 数据绑定与响应式更新 使用 v-model 实现表单元素与数据的双向绑定,数据变更自动更新视图。 示例: <input v-model="message">…

vue图片实现功能

vue图片实现功能

图片上传功能实现 使用 <input type="file"> 结合 Vue 的 v-on:change 事件监听文件选择 <template> <input ty…