SpringBoot中使用Redis Stream实现消息监听示例

目录
  • Demo环境
  • 仓库地址
  • POM依赖
  • 配置监听消息类
  • 监听俩个stream的实现
  • 【问题补充】确认完消息删除消息
  • 【问题补充】自动初始化stream的key和group问题-最新更新-2021年12月4日

Demo环境

  • JDK8
  • Maven3.6.3
  • springboot2.4.3
  • redis_version:6.2.1

仓库地址

https://gitee.com/hlovez/redismq.git.

POM依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>vip.huhailong</groupId>
	<artifactId>redismq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>redismq</name>
	<description>base redis stream mq</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

这里是一个简单的Demo,所以关于redis的一些序列化配置就省略了。

配置监听消息类

配置监听消息类,这里类需要实现StreamListener接口,该接口下只有一个要实现的方法——onMessage方法,代码:

package vip.huhailong.redismq.redistool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
 * @author Huhailong
 * @Description 监听消息
 * @Date 2021/3/10.
 */
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到来自redis的消息");
        System.out.println("message id "+entries.getId());
        System.out.println("stream "+entries.getStream());
        System.out.println("body "+entries.getValue());
    }

}

配置完该类后我们再创建一个类将该监听器注入进去,代码:

package vip.huhailong.redismq.config;

import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import vip.huhailong.redismq.redistool.ListenerMessage;

import java.time.Duration;

/**
 * @author Huhailong
 * @Description
 * @Date 2021/3/12.
 */
@Configuration
public class RedisStreamConfig {

    @Autowired
    private ListenerMessage streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }
}

代码分析:

首先将我们实现了StreamListener的监听器类注入。

subscription方法返回的是一个Subscription类型,它是与当前正在运行任务的链接,可以理解为订阅的链接,它有俩个方法

  • boolean await(Duration timeout): 当订阅变为活动或超时时,同步阻塞呼叫将返回
  • boolean isActive(): 如果当前正在订阅则为true

代码中的var是使用了Lombok的可变局部变量。主要是为了方便

StreamMessageListenerContainer: 消息侦听容器,不能在外部实现。创建后,StreamMessageListenerContainer可以订阅Redis流并使用传入的消息。 StreamMessageListenerContainer允许多个流读取请求,并为每个读取请求返回一个Subscription句柄。取消订阅最终将终止后台轮询。使用键和值序列化器转换消息以支持各种序列化策略。具体文档

__StreamMessageListenerContainerOptions: __ 它是上面4的选项,代码中pollTimeout表示轮询超时时间

create方法使用给定的RedisConnectionFactory和上面的配置选项创建侦听器。

接下来使用receiveAutoAck创建一个新订阅,注意这里接受到消息后会被自动的确认,如果不想自动确认请使用其他的创建订阅方式。该方法共有三个参数

  • 消费组 consumer group ,它不能为null (Consumer类型)
  • stream offset ,stream的偏移量(StreamOffset 类型)
  • listener 不能为null (StreamListener<K,V> 类型)

代码中表示消费者来自名称为mygroup的组,消费者名称为huhailong,这里偏移量设置为了lastConsumed,它表示读取ID大于消费者组使用的最后一个元素的所有新到达的元素。

现在就可以运行项目来验证了,将项目运行起来后通过终端给对应key的stream添加一条消息

> XADD mystream * message springboot

这时可以看到spring boot的控制台打印除了一下消息:

message id 1615532778588-0
stream mystream
body {message=springboot}

说明侦听成功,它会一直处于监听状态,只要对应key的stream添加了新的消息都会被侦听到,到此也就简单的实现了消息队列功能。

监听俩个stream的实现

有网友想实现俩个或俩个以上的stream监听,可以这样实现(有其他更好的方法请留言评论,谢谢)
在上面监听一个stream的基础上在RedisStreamConfig类里增加监听方法,如下:

@Configuration
public class RedisStreamConfig {

    @Autowired
    private ListenerMessage streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
		initStream("mystream","mygroup");	//详细描述请看下方的问题补充——初始化key和group
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }

    @Bean
    public Subscription subscription2(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
		initStream("mystream","mygroup");	//详细描述请看下方的问题补充——初始化key和group
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }
}

记得创建该stream和对应的组,让后启动程序,在控制台模拟添加数据,如图:

然后idea控制台的打印可以看到分别接收到了来自不同的stream的消息

其他类和配置与监听一个的时候相同,不需要改变。

【问题补充】确认完消息删除消息

消息确认完后消息实际上没有在redis中消失,这也是redis stream中的一个特性,如果要想真正的删除该消息,需要我们进行指定字段ID删除,如下:

在RedisUtil中添加删除方法:

public void delField(String key, String fieldId){
        redisTemplate.opsForStream().delete(key,fieldId);
}

然后再ListenerMessage中的onMessage方法里处理完消息后添加删除该消息的方法:

@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    @Autowired
    RedisUtil redisUtil;

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到来自redis的消息");
        System.out.println("message id "+entries.getId().getValue());
        System.out.println("stream "+entries.getStream());
        System.out.println("body "+entries.getValue());
        //下面是删除该消息的方法
        redisUtil.delField("mystream",entries.getId().getValue());
    }

}

【问题补充】自动初始化stream的key和group问题-最新更新-2021年12月4日

最近看到评论大部分有报找不到key和group的错信息,这个是因为没有初始化导致的,当时在写这个demo的时候为了简单测试直接写死了我已经存在的key和group,部分朋友不知道这个是需要初始化的,所以我更新了一下代码,增加了初始化key和group的方法,这样大家直接运行代码就可以了,不需要在做多余的操作。
RedisUtil中增加增加的代码:

public RecordId addStream(String key, Map<String,Object> message){
        RecordId add = redisTemplate.opsForStream().add(key, message);
        return add;	//返回增加后的id
    }

public void addGroup(String key, String groupName){
     redisTemplate.opsForStream().createGroup(key,groupName);
 }

/**
* 用来判断key是否存在
*/
public boolean hasKey(String key){
     if(key==null){
         return false;
     }else{
         return redisTemplate.hasKey(key);
     }

 }

然后再RedisStreamConfig中增加初始化方法,这里注意要判断key是否存在:

private void initStream(String key, String group){
	//判断key是否存在,如果不存在则创建
    boolean hasKey = redisUtil.hasKey(key);
    if(!hasKey){
        Map<String,Object> map = new HashMap<>();
        map.put("field","value");
        RecordId recordId = redisUtil.addStream(key, map);
        redisUtil.addGroup(key,group);
        //将初始化的值删除掉
        redisUtil.delField(key,recordId.getValue());
        log.info("stream:{}-group:{} initialize success",key,group);
    }
}

然后再上面配置监听的方法里调用这个初始化方法就可以了

@Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        initStream("mystream","mygroup");	//调用初始化
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }

https://gitee.com/hlovez/redismq.git已更新,代码已优化整理上传

到此这篇关于SpringBoot中使用Redis Stream实现消息监听示例的文章就介绍到这了,更多相关SpringBoot 消息监听内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring Boot监听Redis Key失效事件实现定时任务的示例

    业务场景 我们以订单功能为例说明下: 生成订单后一段时间不支付订单会自动关闭.最简单的想法是设置定时任务轮询,但是每个订单的创建时间不一样,定时任务的规则无法设定,如果将定时任务执行的间隔设置的过短,太影响效率. 还有一种想法,在用户进入订单界面的时候,判断时间执行相关操作.方式可能有很多,在这里介绍一种监听 Redis 键值对过期时间来实现订单自动关闭. 实现思路 在生成订单时,向 Redis 中增加一个 KV 键值对,K 为订单号,保证通过 K 能定位到数据库中的某个订单即可,V 可为任意值

  • spring boot+redis 监听过期Key的操作方法

    前言: 在订单业务中,有时候需要对订单设置有效期,有效期到了后如果还未支付,就需要修改订单状态.对于这种业务的实现,有多种不同的办法,比如: 1.使用querytz,每次生成一个订单,就创建一个定时任务,到期后执行业务代码: 2.rabbitMq中的延迟队列: 3.对Redis的Key进行监控: 1.引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • springboot+redis过期事件监听实现过程解析

    1 修改 redis.conf配置文件: K Keyspace events, published with keyspace@ prefix事件 E Keyevent events, published with keyevent@ prefix g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, - $ String commands l List commands s Set commands h Hash co

  • SpringBoot如何整合redis实现过期key监听事件

    可以用于简单的过期订单取消支付.7天自动收货场景中 1.Spring Boot整合redis 参考 https://www.jb51.net/article/170687.htm 2.打开redis服务的配置文件添加notify-keyspace-events Ex 如果是注释了,就取消注释 Linux安装redis:https://www.jb51.net/article/193265.htm Windows安装redis:https://www.jb51.net/article/176040

  • SpringBoot如何监控Redis中某个Key的变化(自定义监听器)

    目录 SpringBoot 监控Redis中某个Key的变化 1.声明 2.基本理念 3.实现和创建监听 4.基本demo的其他配置 5.基本测试 6.小结一下 SpringBoot自定义监听器 原理 示例 SpringBoot 监控Redis中某个Key的变化 1.声明 当前内容主要为本人学习和基本测试,主要为监控redis中的某个key的变化(感觉网上的都不好,所以自己看Spring源码直接写一个监听器) 个人参考: Redis官方文档 Spring-data-Redis源码 2.基本理念

  • SpringBoot中使用Redis Stream实现消息监听示例

    目录 Demo环境 仓库地址 POM依赖 配置监听消息类 监听俩个stream的实现 [问题补充]确认完消息删除消息 [问题补充]自动初始化stream的key和group问题-最新更新-2021年12月4日 Demo环境 JDK8 Maven3.6.3 springboot2.4.3 redis_version:6.2.1 仓库地址 https://gitee.com/hlovez/redismq.git. POM依赖 <?xml version="1.0" encoding=

  • redis stream 实现消息队列的实践

    目录 redis 实现消息对列4中方法 发布订阅 list 队列 zset 队列 Stream 队列 基本命令 xadd 生产消息 读取消息 xgroup 消费者组 xreadgroup 消费消息 Pending 等待列表 消息确认 消息转移 信息监控 SpringBoot 整合 参考文档: Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于redis实现消息队列的方式有很多: PUB/S

  • spring整合redis消息监听通知使用的实现示例

    目录 问题引入 1.1 过期问题描述 1.2 常用解决方案分析 1.3.整合SpringData Redis开发 spring整合redis监听消息 1. 配置监听redis消息 2 测试消息 结合redis的key失效机制和消息完成过期优惠券处理 1 模拟过期代金卷案例 2 配置redis中key失效的消息监听 3 接收失效消息完成过期代金卷处理 问题引入 在电商系统中,秒杀,抢购,红包优惠卷等操作,一般都会设置时间限制,比如订单15分钟不付款自动关闭,红包有效期24小时等等.那对于这种需求最

  • springboot中使用redis并且执行调试lua脚本

    目录 原因: 1.创建一个基本的web项目 2.配置redis 3.测试redis 的lua脚本 4.技术点 5.调试方式 1.进入服务关闭关闭正在运行的服务器 2.从命令行启动redis 3.在lua脚本中增加打印 4.运行代码 6.总结 今天有个项目需要使用redis,并且有使用脚本的需求.但是因为之前没有写过,所以还有一点点不熟悉,今天记录一下. 原因: 原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令插入. 1.创建一个基本的web项目 文件 ->新建 -> 项目,

  • springboot中使用redis由浅入深解析

    正文 很多时候,我们会在springboot中配置redis,但是就那么几个配置就配好了,没办法知道为什么,这里就详细的讲解一下 这里假设已经成功创建了一个springboot项目. redis连接工厂类 第一步,需要加上springboot的redis jar包 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redi

  • springboot中使用redis的方法代码详解

    特别说明: 本文针对的是新版 spring boot 2.1.3,其 spring data 依赖为 spring-boot-starter-data-redis,且其默认连接池为 lettuce ​redis 作为一个高性能的内存数据库,如果不会用就太落伍了,之前在 node.js 中用过 redis,本篇记录如何将 redis 集成到 spring boot 中.提供 redis 操作类,和注解使用 redis 两种方式.主要内容如下: •docker 安装 redis •springboo

  • 在SpringBoot中添加Redis及配置方法

    在实际的开发中,会有这样的场景.有一个微服务需要提供一个查询的服务,但是需要查询的数据库表的数据量十分庞大,查询所需要的时间很长. 此时就可以考虑在项目中加入缓存. 引入依赖 在maven项目中引入如下依赖.并且需要在本地安装redis. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifac

  • SpringBoot中使用redis做分布式锁的方法

    一.模拟问题 最近在公司遇到一个问题,挂号系统是做的集群,比如启动了两个相同的服务,病人挂号的时候可能会出现同号的情况,比如两个病人挂出来的号都是上午2号.这就出现了问题,由于是集群部署的,所以单纯在代码中的方法中加锁是不能解决这种情况的.下面我将模拟这种情况,用redis做分布式锁来解决这个问题. 1.新建挂号明细表 2.在idea上新建项目 下图是创建好的项目结构,上面那个parent项目是其他项目不用管它,和新建的没有关系 3.开始创建controller,service,dao(mapp

  • SpringBoot中使用Redis的完整实例

    一.在SpringBoot中使用Redis的一套军体拳 1.导包 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.0.RELEASE</version> </dependency> 2.导入工具类 packag

随机推荐