php实现rabbitmq
PHP 实现 RabbitMQ 的基本方法
安装 PHP 的 AMQP 扩展 在 PHP 中使用 RabbitMQ 需要安装 AMQP 扩展。可以通过 pecl 安装:
pecl install amqp
安装完成后,在 php.ini 中添加 extension=amqp.so 并重启 PHP 服务。
使用 Composer 安装 PHP AMQP 库 如果需要更高级的功能,可以使用 PHP 的 AMQP 库:
composer require php-amqplib/php-amqplib
建立 RabbitMQ 连接
创建连接对象 使用 AMQPStreamConnection 类创建与 RabbitMQ 的连接:
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
'localhost', // host
5672, // port
'guest', // username
'guest' // password
);
创建信道 从连接中创建信道:
$channel = $connection->channel();
发送消息到队列
声明队列 在发送消息前需要先声明队列:
$channel->queue_declare(
'hello', // 队列名称
false, // passive
false, // durable
false, // exclusive
false // auto_delete
);
发布消息 使用 basic_publish 方法发送消息:
$msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
关闭连接 完成消息发送后关闭连接:
$channel->close();
$connection->close();
接收队列消息
声明队列 接收方也需要声明相同的队列:
$channel->queue_declare('hello', false, false, false, false);
定义回调函数 创建一个回调函数来处理接收到的消息:

$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
消费消息 使用 basic_consume 方法开始消费消息:
$channel->basic_consume('hello', '', false, true, false, false, $callback);
保持监听 创建一个循环来保持监听消息:
while ($channel->is_consuming()) {
$channel->wait();
}
关闭连接 完成消息接收后关闭连接:
$channel->close();
$connection->close();
处理消息确认
手动消息确认 为了确保消息被正确处理,可以启用手动确认模式:
$channel->basic_consume(
'hello',
'',
false,
false, // 关闭自动确认
false,
false,
$callback
);
在回调中确认消息 在回调函数中处理完消息后手动确认:
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
使用交换机
声明交换机 可以声明不同类型的交换机(direct, topic, fanout):

$channel->exchange_declare('logs', 'fanout', false, false, false);
绑定队列到交换机 将队列绑定到交换机:
$channel->queue_bind($queue_name, 'logs');
通过交换机发布消息 发布消息到交换机而不是直接到队列:
$msg = new \PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
消息持久化
持久化队列 声明持久化队列:
$channel->queue_declare('task_queue', false, true, false, false);
持久化消息 设置消息为持久化:
$msg = new \PhpAmqpLib\Message\AMQPMessage(
$data,
array('delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
错误处理
连接异常处理 捕获连接可能出现的异常:
try {
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
} catch (\Exception $e) {
echo "Connection failed: " . $e->getMessage();
}
信道异常处理 处理信道操作中的异常:
try {
$channel->queue_declare('hello', false, false, false, false);
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
echo "Channel error: " . $e->getMessage();
}






