当前位置:首页 > PHP

php队列功能实现

2026-02-15 11:49:20PHP

PHP 队列功能实现方法

使用数据库实现队列

创建一个数据库表来存储队列任务,例如:

CREATE TABLE queue (
    id INT AUTO_INCREMENT PRIMARY KEY,
    task_name VARCHAR(255) NOT NULL,
    payload TEXT,
    status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

添加任务到队列:

function addToQueue($taskName, $data) {
    $db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
    $stmt = $db->prepare("INSERT INTO queue (task_name, payload) VALUES (?, ?)");
    $stmt->execute([$taskName, json_encode($data)]);
    return $db->lastInsertId();
}

处理队列任务:

function processQueue() {
    $db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
    $db->beginTransaction();

    $stmt = $db->prepare("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
    $stmt->execute();
    $task = $stmt->fetch(PDO::FETCH_ASSOC);

    if ($task) {
        $update = $db->prepare("UPDATE queue SET status = 'processing' WHERE id = ?");
        $update->execute([$task['id']]);
        $db->commit();

        try {
            $payload = json_decode($task['payload'], true);
            // 执行任务逻辑
            // ...

            $update = $db->prepare("UPDATE queue SET status = 'completed' WHERE id = ?");
            $update->execute([$task['id']]);
        } catch (Exception $e) {
            $update = $db->prepare("UPDATE queue SET status = 'failed' WHERE id = ?");
            $update->execute([$task['id']]);
        }
    } else {
        $db->rollBack();
    }
}

使用Redis实现队列

利用Redis的列表数据结构实现简单队列:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 添加任务
function addToRedisQueue($queueName, $data) {
    global $redis;
    return $redis->rPush($queueName, json_encode($data));
}

// 处理任务
function processRedisQueue($queueName) {
    global $redis;
    while ($task = $redis->lPop($queueName)) {
        $data = json_decode($task, true);
        // 执行任务逻辑
        // ...
    }
}

使用专业队列系统

集成Laravel队列系统(即使不在Laravel项目中):

php队列功能实现

安装依赖:

composer require illuminate/queue

配置代码示例:

require 'vendor/autoload.php';

use Illuminate\Container\Container;
use Illuminate\Queue\Capsule\Manager as Queue;

$queue = new Queue(new Container);

$queue->addConnection([
    'driver' => 'redis',
    'host' => '127.0.0.1',
    'port' => 6379,
    'queue' => 'default',
]);

// 注册任务类
class ProcessPodcast {
    public function fire($job, $data) {
        // 处理任务
        $job->delete();
    }
}

// 添加任务到队列
$queue->push('ProcessPodcast@fire', ['payload' => 'data']);

// 启动队列worker
// 命令行执行:php worker.php

使用消息队列服务

集成RabbitMQ:

php队列功能实现

安装PHP AMQP扩展或库:

pecl install amqp
composer require php-amqplib/php-amqplib

示例代码:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 生产者
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);

$data = ['task' => 'process_data'];
$msg = new AMQPMessage(
    json_encode($data),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();
$connection->close();

// 消费者
$callback = function ($msg) {
    $data = json_decode($msg->body, true);
    // 处理任务
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

定时任务处理

使用crontab定期运行队列处理器:

* * * * * php /path/to/queue/processor.php

processor.php内容:

// 包含队列处理逻辑
// 每次运行处理一定数量的任务后退出

以上方法可根据项目需求和规模选择适合的实现方式,从简单的数据库队列到专业的消息队列系统,各有优缺点。

标签: 队列功能
分享给朋友:

相关文章

vue 实现 功能

vue 实现 功能

Vue.js 是一个流行的前端框架,用于构建用户界面和单页应用程序。以下是一些常见的 Vue 功能实现方法: 数据绑定 Vue 的核心特性之一是数据绑定,可以通过 v-model 指令实现双向数据绑…

vue功能实现难点

vue功能实现难点

Vue 功能实现的常见难点 响应式系统深度监听 Vue 的响应式系统基于 Object.defineProperty(Vue 2)或 Proxy(Vue 3),但对嵌套对象或数组的深层属性变化需要额外…

php实现评论功能

php实现评论功能

数据库设计 创建评论表(示例结构): CREATE TABLE `comments` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id`…

vue的功能实现

vue的功能实现

Vue 核心功能实现 Vue.js 是一个渐进式 JavaScript 框架,其核心功能主要包括数据绑定、组件化、虚拟 DOM 和响应式系统。以下从不同层面解析其实现原理: 响应式系统 Vue 2.…

vue模板功能实现

vue模板功能实现

Vue 模板功能实现 Vue 的模板功能是 Vue.js 的核心特性之一,允许开发者通过声明式的方式将数据绑定到 DOM。以下是 Vue 模板功能的主要实现方式和相关技术细节。 插值语法 Vue 使…

vue 实现拍照功能

vue 实现拍照功能

使用HTML5的getUserMedia API实现拍照 在Vue中实现拍照功能可以通过HTML5的getUserMedia API访问摄像头,结合canvas元素捕获图像。 安装依赖(如需处理图像…