springboot基于Redis发布订阅集群下WebSocket的解决方案

一、背景

单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。

假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。

为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。

二、解决方案(springboot 基于 Redis发布订阅)

1、依赖

<!-- redis -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。

import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
 * @program:
 * @description: redis消息订阅-业务处理
 * @author: zhang yi
 * @create: 2021-01-25 16:46
 */
@Component
public class Demo implements MessageListener {
  Logger logger = LoggerFactory.getLogger(this.getClass());

  @Override
  public void onMessage(Message message, byte[] pattern) {
    logger.info("消息订阅成功---------");
    logger.info("内容:"+message.getBody());
    logger.info("交换机:"+message.getChannel());
  }
}

3、创建PubSubConfig配置类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @program:
 * @description: redis发布订阅配置
 * @author: zhang yi
 * @create: 2021-01-25 16:49
 */
@Configuration
@EnableCaching
public class PubSubConfig {
  Logger logger = LoggerFactory.getLogger(this.getClass());

  //如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory,
  //              MessageListenerAdapter listenerAdapter,
  //              MessageListenerAdapter listenerAdapter2)
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                      MessageListenerAdapter listenerAdapter) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 可以添加多个 messageListener,配置不同的交换机
    container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
    //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
    return container;
  }

  /**
   * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
   * @param demo 第一步的业务处理类
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(Demo demo) {
    logger.info("----------------消息监听器加载成功----------------");
    // onMessage 就是方法名,基于反射调用
    return new MessageListenerAdapter(demo, "onMessage");
  }

  /**
   * 多个交换机就多写一个
   * @param subCheckOrder
   * @return
   */
  //@Bean
  //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
  //  logger.info("----------------消息监听器加载成功----------------");
  //  return new MessageListenerAdapter(subCheckOrder, "onMessage");
  //}

  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

4、消息发布

@Autowired
private RedisTemplate<String, Object> redisTemplate;

redisTemplate.convertAndSend("channel:demo", "我是内容");

三、具体用法

  • socket连接成功。
  • socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。
(0)

相关推荐

  • springboot websocket集群(stomp协议)连接时候传递参数

    最近在公司项目中接到个需求.就是后台跟前端浏览器要保持长连接,后台主动往前台推数据. 网上查了下,websocket stomp协议处理这个很简单.尤其是跟springboot 集成. 但是由于开始是单机玩的,很顺利. 但是后面部署到生产搞集群的话,就会出问题了. 假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到.但是B节点由于没有跟浏览器A建立连接.B节点发的消息浏览器就收不到了. 网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的. 还有很多思路都

  • springboot基于Redis发布订阅集群下WebSocket的解决方案

    一.背景 单机节点下,WebSocket连接成功后,可以直接发送消息.而多节点下,连接时通过nginx会代理到不同节点. 假设一开始用户连接了node1的socket服务.触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点.这就导致了消息发送失败. 为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送. 二.解决方案(s

  • SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

    目录 Redis命令行下使用发布订阅 publish 发布 subscribe 订阅 SpringBoot中使用Redis的发布订阅功能 发布者 订阅者 消息监听容器 注意:redis的发布订阅模式不可以将消息进行持久化,订阅者发生网络断开.宕机等可能导致错过消息. Redis命令行下使用发布订阅 publish 发布 发布者通过以下命令可以往指定channel发布message redis> publish channel message subscribe 订阅 订阅者通过以下命令可以订阅一

  • Redis集群下过期key监听的实现代码

    1. 前言 在使用redis集群时,发现过期key始终监听不到.网上也没有现成的解决方案.于是想,既然不能监听集群,那我可以建立多个redis连接,分别对每个redis的key过期进行监听.以上做法可能不尽人意,目前也没找到好的解决方案,如果有好的想法,请留言告知哦!不多说,直接贴我自己的代码! 2. 代码实现 关于Redis集群配置代码此处不贴,直接贴配置监听类代码! redis.host1: 10.113.56.68 redis.port1: 7030 redis.host2: 10.113

  • C# 基于消息发布订阅模型的示例(下)

    一 背景 在上面的一篇文章中我们介绍了一个完整地基于消息发布和订阅的模型,这篇文章我将介绍一种简单的基于消息的发布和订阅模型,在这个模型中我们将通过构建一个Publisher类来完成对特定的事件和事件订阅进行封装,这个是一个更加轻量级别的方式,使用这个的主要目的是降低类之间彼此的耦合程度,从而方便代码的扩展和访问,最终使代码结构更加合理. 我们首先来看看具体的Publisher类的构成,后面我们将会对这个类做一个详细的讲解和分析. using System; using System.Colle

  • Redis三种集群模式详解

    目录 三种集群模式 一.主从复制 1.reids主从模式 2.redis复制原理 3.redis主从复制原理 4.redis主从复制优缺点 二.Sentinel 哨兵模式 1.Sentinel系统 2.Sentinel故障转移 2.1.Sentinel 哨兵监控过程 2.2.Sentinel 哨兵故障转移 3.Sentinel 哨兵优缺点 三.cluster 模式 1.reids cluster 2.Redis Cluster 数据分片原理 3.Redis Cluster 复制原理 4.redi

  • 通过Docker部署Redis 6.x集群的方法

    系统环境: Redis 版本:6.0.8 Docker 版本:19.03.12 系统版本:CoreOS 7.8 内核版本:5.8.5-1.el7.elrepo.x86_64 一.什么是 Redis 集群模式 在 Redis 3.0 版本后正式推出 Redis 集群模式,该模式是 Redis 的分布式的解决方案,是一个提供在多个 Redis 节点间共享数据的程序集,且 Redis 集群是去中心化的,它的每个 Master 节点都可以进行读写数据,每个节点都拥有平等的关系,每个节点都保持各自的数据和

  • redis搭建哨兵集群的实现步骤

    目录 redis安装部署 redis集群架构 配置redis主从 测试主从 搭建redis哨兵集群 哨兵集群详解: 哨兵集群原理 哨兵集群 redis安装部署 环境说明: redis使用的是6.2.6版本 主机 IP 系统 master 192.168.129.136 redhat8 slave 192.168.182.135 redhat8 slave2 192.168.182.134 redhat8 准备工作 // 三台主机都要做 [root@master opt] wget https:/

  • 基于Docker搭建iServer集群

    目录 前言 一.安装Docker 二.下载 iServer 镜像 三.启动iServer 四.发布服务 五.搭建集群 前言 Linux容器虚拟技术(LXC,Linux Container)是一种轻量级的虚拟化手段,它利用内核虚拟化技术提供轻量级的虚拟化,来隔离进程和资源.Docker扩展了LXC,提供了更高级别的API,并简化了应用的打包和部署,为终端用户创建彼此独立的私有环境,可有效节约开发者和系统管理员的环境部署时间. 一.安装Docker 参考博客 https://www.runoob.c

  • Docker Compose+Nestjs构建Dapr Redis发布订阅分布式应用

    目录 Dapr(分布式应用程序运行时)介绍 实战 Dapr 的 Redis 发布/订阅应用 1. 创建项目 2. 创建 Dapr Placement 服务 3. 创建 Redis Publish 服务 4. 创建 Dapr Pub-Sub 组件 5. 创建 Redis Dapr Sidecar 6. 创建 NestJS Server 7. 为 NestJS 订阅服务器创建 Dockerfile 8. 将 NestJS 订阅服务添加到 docker-compose 文件 9. 创建 Dapr 订阅

  • nodejs redis 发布订阅机制封装实现方法及实例代码

     nodejs redis 发布订阅机制封装 最近项目使用redis,对publish 和 subscribe的使用进行了了解,并进行了封装. var config = require('../config/config'); var log = require("./loghelp"); var redis = require("redis"); function initialclient(param) { var option={ host: config.r

随机推荐