php实现rabbitmq
PHP 实现 RabbitMQ 的基本方法
RabbitMQ 是一个开源的消息代理软件,用于实现消息队列系统。在 PHP 中,可以通过 php-amqplib 库来实现 RabbitMQ 的基本功能。
安装 php-amqplib 库:
composer require php-amqplib/php-amqplib
生产者(Producer)示例代码
生产者负责将消息发送到 RabbitMQ 队列中。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('hello', false, false, false, false);
// 创建消息
$msg = new AMQPMessage('Hello World!');
// 发送消息
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
// 关闭连接
$channel->close();
$connection->close();
?>
消费者(Consumer)示例代码
消费者负责从 RabbitMQ 队列中接收并处理消息。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
// 定义回调函数处理消息
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
// 消费消息
$channel->basic_consume('hello', '', false, true, false, false, $callback);
// 持续监听消息
while ($channel->is_consuming()) {
$channel->wait();
}
// 关闭连接
$channel->close();
$connection->close();
?>
消息持久化
为了确保消息在 RabbitMQ 重启后不丢失,可以将队列和消息设置为持久化。

生产者修改部分:
// 声明持久化队列
$channel->queue_declare('hello', false, true, false, false);
// 创建持久化消息
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
工作队列(Work Queue)
多个消费者共同处理队列中的消息,实现负载均衡。
生产者修改部分:

// 发送多条消息
for ($i = 0; $i < 10; $i++) {
$msg = new AMQPMessage("Message $i");
$channel->basic_publish($msg, '', 'hello');
}
消费者修改部分:
// 设置公平分发
$channel->basic_qos(null, 1, null);
// 处理消息时模拟耗时任务
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(1); // 模拟耗时任务
echo " [x] Done\n";
$msg->ack(); // 手动确认消息
};
交换机(Exchange)示例
RabbitMQ 支持多种交换机类型,如直连(direct)、主题(topic)、扇出(fanout)等。
扇出交换机示例(广播消息):
// 生产者
$channel->exchange_declare('logs', 'fanout', false, false, false);
$msg = new AMQPMessage('Broadcast message');
$channel->basic_publish($msg, 'logs');
// 消费者
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
错误处理与重连
在实际应用中,需要处理连接断开的情况。
try {
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// ... 其他代码
} catch (Exception $e) {
echo "Connection failed: " . $e->getMessage();
// 实现重连逻辑
}
通过以上方法,可以在 PHP 中实现 RabbitMQ 的基本功能,包括消息的生产、消费、持久化、工作队列和交换机等高级特性。






