当前位置:首页 > PHP

php实现rabbitmq

2026-02-14 13:05:06PHP

PHP 实现 RabbitMQ 的基本方法

安装 PHP 的 AMQP 扩展 在 PHP 中使用 RabbitMQ 需要安装 AMQP 扩展。可以通过 pecl 安装:

pecl install amqp

安装完成后,在 php.ini 中添加 extension=amqp.so 并重启 PHP 服务。

使用 Composer 安装 PHP AMQP 库 如果需要更高级的功能,可以使用 PHP 的 AMQP 库:

composer require php-amqplib/php-amqplib

建立 RabbitMQ 连接

创建连接对象 使用 AMQPStreamConnection 类创建与 RabbitMQ 的连接:

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    'localhost',  // host
    5672,         // port
    'guest',      // username
    'guest'       // password
);

创建信道 从连接中创建信道:

$channel = $connection->channel();

发送消息到队列

声明队列 在发送消息前需要先声明队列:

$channel->queue_declare(
    'hello', // 队列名称
    false,   // passive
    false,   // durable
    false,   // exclusive
    false    // auto_delete
);

发布消息 使用 basic_publish 方法发送消息:

$msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

关闭连接 完成消息发送后关闭连接:

$channel->close();
$connection->close();

接收队列消息

声明队列 接收方也需要声明相同的队列:

$channel->queue_declare('hello', false, false, false, false);

定义回调函数 创建一个回调函数来处理接收到的消息:

php实现rabbitmq

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

消费消息 使用 basic_consume 方法开始消费消息:

$channel->basic_consume('hello', '', false, true, false, false, $callback);

保持监听 创建一个循环来保持监听消息:

while ($channel->is_consuming()) {
    $channel->wait();
}

关闭连接 完成消息接收后关闭连接:

$channel->close();
$connection->close();

处理消息确认

手动消息确认 为了确保消息被正确处理,可以启用手动确认模式:

$channel->basic_consume(
    'hello',
    '',
    false,
    false,  // 关闭自动确认
    false,
    false,
    $callback
);

在回调中确认消息 在回调函数中处理完消息后手动确认:

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

使用交换机

声明交换机 可以声明不同类型的交换机(direct, topic, fanout):

php实现rabbitmq

$channel->exchange_declare('logs', 'fanout', false, false, false);

绑定队列到交换机 将队列绑定到交换机:

$channel->queue_bind($queue_name, 'logs');

通过交换机发布消息 发布消息到交换机而不是直接到队列:

$msg = new \PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');

消息持久化

持久化队列 声明持久化队列:

$channel->queue_declare('task_queue', false, true, false, false);

持久化消息 设置消息为持久化:

$msg = new \PhpAmqpLib\Message\AMQPMessage(
    $data,
    array('delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

错误处理

连接异常处理 捕获连接可能出现的异常:

try {
    $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
} catch (\Exception $e) {
    echo "Connection failed: " . $e->getMessage();
}

信道异常处理 处理信道操作中的异常:

try {
    $channel->queue_declare('hello', false, false, false, false);
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
    echo "Channel error: " . $e->getMessage();
}

标签: phprabbitmq
分享给朋友:

相关文章

php实现购物车

php实现购物车

PHP实现购物车功能 购物车功能是电商网站的核心模块之一,通常涉及商品添加、删除、数量修改和结算等操作。以下是使用PHP实现购物车功能的几种方法。 使用Session存储购物车数据 Session…

php实现文件下载

php实现文件下载

PHP实现文件下载的方法 通过PHP实现文件下载功能,可以采用多种方式,以下是几种常见的实现方法: 使用header()函数强制下载 设置HTTP头信息,强制浏览器下载文件而非直接打开:…

php实现搜索功能

php实现搜索功能

实现基本的搜索功能 在PHP中实现搜索功能通常涉及数据库查询。以下是一个简单的实现方式,假设使用MySQL数据库: <?php // 连接数据库 $conn = new mysqli('lo…

php实现验证码

php实现验证码

PHP实现验证码的方法 使用GD库生成验证码 GD库是PHP中处理图像的扩展,可用于生成验证码图片。 <?php session_start(); $width = 120; $height…

git php实现

git php实现

Git 与 PHP 的集成实现 使用 PHP 执行 Git 命令 通过 PHP 的 exec()、shell_exec() 或 passthru() 函数可以直接调用 Git 命令,适用于服务器端操作…

php实现队列

php实现队列

PHP实现队列的方法 队列是一种先进先出(FIFO)的数据结构,PHP可以通过多种方式实现队列功能。 使用数组模拟队列 PHP数组可以模拟队列的基本操作,通过array_push和array_shi…