首页 > 编程知识 正文

spring集成kafka的原理,springcloud服务间调用

时间:2023-05-06 10:46:33 阅读:131999 作者:2251

一、准备工作预说明:如果你运行有问题,请检查Kafka版本和SpringBoot版本与我的文章是否一致。 本文的环境已经过测试。

Kafka服务器的版本为kafka_2.11-1.1.0(Scala ),即1.1.0SpringBoot版本:1.5.10.RELEASE

事先启动zk、kafka,创建主题

[ root @ basic Kafka _ 2.11-1.1.0 ] # xndy/Kafka-topics.sh-- create-- zookeeper localhost 33602181 -复制身份

advertised.listeners=plaintext ://192.168.239.12833609092 maven在dependencygroupidorg.spring framework.Kafka 项目结构要更好地体现实际开发需求,一般生产者调用一个接口的服务处理逻辑,然后向kafka扔数据,一个消费者不断监测这个Topic,处理数据,所以这里把生产者作为一个接口

三.具体实现代码SpringBoot配置文件application.yml

spring : Kafka : bootstrap-servers :192.168.239.12833609092 producer : key-serializer 3: org.Apache.ka che.Kafka.com mon.serialization.stringserializerconsumer 3360 group-id 3360 te sumer o-commit-interval :1000 key-deder erial ization.stringdeserializervalue-string mon.serialization.string deserializer生产者包cn.saytime.web; importorg.spring framework.beans.factory.annotation.auto wired; importorg.spring framework.Kafka.core.Kafka template; importorg.spring framework.web.xnd yd.annotation.request mapping; importorg.spring帧web.xnd yd.annotation.rest controller; /**kafka生产者*/@ rest控制器@ request mapping (Kafka ) ) publicclasstestkafkaproducercontroller ) @ autowiredprivatation 返回' success '; }消费者这里的消费者拦截这个主题,有信息就执行,不需要进行while (真)

package cn.saytime.kafka; importorg.Apache.Kafka.clients.consumer.consumer record; importorg.spring framework.Kafka.annotation.Kafka listener; importorg.spring framework.stereotype.com ponent; /** * kafka消费者测试*/@ componentpublicclasstestconsumer { @ Kafka监听器(topics=' test _ topic ' ) public void listen } consumer record (throws exception (system.out.printf (' topic=% s,offset=%d,value=%s n ',record.topic ),record )

package cn.saytime; importorg.spring framework.boot.spring应用程序; importorg.spring framework.boot.auto configure.springbootapplication; @ springbootapplicationpublicclasstestapplication { publicstaticvoidmain (string [ ] args ) spring application.run (te statition ) msg-hello

控制台输出:

topic=test_topic,offset=19,value=hello表示消费者多次运行并退出,将再次调用接口:

3358 localhost :8080/Kafka/send? msg=kafka

因为topic=test_topic、offset=20、value=kafka,所以在这里可以看到消费者实际上没有停止poll Topic数据。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。