当前位置:首页 > PHP

php实现rabbitmq

2026-02-28 12:16:11PHP

PHP 实现 RabbitMQ 的基本方法

RabbitMQ 是一个开源的消息代理软件,用于实现消息队列系统。在 PHP 中,可以通过 php-amqplib 库来实现 RabbitMQ 的基本功能。

安装 php-amqplib 库:

composer require php-amqplib/php-amqplib

生产者(Producer)示例代码

生产者负责将消息发送到 RabbitMQ 队列中。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

// 创建消息
$msg = new AMQPMessage('Hello World!');

// 发送消息
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

// 关闭连接
$channel->close();
$connection->close();
?>

消费者(Consumer)示例代码

消费者负责从 RabbitMQ 队列中接收并处理消息。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定义回调函数处理消息
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 消费消息
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 持续监听消息
while ($channel->is_consuming()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();
?>

消息持久化

为了确保消息在 RabbitMQ 重启后不丢失,可以将队列和消息设置为持久化。

php实现rabbitmq

生产者修改部分:

// 声明持久化队列
$channel->queue_declare('hello', false, true, false, false);

// 创建持久化消息
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

工作队列(Work Queue)

多个消费者共同处理队列中的消息,实现负载均衡。

生产者修改部分:

php实现rabbitmq

// 发送多条消息
for ($i = 0; $i < 10; $i++) {
    $msg = new AMQPMessage("Message $i");
    $channel->basic_publish($msg, '', 'hello');
}

消费者修改部分:

// 设置公平分发
$channel->basic_qos(null, 1, null);

// 处理消息时模拟耗时任务
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(1); // 模拟耗时任务
    echo " [x] Done\n";
    $msg->ack(); // 手动确认消息
};

交换机(Exchange)示例

RabbitMQ 支持多种交换机类型,如直连(direct)、主题(topic)、扇出(fanout)等。

扇出交换机示例(广播消息):

// 生产者
$channel->exchange_declare('logs', 'fanout', false, false, false);
$msg = new AMQPMessage('Broadcast message');
$channel->basic_publish($msg, 'logs');

// 消费者
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

错误处理与重连

在实际应用中,需要处理连接断开的情况。

try {
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    // ... 其他代码
} catch (Exception $e) {
    echo "Connection failed: " . $e->getMessage();
    // 实现重连逻辑
}

通过以上方法,可以在 PHP 中实现 RabbitMQ 的基本功能,包括消息的生产、消费、持久化、工作队列和交换机等高级特性。

标签: phprabbitmq
分享给朋友:

相关文章

php实现文件上传

php实现文件上传

文件上传的基本实现 在PHP中实现文件上传需要使用$_FILES超全局数组处理上传的文件数据。表单必须设置enctype="multipart/form-data"属性,并采用POST方法提交。…

php实现websocket

php实现websocket

PHP 实现 WebSocket WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。PHP 可以通过多种方式实现 WebSocket 功能,以下是几种常见的方法: 使用 Ratc…

php 实现登录

php 实现登录

PHP 登录功能实现 使用 PHP 实现登录功能通常需要结合数据库验证和会话管理。以下是一个基本的登录实现流程: 数据库准备 创建用户表存储用户名和密码(建议使用哈希加密): CREATE TAB…

php 实现文件下载

php 实现文件下载

实现文件下载的基本方法 使用 PHP 实现文件下载的核心是通过设置 HTTP 头部信息,强制浏览器将文件作为附件下载而非直接显示。以下是一个基础实现示例: $file_path = '/path/t…

php 实现单点登录

php 实现单点登录

PHP 实现单点登录的方法 单点登录(SSO)允许用户通过一次登录访问多个相互信任的系统。以下是基于 PHP 的实现方案: 基于共享 Session 的方案 在同一个主域名下的子域名间可以通过共享…

php 无限分类的实现

php 无限分类的实现

无限分类的实现方法 无限分类通常用于构建树形结构数据,如商品分类、多级菜单等。以下是几种常见的实现方式: 邻接列表模型(Adjacency List) 邻接列表是最简单的实现方式,通过在每个节点中存…