php 实现消息队列
消息队列的基本概念
消息队列是一种异步通信机制,允许应用程序通过发送和接收消息进行解耦。在PHP中,消息队列常用于处理耗时任务、异步通知等场景。
使用Redis实现消息队列
Redis的List数据结构适合实现简单的消息队列。生产者通过LPUSH命令将消息推入队列,消费者通过BRPOP命令阻塞等待消息。

// 生产者代码
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->lPush('message_queue', json_encode(['task' => 'process_data', 'data' => $data]));
// 消费者代码
while (true) {
$message = $redis->brPop('message_queue', 0);
$task = json_decode($message[1], true);
// 处理任务逻辑
}
使用RabbitMQ实现消息队列
RabbitMQ是专业的消息队列系统,提供更完善的消息确认、持久化等功能。需要安装php-amqplib扩展。
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 生产者
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'task_queue');
// 消费者
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
使用数据库实现消息队列
MySQL等关系型数据库也可以作为简单的消息队列存储方案。创建消息表存储待处理消息,消费者定期轮询。

CREATE TABLE message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
queue_name VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TINYINT DEFAULT 0
);
// 生产者
$pdo->prepare("INSERT INTO message_queue (queue_name, message) VALUES (?, ?)")
->execute(['email_queue', json_encode($emailData)]);
// 消费者
$stmt = $pdo->prepare("SELECT * FROM message_queue WHERE status = 0 AND queue_name = ? ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$stmt->execute(['email_queue']);
$message = $stmt->fetch();
if ($message) {
// 处理消息
$pdo->prepare("UPDATE message_queue SET status = 1 WHERE id = ?")->execute([$message['id']]);
}
使用Beanstalkd实现消息队列
Beanstalkd是轻量级消息队列服务,适合PHP环境。需要安装pheanstalk扩展。
require_once 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
// 生产者
$pheanstalk = Pheanstalk::create('127.0.0.1');
$pheanstalk->useTube('test')->put(json_encode(['data' => 'value']));
// 消费者
$job = $pheanstalk->watch('test')->ignore('default')->reserve();
$data = json_decode($job->getData(), true);
// 处理任务
$pheanstalk->delete($job);
消息队列的最佳实践
消息处理应实现幂等性,避免重复处理导致问题。重要消息需要持久化存储,防止服务重启导致消息丢失。对于高并发场景,建议使用专业的消息队列系统如RabbitMQ或Kafka。
消费者进程建议使用supervisor等工具管理,确保进程异常退出后能自动重启。消息处理失败时应实现重试机制,超过重试次数后转入死信队列人工处理。






