当前位置:首页 > Java

JAVA 如何使用kafka

2026-03-04 04:16:40Java

使用 Java 操作 Kafka 的基本步骤

添加依赖

在 Maven 或 Gradle 项目中添加 Kafka 客户端依赖。以下是 Maven 的配置示例:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

生产者配置与发送消息

创建 Kafka 生产者并发送消息到指定主题:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record);
producer.close();

消费者配置与接收消息

创建 Kafka 消费者并订阅主题消费消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
    }
}

高级配置

可根据需求调整生产者或消费者的参数:

  • 生产者:acks(确认机制)、retries(重试次数)、batch.size(批量发送大小)
  • 消费者:auto.offset.reset(偏移量重置策略)、enable.auto.commit(自动提交偏移量)

错误处理

建议为 send() 方法添加回调以处理发送失败的情况:

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    } else {
        System.out.printf("Sent to partition %d, offset %d%n", 
            metadata.partition(), metadata.offset());
    }
});

资源清理

务必在使用完成后关闭生产者和消费者以释放资源:

JAVA 如何使用kafka

producer.close(); // 生产者关闭
consumer.close(); // 消费者关闭

分享给朋友:

相关文章

uniapp中如何使用iconfont

uniapp中如何使用iconfont

使用 Iconfont 在 Uniapp 中的步骤 下载 Iconfont 资源 访问 Iconfont 官网,选择需要的图标并添加到项目。下载时选择 Font class 格式,解压后会得到 .tt…

react refs 如何使用

react refs 如何使用

React Refs 的基本概念 Refs 是 React 提供的一种访问 DOM 节点或 React 组件实例的方式。通常在 React 的数据流中,父子组件通过 props 进行通信,但在某些情况…

react如何使用插槽

react如何使用插槽

使用props.children实现基础插槽 React中插槽的核心是通过props.children传递子组件。父组件在标签内部放置的内容会自动成为children属性: // 父组件 <C…

内网如何使用react框架

内网如何使用react框架

内网使用React框架的部署与开发方法 在无外网连接的内网环境中使用React框架,需提前准备依赖包并配置本地开发环境。以下是具体实施方案: 离线安装Node.js和npm 从官网下载Node.js…

react如何使用其他字体

react如何使用其他字体

使用自定义字体 在React项目中使用自定义字体可以通过CSS的@font-face规则实现。将字体文件(如.woff、.ttf等格式)放入项目的public或src目录,通过CSS引入并定义字体族名…

如何使用react的Table组件

如何使用react的Table组件

安装依赖 确保项目已安装Ant Design库,它是React中常用的UI组件库,包含Table组件。通过npm或yarn安装: npm install antd # 或 yarn add an…