php实现rabbitmq
PHP 实现 RabbitMQ 的基本步骤
安装 RabbitMQ 和 PHP 扩展
确保 RabbitMQ 服务已安装并运行。在 PHP 中,使用 php-amqplib 库来操作 RabbitMQ。通过 Composer 安装:
composer require php-amqplib/php-amqplib
连接到 RabbitMQ
创建一个连接和通道,用于后续的消息发送和接收:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
发送消息到队列
声明一个队列并发送消息:
$channel->queue_declare('hello', false, false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
接收消息
声明队列并设置消费者回调函数:
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
关闭连接
完成操作后关闭通道和连接:

$channel->close();
$connection->close();
实现工作队列(Worker Queue)
生产者
发送耗时任务消息到队列:
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
$msg = new PhpAmqpLib\Message\AMQPMessage(
$data,
['delivery_mode' => PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
消费者
处理任务并确认消息:
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
实现发布/订阅模式
生产者
发送消息到交换机:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
消费者
绑定队列到交换机并接收消息:
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
实现路由(Routing)
生产者
发送带路由键的消息:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$msg = new PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
消费者
绑定队列到特定路由键:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severities = array_slice($argv, 1);
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
注意事项
- 确保 RabbitMQ 服务正常运行。
- 处理消息时考虑消息持久化和手动确认。
- 根据业务需求选择合适的交换机和队列模式。
- 错误处理和日志记录是生产环境中的重要部分。






