当前位置:首页 > PHP

php实现rabbitmq

2026-02-14 13:05:06PHP

PHP 实现 RabbitMQ 的基本方法

安装 PHP 的 AMQP 扩展 在 PHP 中使用 RabbitMQ 需要安装 AMQP 扩展。可以通过 pecl 安装:

pecl install amqp

安装完成后,在 php.ini 中添加 extension=amqp.so 并重启 PHP 服务。

使用 Composer 安装 PHP AMQP 库 如果需要更高级的功能,可以使用 PHP 的 AMQP 库:

composer require php-amqplib/php-amqplib

建立 RabbitMQ 连接

创建连接对象 使用 AMQPStreamConnection 类创建与 RabbitMQ 的连接:

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    'localhost',  // host
    5672,         // port
    'guest',      // username
    'guest'       // password
);

创建信道 从连接中创建信道:

$channel = $connection->channel();

发送消息到队列

声明队列 在发送消息前需要先声明队列:

$channel->queue_declare(
    'hello', // 队列名称
    false,   // passive
    false,   // durable
    false,   // exclusive
    false    // auto_delete
);

发布消息 使用 basic_publish 方法发送消息:

$msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

关闭连接 完成消息发送后关闭连接:

$channel->close();
$connection->close();

接收队列消息

声明队列 接收方也需要声明相同的队列:

$channel->queue_declare('hello', false, false, false, false);

定义回调函数 创建一个回调函数来处理接收到的消息:

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

消费消息 使用 basic_consume 方法开始消费消息:

$channel->basic_consume('hello', '', false, true, false, false, $callback);

保持监听 创建一个循环来保持监听消息:

while ($channel->is_consuming()) {
    $channel->wait();
}

关闭连接 完成消息接收后关闭连接:

$channel->close();
$connection->close();

处理消息确认

手动消息确认 为了确保消息被正确处理,可以启用手动确认模式:

$channel->basic_consume(
    'hello',
    '',
    false,
    false,  // 关闭自动确认
    false,
    false,
    $callback
);

在回调中确认消息 在回调函数中处理完消息后手动确认:

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

使用交换机

声明交换机 可以声明不同类型的交换机(direct, topic, fanout):

$channel->exchange_declare('logs', 'fanout', false, false, false);

绑定队列到交换机 将队列绑定到交换机:

$channel->queue_bind($queue_name, 'logs');

通过交换机发布消息 发布消息到交换机而不是直接到队列:

$msg = new \PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');

消息持久化

持久化队列 声明持久化队列:

$channel->queue_declare('task_queue', false, true, false, false);

持久化消息 设置消息为持久化:

$msg = new \PhpAmqpLib\Message\AMQPMessage(
    $data,
    array('delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

错误处理

连接异常处理 捕获连接可能出现的异常:

try {
    $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
} catch (\Exception $e) {
    echo "Connection failed: " . $e->getMessage();
}

信道异常处理 处理信道操作中的异常:

php实现rabbitmq

try {
    $channel->queue_declare('hello', false, false, false, false);
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
    echo "Channel error: " . $e->getMessage();
}

标签: phprabbitmq
分享给朋友:

相关文章

php 实现自动收货

php 实现自动收货

PHP 实现自动收货功能 自动收货功能通常用于电商系统,在订单超过一定时间未手动确认收货时自动完成订单。以下是实现方法: 数据库设计 需要订单表包含以下关键字段: CREATE TABLE ord…

php实现登录

php实现登录

PHP 登录功能实现 数据库连接与用户表创建 创建一个数据库连接文件 db.php,用于管理数据库连接。MySQL 示例代码如下: <?php $host = 'localhost'; $db…

php实现跳转

php实现跳转

PHP 实现页面跳转的方法 在 PHP 中实现页面跳转有多种方式,以下是常见的几种方法及其适用场景: 使用 header() 函数 通过 header() 函数发送 HTTP 头部信息实现跳转。注意…

php 队列的实现

php 队列的实现

PHP 队列的实现方法 使用数据库实现队列 创建一个数据表存储队列任务,包含任务ID、状态、创建时间等字段。通过SQL语句实现任务的入队和出队操作。 // 入队操作 INSERT INTO queu…

php mvc实现

php mvc实现

PHP MVC 实现步骤 模型(Model)层 模型负责数据处理和业务逻辑,通常与数据库交互。创建一个基类模型,其他具体模型继承它。 class Model { protected $db;…

php 无限分类的实现

php 无限分类的实现

无限分类的实现方法 无限分类通常用于构建树形结构数据,如商品分类、多级菜单等。以下是几种常见的实现方式: 邻接列表模型(Adjacency List) 邻接列表是最简单的实现方式,通过在每个节点中存…