当前位置:首页 > Java

JAVA 如何使用kafka

2026-03-04 04:16:40Java

使用 Java 操作 Kafka 的基本步骤

添加依赖

在 Maven 或 Gradle 项目中添加 Kafka 客户端依赖。以下是 Maven 的配置示例:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

生产者配置与发送消息

创建 Kafka 生产者并发送消息到指定主题:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();

消费者配置与接收消息

创建 Kafka 消费者并订阅主题消费消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
    }
}

高级配置

可根据需求调整生产者或消费者的参数:

  • 生产者:acks(确认机制)、retries(重试次数)、batch.size(批量发送大小)
  • 消费者:auto.offset.reset(偏移量重置策略)、enable.auto.commit(自动提交偏移量)

错误处理

建议为 send() 方法添加回调以处理发送失败的情况:

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.printf("Sent to partition %d, offset %d%n", 
            metadata.partition(), metadata.offset());
    }
});

资源清理

务必在使用完成后关闭生产者和消费者以释放资源:

JAVA 如何使用kafka

producer.close(); // 生产者关闭
consumer.close(); // 消费者关闭

分享给朋友:

相关文章

hashrouter如何使用react

hashrouter如何使用react

使用 HashRouter 在 React 中的方法 安装 react-router-dom 确保项目中已安装 react-router-dom,若未安装,可通过以下命令安装: npm inst…

react如何使用webview

react如何使用webview

使用 React 实现 WebView 的方法 在 React 中,可以通过不同的方式实现 WebView 功能,具体取决于开发环境(如 React Native 或 Web 应用)。以下是常见的实现…

react如何使用echarts

react如何使用echarts

安装 ECharts 和 React 适配库 在 React 项目中使用 ECharts 需要安装 ECharts 核心库及其 React 适配库。通过 npm 或 yarn 安装: npm ins…

react如何使用axios

react如何使用axios

安装 Axios 在 React 项目中安装 Axios,可以通过 npm 或 yarn 进行安装: npm install axios 或 yarn add axios 导入 Axios 在需要…

react如何使用dispatch

react如何使用dispatch

使用 useDispatch 钩子 在函数组件中,通过 react-redux 提供的 useDispatch 钩子获取 dispatch 函数。引入 useDispatch 后直接调用即可:…

react如何使用sass

react如何使用sass

安装 Sass 依赖 在 React 项目中安装 sass 作为开发依赖: npm install sass --save-dev 或使用 Yarn: yarn add sass --dev 创建…