SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

目录
  • Redis命令行下使用发布订阅
    • publish 发布
    • subscribe 订阅
  • SpringBoot中使用Redis的发布订阅功能
    • 发布者
    • 订阅者
    • 消息监听容器

注意:redis的发布订阅模式不可以将消息进行持久化,订阅者发生网络断开、宕机等可能导致错过消息。

Redis命令行下使用发布订阅

publish 发布

发布者通过以下命令可以往指定channel发布message

redis> publish channel message

subscribe 订阅

订阅者通过以下命令可以订阅一个或多个频道,如果频道不存在则会创建

redis> subscribe channel [channel ...]

对于redis的发布订阅的命令就这么简单。那么接下来我们在springboot中如何使用发布订阅的功能呢?

SpringBoot中使用Redis的发布订阅功能

添加依赖配置redis信息和连接池什么的就不说了,如果添加的有commons-pool2依赖的话,会自动帮我们配置redis连接池的

发布者

相对于订阅者来说,发布者的实现方式很简单,以下方式就可以往channel中发送message了。

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void publish(){
    // 使用高级的redisTemplate
    redisTemplate.convertAndSend("channel","message");

    // 使用低级的connection 实际上redisTemplate的底层就是使用的下面的方式
    redisTemplate.execute(new RedisCallback<Object>() {
          @Override
          public Object doInRedis(RedisConnection connection) throws DataAccessException {
              connection.publish("channel".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));
              return null;
         }
     }, true);
     // true这个参数意思是 是否将redis连接暴露给回调代码,大多数情况下设置true就可以了,往后深入的话可以看到
     RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 如果为false的话会创建redis连接的代理
}

订阅者

订阅者因为涉及到连接、线程等 所以内容相对会多一点

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void subscribe() {
        redisTemplate.execute(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                // 我定义了一个全局的 ConcurrentHashMap 用来存放连接 因为后面的取消订阅的线程要和订阅的线程用同一个连接
                map.put("connection",connection);

                // subscribe 按频道订阅 该方法会阻塞该线程 只有取消订阅才会释放该线程
                connection.subscribe(new MessageListener() {
                    @Override
                    public void onMessage(Message message, byte[] pattern) {
                        log.info("接收到消息");
                        System.out.println(new String(message.getBody()));
                    }
                }, "channelOne".getBytes(StandardCharsets.UTF_8), "channelTwo".getBytes(StandardCharsets.UTF_8));

                // 按模式订阅 pSubscribe 只有取消订阅才会释放该线程
//                connection.pSubscribe(new MessageListener() {
//                    @Override
//                    public void onMessage(Message message, byte[] pattern) {
//                        System.out.println(new String(message.getBody()));
//                    }
//                }, "patternOne".getBytes(StandardCharsets.UTF_8), "patternOne".getBytes(StandardCharsets.UTF_8));
                return null;
            }
        }, true);
    }

如何取消订阅呢?从刚才的map里取到连接

    RedisConnection the = map.get("connection");
    Subscription subscription = the.getSubscription();
    subscription.unsubscribe();

消息监听容器

上面的那种订阅为低级订阅,由于连接在调用subscribe的时候会导致当前线程阻塞,这种方式需要对每个监听器连接和线程管理,所以spring提供了RedisMessageListenerContainer类来帮我们完成这些工作。

RedisMessageListenerContainer顾名思义可以知道它是一个消息监听容器
详情请参考官方文档

如何实现

@Configuration
public class DefaultMessageListenerContainerConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 官方推荐我们使用自定义的线程池或者使用TaskExecutor
        container.setTaskExecutor(executor());
        container.addMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody()));
            }
        }, new ChannelTopic("message"));
        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
}

这个时候我们在redis命令行内使用 publish channel message 的时候,我们的spring程序就可以订阅到消息了。

再说下 MessageListenerAdapter
我们可以通过 MessageListenerAdapter 消息接收者包装进去,消息接收者不会和redis有任何耦合。
官方文档给了spring传统的xml的方式配置的,下面我给出基于configuration配置的代码

public interface MessageDelegate {
    void handleMessage(String message);
}

public class DefaultMessageDelegate implements MessageDelegate {
    @Override
    public void handleMessage(String message) {
        System.out.println(message);
    }
}

@Configuration
public class MessageListenerContainerConfig {

    @Autowired
    private DefaultMessageDelegate defaultMessageDelegate;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.setTaskExecutor(executor());

        Map<MessageListenerAdapter, Collection<? extends Topic>> map = new HashMap<>();
        List<ChannelTopic> channelTopics = new ArrayList<>();
        ChannelTopic channelTopic = new ChannelTopic("message");
        channelTopics.add(channelTopic);
        map.put(messageListenerAdapter, channelTopics);
        container.setMessageListeners(map);

        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        // handleMessage 参数消息来的时候要调用的方法 默认是 handleMessage
        return new MessageListenerAdapter(defaultMessageDelegate, "handleMessage");
    }
}

如果我们要在程序运行时添加订阅或者取消订阅的时候该怎么办呢?
我们需要提前准备好消息侦听器,添加的时候把侦听器注入到消息容器
取消的时候就调用消息容器的remove方法把侦听器删除掉即可。

到此这篇关于SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用的文章就介绍到这了,更多相关SpringBoot Redis发布订阅模式内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot基于Redis发布订阅集群下WebSocket的解决方案

    一.背景 单机节点下,WebSocket连接成功后,可以直接发送消息.而多节点下,连接时通过nginx会代理到不同节点. 假设一开始用户连接了node1的socket服务.触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点.这就导致了消息发送失败. 为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送. 二.解决方案(s

  • SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

    目录 Redis命令行下使用发布订阅 publish 发布 subscribe 订阅 SpringBoot中使用Redis的发布订阅功能 发布者 订阅者 消息监听容器 注意:redis的发布订阅模式不可以将消息进行持久化,订阅者发生网络断开.宕机等可能导致错过消息. Redis命令行下使用发布订阅 publish 发布 发布者通过以下命令可以往指定channel发布message redis> publish channel message subscribe 订阅 订阅者通过以下命令可以订阅一

  • .net core如何使用Redis发布订阅

    Redis是一个性能非常强劲的内存数据库,它一般是作为缓存来使用,但是他不仅仅可以用来作为缓存,比如著名的分布式框架dubbo就可以用Redis来做服务注册中心.接下来介绍一下.net core 使用Redis的发布/订阅功能. Redis 发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis 客户端可以订阅任意数量的通道. 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 -- client2 .

  • nodejs redis 发布订阅机制封装实现方法及实例代码

     nodejs redis 发布订阅机制封装 最近项目使用redis,对publish 和 subscribe的使用进行了了解,并进行了封装. var config = require('../config/config'); var log = require("./loghelp"); var redis = require("redis"); function initialclient(param) { var option={ host: config.r

  • redis发布订阅_动力节点Java学院整理

    其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个场景还能找到其他场景么,当然有啦,你想想,如果你要在内存里面做一个读写分离的程序,为了维持数据的完整性,你是不是需要保证在写入的时候,也要分发到各个读内存的程序中呢?所以说场景还是很多的,在于你的挖掘~~~ 下面还是从基本命令入手: 一:命令简介 从redis手册上面可以看到,其实"发布.订阅"

  • JavaScript设计模式之观察者模式与发布订阅模式详解

    本文实例讲述了JavaScript设计模式之观察者模式与发布订阅模式.分享给大家供大家参考,具体如下: 学习了一段时间设计模式,当学到观察者模式和发布订阅模式的时候遇到了很大的问题,这两个模式有点类似,有点傻傻分不清楚,博客起因如此,开始对观察者和发布订阅开始了Google之旅.对整个学习过程做一个简单的记录. 观察者模式 当对象间存在一对多关系时,则使用观察者模式(Observer Pattern).比如,当一个对象被修改时,则会自动通知它的依赖对象.观察者模式属于行为型模式.在观察模式中共存

  • gRPC的发布订阅模式及REST接口和超时控制

    前言 上篇文章 gRPC,爆赞 直接爆了,内容主要包括:简单的 gRPC 服务,流处理模式,验证器,Token 认证和证书认证. 在多个平台的阅读量都创了新高,在 oschina 更是获得了首页推荐,阅读量到了 1w+,这已经是我单篇阅读的高峰了. 看来只要用心写还是有收获的. 这篇咱们还是从实战出发,主要介绍 gRPC 的发布订阅模式,REST 接口和超时控制. 相关代码我会都上传到 GitHub,感兴趣的小伙伴可以去查看或下载. 发布和订阅模式 发布订阅是一个常见的设计模式,开源社区中已经存

  • Docker Compose+Nestjs构建Dapr Redis发布订阅分布式应用

    目录 Dapr(分布式应用程序运行时)介绍 实战 Dapr 的 Redis 发布/订阅应用 1. 创建项目 2. 创建 Dapr Placement 服务 3. 创建 Redis Publish 服务 4. 创建 Dapr Pub-Sub 组件 5. 创建 Redis Dapr Sidecar 6. 创建 NestJS Server 7. 为 NestJS 订阅服务器创建 Dockerfile 8. 将 NestJS 订阅服务添加到 docker-compose 文件 9. 创建 Dapr 订阅

  • node.js 发布订阅模式的实例

    实例如下: //导入内置模块 let EventEmitter = require('events'); let util=require('util'); //Man继承EventEmitter util.inherits(Man,EventEmitter); //创建一个函数 function Man(){} //实例化函数 let man=new Man(); function findGirl() { console.log('找新的女朋友') } function saveMoney(

  • JavaScript中发布/订阅模式的简单实例

    上次研究观察者模式,很多文章说它也叫Subscribe/Publish(发布/订阅模式).可在<Javascript设计模式>一书中,这两种模式还是有些区别的.书中原话如下: 1.Observer模式要求希望接收到主题通知者的观察者必须订阅内容改变的事件. 2.Subscribe/Publish模式使用了一个主题/事件通道,这个通道介于订阅者和发布者之间.该事件系统允许代码定义应用程序的特定事件,该事件可以传递自定义参数,自定义参数包含订阅者所需要的值.其目的是避免订阅者和发布者产生依赖关系.

随机推荐