JAVA 如何使用kafka
使用 Java 操作 Kafka 的基本步骤
添加依赖
在 Maven 或 Gradle 项目中添加 Kafka 客户端依赖。以下是 Maven 的配置示例:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
生产者配置与发送消息
创建 Kafka 生产者并发送消息到指定主题:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();
消费者配置与接收消息
创建 Kafka 消费者并订阅主题消费消息:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
高级配置
可根据需求调整生产者或消费者的参数:
- 生产者:
acks(确认机制)、retries(重试次数)、batch.size(批量发送大小) - 消费者:
auto.offset.reset(偏移量重置策略)、enable.auto.commit(自动提交偏移量)
错误处理
建议为 send() 方法添加回调以处理发送失败的情况:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
资源清理
务必在使用完成后关闭生产者和消费者以释放资源:
producer.close(); // 生产者关闭
consumer.close(); // 消费者关闭






