消息队列 实现 php
消息队列的基本概念
消息队列是一种异步通信机制,允许应用程序通过发送和接收消息来解耦处理流程。常用于削峰填谷、异步任务处理、系统解耦等场景。
PHP 实现消息队列的常见方式
使用 Redis 实现
Redis 的 LPUSH 和 BRPOP 命令可以模拟简单的队列:
// 生产者
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->lPush('task_queue', json_encode(['task_id' => 123, 'data' => 'example']));
// 消费者
while (true) {
$task = $redis->brPop('task_queue', 0);
$data = json_decode($task[1], true);
// 处理任务逻辑
}
使用 RabbitMQ
RabbitMQ 是专业的消息队列中间件,需安装 php-amqplib 库:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 生产者
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'task_queue');
// 消费者
$callback = function ($msg) {
echo 'Received: ', $msg->body, "\n";
};
$channel->basic_consume('task_queue', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
使用数据库表模拟队列
通过数据库表实现简单的队列(适合低并发场景):
// 创建任务表
CREATE TABLE `task_queue` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`payload` TEXT NOT NULL,
`status` ENUM('pending', 'processed') DEFAULT 'pending',
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// 生产者
$pdo = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$stmt = $pdo->prepare("INSERT INTO task_queue (payload) VALUES (?)");
$stmt->execute([json_encode(['task' => 'data'])]);
// 消费者
$stmt = $pdo->prepare("SELECT * FROM task_queue WHERE status = 'pending' LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
// 处理任务
$update = $pdo->prepare("UPDATE task_queue SET status = 'processed' WHERE id = ?");
$update->execute([$task['id']]);
}
使用 Beanstalkd
Beanstalkd 是轻量级消息队列服务,需安装 pheanstalk 库:
require_once 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
// 生产者
$queue = new Pheanstalk('127.0.0.1');
$queue->useTube('test')->put('task data');
// 消费者
$job = $queue->watch('test')->ignore('default')->reserve();
echo $job->getData();
$queue->delete($job);
选择建议
- Redis:适合轻量级、高吞吐场景,但无持久化保障(需配置 RDB/AOF)。
- RabbitMQ:功能全面,支持复杂路由和持久化,适合企业级应用。
- 数据库:简单易用,但性能较差,适合低频任务。
- Beanstalkd:轻量高效,但功能较单一,适合简单任务队列。
根据实际需求选择合适的技术方案,并注意处理消息确认、重试和死信队列等边界情况。







