springboot websocket集群(stomp协议)连接时候传递参数

最近在公司项目中接到个需求。就是后台跟前端浏览器要保持长连接,后台主动往前台推数据。

网上查了下,websocket stomp协议处理这个很简单。尤其是跟springboot 集成。

但是由于开始是单机玩的,很顺利。

但是后面部署到生产搞集群的话,就会出问题了。

假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到。但是B节点由于没有跟浏览器A建立连接。B节点发的消息浏览器就收不到了。

网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的。

还有很多思路都是通过session获取信息的。但是这都不是我需要的。我需要的是从前台传递参数,连接的时候每个节点保存下。然后通过SimpleUserRegistry.getUser获取。

话不多说,直接上代码。

<script type="text/javascript" src="${request.contextPath}/scripts/sockjs.min.js"></script>
<script type="text/javascript" src="${request.contextPath}/scripts/stomp.min.js"></script>
var WEB_SOCKET = {

  topic : "",
  url : "",
  stompClient : null,

  connect : function(url, topic, callback,userid) {
   this.url = url;
   this.topic = topic;
   var socket = new SockJS(url); //连接SockJS的endpoint名称为"endpointOyzc"
   WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
   WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//连接WebSocket服务端
    // console.log('Connected:' + frame);
    //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
    WEB_SOCKET.stompClient.subscribe(topic, callback);
   });
  }
};

这是响应的前端代码。只需要引入两个js。调用new SockJS(url) 就代表跟服务器建立连接了。

@Configuration

//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

 @Autowired
 private GetHeaderParamInterceptor getHeaderParamInterceptor;

 @Override
 //注册STOMP协议的节点(endpoint),并映射指定的url
 public void registerStompEndpoints(StompEndpointRegistry registry) {
  //注册一个STOMP的endpoint,并指定使用SockJS协议
  registry.addEndpoint("/endpointOyzc")
    .setAllowedOrigins("*")
    .withSockJS();
  /* registry.addEndpoint("/endpointOyzc")
    .setAllowedOrigins("*")
    .setHandshakeHandler(xlHandshakeHandler)
    .withSockJS();*/
 }

 @Override
 //配置消息代理(Message Broker)
 public void configureMessageBroker(MessageBrokerRegistry registry) {
  //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
  registry.enableSimpleBroker("/topic", "/user");
  // 全局使用的消息前缀(客户端订阅路径上会体现出来)
  //registry.setApplicationDestinationPrefixes("/app");
  //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
  registry.setUserDestinationPrefix("/user");
 }

 /**
  * 采用自定义拦截器,获取connect时候传递的参数
  *
  * @param registration
  */
 @Override
 public void configureClientInboundChannel(ChannelRegistration registration) {
  registration.interceptors(getHeaderParamInterceptor);
 }
}

注:上面的endpointOyzc就是前端的url。后面注册端点,前台链接。

然后注意下configureClientInboundChannel这个方法,这个方法里面注入拦截器就是为了链接时候接收参数的。

/**
 * @author : hao
 * @description : websocket建立链接的时候获取headeri里认证的参数拦截器。
 * @time : 2019/7/3 20:42
 */
@Component
public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {

 @Override
 public Message<?> preSend(Message<?> message, MessageChannel channel) {
  StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  if (StompCommand.CONNECT.equals(accessor.getCommand())) {
   Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
   if (raw instanceof Map) {
    Object name = ((Map) raw).get("userid");
    if (name instanceof LinkedList) {
     // 设置当前访问的认证用户
     accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString()));
    }
   }
  }
  return message;
 }
}
/**
 * @author : hao
 * @description : 自定义的java.security.Principal
 * @time : 2019/7/3 20:42
 */
public class JqxxPrincipal implements Principal {

 private String loginName;

 public JqxxPrincipal(String loginName) {
  this.loginName = loginName;
 }

 @Override
 public String getName() {
  return loginName;
 }
}

这样就存入的前台传的参数。

后台发消息的时候怎么发呢?

/**
 * @author : hao
 * @description : websocket发送代理,负责发送消息
 * @time : 2019/7/4 11:01
 */
@Component
@Slf4j
public class WebsocketSendProxy<T> {
 @Autowired
 private SimpMessagingTemplate template;

 @Autowired
 private SimpUserRegistry userRegistry;

 @Resource(name = "redisServiceImpl")
 private RedisService redisService;

 @Value("spring.redis.message.topic-name")
 private String topicName;

 public void sendMsg(RedisWebsocketMsg<T> redisWebsocketMsg) {
  SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
  log.info("发送消息前获取接收方为{},根据Registry获取本节点上这个用户{}", redisWebsocketMsg.getReceiver(), simpUser);
  if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
   //2. 获取WebSocket客户端的订阅地址
   WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
   if (channelEnum != null) {
    //3. 给WebSocket客户端发送消息
    template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
   }
  } else {
   //给其他订阅了主题的节点发消息,因为本节点没有
   redisService.convertAndSend(topicName, redisWebsocketMsg);
  }

 }
}

可以发现上面代码利用了redis监听模型,也就是redis模型的消息队列

/**
 * @author : hao
 * @description : redis消息监听实现类,接收处理类
 * @time : 2019/7/3 14:00
 */
@Component
@Slf4j
public class MessageReceiver {

 @Autowired
 private SimpMessagingTemplate messagingTemplate;

 @Autowired
 private SimpUserRegistry userRegistry;

 /**
  * 处理WebSocket消息
  */
 public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
  log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
  //1. 取出用户名并判断是否连接到当前应用节点的WebSocket
  SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

  if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
   //2. 获取WebSocket客户端的订阅地址
   WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
   if (channelEnum != null) {
    //3. 给WebSocket客户端发送消息
    messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
   }
  }
 }
}

redis消息模型只贴部分代码就好了

/**
  * 消息监听器
  */
 @Bean
 MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
  //消息接收者以及对应的默认处理方法
  MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
  //消息的反序列化方式
  messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

  return messageListenerAdapter;
 }

 /**
  * message listener container
  */
 @Bean
 RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
   , MessageListenerAdapter messageListenerAdapter){
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  //添加消息监听器
  container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));

  return container;
 }

上面的思路大体如下:客户端简历链接时候,传过来userid保存起来。发消息的时候 通过userRegistry获取,能获取到就证明是跟本节点建立的链接,直接用本节点发消息就好了。

如果不是就利用redis消息队列,把消息推出去。每个节点去判断获取看下是不是本节点的userid。这样就实现了集群的部署。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • springboot websocket简单入门示例

    之前做的需求都是客户端请求服务器响应,新需求是服务器主动推送信息到客户端.百度之后有流.长轮询.websoket等方式进行.但是目前更加推崇且合理的显然是websocket. 从springboot官网翻译了一些资料,再加上百度简单实现了springboot使用websocekt与客户端的双工通信. 1.首先搭建一个简单的springboot环境 <!-- Inherit defaults from Spring Boot --> <parent> <groupId>o

  • 详解spring boot实现websocket

    前言 QQ这类即时通讯工具多数是以桌面应用的方式存在.在没有websocket出现之前,如果开发一个网页版的即时通讯应用,则需要定时刷新页面或定时调用ajax请求,这无疑会加大服务器的负载和增加了客户端的流量.而websocket的出现,则完美的解决了这些问题. spring boot对websocket进行了封装,这对实现一个websocket网页即时通讯应用来说,变得非常简单.  一.准备工作 pom.xml引入 <dependency> <groupId>org.spring

  • 关于Spring Boot WebSocket整合以及nginx配置详解

    前言 本文主要给大家介绍了关于Spring Boot WebSocket整合及nginx配置的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 一:Spring Boot WebSocket整合 创建一个maven项目,加入如下依赖 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId>

  • 通过实例讲解springboot整合WebSocket

    一.背景 我们都知道 http 协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息.想要实现浏览器的主动推送有两种主流实现方式: 轮询:缺点很多,但是实现简单 websocket:在浏览器和服务器之间建立 tcp 连接,实现全双工通信 springboot 使用 websocket 有两种方式,一种是实现简单的 websocket,另外一种是实现STOMP协议.这一篇实现简单的 websocket,STOMP 下一篇在讲. 注意:如下都是针对使用 springboot

  • 使用 Spring Boot 实现 WebSocket实时通信

    在开发 Web 应用程序时,我们有时需要将服务端事件推送到连接的客户端.但 HTTP 并不能做到.客户端打开与服务端的连接并请求数据,但服务端不能打开与客户端的连接并推送数据. 为了解决这个限制,我们可以建立了一个轮询模式,网页会间隔地轮询服务器以获取新事件.但这种模式不太理想,因为它增加了 HTTP 开销,速度也只能达到与轮询的速率一样快,并且给服务器增加了不必要的负载. 幸运的是,HTML5 WebSocket 出现了.WebSocket 协议允许浏览器与 Web 服务器之间进行低开销的交互

  • SpringBoot使用WebSocket的方法实例详解

    WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议. WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输. 在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道.两者之间就直接可以数据互相传送. java怎么写 配置Be

  • Spring Boot 开发私有即时通信系统(WebSocket)

    1/ 概述 利用Spring Boot作为基础框架,Spring Security作为安全框架,WebSocket作为通信框架,实现点对点聊天和群聊天. 2/ 所需依赖 Spring Boot 版本 1.5.3,使用MongoDB存储数据(非必须),Maven依赖如下: <properties> <java.version>1.8</java.version> <thymeleaf.version>3.0.0.RELEASE</thymeleaf.ve

  • 详解spring boot Websocket使用笔记

    本文只作为个人笔记,大部分代码是引用其他人的文章的. 在springboot项目中使用websocket做推送,虽然挺简单的,但初学也踩过几个坑,特此记录. 使用websocket有两种方式:1是使用sockjs,2是使用h5的标准.使用Html5标准自然更方便简单,所以记录的是配合h5的使用方法. 1.pom 核心是@ServerEndpoint这个注解.这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,只要在pom文件中引入j

  • Spring Boot实现STOMP协议的WebSocket的方法步骤

    1.概述 我们之前讨论过Java Generics的基础知识.在本文中,我们将了解Java中的通用构造函数. 泛型构造函数是至少需要有一个泛型类型参数的构造函数.我们将看到泛型构造函数并不都是在泛型类中出现的,而且并非所有泛型类中的构造函数都必须是泛型. 2.非泛型类 首先,先写一个简单的类:Entry,它不是泛型类: public class Entry { private String data; private int rank; } 在这个类中,我们将添加两个构造函数:一个带有两个参数的

  • 详解在Spring Boot框架下使用WebSocket实现消息推送

    spring Boot的学习持续进行中.前面两篇博客我们介绍了如何使用Spring Boot容器搭建Web项目以及怎样为我们的Project添加HTTPS的支持,在这两篇文章的基础上,我们今天来看看如何在Spring Boot中使用WebSocket. 什么是WebSocket WebSocket为浏览器和服务器之间提供了双工异步通信功能,也就是说我们可以利用浏览器给服务器发送消息,服务器也可以给浏览器发送消息,目前主流浏览器的主流版本对WebSocket的支持都算是比较好的,但是在实际开发中使

随机推荐