当前位置:首页 > PHP

php实现rabbitmq

2026-01-29 15:00:53PHP

PHP 实现 RabbitMQ 的基本步骤

安装 RabbitMQ 和 PHP 扩展

确保 RabbitMQ 服务已安装并运行。在 PHP 中,使用 php-amqplib 库来操作 RabbitMQ。通过 Composer 安装:

composer require php-amqplib/php-amqplib

连接到 RabbitMQ

创建一个连接和通道,用于后续的消息发送和接收:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

发送消息到队列

声明一个队列并发送消息:

$channel->queue_declare('hello', false, false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

接收消息

声明队列并设置消费者回调函数:

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

关闭连接

完成操作后关闭通道和连接:

php实现rabbitmq

$channel->close();
$connection->close();

实现工作队列(Worker Queue)

生产者

发送耗时任务消息到队列:

$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
$msg = new PhpAmqpLib\Message\AMQPMessage(
    $data,
    ['delivery_mode' => PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');

消费者

处理任务并确认消息:

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

实现发布/订阅模式

生产者

发送消息到交换机:

php实现rabbitmq

$channel->exchange_declare('logs', 'fanout', false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');

消费者

绑定队列到交换机并接收消息:

$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

实现路由(Routing)

生产者

发送带路由键的消息:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);

消费者

绑定队列到特定路由键:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severities = array_slice($argv, 1);
foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

注意事项

  • 确保 RabbitMQ 服务正常运行。
  • 处理消息时考虑消息持久化和手动确认。
  • 根据业务需求选择合适的交换机和队列模式。
  • 错误处理和日志记录是生产环境中的重要部分。

标签: phprabbitmq
分享给朋友:

相关文章

php实现分页

php实现分页

PHP实现分页的基本方法 使用LIMIT和OFFSET进行数据库查询分页。通过计算当前页码和每页显示数量,动态生成SQL语句。 $page = isset($_GET['page']) ? (int…

php实现静态

php实现静态

静态化实现方法 PHP实现静态化主要通过生成HTML文件或缓存机制来减少服务器动态解析的开销。以下是几种常见方法: 生成静态HTML文件 通过file_put_contents()将动态内容保存为H…

php 实现队列

php 实现队列

PHP 实现队列的方法 PHP 可以通过多种方式实现队列功能,以下是几种常见的实现方法: 使用数组实现队列 PHP 数组可以通过 array_push 和 array_shift 函数模拟队列的先进…

php 实现无限分类

php 实现无限分类

无限分类的实现方法 无限分类通常用于构建多层级结构的数据,如商品分类、菜单管理等。以下是几种常见的实现方式: 邻接列表模型(Adjacency List) 这是最简单的实现方式,每个分类记录存储其父…

php实现注册登录

php实现注册登录

PHP 注册功能实现 创建数据库表(以 MySQL 为例) CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, usern…

php实现域名

php实现域名

PHP实现域名操作的方法 获取当前域名 使用$_SERVER['HTTP_HOST']可以获取当前访问的域名: $currentDomain = $_SERVER['HTTP_HOST']; ech…