php 实现消息队列
PHP 实现消息队列的方法
使用 Redis 实现消息队列
Redis 的 List 结构可以用于实现简单的消息队列。通过 LPUSH 和 RPOP 命令可以实现先进先出的队列。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 生产者:推送消息到队列
$redis->lPush('message_queue', 'Message 1');
$redis->lPush('message_queue', 'Message 2');
// 消费者:从队列中获取消息
$message = $redis->rPop('message_queue');
while ($message) {
echo "Processing: " . $message . "\n";
$message = $redis->rPop('message_queue');
}
使用 RabbitMQ 实现消息队列
RabbitMQ 是一个功能强大的消息队列系统,支持多种消息协议。PHP 可以通过 AMQP 扩展或库与 RabbitMQ 交互。
安装 AMQP 扩展:
pecl install amqp
生产者代码:
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('messages');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName('message_queue');
$queue->declareQueue();
$queue->bind('messages', 'message_key');
$exchange->publish('Message content', 'message_key');
消费者代码:
$connection = new AMQPConnection([...]);
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('message_queue');
$queue->declareQueue();
$queue->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
echo "Processing: " . $envelope->getBody() . "\n";
$queue->ack($envelope->getDeliveryTag());
});
使用数据库实现消息队列
MySQL 或 PostgreSQL 等关系型数据库也可以实现简单的消息队列。
创建消息表:
CREATE TABLE message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE
);
生产者代码:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO message_queue (message) VALUES (?)");
$stmt->execute(['Message content']);
消费者代码:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("SELECT * FROM message_queue WHERE processed = FALSE ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$pdo->beginTransaction();
$stmt->execute();
$message = $stmt->fetch(PDO::FETCH_ASSOC);
if ($message) {
echo "Processing: " . $message['message'] . "\n";
$update = $pdo->prepare("UPDATE message_queue SET processed = TRUE WHERE id = ?");
$update->execute([$message['id']]);
}
$pdo->commit();
使用 Beanstalkd 实现消息队列
Beanstalkd 是一个轻量级、快速的消息队列服务。
安装 Beanstalkd:
sudo apt-get install beanstalkd
生产者代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
$pheanstalk->useTube('test')->put('Message content');
消费者代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
$job = $pheanstalk->watch('test')->ignore('default')->reserve();
echo "Processing: " . $job->getData() . "\n";
$pheanstalk->delete($job);
使用 Laravel 队列系统
Laravel 提供了内置的队列系统,支持多种队列驱动(database, redis, beanstalkd, sqs 等)。
创建任务:
php artisan make:job ProcessMessage
任务类:
namespace App\Jobs;
class ProcessMessage implements ShouldQueue
{
public function __construct(public string $message) {}
public function handle()
{
echo "Processing: " . $this->message;
}
}
分发任务:
ProcessMessage::dispatch('Message content');
运行队列处理器:
php artisan queue:work
选择合适的消息队列方案
- 简单需求:Redis 或数据库队列足够
- 高性能需求:RabbitMQ 或 Beanstalkd
- Laravel 项目:使用内置队列系统
- 云环境:AWS SQS 或其他云服务队列
每种方案都有其优缺点,应根据项目需求、规模和技术栈选择最适合的方案。







