php 消息队列实现
PHP 消息队列实现方法
使用 Redis 实现消息队列
Redis 的 List 数据结构非常适合实现简单的消息队列。通过 LPUSH 和 RPOP 命令可以实现先进先出的队列。
安装 Redis 扩展:
pecl install redis
生产者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->lPush('message_queue', json_encode(['data' => 'test message']));
消费者代码示例:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while(true) {
$message = $redis->rPop('message_queue');
if ($message) {
$data = json_decode($message, true);
// 处理消息
}
sleep(1); // 避免CPU过度占用
}
使用 RabbitMQ 实现消息队列
RabbitMQ 是专业的消息队列系统,提供更可靠的消息传递机制。
安装 PHP AMQP 扩展:
pecl install amqp
生产者代码示例:
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange_name');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName('queue_name');
$queue->declareQueue();
$queue->bind('exchange_name', 'routing_key');
$exchange->publish('message content', 'routing_key');
消费者代码示例:
$connection = new AMQPConnection([...]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('queue_name');
$queue->declareQueue();
while(true) {
$message = $queue->get();
if ($message) {
// 处理消息
$queue->ack($message->getDeliveryTag());
}
sleep(1);
}
使用数据库实现简单队列
对于小型应用,可以使用数据库表作为队列存储。
创建消息表:
CREATE TABLE message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
message TEXT NOT NULL,
status TINYINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
生产者代码:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO message_queue (message) VALUES (?)");
$stmt->execute([json_encode(['data' => 'test'])]);
消费者代码:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
while(true) {
$pdo->beginTransaction();
$stmt = $pdo->prepare("SELECT * FROM message_queue WHERE status = 0 ORDER BY id ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$message = $stmt->fetch(PDO::FETCH_ASSOC);
if ($message) {
// 处理消息
$update = $pdo->prepare("UPDATE message_queue SET status = 1 WHERE id = ?");
$update->execute([$message['id']]);
$pdo->commit();
} else {
$pdo->rollBack();
}
sleep(1);
}
使用 Laravel 队列系统
Laravel 提供了内置的队列系统,支持多种驱动如数据库、Redis、Amazon SQS 等。
创建任务:
php artisan make:job ProcessMessage
定义任务逻辑:
class ProcessMessage implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $message;
public function __construct($message)
{
$this->message = $message;
}
public function handle()
{
// 处理消息逻辑
}
}
分发任务:
ProcessMessage::dispatch('message content');
启动队列处理器:
php artisan queue:work
使用 Beanstalkd 实现队列
Beanstalkd 是一个轻量级消息队列服务。
安装 Pheanstalk 库:
composer require pda/pheanstalk
生产者代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
$pheanstalk->useTube('testtube')->put('message data');
消费者代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
while(true) {
$job = $pheanstalk->watch('testtube')->reserve();
$message = $job->getData();
// 处理消息
$pheanstalk->delete($job);
sleep(1);
}
消息队列实现注意事项
确保消息处理的幂等性,避免重复处理导致的问题
对于关键业务消息,实现消息持久化和重试机制
考虑消息积压时的处理策略,如增加消费者数量
监控队列长度和处理延迟,及时发现性能问题
根据业务需求选择合适的队列系统,简单场景可用Redis,复杂场景建议使用RabbitMQ或Kafka







