php redis实现消息队列
使用 PHP 和 Redis 实现消息队列
Redis 是一个高性能的键值存储系统,支持多种数据结构,如列表(List)、发布/订阅(Pub/Sub)等,非常适合实现消息队列。以下是几种常见的实现方式。
基于 Redis List 实现消息队列
Redis 的 List 数据结构支持 LPUSH(左插入)和 BRPOP(阻塞右弹出),可以轻松实现生产者-消费者模式。
生产者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 将消息推入队列
$message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com']]);
$redis->lPush('message_queue', $message);
消费者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 阻塞式弹出消息(超时时间 0 表示无限等待)
while (true) {
$message = $redis->brPop('message_queue', 0);
$data = json_decode($message[1], true);
// 处理消息逻辑
if ($data['task'] === 'send_email') {
echo "Sending email to: " . $data['data']['to'] . "\n";
}
}
基于 Redis Pub/Sub 实现消息队列
Redis 的发布/订阅模式适用于广播消息的场景,但消息不会持久化,消费者离线时会丢失消息。
发布者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 发布消息到频道
$message = json_encode(['event' => 'user_created', 'data' => ['id' => 123]]);
$redis->publish('notifications', $message);
订阅者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 订阅频道
$redis->subscribe(['notifications'], function ($redis, $channel, $message) {
$data = json_decode($message, true);
if ($data['event'] === 'user_created') {
echo "New user created: " . $data['data']['id'] . "\n";
}
});
使用 Redis Stream 实现消息队列(Redis 5.0+)
Redis Stream 是更高级的消息队列实现,支持消息持久化、消费者组和多消费者。
生产者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加消息到 Stream
$message = ['task' => 'process_order', 'order_id' => 456];
$redis->xAdd('order_stream', '*', $message);
消费者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 读取 Stream 中的消息
while (true) {
$messages = $redis->xRead(['order_stream' => '$'], 1, 1000);
if (!empty($messages)) {
foreach ($messages['order_stream'] as $id => $message) {
echo "Processing order: " . $message['order_id'] . "\n";
// 确认消息处理完成(可选)
$redis->xDel('order_stream', $id);
}
}
}
注意事项
- 消息确认机制:Redis List 和 Stream 支持手动确认消息,而 Pub/Sub 不支持。
- 持久化:Stream 和 List 可以持久化消息,Pub/Sub 消息在消费者离线时会丢失。
- 性能:List 适合简单队列,Stream 适合复杂场景(如消费者组)。
- 错误处理:在生产环境中需添加重试机制和异常处理。
根据具体需求选择合适的实现方式。







