php redis如何实现队列
PHP Redis 实现队列的方法
Redis 是一个高性能的键值存储系统,常用于实现队列功能。PHP 可以通过 phpredis 或 predis 扩展与 Redis 交互。以下是几种常见的 Redis 队列实现方式。
使用 List 结构实现队列
Redis 的 List 结构支持 LPUSH 和 RPOP 操作,适合实现 FIFO(先进先出)队列。

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 生产者:将任务推入队列
$redis->lPush('task_queue', json_encode(['task_id' => 1, 'data' => 'example']));
// 消费者:从队列中取出任务
$task = $redis->rPop('task_queue');
if ($task) {
$taskData = json_decode($task, true);
// 处理任务
}
使用阻塞式队列
Redis 的 BRPOP 命令可以阻塞等待队列中的任务,避免消费者轮询。
while (true) {
// 阻塞式获取任务,超时时间为 5 秒
$task = $redis->brPop('task_queue', 5);
if ($task) {
$taskData = json_decode($task[1], true);
// 处理任务
}
}
使用 Pub/Sub 实现消息队列
Redis 的 Pub/Sub 模式适合广播消息,但不保证消息的持久化。

// 订阅者
$redis->subscribe(['task_channel'], function ($redis, $channel, $message) {
$taskData = json_decode($message, true);
// 处理任务
});
// 发布者
$redis->publish('task_channel', json_encode(['task_id' => 1, 'data' => 'example']));
使用 Sorted Set 实现延迟队列
Sorted Set 可以按分数排序,适合实现延迟任务。
// 生产者:添加延迟任务(10 秒后执行)
$redis->zAdd('delayed_queue', time() + 10, json_encode(['task_id' => 1, 'data' => 'example']));
// 消费者:检查并执行到期任务
$tasks = $redis->zRangeByScore('delayed_queue', 0, time());
foreach ($tasks as $task) {
$taskData = json_decode($task, true);
// 处理任务
$redis->zRem('delayed_queue', $task);
}
使用 Redis Stream 实现高级队列
Redis 5.0 引入的 Stream 结构更适合消息队列场景,支持消息分组和确认机制。
// 生产者:添加消息到 Stream
$redis->xAdd('task_stream', '*', ['task_id' => 1, 'data' => 'example']);
// 消费者:读取消息
$messages = $redis->xRead(['task_stream' => '0'], 1);
foreach ($messages as $stream => $entries) {
foreach ($entries as $id => $data) {
// 处理任务
$redis->xAck('task_stream', 'consumer_group', $id);
}
}
注意事项
- 使用 Redis 队列时需考虑持久化配置,避免重启导致数据丢失。
- 高并发场景下需处理竞争条件,如使用
LPUSH和RPOP的组合。 - 对于关键任务,建议结合数据库记录任务状态,确保可靠性。
以上方法可根据具体需求选择,List 和 Stream 是最常用的 Redis 队列实现方式。






