websocket+redis动态订阅和动态取消订阅的实现示例

目录
  • 原理
  • redis订阅监听类
  • webSocket订阅推送类
  • 项目地址
  • Update20220415

原理

websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。

订阅频道消息格式:

{
    "cmd":"subscribe",
    "topic":[
        "topic_name"
    ]
}

模糊订阅格式

{
    "cmd":"psubscribe",
    "topic":[
        "topic_name"
    ]
}

取消订阅格式

{
    "cmd":"unsubscribe",
    "topic":[
        "topic_name"
    ]
}

两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。

redis订阅监听类

package com.curtain.core;

import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Arrays;

/**
 * @Author Curtain
 * @Date 2021/6/7 14:27
 * @Description
 */
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
    private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
    private Jedis jedis;

    //订阅
    public void subscribe(String... channels) {
        jedis = jedisPool.getResource();
        try {
            jedis.subscribe(this, channels);
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            subscribe(channels);
        }
    }

    //模糊订阅
    public void psubscribe(String... channels) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.psubscribe(this, channels);
        } catch (ArithmeticException e) {//取消订阅故意造成的异常
            if (jedis != null)
                jedis.close();
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            psubscribe(channels);
        }
    }

    public void unsubscribeAndClose(String... channels){
        unsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    public void punsubscribeAndClose(String... channels){
        punsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, pattern);
        WebSocketServer.publish(message, channel);

    }

    @Override
    public void onMessage(String channel, String message) {
        log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, channel);
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        log.info("unsubscribe redis channel:" + channel);
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        log.info("punsubscribe redis channel:" + pattern);
    }
}

1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。

webSocket订阅推送类

这个类会有两个ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>类型类变量,分别存储订阅和模糊订阅的信息。

外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。

还有个ConcurrentHashMap<String, RedisPubSub>类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。

信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。

package com.curtain.core;

import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author Curtain
 * @Date 2021/5/14 16:49
 * @Description
 */
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放psub的事件
     **/
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放topic(pattern)-对应的RedisPubsub
     */
    private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    private String sessionId = "";
    //要注入的对象
    private static TaskExecuteService executeService;
    private static WebsocketProperties properties;

    private Cancelable cancelable;

    @Autowired
    public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
        WebSocketServer.executeService = taskExecuteService;
    }

    @Autowired
    public void setWebsocketProperties(WebsocketProperties properties) {
        WebSocketServer.properties = properties;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.sessionId = session.getId();
        //构造推送数据
        Map pubHeader = new HashMap();
        pubHeader.put("name", "connect_status");
        pubHeader.put("type", "create");
        pubHeader.put("from", "pubsub");
        pubHeader.put("time", new Date().getTime() / 1000);
        Map pubPayload = new HashMap();
        pubPayload.put("status", "success");
        Map pubMap = new HashMap();
        pubMap.put("header", pubHeader);
        pubMap.put("payload", pubPayload);
        sendMessage(JSON.toJSONString(pubMap));
        cancelable = executeService.runPeriodly(() -> {
            try {
                if (cancelable != null && !session.isOpen()) {
                    log.info("断开连接,停止发送ping");
                    cancelable.cancel();
                } else {
                    String data = "ping";
                    ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                    session.getBasicRemote().sendPing(payload);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, properties.getPeriod());

    }

    @OnMessage
    public void onMessage(String message) {
        synchronized (session) {
            Map msgMap = (Map) JSON.parse(message);
            String cmd = (String) msgMap.get("cmd");
            //订阅消息
            if ("subscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地记录订阅信息
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================subscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始订阅:" + topic);
                    if (webSocketMap.containsKey(topic)) {//有人订阅过了
                        webSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前还没人订阅过,所以需要订阅redis频道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        webSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.subscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);
                    log();
                    log.info("============================subscribe-end============================");
                }
            }
            //psubscribe
            if ("psubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地记录订阅信息
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================psubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始模糊订阅:" + topic);
                    if (pWebSocketMap.containsKey(topic)) {//有人订阅过了
                        pWebSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前还没人订阅过,所以需要订阅redis频道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        pWebSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.psubscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic);
                    log();
                    log.info("============================psubscribe-end============================");
                }
            }
            //取消订阅
            if ("unsubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //删除本地对应的订阅信息
                for (String topic : topics) {
                    log.info("============================unsubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic);
                    if (webSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                            webSocketMap.remove(topic);
                            redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    if (pWebSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                            pWebSocketMap.remove(topic);
                            redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic);
                    log();
                    log.info("============================unsubscribe-end============================");
                }
            }
        }
    }

    @OnMessage
    public void onPong(PongMessage pongMessage) {
        try {
            log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        synchronized (session) {
            log.info("============================onclose-start============================");
            //删除订阅
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //删除模糊订阅
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("sessionId:" + this.sessionId + ",断开连接:");
            //debug
            log();
            log.info("============================onclose-end============================");
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        synchronized (session) {
            log.info("============================onError-start============================");
            log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage());
            error.printStackTrace();
            log.info("关闭错误用户对应的连接");
            //删除订阅
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //删除模糊订阅
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("完成错误用户对应的连接关闭");
            //debug
            log();
            log.info("============================onError-end============================");
        }
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) {
        synchronized (session) {
            try {
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void publish(String msg, String topic) {
        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
        map = pWebSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
    }

    private void log() {
        log.info("<<<<<<<<<<<完成操作后,打印订阅信息开始>>>>>>>>>>");
        Iterator iterator1 = webSocketMap.keySet().iterator();
        while (iterator1.hasNext()) {
            String topic = (String) iterator1.next();
            log.info("topic:" + topic);
            Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
            while (iterator2.hasNext()) {
                String session = (String) iterator2.next();
                log.info("订阅" + topic + "的sessionId:" + session);
            }
        }
        log.info("<<<<<<<<<<<完成操作后,打印订阅信息结束>>>>>>>>>>");
    }
}

项目地址

上面介绍了核心代码,下面是完整代码地址

https://github.com/Curtain-Wang/websocket-redis-subscribe.git

Update20220415

参考评论区老哥的建议,将redis订阅监听类里面的subscribe和psubscribe方法调整如下:

    //订阅
    @Override
    public void subscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.subscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    //模糊订阅
    @Override
    public void psubscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.psubscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }

到此这篇关于websocket+redis动态订阅和动态取消订阅的实现示例的文章就介绍到这了,更多相关websocket redis动态订阅 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot基于Redis发布订阅集群下WebSocket的解决方案

    一.背景 单机节点下,WebSocket连接成功后,可以直接发送消息.而多节点下,连接时通过nginx会代理到不同节点. 假设一开始用户连接了node1的socket服务.触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点.这就导致了消息发送失败. 为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送. 二.解决方案(s

  • 使用 Redis 缓存实现点赞和取消点赞的示例代码

    点赞功能是很多平台都会提供的一个功能,那么,我们要如何实现点赞和取消点赞呢? 这篇文章总结了我在项目中实现点赞的方法. 缓存 vs 数据库? 首先我们要考虑的是数据要放到哪里,很多时候我们都会把数据放到数据库(如 MySQL),由于关系型数据库的稳定性,大部分场景下我们也都会使用关系数据库来存储数据. 不过,在一些特殊的场景下,传统的关系型数据库很可能无法满足我们的需求.比如,在访问量较大的情况下,数据库很可能会宕机或者访问速度非常慢.这对用户来说是不能容忍的.因此就有了非关系型数据库,如 Re

  • websocket+redis动态订阅和动态取消订阅的实现示例

    目录 原理 redis订阅监听类 webSocket订阅推送类 项目地址 Update20220415 原理 websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道. 订阅频道消息格式: { "cmd":"subscribe", "topic":[ "topic_name" ] } 模糊订阅格式 { "cmd":"psubsc

  • SpringBoot+Redis实现消息的发布与订阅的示例代码

    目录 1.什么是redis的发布与订阅 2.Redis发布订阅 3.命令行实现功能 订阅主题 模式匹配订阅 发布消息 取消订阅 测试 4.SpringBoot实现功能 Springboot整合Redis 配置消息监听 测试 1.什么是redis的发布与订阅 在官网的文档介绍中有一行介绍:Redis是一个快速稳定的发布/订阅消息系统. 2.Redis发布订阅 机制 Redis提供了发布与订阅的功能,可以用于消息的传输,Redis的发布订阅机制包括三部分,发布者.订阅者和Channel(主题或者队列

  • go+redis实现消息队列发布与订阅的详细过程

    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息. 1.生产者随机发布消息,用rpush发布.2.消费者用lpop订阅消费,一旦没有消息,随机休眠.redis做消息队列的缺点:没有持久化.一旦消息没有人消费,积累到一定程度后就会丢失 package main import ( "fmt

  • RxSwift学习之Observable的新建、订阅及取消订阅

    前言 我们在前一篇基础之上,本文将会介绍 RxSwift 中的 Observables 部分. 在 RxSwift 中 Observable 也被称为 Observable Sequence.Sequence.Stream.Observable 会以异步的方式不断的发射事件形成事件流,并且数据也会沿着事件流进行传播.下图是事件流的图像化表示: 其中从左到右的箭头代表时间轴,而三个圆圈则构成了可观察序列.而整个过程会按照从左到右的顺序.另外,事件可能在可观察序列生命周期内的任意时刻被触发. Obs

  • 结合Visual C#开发环境讲解C#中事件的订阅和取消订阅

    类或对象可以通过事件向其他类或对象通知发生的相关事情.发送(或引发)事件的类称为"发行者",接收(或处理)事件的类称为"订户". 在典型的 C# Windows 窗体或 Web 应用程序中,可订阅由控件(如按钮和列表框)引发的事件.可使用 Visual C# 集成开发环境 (IDE) 来浏览控件发布的事件,选择要处理的事件.IDE 会自动添加空事件处理程序方法和订阅事件的代码. 事件概述 事件具有以下特点: 发行者确定何时引发事件,订户确定执行何种操作来响应该事件.

  • 浅谈Angular 中何时取消订阅

    你可能知道当你订阅 Observable 对象或设置事件监听时,在某个时间点,你需要执行取消订阅操作,进而释放操作系统的内存.否则,你的应用程序可能会出现内存泄露. 接下来让我们看一下,需要在 ngOnDestroy 生命周期钩子中,手动执行取消订阅操作的一些常见场景. 手动释放资源场景 表单 export class TestComponent { ngOnInit() { this.form = new FormGroup({...}); // 监听表单值的变化 this.valueChan

  • 深入理解Angular4订阅(Subscribe)与取消

    订阅(Subscribe) 写过js的都知道,subscribe在很多地方都能看到它的身影,并且起到了很重要的作用.侦听http请求的返回,页面间传递参数- -说起订阅,就不能不提Observable,说起Observable就不能不提Subscribable- -等等,扯太远了.回到正题,subscribe是Observable类下的一个函数.从Observable的中文名:"可观察的"就能看出,Observable的作用是可以起到类似监听的作用,但它的监听往往都是在跨页面中,举个栗

  • RxJava取消订阅的各种方式的实现

    手动取消订阅 Consumer类型 Observable创建返回Disposable取消 public class SecondActivity extends AppCompatActivity { private static final String TAG = "SecondActivity"; private Disposable disposable; @Override protected void onCreate(Bundle savedInstanceState)

  • Redis Cluster集群动态扩容的实现

    目录 一.引言 二.Cluster集群增加操作 1.动态增加Master主服务器节点 2.动态增加Slave从服务器节点 三.Cluster集群删除操作 1.动态删除Slave从服务器节点 2.动态删除Master主服务器节点 四.总结 一.引言 上一篇文章我们一步一步的教大家搭建了Redis的Cluster集群环境,形成了3个主节点和3个从节点的Cluster的环境.当然,大家可以使用 Cluster info 命令查看Cluster集群的状态,也可以使用Cluster Nodes 命令来详细

随机推荐