java如何操作kafka
使用Java操作Kafka
添加依赖
在Maven项目中,需添加Kafka客户端依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
生产者示例
创建生产者发送消息:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
}
}
消费者示例
创建消费者接收消息:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
配置参数说明
- bootstrap.servers: Kafka集群地址列表。
- key.serializer/deserializer: 键的序列化/反序列化类。
- value.serializer/deserializer: 值的序列化/反序列化类。
- group.id: 消费者组ID,同一组内的消费者共享消息。
高级功能
- 事务支持: 通过配置
enable.idempotence和transactional.id实现精确一次语义。 - 分区分配策略: 消费者可通过
partition.assignment.strategy配置分配策略(如Range或RoundRobin)。 - 偏移量管理: 手动提交偏移量需设置
enable.auto.commit为false并调用commitSync()或commitAsync()。
注意事项
- 生产者需处理
send()方法的回调异常。 - 消费者避免长时间阻塞
poll(),需合理设置超时时间。 - 资源释放需显式调用
close()方法。






