php实现redis消息队列
安装 Redis 扩展
确保 PHP 已安装 Redis 扩展。可以通过以下命令安装:
pecl install redis
在 php.ini 中添加 extension=redis.so 并重启 PHP 服务。
连接 Redis 服务器
使用 Redis 类连接到 Redis 服务器:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
如果需要认证,添加 $redis->auth('password')。
生产者推送消息
使用 lPush 或 rPush 将消息推送到队列:

$queueName = 'message_queue';
$message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com']]);
$redis->lPush($queueName, $message);
消费者获取消息
使用 brPop 或 blPop 阻塞式获取消息(避免轮询):
while (true) {
$message = $redis->brPop($queueName, 0); // 0 表示无限等待
if ($message) {
$task = json_decode($message[1], true);
// 处理任务逻辑
echo "Processing: " . $task['task'] . "\n";
}
}
错误处理与重试
添加异常捕获和重试机制:
try {
$redis->lPush($queueName, $message);
} catch (RedisException $e) {
error_log("Redis error: " . $e->getMessage());
// 重试或记录失败
}
队列监控
使用 llen 查看队列长度:

$length = $redis->llen($queueName);
echo "Queue length: $length\n";
多消费者负载均衡
启动多个消费者进程或使用 Supervisor 管理进程:
[program:redis_worker]
command=php /path/to/worker.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
消息确认机制
实现 ACK 机制以确保消息处理完成:
$processingQueue = 'processing_queue';
$redis->rPush($processingQueue, $message);
// 处理完成后删除
$redis->lRem($processingQueue, $message, 1);
使用 Pub/Sub 模式
对于广播场景,使用发布订阅模式:
$redis->publish('channel_name', $message);
// 订阅端
$redis->subscribe(['channel_name'], function ($redis, $channel, $message) {
echo "Received: $message\n";
});






