SpringCloud之消息总线Spring Cloud Bus实例代码

一、简介

在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。

二、消息代理

消息代理(Message Broker)是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是在企业应用中,我们经常需要使用消息代理的场景:

  1. 将消息路由到一个或多个目的地。
  2. 消息转化为其他的表现方式。
  3. 执行消息的聚集、消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。
  4. 调用Web服务来检索数据。
  5. 响应事件或错误。
  6. 使用发布-订阅模式来提供内容或基千主题的消息路由。

目前已经有非常多的开源产品可以供大家使用, 比如:

  1. ActiveMQKafka
  2. RabbitMQ
  3. RocketMQ
  4. 等......

三、SpringCloud+RabbitMQ

(1)RabbitMQ简介、安装不赘述。

(2)pom.xml

<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency> 

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies> 

(3)application.yml

spring:
 application:
 name: rabbitmq-hello
 rabbitmq:
 host: ***.***.***.***
 port: 5672
 username: guest
 password: guest 

(4)发送者Sender

@Component
public class Sender { 

 private static final Logger log = LoggerFactory.getLogger(Sender.class);
 @Autowired
 private AmqpTemplate amqpTemplate; 

 public void send() {
 String context = "hello " + new Date();
 log.info("Sender : " + context);
 this.amqpTemplate.convertAndSend("hello", context);
 }
} 

(5)接受者Receiver

@Component
@RabbitListener(queues = "hello")
public class Receiver { 

 private static final Logger log = LoggerFactory.getLogger(Receiver.class); 

 @RabbitHandler
 public void process(String hello) {
 log.info("Receiver : " + hello);
 }
} 

(6)创建RabbitMQ的配置类 RabbitConfig

@Configuration
public class RabbitConfig { 

 @Bean
 public Queue helloQueue(){
 return new Queue("hello");
 }
} 

(7)创建单元测试类, 用来调用消息生产

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SpringcloudbusrabbitmqApplication.class)
public class HelloApplicationTests { 

 @Autowired
 private Sender sender; 

 @Test
 public void hello() throws Exception {
 sender.send();
 }
} 

(8)测试,执行HelloApplicationTests

(9)访问host:15672

四、改造Config-Client(整合springcloud bus)

(1)pom.xml

<dependencies>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-config</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-eureka</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-bus-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency> 

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies> 

(2)bootstrap.properties

spring.application.name=configspace
spring.cloud.config.label=master
spring.cloud.config.profile=dev
spring.cloud.config.uri= http://localhost:5588/
eureka.client.serviceUrl.defaultZone=http://localhost:5555/eureka/ 

server.port=5589 

spring.rabbitmq.host=118.89.237.88
spring.rabbitmq.port= 5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest 

management.security.enabled=false 

(3)其他不用改变

五、测试

(1)测试准备

一个服务注册中心,EUREKASERVER,端口为5555;

一个分布式配置中心,ConfigServer,端口为5588;

二个分布式配置,ConfigClient,端口为5589、5590;(2)访问http://localhost:5589/from

(3)访问http://localhost:5590/from

RabbitMQ:

(4)去仓库修改password的值

from=git-dev-v1.0 by springcloud config-server
username=springcloud
password=1234567890 

(5)POST请求http://localhost:5589/bus/refresh或者http://localhost:5590/bus/refresh

成功请求后config-client会重新读取配置文件

(6)再次访问

  1. 如果POST请求的是:http://localhost:5589/bus/refresh,请访问http://localhost:5590/from
  2. 如果访问出现401,则配置需要加上management.security.enabled=false

如果POST请求的是:http://localhost:5590/bus/refresh,请访问http://localhost:5589/from

另/bus/refresh接口可以指定服务,即使用“username”参数,比如 “/bus/refresh?destination=username:**”即刷新服务名为username的所有服务,不管ip地址。

(7)架构

(8)架构调整

既然SpringCloud Bus的/bus/refresh接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也可以相应做出一些调整。在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例会不同千集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。比如, 需要对服务实例进行迁移,那么我们不得不修改Web Hook中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。

因此, 我们将之前的架构做了 一些调整, 如下图所示:

主要做了以下这些改动:

  1. 在ConfigServer中也引入SpringCloud Bus,将配置服务端也加入到消息总线中来。
  2. /bus/refresh请求不再发送到具体服务实例上,而是发送给Config Server,并通过des巨nation参数来指定需要更新配置的服务或实例。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringCloud Bus 消息总线的具体使用

    什么是消息总线 1. 概念 在微服务架构中,通常会使用轻量级的消息代理来构建一个共用的消息主题来连接各个微服务实例, 它广播的消息会被所有在注册中心的微服务实例监听和消费,也称消息总线 2. SpringCloud Bus SpringCloud中也有对应的解决方案,SpringCloud Bus 将分布式的节点用轻量的消息代理连接起来, 可以很容易搭建消息总线,配合SpringCloud config 实现微服务应用配置信息的动态更新. 3. 其他 消息代理属于中间件.设计代理的目的就是为了能

  • SpringCloud Bus消息总线的实现

    好了现在我们接着上一篇的随笔,继续来讲.上一篇我们讲到,我们如果要去更新所有微服务的配置,在不重启的情况下去更新配置,只能依靠spring cloud config了,但是,是我们要一个服务一个服务的发送post请求, 我们能受的了吗?这比之前的没配置中心好多了,那么我们如何继续避免挨个挨个的向服务发送Post请求来告知服务,你的配置信息改变了,需要及时修改内存中的配置信息. 这时候我们就不要忘记消息队列的发布订阅模型.让所有为服务来订阅这个事件,当这个事件发生改变了,就可以通知所有微服务去更新

  • SpringCloud之消息总线Spring Cloud Bus实例代码

    一.简介 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线. 二.消息代理 消息代理(Message Broker)是一种消息验证.传输.路由的架构模式.它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程.消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息, 并根据设定好的消息处理流来转发给正确的应

  • SpringCloud之服务注册与发现Spring Cloud Eureka实例代码

    一.Spring Cloud简介 Spring Cloud是一个基千SpringBoot实现的微服务架构开发 工具.它为微服务架构中涉及的 配置管理.服务治理. 断路器. 智能路由.微代理. 控制总线. 全局锁. 决策竞选.分布式会话和集群状态管理等操作提供了一种简单的开发方式. Spring Cloud包含了多个子项目(针对分布式系统中涉及的多个不同开源产品,还可能会新增),如下所述. Spring Cloud Config: 配置管理工具.Spring Cloud Netflix: 核心组件

  • 基于kafka实现Spring Cloud Bus消息总线

    目录 一.什么是消息总线 二.整合消息总线实现配置自动刷新 2.1 面向客户端基本架构 2.2 面向服务端的架构 三.利用kafka实现消息总线 3.1 Spring Boot 整合kafka 3.2 实现动态 刷新 3.3 指定刷新范围 一.什么是消息总线 相信大多数读者之前都使用过各种各样的消息队列,例如RabbitMQ.kafka等等,消息总线和他的概念差不多,在微服务系统的架构中,我们通常会使用轻量级的消息代理来 构建一个共用的消息主题让系统中所有的微服务都连接上来,由于该主题中产生的消

  • 解析Spring Cloud Bus消息总线

    概念 我们使用配置中心时,当配置中心的配置发生了变化,我们就要发送一个post请求给客户端,让它重新去拉取新的的配置.当客户端有很多时,并且还是使用同一份配置文件,这样当配置中心的配置发生改变,我们就得逐个发送post请求通知,这样无疑是很浪费人力物力的. Bus消息总线组件就帮我们解决了这个问题.他的工作流程是这样的,当配置中心的配置发生了变化时,我们给其中一个客户端发送post请求,然后client将请求的信息发送到rabbitmq队列中,然后消息队列将消息发送给别的队列. 使用 准备工作

  • Spring Cloud OpenFeign实例介绍使用方法

    目录 一. OpenFeign概述 二. 使用步骤 2.1 feign接口模块 2.1.1依赖配置 2.1.2编写FeignClient的接口, 并加@FeignCleint 注解 2.2 消费端使用fegin接口 2.2.1在消费者端添加feign接口依赖 2.2.2在消费者端配置文件中添加 feign.client.url 2.2.3在消费者端启动类中添加@EnableFeignClients 2.2.4在消费者端使用fegin接口 2.3 测试 一. OpenFeign概述 OpenFei

  • Java Spring Cloud Bus 实现配置实时更新详解

    目录 背景 实现原理 ConfigServer改造 1. pom.xml增加以下依赖 2. 配置文件中配置暴露接口 Service改造 1. pom.xml增加以下依赖 2. 通过@RefreshScope声明配置刷新时需要重新注入 测试 总结 背景 使用Spring Cloud Config Server,启动Service时会从配置中心取配置文件,并注入到应用中,如果在Service运行过程中想更新配置,需要使用Spring Cloud Bus配合实现实时更新. 实现原理 需要借助Rabbi

  • Python实现微信消息防撤回功能的实例代码

    微信(WeChat)是腾讯公司于2011年1月21日推出的一款社交软件,8年时间微信做到日活10亿,日消息量450亿.在此期间微信也推出了不少的功能如:"摇一摇"."漂流瓶"."朋友圈"."附近的人"."公众平台"."小程序"等等,涵盖了我们生活的方方面面,微信正在慢慢践行着他们的口号:微信,是一个生活方式 一.背景介绍 产品的更新迭代必然会伴随着功能的推出和下线,今天我们要讲的便是微信

  • Spring缓存机制实例代码

    Spring的缓存机制非常灵活,可以对容器中任意Bean或者Bean的方法进行缓存,因此这种缓存机制可以在JavaEE应用的任何层次上进行缓存. Spring缓存底层也是需要借助其他缓存工具来实现,例如EhCache(Hibernate缓存工具),上层则以统一API编程. 要使用Spring缓存,需要以下三步 1.向Spring配置文件导入context:命名空间 2.在Spring配置文件启用缓存,具体是添加 <cache:annotation-driven cache-manager="

  • Spring Cloud Gateway 远程代码执行漏洞(CVE-2022-22947)的过程解析

    目录 1.漏洞描述 2.影响版本 3.漏洞环境搭建 4.漏洞复现 5.修复方案 1.漏洞描述 Spring Cloud Gateway 是基于 Spring Framework 和 Spring Boot 构建的 API 网关,它旨在为微服务架构提供一种简单.有效.统一的 API 路由管理方式. Spring官方博客发布了一篇关于Spring Cloud Gateway的CVE报告,据公告描述,当启用和暴露 Gateway Actuator 端点时,使用 Spring Cloud Gateway

  • Spring Cloud Feign实例讲解学习

    前面博文搭建了一个Eureka+Ribbon+Hystrix的框架,虽然可以基本满足服务之间的调用,但是代码看起来实在丑陋,每次客户端都要写一个restTemplate,为了让调用更美观,可读性更强,现在我们开始学习使用Feign. Feign包含了Ribbon和Hystrix,这个在实战中才慢慢体会到它的意义,所谓的包含并不是Feign的jar包包含有Ribbon和Hystrix的jar包这种物理上的包含,而是Feign的功能包含了其他两者的功能这种逻辑上的包含.简言之:Feign能干Ribb

随机推荐