首页 > 编程知识 正文

springcloud服务间调用,not authorized to access group

时间:2023-05-04 19:10:00 阅读:130566 作者:2618

SpringBoot集成Kafka

本文介绍了SpringBoot是如何与Kafka集成的,并简单地创建了Demo以测试发送和消耗功能

前言

的版本。

springboot:2.3.4 .发行版

spring-kafka : 2.5.6.RELEASE

kafka : 2.5.1

zookeeper : 3.4.14

本Demo使用的是SpringBoot较高版本的SpringBoot 2.3.4.RELEASE,是与版本相关的

spring boot 2.3 usersshoulduse 2.5.x (引导从属管理willusethecorrectversion )。

spring和kafka的版本关系

构建Kafka和Zookeeper环境

构建并启动kafka和zookeeper环境

创建Demo项目并部署spring-kafka

2.1 pom文件

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

com.google.code.gson

格森

2.2配置APP应用程序. yml

spring:

kafka:

bootstrap-servers :192.168.25.633609092 # bootstrap-servers :连接到Kafka的地址。 请用逗号分隔多个地址

消费者:

组-id:my组

启用自动提交:真

自动提交间隔: 100ms

properties:

session.timeout.ms: 15000

key-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer

value-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer

自动偏移重置:电子仓库

producer:

如果设置为大于retries: 0 #,客户端将重新发送发送失败的记录

batch-size: 16384 #如果将多个记录发送到同一分区,则Producer会尝试将记录分组到更少的请求中。 这有助于提高客户端和服务器端的性能。 此配置控制批次的默认大小(以字节为单位)。 16384是默认配置

缓冲存储器:33554432 #用于缓冲等待producer发送到服务器的记录的字节总数。 33554432是默认配置

key-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer #关键字的序列化类

值序列化类3360 org.Apache.Kafka.com mon.serialization.string serializer #值的序列化类

2.3消息主体消息定义

//*

* @author johnny

* @create 2020-09-23上午9:21点

*/

@Data

公共类消息{

私有长id;

私有字符串msg;

私密日期发送时间;

}

2.4 Kafka发送器的定义

主要利用KafkaTemplate发送消息,将消息封装为消息,转换为Json字符串并发送到Kafka

@Component

@Slf4j

公共类Kafka发送器{

私密性认证模板;

//用构造函数方式注入kafkaTemplate

publickafkasender (kafkatemplatekafkatemplate ) {

this.Kafka template=Kafka template;

}

隐私gson gson=new GSO nbuilder ().create );

公共语音发送(字符串msg ) {

消息消息=new消息(;

消息. setid (system.current time millis ();

消息. set msg (msg;

消息. set send time (新日期) );

log.info (【消息: { }】、GSON.toJSON )、message );

向topic=hello2发送的消息

kafkatemplate.send('Hello2',GSON.toJSON ) ) message );

}

}

2.5 Kafka消费者定义

在注释中为监听的方法设置监听程序,并指定要监听的主题

kafka的消息收件人作为ConsumerRecord对象封装并返回,其中的value属性是实际消息。

@Component

@Slf4j

公共类Kafka consumer {

@ Kafka监听器(topics=({ ' hello2' } ) () ) ) ) ) ) ) 652 )

公共语音列表(consumer record? 记录

选项. of nullable (record.value ) )

. if present (消息- {

log.info(【record={}】、record );

log.info (【消息={ }】、消息);

);

}

}

3 .测试效果

提供用于调用Kafka发送器发送消息的Http接口

3.1提供http测试接口

@ rest控制器

@Slf4j

公共类测试控制器{

@Autowired

私密密钥发送器密钥发送器;

@ get mapping (发送消息/{ msg } ) )。

公共语音发送消息(@ path variable ) (msg ) String msg ) )。

kafkasender.send(msg;

}

}

3.2启动项目

监听8080端口

KafkaMessageListenerContainer上的消费者组=my group上有拦截hello2- 0主题的消费者

3.3调用http接口

SpringBoot集成Kafka到此结束。

以上是Kafka和SpringBoot的整合步骤的详细内容。 有关SpringBoot与Kafka集成的详细信息,请参阅脚本房屋中的其他相关文章。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。