php使用redis实现队列
PHP 使用 Redis 实现队列
Redis 提供了多种数据结构,如列表(List)、发布/订阅(Pub/Sub)和流(Stream),可以用于实现队列功能。以下是几种常见的实现方式。
使用 Redis 列表(List)实现队列
Redis 的列表数据结构支持在头部和尾部插入、删除元素,非常适合实现先进先出(FIFO)的队列。
入队操作(生产者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 将任务推入队列尾部
$redis->rpush('task_queue', json_encode(['task_id' => 123, 'data' => 'example']));
出队操作(消费者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 从队列头部取出任务(阻塞模式,超时时间为 0 表示无限等待)
$task = $redis->blpop('task_queue', 0);
$taskData = json_decode($task[1], true);
使用 Redis 发布/订阅(Pub/Sub)实现队列
发布/订阅模式适合广播消息的场景,但需要注意它不支持消息持久化。
发布者(生产者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 发布消息到频道
$redis->publish('task_channel', json_encode(['task_id' => 123, 'data' => 'example']));
订阅者(消费者)
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 订阅频道并处理消息
$redis->subscribe(['task_channel'], function ($redis, $channel, $message) {
$taskData = json_decode($message, true);
// 处理任务逻辑
});
使用 Redis 流(Stream)实现队列
Redis 5.0 引入的流数据结构更适合实现消息队列,支持消息持久化和消费者组。
生产者
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加消息到流
$redis->xAdd('task_stream', '*', ['task_id' => 123, 'data' => 'example']);
消费者
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 从流中读取消息(阻塞模式)
$messages = $redis->xRead(['task_stream' => '0'], 1, 0);
foreach ($messages as $stream => $entries) {
foreach ($entries as $entry) {
$taskData = $entry;
// 处理任务逻辑
}
}
使用消费者组(Consumer Group)
消费者组允许多个消费者协同处理消息,适合分布式场景。
创建消费者组
$redis->xGroup('CREATE', 'task_stream', 'task_group', '0', true);
消费者组消费
$messages = $redis->xReadGroup('task_group', 'consumer1', ['task_stream' => '>'], 1, 0);
foreach ($messages as $stream => $entries) {
foreach ($entries as $entry) {
$taskData = $entry;
// 处理任务逻辑
// 确认消息已处理
$redis->xAck('task_stream', 'task_group', [$entry['id']]);
}
}
注意事项
- 消息确认机制:使用
xAck确认消息处理完成,避免重复消费。 - 错误处理:捕获 Redis 异常并实现重试逻辑。
- 性能优化:批量处理消息以减少 Redis 操作次数。
以上方法可以根据实际需求选择,列表适合简单队列,流和消费者组适合复杂场景。







