首页 > 编程知识 正文

java开kafka消费端(java实现kafka消费者)

时间:2023-12-03 11:56:27 阅读:311696 作者:DFJW

本文目录一览:

  • 1、用Kafka和Java搭建的项目,Kafka管理中心在什么情况下会重复发送消息?消费端的程序接收到消息,进入方法
  • 2、java工程kafka传递自定义对象,消费端获取到的是null
  • 3、kafka消费者java版本读取不到消息怎么办
  • 4、java客户端使用kafka时什么情况下使用kafka client和spring kafka?
  • 5、使用java实现kafka consumer时报错

用Kafka和Java搭建的项目,Kafka管理中心在什么情况下会重复发送消息?消费端的程序接收到消息,进入方法

非手动提交offset

消费者只要读取到数据,就会修改offset,不需要方法体执行完

手动提交

需要手动提交代码执行完毕

针对你的问题,情况有很多种可能。

你是否开启手动提交offset

你的消费者,有几个?是否是同一个组?

java工程kafka传递自定义对象,消费端获取到的是null

3. 启服务

3.1 启zookeeper

启zk两种式第种使用kafka自带zk

bin/zookeeper-server-start.sh config/zookeeper.properties

另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址

例zookeeper.connect=10.202.4.179:2181

3.2 启 kafka

bin/kafka-server-start.sh config/server.properties

4.创建topic

bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test

创建名testtopic副本区

通list命令查看刚刚创建topic

bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181

5.启producer并发送消息启producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启发送消息

test

hello boy

按Ctrl+C退发送消息

6.启consumer

bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning

启consumerconsole看producer发送消息

启两终端发送消息接受消息

都行查看zookeeper进程kafkatopic步步排查原吧

kafka消费者java版本读取不到消息怎么办

Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

java客户端使用kafka时什么情况下使用kafka client和spring kafka?

spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作

所以你使用spring的情况下,可以用spring-kafka,当然直接用kafka client也行

使用java实现kafka consumer时报错

public static void consumer(){

        Properties props = new Properties();  

        props.put("zk.connect", "hadoop-2:2181");  

        props.put("zk.connectiontimeout.ms", "1000000");  

        props.put("groupid", "fans_group");  

          

        // Create the connection to the cluster  

        ConsumerConfig consumerConfig = new ConsumerConfig(props);  

        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

          

        MapString, Integer map = new HashMapString, Integer();

        map.put("fans", 1);

        

        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  

        MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);  

        ListKafkaStreamMessage streams = topicMessageStreams.get("fans");  

          

        // create list of 4 threads to consume from each of the partitions   

        ExecutorService executor = Executors.newFixedThreadPool(1);  

        long startTime = System.currentTimeMillis();

        // consume the messages in the threads  

        for(final KafkaStreamMessage stream: streams) {  

          executor.submit(new Runnable() {  

            public void run() {  

                 ConsumerIteratorMessage it = stream.iterator();

                  while (it.hasNext()){

                      log.debug(byteBufferToString(it.next().message().payload()));

                  }

              } 

            

          }); 

          log.debug("use time="+(System.currentTimeMillis()-startTime));

        }  

    }

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。