php自动排队实现
PHP 自动排队实现方法
数据库驱动队列
使用数据库表作为队列存储,适合小型或中等规模应用。创建一个包含任务状态、创建时间等字段的表。
CREATE TABLE queue (
id INT AUTO_INCREMENT PRIMARY KEY,
task_data TEXT,
status ENUM('pending', 'processing', 'completed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL
);
PHP 代码实现入队和出队操作:
// 入队
function enqueue($taskData) {
$db = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
$stmt = $db->prepare("INSERT INTO queue (task_data) VALUES (?)");
$stmt->execute([json_encode($taskData)]);
}
// 出队
function dequeue() {
$db = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
$db->beginTransaction();
$stmt = $db->query("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE");
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
$update = $db->prepare("UPDATE queue SET status = 'processing', processed_at = NOW() WHERE id = ?");
$update->execute([$task['id']]);
$db->commit();
return json_decode($task['task_data'], true);
}
$db->rollBack();
return null;
}
Redis 队列实现
Redis 的列表数据结构非常适合实现高效队列系统,性能优于数据库方案。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 入队
function redisEnqueue($queueName, $taskData) {
global $redis;
$redis->rPush($queueName, json_encode($taskData));
}
// 出队
function redisDequeue($queueName) {
global $redis;
$task = $redis->lPop($queueName);
return $task ? json_decode($task, true) : null;
}
Laravel 队列系统
Laravel 提供了开箱即用的队列系统,支持数据库、Redis、Amazon SQS 等多种驱动。
定义任务类:
php artisan make:job ProcessTask
在生成的 Job 类中实现 handle 方法:
class ProcessTask implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
// 任务处理逻辑
}
}
分发任务:
ProcessTask::dispatch($data)->onQueue('high');
启动队列处理器:
php artisan queue:work --queue=high,default
消息队列系统集成
对于大规模应用,可以集成专业消息队列系统如 RabbitMQ 或 Apache Kafka。
RabbitMQ 示例:
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);
// 发布消息
$msg = new AMQPMessage(json_encode($taskData), ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'task_queue');
// 消费消息
$callback = function ($msg) {
$taskData = json_decode($msg->body, true);
// 处理任务
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
定时任务处理
对于不需要实时处理的队列,可以使用 cron 定时触发队列处理。
创建处理脚本:
// process_queue.php
while ($task = dequeue()) {
processTask($task);
}
设置 cron 任务:
* * * * * /usr/bin/php /path/to/process_queue.php
队列监控与管理
实现队列监控功能,可以跟踪队列长度、处理速度等指标。
Redis 队列监控示例:

function getQueueStats($queueName) {
global $redis;
return [
'pending' => $redis->lLen($queueName),
'processing' => $redis->get("queue:$queueName:processing"),
'failed' => $redis->get("queue:$queueName:failed")
];
}
对于生产环境,建议使用专门的队列监控工具如 Laravel Horizon 或 RabbitMQ 管理界面。






