当前位置:首页 > PHP

php实现rabbitmq

2026-01-29 15:00:53PHP

PHP 实现 RabbitMQ 的基本步骤

安装 RabbitMQ 和 PHP 扩展

确保 RabbitMQ 服务已安装并运行。在 PHP 中,使用 php-amqplib 库来操作 RabbitMQ。通过 Composer 安装:

composer require php-amqplib/php-amqplib

连接到 RabbitMQ

创建一个连接和通道,用于后续的消息发送和接收:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

发送消息到队列

声明一个队列并发送消息:

$channel->queue_declare('hello', false, false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

接收消息

声明队列并设置消费者回调函数:

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

关闭连接

完成操作后关闭通道和连接:

php实现rabbitmq

$channel->close();
$connection->close();

实现工作队列(Worker Queue)

生产者

发送耗时任务消息到队列:

$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
$msg = new PhpAmqpLib\Message\AMQPMessage(
    $data,
    ['delivery_mode' => PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');

消费者

处理任务并确认消息:

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

实现发布/订阅模式

生产者

发送消息到交换机:

php实现rabbitmq

$channel->exchange_declare('logs', 'fanout', false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');

消费者

绑定队列到交换机并接收消息:

$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

实现路由(Routing)

生产者

发送带路由键的消息:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);

消费者

绑定队列到特定路由键:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severities = array_slice($argv, 1);
foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

注意事项

  • 确保 RabbitMQ 服务正常运行。
  • 处理消息时考虑消息持久化和手动确认。
  • 根据业务需求选择合适的交换机和队列模式。
  • 错误处理和日志记录是生产环境中的重要部分。

标签: phprabbitmq
分享给朋友:

相关文章

php实现打印功能

php实现打印功能

PHP实现打印功能的方法 在PHP中实现打印功能可以通过多种方式完成,包括直接输出到浏览器、生成PDF文件、调用打印机接口等。以下是几种常见的方法: 直接输出HTML内容 PHP可以通过echo或p…

php实现分页

php实现分页

PHP实现分页的基本方法 使用原生PHP实现分页功能需要结合数据库查询和HTML展示。以下是核心实现步骤: 数据库查询与分页逻辑 // 连接数据库 $conn = new mysqli('loca…

php实现文件上传

php实现文件上传

文件上传的基本实现 在PHP中实现文件上传需要使用$_FILES超全局数组处理上传的文件数据。表单必须设置enctype="multipart/form-data"属性,并采用POST方法提交。…

php实现搜索功能

php实现搜索功能

实现基本的搜索功能 在PHP中实现搜索功能通常涉及数据库查询。以下是一个简单的实现方式,假设使用MySQL数据库: <?php // 连接数据库 $conn = new mysqli('loc…

php实现递归

php实现递归

递归的基本概念 递归是一种函数调用自身的技术,适用于解决可以分解为相同子问题的问题。在PHP中,递归需要明确的终止条件(基线条件),否则会导致无限循环。 递归实现阶乘 阶乘是经典的递归案例,n! =…

php长连接实现

php长连接实现

PHP 长连接实现方法 长连接(Keep-Alive)允许客户端与服务器在一次TCP连接中发送和接收多个HTTP请求/响应,减少重复建立连接的开销。以下是PHP中实现长连接的几种方式: 配置HTTP…