SpringBoot整合Redis实现消息发布与订阅的示例代码

当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步!

能实现发送与接收信息的中间介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现。

1.添加SpringBoot 集成Redis maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置Redis配置 RedisConfig.java

@Configuration
public class RedisConfig {
    @Value("${redis.server.nodes}")
    private String redisServerNodes;

    @Value("${redis.server.password}")
    private String redisServerPassword;

    //Redis集群配置,单机的redis注释掉该方法,在application配置文件里面配置就可以了
    @Bean
    public RedisClusterConfiguration getRedisClusterConfiguration() {

        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();

        String[] serverArray = redisServerNodes.split(",");
        Set<RedisNode> nodes = new HashSet<RedisNode>();
        for (String ipPort : serverArray) {
            String[] ipAndPort = ipPort.split(":");
            nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));
        }
        redisClusterConfiguration.setClusterNodes(nodes);
        RedisPassword pwd = RedisPassword.of(redisServerPassword);
        redisClusterConfiguration.setPassword(pwd);
        return redisClusterConfiguration;
    }

    //指定redisTemplate类型,如下为<String, Object>
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        // 使用JSON格式序列化对象,对缓存数据key和value进行转换
        Jackson2JsonRedisSerializer<Object> jacksonSeial = new Jackson2JsonRedisSerializer<>(Object.class);
        // 解决查询缓存转换异常的问题
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(objectMapper);

        // 设置RedisTemplate模板API的序列化方式为JSON
        template.setDefaultSerializer(jacksonSeial);
        return template;
    }

     /**
     * Redis消息监听器容器
     * 这个容器加载了RedisConnectionFactory和消息监听器
     * 可添加多个不同话题的redis监听器,需要将消息监听器和消息频道绑定,
     * 通过反射调用消息订阅处理器的相关方法进行业务处理
     *
     * @param  redisConnectionFactory       连接工厂
     * @param  listener                     Redis消息监听器
     * @param  MessageListenerAdapter       Redis消息监听适配器
     * @return RedisMessageListenerContainer    消息监听容器
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                                   RedisMessageListener listener,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        // 所有的订阅消息,都需要在这里进行注册绑定
        // new PatternTopic(TOPIC_NAME1) 表示发布信息的频道
        // 可以添加多个频道以及配置不同的频道
        container.addMessageListener(listener, new PatternTopic(SystemConstants.TOPIC_NAME1));
        container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2));

        /**
         * 设置序列化对象
         * 特别注意:1. 发布的时候和订阅方都需要设置序列化
         *         2. 设置序列化对象必须放在 {加入消息监听器} 这步后面,不然接收器接收不到消息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }

    /**
     * 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
     * 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage
     *
     * @param redisMessageReceiver 消息接收对象
     * @return 消息监听适配器
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisMessageReceiver redisMessageReceiver) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "onMessage");
        Jackson2JsonRedisSerializer<Object> seria = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
}

3.Redis的订阅主要有两种实现方式

方式一:编写Redis监听类RedisMessageListener,实现Redis的监听接口MessageListener,并重写onMessage方法

方式二:编写Redis消息监听适配器类,并在RedisConfig.java中配置消息监听适配器bean

方式一 与 方式二 主要是实现订阅Redis推送的消息后的具体操作,这两种方式可以同时使用来订阅多个频道里的消息

//方式一:
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private CacheManager cacheManager;

    @Override
    public void onMessage(Message message, byte[] pattern) {
    // 接收的topic
    log.info("接收消息频道:" + new String(pattern));
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
    MessageDto<?> messageDto = (MessageDto<?>) redisTemplate.getValueSerializer().deserialize(message.getBody());
    //MessageDto<T>为自己编写的一个消息对象类(如自定义有:String data,String title,T content 等属性)
    log.info("接收消息内容:{}", messageDto);
    //根据消息内容做具体业务逻辑。。。。。。。。。
    //。。。。。。。。。。。。。。。。。。。。。。
    }
}
//方式二
@Slf4j
@Component
public class RedisMessageReceiver {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /**
     * 方法命名规则必须为function(Object messageDto) / function(Object messageDto,String topic)
     * @param messageDto 消息对象
     * @param topic 消息频道名称
     */
    public void onMessage(Object messageDto , String topic) {
        // 接收消息频道
        log.info("接收消息频道:" + topic);
        //接收消息内容
        log.info("接收消息内容:{}",messageDto);
    }
}

4.编写Redis消息的推送工具类,在需要推送消息的地方使用来向Redis推送消息(如:操作数据的地方)

@Slf4j
@Component
public class RedisMessageUtils {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /**
     * 向通道发布消息
     */
    public void sendMessage(String topic, Object message) {
        if (!StringUtils.hasText(topic)) {
            return;
        }
        try {
            redisTemplate.convertAndSend(topic, message);
            log.info("发送消息成功,topic:{},message:{}", topic, message);
        } catch (Exception e) {
            log.info("发送消息失败,topic:{},message:{}", topic, message);
            e.printStackTrace();
        }
    }
}

5.使用

@RestController
@RequestMapping("/user")
public class UserController{

  @Autowired
  private UserService userService;

  @PostMapping("/getUsers")
  public List<User> queryUsers(@RequestBody UserDto userDto){
   List<User> users=userService.queryUsers(userDto);
   //发送测试消息
   RedisMessageUtils.sendMessage(SystemConstants.TOPIC_NAME2, new MessageDto());
   return users;
  }
}

成功示例:

2099-12-31 23:59:59.999 [http-nio-8888-exec-1] INFO  com.xxx.yyy.util.RedisMessageUtils : 发送消息成功,topic:TOPIC2,message:MessageDto(data=null, title=null, content=null)
2099-12-31 23:59:59.999 [container-2] INFO  com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息频道:TOPIC2
2099-12-31 23:59:59.999 [container-2] INFO  com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息内容:MessageDto(data=null, title=null, content=null)

以上就是SpringBoot整合Redis实现消息发布与订阅的示例代码的详细内容,更多关于SpringBoot Redis消息发布 订阅的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot 整合Redis 数据库的方法

    Redis简介 Redis(官网: https://redis.io )是一个基于内存的日志型可持久化的缓存数据库,保存形式为key-value格式,Redis完全免费开源,它使用ANSI C语言编写.与其他的key - value缓存产品一样,Redis具有以下三个特点. • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用: • Redis不仅支持简单的key-value类型数据,同时还提供字符串.链表.集合.有序集合和哈希等数据结构的存储: • R

  • springboot 集成redis哨兵主从的实现

    目录 一.环境 二.POM文件 三.application.yml配置 四.reidsTemplate配置 五.单元测试(JUnit4) 一.环境 spring boot 2.3.12.RELEASEJDK 1.8IntelliJ IDEA开发工具Redis哨兵主从搭建 二.POM文件 pom文件其他忽略,只展示和redis有关系统的依赖 <dependency> <groupId>org.springframework.boot</groupId> <artif

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • SpringBoot+Redis+Lua分布式限流的实现

    Redis支持LUA脚本的主要优势 LUA脚本的融合将使Redis数据库产生更多的使用场景,迸发更多新的优势: 高效性:减少网络开销及时延,多次redis服务器网络请求的操作,使用LUA脚本可以用一个请求完成 数据可靠性:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入. 复用性:LUA脚本执行后会永久存储在Redis服务器端,其他客户端可以直接复用 可嵌入性:可嵌入JAVA,C#等多种编程语言,支持不同操作系统跨平台交互 简单强大:小巧轻便,资源占用率低,支持过程化和对象化的编程

  • SpringBoot整合Redis使用@Cacheable和RedisTemplate

    对之前网站做了一些很简单的优化,给用户列表加了一个分页功能. 分页就更好考虑加载速度,如果换一页就要等几秒,那体验感是非常差的. 因此想到了加一个redis缓存. springboot整合redis有两种方式: 一.使用注解,@EnableCaching @Cacheable . . . 等 二.使用RedisTemplate 两者都能操作缓存,使用RedisTemplate 操作肯定是比使用注解灵活.方便.但是从理论上来讲注解方式速度应该更快,因为使用注解如果在缓存中有就直接从缓存中取,不用进

  • Spring Boot整合 NoSQL 数据库 Redis详解

    目录 引言 一.环境准备 二.构建Spring Boot项目 三.引入Redis依赖 四.Reds相关配置 五.添加Redis配置类 六.测试一下 引言 在日常的开发中,除了使用 Spring Boot 这个企业级快速构建项目的框架之外,随着业务数据量的大幅度增加,对元数据库造成的压力成倍剧增.在此背景下, Redis 这个 NoSQL 数据库已然整个项目架构中的不可或缺的一部分,懂得如何 Spring Boot 整合 Redis ,是当今开发人员必备的一项技能,接下来对整合步骤进行详细说明.

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • SpringBoot整合Redis正确的实现分布式锁的示例代码

    前言 最近在做分块上传的业务,使用到了Redis来维护上传过程中的分块编号. 每上传完成一个分块就获取一下文件的分块集合,加入新上传的编号,手动接口测试下是没有问题的,前端通过并发上传调用就出现问题了,并发的get再set,就会存在覆盖写现象,导致最后的分块数据不对,不能触发分块合并请求. 遇到并发二话不说先上锁,针对执行代码块加了一个JVM锁之后问题就解决了. 仔细一想还是不太对,项目是分布式部署的,做了负载均衡,一个节点的代码被锁住了,请求轮询到其他节点还是可以进行覆盖写,并没有解决到问题啊

  • SpringBoot+Redis实现消息的发布与订阅的示例代码

    目录 1.什么是redis的发布与订阅 2.Redis发布订阅 3.命令行实现功能 订阅主题 模式匹配订阅 发布消息 取消订阅 测试 4.SpringBoot实现功能 Springboot整合Redis 配置消息监听 测试 1.什么是redis的发布与订阅 在官网的文档介绍中有一行介绍:Redis是一个快速稳定的发布/订阅消息系统. 2.Redis发布订阅 机制 Redis提供了发布与订阅的功能,可以用于消息的传输,Redis的发布订阅机制包括三部分,发布者.订阅者和Channel(主题或者队列

  • Spring boot+redis实现消息发布与订阅的代码

    一.创建spring boot项目 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId>

  • springboot整合mybatis-plus实现多表分页查询的示例代码

    1.新建一个springboot工程 2.需要导入mybatis和mybatis-plus的依赖文件 <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.1.1</version> </dependency> <dependency> &l

  • Springboot使用redis实现接口Api限流的示例代码

    前言 该篇介绍的内容如题,就是利用redis实现接口的限流(  某时间范围内 最大的访问次数 ) . 正文 惯例,先看下我们的实战目录结构: 首先是pom.xml 核心依赖: <!--用于redis数据库连接--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId>

  • SpringBoot整合Redis、ApachSolr和SpringSession的示例

    本文介绍了SpringBoot整合Redis.ApachSolr和SpringSession,分享给大家,具体如下: 一.简介 SpringBoot自从问世以来,以其方便的配置受到了广大开发者的青睐.它提供了各种starter简化很多繁琐的配置.SpringBoot整合Druid.Mybatis已经司空见惯,在这里就不详细介绍了.今天我们要介绍的是使用SpringBoot整合Redis.ApacheSolr和SpringSession. 二.SpringBoot整合Redis Redis是大家比

  • SpringBoot整合WxJava开启消息推送的实现

    目录 1.引入 WxJava 依赖 2.申请微信小程序 3.微信小程序配置信息 4.消息推送配置 5.接收消息推送的 API 6.消息推送测试 接入微信小程序消息推送服务,可以3种方式选择其一: 1.开发者服务器接收消息推送2.云函数接收消息推送3.微信云托管服务接收消息推送 开发者服务器接收消息推送,开发者需要按照如下步骤完成: 1.填写服务器配置2.验证服务器地址的有效性3.据接口文档实现业务逻辑,接收消息和事件 1.引入 WxJava 依赖 <!-- web支持 --> <depe

  • Springboot整合Redis与数据持久化

    目录 Springboot整合Redis 使用json方式存储 序列化方式存储数据 MySQL与Redis一致性解决同步问题 Redis持久化机制 全量同步与增量同步 RDB与AOF RDB AOF Springboot整合Redis 有两种存储数据的方式: 方案1:在Redis存放一个对象 使用json序列化与反序列化 方案2:直接使用redis自带序列化方式存储对象 maven依赖 <parent> <groupId>org.springframework.boot</g

随机推荐