java如何监听rabbit
监听 RabbitMQ 消息的方法
在 Java 中监听 RabbitMQ 消息通常使用 amqp-client 库,以下是具体实现步骤。
添加依赖
在 Maven 项目中添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
创建连接工厂
配置 RabbitMQ 连接参数:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
建立连接和通道
创建连接和通道对象:
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
声明队列
确保队列存在,如果不存在则创建:

channel.queueDeclare("queue_name", false, false, false, null);
创建消费者
实现 DefaultConsumer 或使用 Lambda 表达式:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
开始消费消息
绑定消费者到队列:
channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> {});
完整示例代码
import com.rabbitmq.client.*;
public class RabbitMQListener {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_queue", false, false, false, null);
System.out.println("Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
channel.basicConsume("test_queue", true, deliverCallback, consumerTag -> {});
}
}
高级配置选项
消息确认机制
关闭自动确认,手动确认消息:

channel.basicConsume("queue_name", false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
设置 QoS
限制未确认消息的数量:
channel.basicQos(1);
绑定交换机
将队列绑定到交换机:
channel.exchangeDeclare("exchange_name", "direct");
channel.queueBind("queue_name", "exchange_name", "routing_key");
异常处理
确保资源正确关闭:
try {
// RabbitMQ 操作代码
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}






