当前位置:首页 > Java

java如何操作kafka

2026-03-25 10:41:53Java

使用Java操作Kafka

添加依赖

在Maven项目中,需添加Kafka客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>

生产者示例

创建生产者发送消息:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");

        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });
        producer.close();
    }
}

消费者示例

创建消费者接收消息:

java如何操作kafka

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

配置参数说明

  • bootstrap.servers: Kafka集群地址列表。
  • key.serializer/deserializer: 键的序列化/反序列化类。
  • value.serializer/deserializer: 值的序列化/反序列化类。
  • group.id: 消费者组ID,同一组内的消费者共享消息。

高级功能

  • 事务支持: 通过配置enable.idempotencetransactional.id实现精确一次语义。
  • 分区分配策略: 消费者可通过partition.assignment.strategy配置分配策略(如Range或RoundRobin)。
  • 偏移量管理: 手动提交偏移量需设置enable.auto.commitfalse并调用commitSync()commitAsync()

注意事项

  • 生产者需处理send()方法的回调异常。
  • 消费者避免长时间阻塞poll(),需合理设置超时时间。
  • 资源释放需显式调用close()方法。

标签: 操作java
分享给朋友:

相关文章

java如何输出

java如何输出

输出到控制台 使用 System.out.println() 方法输出内容到控制台,适用于调试或简单信息展示。 示例代码: System.out.println("Hello, World!")…

如何删除java

如何删除java

卸载 Java 的步骤 Windows 系统: 打开控制面板,选择“程序和功能”或“卸载程序”,在列表中找到 Java 相关条目(如“Java Runtime Environment”或“Java D…

如何学好java

如何学好java

理解基础概念 掌握Java的核心概念是学习的基础。包括数据类型、变量、运算符、控制流(如循环和条件语句)、数组等。理解面向对象编程(OOP)的四大特性:封装、继承、多态和抽象。 实践编程练习 通过实…

java版本如何查看

java版本如何查看

查看Java版本的命令行方法 在命令行或终端中运行以下命令可以查看当前安装的Java版本: java -version 输出示例: java version "1.8.0_301" Java(TM…

java面试官如何面试别人

java面试官如何面试别人

考察基础知识 从Java核心概念入手,包括面向对象特性(封装、继承、多态)、集合框架(ArrayList vs LinkedList、HashMap实现原理)、异常处理机制、多线程(线程池、锁机制)等…

java下载功能vue实现

java下载功能vue实现

Java 后端文件下载功能实现 后端需提供文件下载的接口,通常使用 HttpServletResponse 实现。以下是一个简单的 Java Spring Boot 示例: @GetMapping(…