当前位置:首页 > Java

JAVA 如何使用kafka

2026-03-04 04:16:40Java

使用 Java 操作 Kafka 的基本步骤

添加依赖

在 Maven 或 Gradle 项目中添加 Kafka 客户端依赖。以下是 Maven 的配置示例:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

生产者配置与发送消息

创建 Kafka 生产者并发送消息到指定主题:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();

消费者配置与接收消息

创建 Kafka 消费者并订阅主题消费消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
    }
}

高级配置

可根据需求调整生产者或消费者的参数:

  • 生产者:acks(确认机制)、retries(重试次数)、batch.size(批量发送大小)
  • 消费者:auto.offset.reset(偏移量重置策略)、enable.auto.commit(自动提交偏移量)

错误处理

建议为 send() 方法添加回调以处理发送失败的情况:

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.printf("Sent to partition %d, offset %d%n", 
            metadata.partition(), metadata.offset());
    }
});

资源清理

务必在使用完成后关闭生产者和消费者以释放资源:

producer.close(); // 生产者关闭
consumer.close(); // 消费者关闭

JAVA 如何使用kafka

分享给朋友:

相关文章

react如何使用redux

react如何使用redux

使用 Redux 在 React 中的应用 Redux 是一个状态管理库,通常与 React 结合使用以管理全局状态。以下是具体实现步骤: 安装依赖 确保项目中已安装 redux 和 react-…

react如何使用webview

react如何使用webview

使用 React 实现 WebView 的方法 在 React 中,可以通过不同的方式实现 WebView 功能,具体取决于开发环境(如 React Native 或 Web 应用)。以下是常见的实现…

react激光如何使用

react激光如何使用

安装React激光库 在项目中安装React激光库,通常通过npm或yarn完成。确保项目已初始化并具备React环境。 npm install react-laser-beam # 或 yar…

react 如何使用axios

react 如何使用axios

安装 axios 在 React 项目中安装 axios 依赖包: npm install axios # 或 yarn add axios 引入 axios 在需要发送 HTTP 请求的组件或文…

react如何使用图片

react如何使用图片

在React中使用图片的方法 静态图片导入 在React组件中可以直接通过import导入图片文件,这种方式适用于已知的静态资源。导入后,图片会被处理为模块,可通过src属性引用。 import R…

react如何使用sass

react如何使用sass

安装 Sass 依赖 在 React 项目中安装 sass 作为开发依赖: npm install sass --save-dev 或使用 Yarn: yarn add sass --dev…