首页 > 编程知识 正文

kafka官网,flume和kafka区别

时间:2023-05-06 06:47:10 阅读:207228 作者:4184

kafka报java.io.EOFException错误解决方案

在项目中使用kafka来获取第三方推送的生产端数据进行消费,在项目运行过程中发现consumer.poll(1000)方法一直报java.io.EOFException错误,如下所示:

java.io.EOFExceptionat org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) ~[kafka-clients-2.1.0.jar:?]at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) ~[kafka-clients-2.1.0.jar:?]at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) ~[kafka-clients-2.1.0.jar:?]at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) ~[kafka-clients-2.1.0.jar:?]at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.common.network.Selector.poll(Selector.java:467) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-2.1.0.jar:?]at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) [kafka-clients-2.1.0.jar:?]at com.stream.task.hr.HRKafkaConsumer.hrEmpSync(HRKafkaConsumer.java:102) [FMP-TASK-V1.0.0.jar:?]at com.stream.task.hr.HrEmpSyncTask.execute(HrEmpSyncTask.java:43) [FMP-TASK-V1.0.0.jar:?]at com.stream.task.job.SchedulerJob$TaskRunnable.run(SchedulerJob.java:68) [FMP-TASK-V1.0.0.jar:?]at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_171]at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_171]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_171]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_171]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

看网上说是kafka版本问题,我这边看了下项目kafka使用的是kafka-clients-0.10.0.0.jar,
依赖是

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency>

使用这个kafka-clients依赖会有一个问题,那就是代码中ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);会进入死循环且报java.io.EOFException错误,于是试了好几个kafka-clients版本都不行,最后注释掉kafka-clients依赖换个kafka依赖:

<!--<dependency>--><!--<groupId>org.apache.kafka</groupId>--><!--<artifactId>kafka-clients</artifactId>--><!--<version>0.10.0.0</version>--><!--</dependency>--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.1.0</version></dependency>

代码中对应修改一下这个poll()方法,改为ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));再次启动项目就不会无限的循环报错了。

至于kafka怎么使用论坛有好多帖子,就不一一赘述了。这次只是记录本项目中的报错以及修复过程,积累一些经验供以后参考。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。