php队列功能实现
PHP 队列功能实现方法
使用数据库实现队列
创建一个数据库表来存储队列任务,例如:
CREATE TABLE queue (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(255) NOT NULL,
payload TEXT,
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
添加任务到队列:
function addToQueue($taskName, $data) {
$db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
$stmt = $db->prepare("INSERT INTO queue (task_name, payload) VALUES (?, ?)");
$stmt->execute([$taskName, json_encode($data)]);
return $db->lastInsertId();
}
处理队列任务:
function processQueue() {
$db = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
$db->beginTransaction();
$stmt = $db->prepare("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
$update = $db->prepare("UPDATE queue SET status = 'processing' WHERE id = ?");
$update->execute([$task['id']]);
$db->commit();
try {
$payload = json_decode($task['payload'], true);
// 执行任务逻辑
// ...
$update = $db->prepare("UPDATE queue SET status = 'completed' WHERE id = ?");
$update->execute([$task['id']]);
} catch (Exception $e) {
$update = $db->prepare("UPDATE queue SET status = 'failed' WHERE id = ?");
$update->execute([$task['id']]);
}
} else {
$db->rollBack();
}
}
使用Redis实现队列
利用Redis的列表数据结构实现简单队列:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加任务
function addToRedisQueue($queueName, $data) {
global $redis;
return $redis->rPush($queueName, json_encode($data));
}
// 处理任务
function processRedisQueue($queueName) {
global $redis;
while ($task = $redis->lPop($queueName)) {
$data = json_decode($task, true);
// 执行任务逻辑
// ...
}
}
使用专业队列系统
集成Laravel队列系统(即使不在Laravel项目中):

安装依赖:
composer require illuminate/queue
配置代码示例:
require 'vendor/autoload.php';
use Illuminate\Container\Container;
use Illuminate\Queue\Capsule\Manager as Queue;
$queue = new Queue(new Container);
$queue->addConnection([
'driver' => 'redis',
'host' => '127.0.0.1',
'port' => 6379,
'queue' => 'default',
]);
// 注册任务类
class ProcessPodcast {
public function fire($job, $data) {
// 处理任务
$job->delete();
}
}
// 添加任务到队列
$queue->push('ProcessPodcast@fire', ['payload' => 'data']);
// 启动队列worker
// 命令行执行:php worker.php
使用消息队列服务
集成RabbitMQ:

安装PHP AMQP扩展或库:
pecl install amqp
composer require php-amqplib/php-amqplib
示例代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 生产者
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = ['task' => 'process_data'];
$msg = new AMQPMessage(
json_encode($data),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();
$connection->close();
// 消费者
$callback = function ($msg) {
$data = json_decode($msg->body, true);
// 处理任务
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
定时任务处理
使用crontab定期运行队列处理器:
* * * * * php /path/to/queue/processor.php
processor.php内容:
// 包含队列处理逻辑
// 每次运行处理一定数量的任务后退出
以上方法可根据项目需求和规模选择适合的实现方式,从简单的数据库队列到专业的消息队列系统,各有优缺点。






