首页 > 编程知识 正文

spring集成kafka的原理,M bus

时间:2023-05-06 12:43:03 阅读:8939 作者:3698

如果客户端需要刷新才能获取最新的配置信息,则可以在每次使用webhook机制发送代码提交请求时更新客户端。 当客户端越来越多时,必须为每个客户端运行,这种情况不太适合。 使用Spring Cloud Bus可以完美解决这个问题。

Spring bus的一个核心思想是通过分布式启动器扩展spring boot APP应用程序,也可以用于在多个APP应用程序之间建立通信信道。 目前实现的唯一方法是使用amqpmessageagent作为通道。 根据通道设置,相同的特性设置位于更多通道的文档中。 其实本质上是利用MQ的广播机制在分散的系统中传播信息,现在常用的是Kafka和RabbitMQ。

整个客户端配置文件更新过程如下:

发送代码将开机自检请求发送到bus/refreshServer端接收请求,发送到Spring Cloud BusSpring Cloud bus接收消息,并向其他客户端接收通知。 有关实现RocketMQ以获取请求服务器端获取最新配置的所有客户端的最新配置,请参见《Spring Boot中使用RabbitMQ》文章,其中具体介绍了Spring Cloud Bus的配置,以及Spring Cloud Bus和spring bus

RabbitMQ实现具体尝试整个配置过程,如下所示:

准备工作:虽然在这里不做新的应用,但是在上一章中,需要使用我们已经实现的关于Spring Cloud Config的几个工序。 如果读者还不知道这一点,建议先读第四章的内容。 config-repo :在包含名为“Didi space”的多环境配置文件的Git仓库中定义的目录。 配置文件包含from参数。 config-server-Eureka :已配置git仓库并在eureka的服务器端注册。 配置客户端- eureka :在eureka上发现配置服务器的客户端。 一个名为didispace的APP应用程序,用于访问配置服务器以获取配置信息。 此APP应用程序提供了/from接口,它从config-repo/Didi space-dev.properties检索并返回from属性。 扩展config-client-eureka APP应用程序以修改pom.xml并添加spring-cloud-starter-bus-amqp模块(spring-boot-starter-actom ) 1

2

3

4

从属关系

groupid org.spring framework.cloud/groupid

artifactidspring-cloud-starter-bus-amqp/artifact id

/从属

将RabbitMQ连接和用户信息添加到配置式1

2

3

4

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbit MQ.username=spring cloud

spring.rabbit MQ.password=123456

启动config-server-eureka并启动两个config-client-eureka (分别为不同的端口,例如7002、7003 )时,config-client-eureka的控制Refresh],methods=[ post ] } ' ontopublicvoidorg.spring framework.cloud.bus.endpoinnt

到目前为止,您可以使用Spring Cloud Bus实时更新总线上的属性配置。 如果您先访问两个config-client-eureka的/from请求,则会返回当前config-repo/Didi space-dev.properties的from属性。 然后修改config-repo/Didi space-dev.properties中的from属性值,并将开机自检请求发送到/bus/refresh之一。 最后,分别访问启动的两个config-client-eureka的/from请求。 在这种情况下,这两个请求都返回最新的config-repo/Didi space-dev.properties的from属性。 原理分析我们使用Spring Cloud Bus和Spring Cloud Config的集成,以RabbitMQ为消息代理,实现了APP应用配置的动态更新。

p>整个方案的架构如上图所示,其中包含了Git仓库、Config Server、以及微服务“Service A”的三个实例,这三个实例中都引入了Spring Cloud Bus,所以他们都连接到了RabbitMQ的消息总线上。

当我们将系统启动起来之后,“Service A”的三个实例会请求Config Server以获取配置信息,Config Server根据应用配置的规则从Git仓库中获取配置信息并返回。

此时,若我们需要修改“Service A”的属性。首先,通过Git管理工具去仓库中修改对应的属性值,但是这个修改并不会触发“Service A”实例的属性更新。我们向“Service A”的实例3发送POST请求,访问/bus/refresh接口。此时,“Service A”的实例3就会将刷新请求发送到消息总线中,该消息事件会被“Service A”的实例1和实例2从总线中获取到,并重新从Config Server中获取他们的配置信息,从而实现配置信息的动态更新。

而从Git仓库中配置的修改到发起/bus/refresh的POST请求这一步可以通过Git仓库的Web Hook来自动触发。由于所有连接到消息总线上的应用都会接受到更新请求,所以在Web Hook中就不需要维护所有节点内容来进行更新,从而解决了通过Web Hook来逐个进行刷新的问题。

指定刷新范围

上面的例子中,我们通过向服务实例请求Spring Cloud Bus的/bus/refresh接口,从而触发总线上其他服务实例的/refresh。但是有些特殊场景下(比如:灰度发布),我们希望可以刷新微服务中某个具体实例的配置。

Spring Cloud Bus对这种场景也有很好的支持:/bus/refresh接口还提供了destination参数,用来定位具体要刷新的应用程序。比如,我们可以请求/bus/refresh?destination=customers:9000,此时总线上的各应用实例会根据destination属性的值来判断是否为自己的实例名,若符合才进行配置刷新,若不符合就忽略该消息。

destination参数除了可以定位具体的实例之外,还可以用来定位具体的服务。定位服务的原理是通过使用Spring的PathMatecher(路径匹配)来实现,比如:/bus/refresh?destination=customers:**,该请求会触发customers服务的所有实例进行刷新。

架构优化

既然Spring Cloud Bus的/bus/refresh接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也相应的可以做出一些调整。在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例就会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作,比如:我们需要对服务实例进行迁移,那么我们不得不修改Web Hook中的配置等。所以我们要尽可能的让服务集群中的各个节点是对等的。

因此,我们将之前的架构做了一些调整,如下图所示:

 

 

我们主要做了这些改动:

在Config Server中也引入Spring Cloud Bus,将配置服务端也加入到消息总线中来。/bus/refresh请求不在发送到具体服务实例上,而是发送给Config Server,并通过destination参数来指定需要更新配置的服务或实例。

通过上面的改动,我们的服务实例就不需要再承担触发配置更新的职责。同时,对于Git的触发等配置都只需要针对Config Server即可,从而简化了集群上的一些维护工作。

本文完整示例:

开源中国:http://git.oschina.net/didispace/SpringCloud-Learning/tree/master/Chapter1-1-7GitHub:https://github.com/dyc87112/SpringCloud-Learning/tree/master/Chapter1-1-7

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。