SpringBoot集成Kafka
本文介绍了SpringBoot是如何与Kafka集成的,并简单地创建了Demo以测试发送和消耗功能
前言
的版本。
springboot:2.3.4 .发行版
spring-kafka : 2.5.6.RELEASE
kafka : 2.5.1
zookeeper : 3.4.14
本Demo使用的是SpringBoot较高版本的SpringBoot 2.3.4.RELEASE,是与版本相关的
spring boot 2.3 usersshoulduse 2.5.x (引导从属管理willusethecorrectversion )。
spring和kafka的版本关系
构建Kafka和Zookeeper环境
构建并启动kafka和zookeeper环境
创建Demo项目并部署spring-kafka
2.1 pom文件
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
com.google.code.gson
格森
2.2配置APP应用程序. yml
spring:
kafka:
bootstrap-servers :192.168.25.633609092 # bootstrap-servers :连接到Kafka的地址。 请用逗号分隔多个地址
消费者:
组-id:my组
启用自动提交:真
自动提交间隔: 100ms
properties:
session.timeout.ms: 15000
key-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer
value-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer
自动偏移重置:电子仓库
producer:
如果设置为大于retries: 0 #,客户端将重新发送发送失败的记录
batch-size: 16384 #如果将多个记录发送到同一分区,则Producer会尝试将记录分组到更少的请求中。 这有助于提高客户端和服务器端的性能。 此配置控制批次的默认大小(以字节为单位)。 16384是默认配置
缓冲存储器:33554432 #用于缓冲等待producer发送到服务器的记录的字节总数。 33554432是默认配置
key-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer #关键字的序列化类
值序列化类3360 org.Apache.Kafka.com mon.serialization.string serializer #值的序列化类
2.3消息主体消息定义
//*
* @author johnny
* @create 2020-09-23上午9:21点
*/
@Data
公共类消息{
私有长id;
私有字符串msg;
私密日期发送时间;
}
2.4 Kafka发送器的定义
主要利用KafkaTemplate发送消息,将消息封装为消息,转换为Json字符串并发送到Kafka
@Component
@Slf4j
公共类Kafka发送器{
私密性认证模板;
//用构造函数方式注入kafkaTemplate
publickafkasender (kafkatemplatekafkatemplate ) {
this.Kafka template=Kafka template;
}
隐私gson gson=new GSO nbuilder ().create );
公共语音发送(字符串msg ) {
消息消息=new消息(;
消息. setid (system.current time millis ();
消息. set msg (msg;
消息. set send time (新日期) );
log.info (【消息: { }】、GSON.toJSON )、message );
向topic=hello2发送的消息
kafkatemplate.send('Hello2',GSON.toJSON ) ) message );
}
}
2.5 Kafka消费者定义
在注释中为监听的方法设置监听程序,并指定要监听的主题
kafka的消息收件人作为ConsumerRecord对象封装并返回,其中的value属性是实际消息。
@Component
@Slf4j
公共类Kafka consumer {
@ Kafka监听器(topics=({ ' hello2' } ) () ) ) ) ) ) ) 652 )
公共语音列表(consumer record? 记录
选项. of nullable (record.value ) )
. if present (消息- {
log.info(【record={}】、record );
log.info (【消息={ }】、消息);
);
}
}
3 .测试效果
提供用于调用Kafka发送器发送消息的Http接口
3.1提供http测试接口
@ rest控制器
@Slf4j
公共类测试控制器{
@Autowired
私密密钥发送器密钥发送器;
@ get mapping (发送消息/{ msg } ) )。
公共语音发送消息(@ path variable ) (msg ) String msg ) )。
kafkasender.send(msg;
}
}
3.2启动项目
监听8080端口
KafkaMessageListenerContainer上的消费者组=my group上有拦截hello2- 0主题的消费者
3.3调用http接口
SpringBoot集成Kafka到此结束。
以上是Kafka和SpringBoot的整合步骤的详细内容。 有关SpringBoot与Kafka集成的详细信息,请参阅脚本房屋中的其他相关文章。