php 队列实现
PHP 队列实现方法
PHP 队列可以通过多种方式实现,以下是几种常见的方法:
使用数据库实现队列
创建数据库表存储队列任务:
CREATE TABLE queue (
id INT AUTO_INCREMENT PRIMARY KEY,
task VARCHAR(255) NOT NULL,
status ENUM('pending', 'processing', 'completed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL
);
添加任务到队列:
function enqueue($task) {
$pdo = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO queue (task) VALUES (?)");
$stmt->execute([$task]);
}
处理队列任务:
function processQueue() {
$pdo = new PDO('mysql:host=localhost;dbname=your_db', 'username', 'password');
// 开始事务
$pdo->beginTransaction();
// 获取待处理任务
$stmt = $pdo->query("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
// 更新任务状态为处理中
$update = $pdo->prepare("UPDATE queue SET status = 'processing', processed_at = NOW() WHERE id = ?");
$update->execute([$task['id']]);
// 提交事务
$pdo->commit();
// 执行任务
try {
// 执行任务逻辑
// ...
// 标记任务为完成
$complete = $pdo->prepare("UPDATE queue SET status = 'completed' WHERE id = ?");
$complete->execute([$task['id']]);
} catch (Exception $e) {
// 处理失败逻辑
// ...
}
} else {
$pdo->commit();
}
}
使用 Redis 实现队列
利用 Redis 的列表数据结构实现简单队列:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 入队
function enqueue($task) {
global $redis;
$redis->rPush('queue', json_encode($task));
}
// 出队
function dequeue() {
global $redis;
$task = $redis->lPop('queue');
return $task ? json_decode($task, true) : null;
}
// 处理队列
function processQueue() {
while ($task = dequeue()) {
try {
// 处理任务逻辑
// ...
} catch (Exception $e) {
// 处理失败逻辑
// ...
}
}
}
使用专业队列系统
对于生产环境,建议使用专业的队列系统如:
-
Beanstalkd:
- 轻量级消息队列系统
- 支持优先级、延迟、超时等特性
-
RabbitMQ:
- 功能丰富的消息代理
- 支持多种消息协议
-
AWS SQS:
- 托管的消息队列服务
- 无需维护基础设施
Laravel 队列系统
如果使用 Laravel 框架,内置队列系统提供多种驱动:

// 创建任务
php artisan make:job ProcessPodcast
// 分发任务
ProcessPodcast::dispatch($podcast);
// 配置队列驱动
// .env 文件中设置
QUEUE_CONNECTION=database // 或 redis, beanstalkd, sqs 等
// 启动队列处理器
php artisan queue:work
队列最佳实践
- 实现幂等性处理,防止重复执行
- 添加重试机制处理临时失败
- 监控队列长度和处理延迟
- 考虑使用优先级队列处理重要任务
- 实现死信队列处理无法完成的任务
以上方法可根据项目需求和规模选择合适的实现方式,从简单的数据库队列到专业的消息队列系统各有优缺点。






