php 消息队列 实现
PHP 消息队列实现方法
使用数据库作为队列
数据库表结构可以设计为包含任务ID、任务数据、状态等字段。通过插入记录实现入队,通过查询和更新状态实现出队。
CREATE TABLE queue (
id INT AUTO_INCREMENT PRIMARY KEY,
data TEXT,
status ENUM('pending', 'processing', 'completed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
入队操作示例:
$stmt = $pdo->prepare("INSERT INTO queue (data) VALUES (?)");
$stmt->execute([json_encode($taskData)]);
出队操作示例:
$pdo->beginTransaction();
$stmt = $pdo->prepare("SELECT * FROM queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch();
if ($task) {
$update = $pdo->prepare("UPDATE queue SET status = 'processing' WHERE id = ?");
$update->execute([$task['id']]);
$pdo->commit();
// 处理任务
}
使用Redis实现队列
Redis的列表数据结构非常适合实现队列。LPUSH用于入队,RPOP用于出队。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 入队
$redis->lPush('task_queue', json_encode($taskData));
// 出队
while (true) {
$task = $redis->rPop('task_queue');
if ($task) {
// 处理任务
}
sleep(1); // 避免空轮询
}
使用专业消息队列系统
对于生产环境,建议使用专业的消息队列系统如RabbitMQ或Beanstalkd。
RabbitMQ示例:
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$queue = new AMQPQueue($channel);
$queue->setName('tasks');
$queue->declareQueue();
// 生产者
$exchange->publish(json_encode($taskData), 'tasks');
// 消费者
$queue->consume(function(AMQPEnvelope $message) {
// 处理消息
return false; // 返回false表示确认消息
});
使用PHP的PCNTL扩展处理队列
对于需要并行处理的任务,可以结合PCNTL扩展创建多进程消费者。
$queue = new Redis(); // 或其他队列实现
$workers = 4;
for ($i = 0; $i < $workers; $i++) {
$pid = pcntl_fork();
if ($pid == -1) {
die("无法创建子进程");
} elseif ($pid) {
// 父进程
continue;
} else {
// 子进程
while (true) {
$task = $queue->rPop('task_queue');
if ($task) {
// 处理任务
}
sleep(1);
}
exit; // 子进程结束
}
}
// 父进程等待子进程
while (pcntl_waitpid(0, $status) != -1);
使用Laravel队列系统
Laravel提供了完整的队列解决方案,支持数据库、Redis、Beanstalkd等多种驱动。
创建任务:
php artisan make:job ProcessTask
定义任务:
class ProcessTask implements ShouldQueue
{
public function __construct(public array $data) {}
public function handle()
{
// 处理任务
}
}
分发任务:
ProcessTask::dispatch($data)->onQueue('tasks');
启动工作者:

php artisan queue:work --queue=tasks
每种实现方式都有其适用场景,数据库队列适合简单应用,Redis适合高性能需求,专业队列系统适合复杂场景,Laravel队列则提供了开箱即用的解决方案。






