Spring boot+redis实现消息发布与订阅的代码

一.创建spring boot项目

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.41</version>
  </dependency>

二.编辑yml配置文件

server:
 port: 7888
# 日志配置
logging:
 config: classpath:log/logback.xml
 level:
 cn.com.dhcc: info
 org.springframework: info
 org.springframework.web: info
 com.alibaba.nacos.client.naming: error
spring:
 redis:
  host: localhost
  port: 6379
  password: *********
  database: 1
  jedis:
  pool:
  max-idle: 8
  max-active: 8
  max-wait: -1
  min-idle: 0
  timeout: 5000

三.配置Redis

@Configuration
public class RedisConfiguration {

 /**
  * 实例化 RedisTemplate 对象
  *
  * @return
  */
 @Bean("RedisTemplateS")
 public RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
  RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
  initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
  return redisTemplate;
 }

 /**
  * 设置数据存入 redis 的序列化方式,并开启事务
  *
  * @param redisTemplate
  * @param factory
  */
 private void initDomainRedisTemplate(@Qualifier("RedisTemplateS") RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
  // 如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to
  // String!
  redisTemplate.setKeySerializer(new StringRedisSerializer());
  redisTemplate.setHashKeySerializer(new StringRedisSerializer());

  FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<Object>(Object.class);
  redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
  redisTemplate.setValueSerializer(fastJsonRedisSerializer);
  //redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
  //redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  // 开启事务
  redisTemplate.setEnableTransactionSupport(true);
  redisTemplate.setConnectionFactory(factory);
 }

 /**
  * 注入封装RedisTemplate @Title: redisUtil @return RedisUtil @date
  *
  */
 @Bean(name = "redisUtils")
 public RedisUtils redisUtil(@Qualifier("RedisTemplateS") RedisTemplate<String, Object> redisTemplate) {
  RedisUtils redisUtil = new RedisUtils();
  redisUtil.setRedisTemplate(redisTemplate);
  return redisUtil;
 }

四.编写RedisUtil消息发布方法

public class RedisUtils {
 private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);

 private RedisTemplate<String, Object> redisTemplate;

 public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
  this.redisTemplate = redisTemplate;
 }

 public void publish(String channal ,Object obj) {
  redisTemplate.convertAndSend(channal,obj );
 }
}

五.配置消息监听

@Configuration
public class RedisMessageListener {

 /**
  * 创建连接工厂
  * @param connectionFactory
  * @param listenerAdapter
  * @return
  */
 @Bean
 public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
             MessageListenerAdapter listenerAdapter,MessageListenerAdapter listenerAdapter2){
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  //接受消息的key
  container.addMessageListener(listenerAdapter,new PatternTopic("phone"));
  return container;
 }

 /**
  * 绑定消息监听者和接收监听的方法
  * @param receiver
  * @return
  */
 @Bean
 public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver){
  return new MessageListenerAdapter(receiver,"receiveMessage");
 }

 /**
  * 注册订阅者
  * @param latch
  * @return
  */
 @Bean
 ReceiverRedisMessage receiver(CountDownLatch latch) {
  return new ReceiverRedisMessage(latch);
 }

 /**
  * 计数器,用来控制线程
  * @return
  */
 @Bean
 public CountDownLatch latch(){
  return new CountDownLatch(1);//指定了计数的次数 1
 }
}

六.消息订阅方法

public class ReceiverRedisMessage {

 private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class);
 private CountDownLatch latch;

 @Autowired
 public ReceiverRedisMessage(CountDownLatch latch) {
  this.latch = latch;
 }

 /**
  * 队列消息接收方法
  *
  * @param jsonMsg
  */
 public void receiveMessage(String jsonMsg) {
  log.info("[开始消费REDIS消息队列phone数据...]");
  try {
   log.info("监听者收到消息:{}", jsonMsg);
   JSONObject exJson = JSONObject.parseObject(jsonMsg);
   User user = JSON.toJavaObject(exJson, User.class);
   System.out.println("转化为对象 :"+user);
   log.info("[消费REDIS消息队列phone数据成功.]");
  } catch (Exception e) {
   log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage());
  }
  latch.countDown();
 }
}

七.定时消息发布测试

@EnableScheduling
@Component
public class PublisherController {

 private static final Logger log = LoggerFactory.getLogger(PublisherController.class);

 @Autowired
 private RedisUtils redisUtils;

 @Scheduled(fixedRate = 5000)
 public String pubMsg() {
  User user=new User(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县");
  redisUtils.publish("phone", user);
  log.info("Publisher sendes Topic... ");
  return "success";
 }
}

八.测试结果

九.发布对象User实体

public class User implements Serializable {

 /**
  *
  */
 private static final long serialVersionUID = 1L;
 private int id;
 private String name;
 private int age;
 private String sex;
 private String address;
  .....................
}

到此这篇关于Spring boot+redis实现消息发布与订阅的文章就介绍到这了,更多相关Spring boot redis消息发布与订阅内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python redis操作实例分析【连接、管道、发布和订阅等】

    本文实例讲述了Python redis操作.分享给大家供大家参考,具体如下: 一.redis redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与me

  • 在SpringBoot中添加Redis及配置方法

    在实际的开发中,会有这样的场景.有一个微服务需要提供一个查询的服务,但是需要查询的数据库表的数据量十分庞大,查询所需要的时间很长. 此时就可以考虑在项目中加入缓存. 引入依赖 在maven项目中引入如下依赖.并且需要在本地安装redis. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifac

  • Spring Boot单元测试中使用mockito框架mock掉整个RedisTemplate的示例

    概述 当我们使用单元测试来验证应用程序代码时,如果代码中需要访问Redis,那么为了保证单元测试不依赖Redis,需要将整个Redis mock掉.在Spring Boot中结合mockito很容易做到这一点,如下代码: import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration

  • SpringBoot集成Redisson实现分布式锁的方法示例

    上篇 <SpringBoot 集成 redis 分布式锁优化>对死锁的问题进行了优化,今天介绍的是 redis 官方推荐使用的 Redisson ,Redisson 架设在 redis 基础上的 Java 驻内存数据网格(In-Memory Data Grid),基于NIO的 Netty 框架上,利用了 redis 键值数据库.功能非常强大,解决了很多分布式架构中的问题. Github的wiki地址: https://github.com/redisson/redisson/wiki 官方文档

  • nodejs redis 发布订阅机制封装实现方法及实例代码

     nodejs redis 发布订阅机制封装 最近项目使用redis,对publish 和 subscribe的使用进行了了解,并进行了封装. var config = require('../config/config'); var log = require("./loghelp"); var redis = require("redis"); function initialclient(param) { var option={ host: config.r

  • Redis 订阅发布_Jedis实现方法

    我想到使用Redis的订阅发布模式是用来解决推送问题的-. 对于概念性的叙述,多多少少还是要提一下的: 什么是Redis发布订阅?Redis发布订阅是一种消息通信模式,发送者通过通道A发送消息message,订阅过通道A的客户端就可以接收到消息message.嗯度娘上面的解释要比我所说的好多了,而我所理解的就是:所谓的订阅发布模式,其实和我们看电视,听广播差不多,在我们没有调台(换频道)的时候,那个频道也是在传递消息的(发布).我们换到那个频道上(订阅)就能接收到消息了.是的,虽然可能有些不恰当

  • Spring Boot整合Redis的完整步骤

    前言 实际 开发 中 缓存 处理是必须的,不可能我们每次客户端去请求一次 服务器 ,服务器每次都要去 数据库 中进行查找,为什么要使用缓存?说到底是为了提高系统的运行速度.将用户频繁访问的内容存放在离用户最近,访问速度最 快的 地方,提高用户的响 应速度,今天先来讲下在 springboot 中整合 redis 的详细步骤. 一.Spring Boot对Redis的支持 Spring对Redis的支持是使用Spring Data Redis来实现的,一般使用Jedis或者lettuce(默认),

  • Spring boot+redis实现消息发布与订阅的代码

    一.创建spring boot项目 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId>

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • spring boot+redis 监听过期Key的操作方法

    前言: 在订单业务中,有时候需要对订单设置有效期,有效期到了后如果还未支付,就需要修改订单状态.对于这种业务的实现,有多种不同的办法,比如: 1.使用querytz,每次生成一个订单,就创建一个定时任务,到期后执行业务代码: 2.rabbitMq中的延迟队列: 3.对Redis的Key进行监控: 1.引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • spring boot+ redis 接口访问频率限制的实现

    生产环境下可以解决的问题: 1.短信验证码请求评率限制(防止抓包短信轰炸) 2.热点数据请求评率限制(防止数据库爆炸) @Component public class BlackInterceper implements HandlerInterceptor { @Autowired private RedisTemplate<String, Object> redisTemplate; private Logger log = LoggerFactory.getLogger(this.get

  • Spring Boot应用通过Docker发布部署的流程分析

    目录 手动部署 1.idea创建spring boot项目 2.项目打成 Jar 包 3.构建 docker image 4.查看并运行镜像 插件部署 运行推送命令 将Spring Boot项目部署到docker中有两种方法,手动部署和插件部署 手动部署 1.idea创建spring boot项目 pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http:/

  • 关于Spring Boot项目的 log4j2 核弹漏洞问题(一行代码配置搞定)

    看到群里还有小伙伴说公司里还特别建了800+人的群在处理... 好在很快就有了缓解措施和解决方案.同时,log4j2官方也是速度影响发布了最新的修复版本.各应用方也可以执行较为稳定的修复方案了. 不过我看到群里发出来的各种修复方法,还真是不好看...所以这里也提一下Spring Boot用户怎么修复最简单吧. 最简修复方式 有些小伙伴其实想到了直接通过Spring Boot的Starter去解决,所以还给Spring Boot提了Issue,希望spring-boot-starter-log4j

  • Spring Boot/VUE中路由传递参数的实现代码

    在路由时传递参数,一般有两种形式,一种是拼接在url地址中,另一种是查询参数.如:http://localhost:8080/router/tang/101?type=spor&num=12.下面根据代码看一下,VUE 和 Spring Boot 中各自是如何处理传递和接受参数的. Spring Boot package com.tang.demo1.controller; import org.springframework.web.bind.annotation.*; @RestContro

  • spring boot springjpa 支持多个数据源的实例代码

    1.SpringBoot的程序启动类 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.web

  • Spring boot通过HttpSessionListener监听器统计在线人数的实现代码

    首先说下,这个统计在线人数有个缺陷,一个人在线可以同时拥有多个session,导致统计有一定的不准确行. 接下来,开始代码的编写, 第一步:实现HttpSessionListener中的方法,加上注解@WebListener @WebListener public class SessionListener implements HttpSessionListener{ public void sessionCreated(HttpSessionEvent arg0) { // TODO Aut

随机推荐