详解Spring Cloud微服务架构下的WebSocket解决方案

WebSocket在现代浏览器中的应用已经算是比较普遍了,在某些业务场景下,要求必须能够在服务器端推送消息至客户端。在没有WebSocket的年代,我们使用过dwr,在那个时候dwr真实一个非常棒的方案。但是在WebSocket兴起之后,我们更愿意使用标准实现来解决问题、

首先交代一下,本篇文章不讲解WebSocket的配置,主要讲的是针对在微服务架构集群模式下解决方案的选择。

微服务架构大家应该都不陌生了,在微服务架构下,服务是分布式的,而且为了保证业务的可用性,每个服务都是以集群的形式存在。在集群模式下,要保证集群的每一个节点的访问得到相同的结果就需要做到数据一致性,如缓存、session等。

微服务集群缓存通常使用分布式缓存redis解决,session一致性也通常会通过redis解决,但是现在更流行的是无状态的Http,即无session化,最常见的解决方案就是OAuth。

WebSocket有所不同,它是与服务端建立一个长连接,在集群模式下,显然不可能把前端与服务集群中的每一个节点建立连接,一个可行的思路是像解决http session的共享一样,通过redis来实现websocket的session共享,但是websocket session的数量是远多于http session的数量的(因为每打开一个页面都会建立一个websocket连接),所以随着用户量的增长,共享的数据量太大,很容易造成瓶颈。

另一个思路是,websocket总归会与集群中某个节点建立连接,那么,只要找到连接所在的节点,就可以向服务端推送消息了,那么要解决的问题就是如何找到一个websocket连接所在的节点。要找到连接在哪个节点上,我们需要一个唯一的标识符用于寻找连接,然而在基于stomp的发布-订阅模式下,一个消息的推送可能是面向若干个连接的,可能分布在集群中的每一个节点上,这样去寻找连接的代价也很高。既然这样,我们不妨换种思路,每一个websocket消息,我们在集群的每个节点上都进行推送,订阅了该消息的连接,不管有一个还是一万个,最终肯定都能收到这个消息。基于这个思路,我们做了一些技术选型:

  • RabbitMQ
  • Spring Cloud Stream

首先说RabbitMQ,高级消息队列,可以实现消息广播(当然kafka一样可以做到,这里只介绍一种),另一项技术是Spring Cloud Stream,stream是一个用于构建高度可扩展事件驱动型微服务的框架,并且它可以跟RabbitMQ、Kafka以及其他多种消息服务集成,使用了stream,要把rabbitmq换成kafka只不过是改改配置的事情。接下来重点介绍使用方法:

引入依赖

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置Binder

binder是stream中的重要概念,是用于配置用于stream发布和订阅事件的消息中间件。先看一段配置:

spring:
 cloud:
  stream:
   binders:
    defaultRabbit:
     type: rabbit
     environment:
      spring:
       rabbitmq:
        host: localhost
        username: username
        password: password
        virtual-host: /

配置中的 defaultRabbit 是binder的名称,一会会在其他配置中引用,type指定了消息中间件的类型,environment是对消息中间件的配置,这里的配置结构和spring.rabbitmq命名空间下的配置项一模一样的,可以参照着进行配置(这样配置的作用是可以把stream的rabbitmq配置和项目中其他地方使用的rabbitmq区分开,如果这里不配置environment,binder会沿用spring.rabbitmq命名空间下的配置),比如你的项目中的rabbitmq的配置是这样的:

spring:
 rabbitmq:
  host: localhost
  username: username
  password: password
  virtual-host: /

那上门的binder的environment配置完全可以去掉。

消息流与binder的绑定

微服务要接收挥着发布事件消息,根据spring cloud stream的名字,顾名思义,需要使用流,所以需要在配置中声明两个事件流,一个输入流,一个输出流:

spring:
 cloud:
  stream:
   bindings:
    websocketMessageIn:
     destination: websocketMessage
     binder: defaultRabbit
    websocketMessageOut:
     destination: websocketMessage
     binder: defaultRabbit

这里我们看到,事件流引用了binder,表示这两个流使用rabbitmq这个中间件(看到这里想必大家已经明白了,在一个项目中完全可以同时使用rabbit和kafka作为事件流的消息中间件)。

websocketMessageIn,websocketMessageOut是事件流的名字(可以自己随便起),destination指定了两个事件流的destination是同一个,这决定了写入和读取是指向同一个地方(不一定是同一个消息队列)。

事件流声明

事件流使用接口进行定义:

/**
 * websocket消息事件流接口
 * Created by 吴昊 on 18-11-8.
 *
 * @author 吴昊
 * @since 1.4.3
 */
interface WebSocketMessageStream {
 companion object {
  const val INPUT: String = "webSocketMessageIn"
  const val OUTPUT: String = "webSocketMessageOut"
 }

 /**
  * 输入
  */
 @Input(INPUT)
 fun input(): SubscribableChannel

 /**
  * 输出
  */
 @Output(OUTPUT)
 fun output(): MessageChannel
}

声明事件流接口,这里面定义了两个常量,分别对应配置中的两个流名称,通过调用input()方法获取输入流,通过调用output()获取输出流。

该接口的实现由spring cloud stream完成,不需要自己实现。

使用事件流

声明一个bean:

@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……

这里的@EnableBinding 注解指明了事件流接口类,只有添加了这个注解(要能被Spring识别到,可以加在入口类上,也可以加在@Configuration注解的类上),该接口才会被实现,并且加入到Spring的容器中(可以注入)。

上面WebSocketMessageService的内容如下:

@Autowired
 private lateinit var stream: WebSocketMessageStream
 @Autowired
 private lateinit var template: SimpMessagingTemplate

 @StreamListener(WebSocketMessageStream.INPUT)
 fun messageReceived(message: WebSocketMessage) {
  template.convertAndSend(message.destination, message.body)
 }

 fun send(destination: String, body: Any) {
  stream.output().send(
    MutableMessage(WebSocketMessage(destination, body))
  )
 }

接收消息

@StreamListener 注解指明了要监听的事件流,方法接收的参数即事件的消息内容(使用jackson反序列化),这里的messageReceived方法直接将接收到的消息直接用websocket发送给前端

发送消息

同样,发送也很简单,将消息直接发送到输入流中,上面的send方法即是将原本应该用SimpMessagingTemplate发送给websocket的消息发送到spring cloud stream的事件流中。这样做以后,项目中所有需要向前端推送webSocket消息的操作都应该调用send方法来进行。

讲到这里大家可能还有点糊涂,也有一些疑问,为什么这样每个微服务节点就能收到事件消息了?或者单个节点接收事件消息和多个节点接收的配置是怎么控制的。各位不要着急,待我慢慢道来,接下来就要结合rabbit的知识来讲解 了:

首先看一下rabbit的消息队列:

从图中看到,存在多个以webSocketMessage开头的队列,这是每一个微服务节点创建了一个消息队列,再来看exchange:

exchange绑定的消息队列

这里的exchange名称和上面消息队列的名称前缀均是webSocketMessage, 这个都是 由前面的binding配置中的destination指定的,和destination名称保持一致

当应用向输入流中写入事件时,使用destination作为key(即webSocketMessage),将消息写入名为webSocketMessage的exchange,由于exchange绑定的消息队列前缀均为webSocketMessage且routing key都是#,所以exchange会将消息路由到每一个webSocketMessage开头的消息队列上(这里涉及到rabbitmq的知识点,如过不懂请自行查阅资料),这样每一个微服务都能接收到相同的消息。

我们再来看前面提出的问题,这样的配置可以把消息推送到每一个微服务节点,那么如果需要一个消息只被一个节点接收,该怎么配置呢?很简单,一个配置项就可以搞定:

spring:
 cloud:
  stream:
   bindings:
    websocketMessageIn:
     group: test
     destination: websocketMessage
     binder: defaultRabbit

可以看到,相比前面的配置,仅仅多了一个group的配置,这样配置之后,rabbitmq会生成一个名为websocketMessage.test的消息队列(前面讲到的每个微服务建立的消息队列是自动删除的,即微服务断开连接后消息队列就被删除,而这个消息队列是持久化的,也就是即使所有的微服务节点全部断开连接也不会被删除),所有的微服务节点监听这一个队列,当队列中有消息时,只会被一个节点消费。

要讲的内容到此结束,spring cloud stream的配置远不止这些,但是这些配置已足够完成我所需要做的事情,其他的配置请参考spring cloud stream官方文档:

http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/Fishtown.RC2/single/spring-cloud-stream.html

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • spring cloud gateway 限流的实现与原理

    在高并发的系统中,往往需要在系统中做限流,一方面是为了防止大量的请求使服务器过载,导致服务不可用,另一方面是为了防止网络攻击. 常见的限流方式,比如Hystrix适用线程池隔离,超过线程池的负载,走熔断的逻辑.在一般应用服务器中,比如tomcat容器也是通过限制它的线程数来控制并发的:也有通过时间窗口的平均速度来控制流量.常见的限流纬度有比如通过Ip来限流.通过uri来限流.通过用户访问频次来限流. 一般限流都是在网关这一层做,比如Nginx.Openresty.kong.zuul.Spring

  • 详解Spring Cloud Gateway 数据库存储路由信息的扩展方案

    动态路由背景 ​ 无论你在使用Zuul还是Spring Cloud Gateway 的时候,官方文档提供的方案总是基于配置文件配置的方式 例如: # zuul 的配置形式 routes: pig-auth: path: /auth/** serviceId: pig-auth stripPrefix: true # gateway 的配置形式 routes: - id: pigx-auth uri: lb://pigx-auth predicates: - Path=/auth/** filte

  • SpringCloud Zuul在何种情况下使用Hystrix及问题小结

    首先,引入spring-cloud-starter-zuul之后会间接引入: hystrix依赖已经引入,那么何种情况下使用hystrix呢? 在Zuul的自动配置类ZuulServerAutoConfiguration和ZuulProxyAutoConfiguration中总共会向Spring容器注入3个Zuul的RouteFilter,分别是 •SimpleHostRoutingFilter 简单路由,通过HttpClient向预定的URL发送请求 生效条件: RequestContext.

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • 详解SpringCloud Finchley Gateway 统一异常处理

    SpringCloud Finchley Gateway 统一异常处理 全文搜索[@@]搜索重点内容标记 1 . 问题:使用SpringCloud Gateway时,会出现各种系统级异常,默认返回HTML. 2 . Finchley版本的Gateway,使用WebFlux形式作为底层框架,而不是Servlet容器,所以常规的异常处理无法使用 翻阅源码,默认是使用DefaultErrorWebExceptionHandler这个类实现结构如下: 可以实现参考DefaultErrorWebExcep

  • Spring Cloud CLI简单介绍

    1.简介 在本文中,我们将介绍Spring Boot Cloud CLI(或简称Cloud CLI).该工具为Spring Boot CLI提供了一组命令行增强功能,有助于进一步抽象和简化Spring Cloud部署. CLI于2016年底推出,允许使用命令行..yml配置文件和Groovy脚本快速自动配置和部署标准Spring Cloud服务. 2.安装 Spring Boot Cloud CLI 1.3.x需要Spring Boot CLI 1.5.x,因此请务必从Maven Central

  • 使用Servlet处理一个上传的文件

    Servlet中可以使用post请求上传文件,使用getReader()和getInputStream()自己处理,也可以使用getPart()或getParts()封装了一些功能的方法处理,getParts()可以同时上传多个文件.接下来使用四个Demo来练习一下使用方法 一.使用getReader()和getInputStream() Demo1 <!-- 这是HTML代码块,窗体网页上显示的是一个选择文件的input框和一个upload的button --> <!DOCTYPE h

  • Servlet+MyBatis项目转Spring Cloud微服务,多数据源配置修改建议

    一.项目需求 在开发过程中,由于技术的不断迭代,为了提高开发效率,需要对原有项目的架构做出相应的调整. 二.存在的问题 为了不影响项目进度,架构调整初期只是把项目做了简单的maven管理,引入springboot并未做spring cloud微服务处理.但随着项目的进一步开发,急需拆分现有业务,做微服务处理.因此架构上的短板日益突出.spring cloud config 无法完全应用,每次项目部署需要修改大量配置文件.严重影响开发效率,因此便萌生了对项目架构再次调整的决心. 三.调整建议 为了

  • Spring Cloud Ribbon的踩坑记录与原理详析

    简介 Spring Cloud Ribbon 是一个基于Http和TCP的客服端负载均衡工具,它是基于Netflix Ribbon实现的.它不像服务注册中心.配置中心.API网关那样独立部署,但是它几乎存在于每个微服务的基础设施中.包括前面的提供的声明式服务调用也是基于该Ribbon实现的.理解Ribbon对于我们使用Spring Cloud来讲非常的重要,因为负载均衡是对系统的高可用.网络压力的缓解和处理能力扩容的重要手段之一.在上节的例子中,我们采用了声明式的方式来实现负载均衡.实际上,内部

  • 详解Spring Cloud Netflix Zuul中的速率限制

    Spring Cloud Netflix Zuul是一个包含Netflix Zuul的 开源网关.它为Spring Boot应用程序添加了一些特定功能.不幸的是,开箱即用不提供速率限制. 除了Spring Cloud Netflix Zuul依赖项之外,我们还需要将Spring Cloud Zuul RateLimit 添加到我们的应用程序的pom.xml中: <dependency> <groupId>org.springframework.cloud</groupId&g

随机推荐