redis stream 实现消息队列的实践

目录
  • redis 实现消息对列4中方法
    • 发布订阅
    • list 队列
    • zset 队列
    • Stream 队列
  • 基本命令
    • xadd 生产消息
    • 读取消息
    • xgroup 消费者组
    • xreadgroup 消费消息
    • Pending 等待列表
    • 消息确认
    • 消息转移
    • 信息监控
  • SpringBoot 整合
  • 参考文档:

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

基于redis实现消息队列的方式有很多:

  • PUB/SUB,订阅/发布模式
  • 基于List的 LPUSH+BRPOP 的实现

redis 实现消息对列4中方法

发布订阅

发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。

list 队列

生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

zset 队列

生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。

zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

Stream 队列

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

提供消息ack机制。

基本命令

xadd 生产消息

往 stream 内创建消息 语法为:

XADD key ID field string [field string …]

# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
xadd stream1 * name zs age 23

读取消息

读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
xread count 1 streams stream1 0
#表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
xread count 1 streams msg 1649143363972-0

#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
xread count 1 block 0 streams stream1 $ 

xrange stream - + 10

XRANGE key startID endID count

#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
xrange stream1 - + 10

xgroup 消费者组

redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

创建消费者组:

#消费消息首先得创建消费者组
# 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
xgroup create stream1 group1 0

#查询stream1内的所有消费者组信息
xinfo groups stream1

xreadgroup 消费消息

通过xreadgroup可以在消费者组内创建消费者消费消息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#创建消费者读取消息
#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 >

Pending 等待列表

通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

XPENDING key group [start end count] [consumer]

#查看pending列表
# 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1组内的所有消费者pending类表
xpending stream1 group1 - + 10

消息确认

当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

消息转移

当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

信息监控

redis提供了xinfo来查看stream的信息

#查看sream信息
xinfo stream steam1
#查询消费者组信息
xinfo groups group1 

#查询消费者信息
xinfo consumers consumer1

SpringBoot 整合

1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 编写消费者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    public final String streamName      = "emailStream";
    public final String groupName       = "emailGroup";
    public final String consumerName    = "emailConsumer";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        //log.info("stream名称-->{}",message.getStream());
        //log.info("消息ID-->{}",message.getId());
        log.info("消息内容-->{}",message.getValue());

        Map<String, String> msgMap = message.getValue();

        if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
            //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务
            log.info("消费异常-->"+message);
           return;
        }

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //消息应答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
	//我们可以启动定时任务不断监听pending列表,处理死信消息
}

3 配置redis

序列化配置

@EnableCaching
@Configuration
public class RedisConfig {

    /**
     * 设置redis序列化规则
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        return jackson2JsonRedisSerializer;
    }

    /**
     * RedisTemplate配置
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();

        // key序列化
        redisTemplate.setKeySerializer(stringSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        // Hash key序列化
        redisTemplate.setHashKeySerializer(stringSerializer);
        // Hash value序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

消费者组和消费者配置

@Slf4j
@Configuration
public class RedisStreamConfig {

    @Autowired
    private EmailConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(3))
                //count 数量(一次只获取一条消息)
                .batchSize(1)
                //序列化规则
                .serializer( stringRedisSerializer )
                .build();
    }

    /**
     * 开启监听器接收消息
     */
    @Bean
    public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if( !redisTemplate.hasKey(emailConsumer.streamName)){
            redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",emailConsumer.streamName);
        }

        //创建消费者组
        try {
            redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在",emailConsumer.groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
            Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
            StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
            emailConsumer
        );

        listenerContainer.start();
        return listenerContainer;
    }

}

4.生产者生产消息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){

    StreamOperations streamOperations = redisTemplate.opsForStream();

    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);

        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add("emailStream",msgMap);
    }
    return "success";
}

参考文档:

redis Stream 消息队列

SpringBoot整合redis stream 实现消息队列

到此这篇关于redis stream 实现消息队列的实践的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redis 中使用 list,streams,pub/sub 几种方式实现消息队列的问题

    目录 使用Redis实现消息队列 基于List的消息队列 分析下源码实现 基于Streams的消息队列 分析下源码实现 stream的结构 streamCG消费者组 streamConsumer消费者结构 发布订阅 普通的订阅 基于模式(pattern)的发布/订阅 看下源码实现 总结 参考 使用 Redis 实现消息队列 Redis 中也是可以实现消息队列 不过谈到消息队列,我们会经常遇到下面的几个问题 1.消息如何防止丢失: 2.消息的重复发送如何处理: 3.消息的顺序性问题: 关于 mq

  • redis stream 实现消息队列的实践

    目录 redis 实现消息对列4中方法 发布订阅 list 队列 zset 队列 Stream 队列 基本命令 xadd 生产消息 读取消息 xgroup 消费者组 xreadgroup 消费消息 Pending 等待列表 消息确认 消息转移 信息监控 SpringBoot 整合 参考文档: Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于redis实现消息队列的方式有很多: PUB/S

  • 使用 Redis 流实现消息队列的代码

    在介绍了 Redis 流的基本功能之后, 现在是时候使用这些功能来构建一些实际的应用了. 消息队列作为流的典型应用之一, 具有非常好的示范性, 因此我们将使用 Redis 流的相关功能构建一个消息队列应用, 这个消息队列跟我们之前使用其他 Redis 数据结构构建的消息队列具有相似的功能. 代码清单 10-1 展示了一个具有基本功能的消息队列实现: 代码最开头的是几个转换函数, 它们负责对程序的相关输入输出进行转换和格式化: MessageQueue 类用于实现消息队列, 它的添加消息.移除消息

  • SpringBoot中使用Redis Stream实现消息监听示例

    目录 Demo环境 仓库地址 POM依赖 配置监听消息类 监听俩个stream的实现 [问题补充]确认完消息删除消息 [问题补充]自动初始化stream的key和group问题-最新更新-2021年12月4日 Demo环境 JDK8 Maven3.6.3 springboot2.4.3 redis_version:6.2.1 仓库地址 https://gitee.com/hlovez/redismq.git. POM依赖 <?xml version="1.0" encoding=

  • PHP使用php-resque库配合Redis实现MQ消息队列的教程

    消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验. 为了实现类似的需求,Web项目中一般的实现方法是使用消息队列(Message Queue),比如MemcacheQ,RabbitMQ等等,都是很著名的产品. 消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本.正是因为消息队列实在太简单了,当拿着消息

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • 深入理解redis分布式锁和消息队列

    最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了. redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String loc

  • Redis 使用 List 实现消息队列的优缺点

    目录 什么是消息队列 消息队列满足哪些特性 消息有序性 重复消息处理 可靠性 List 实现消息队列 LPUSH RPOP 实时消费问题 重复消费 消息可靠性 需要注意的是 Redission 实战 添加依赖 Java 代码实战 单元测试 总结 需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存. 分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦.流量消峰.实现最终一致性. 目前市面上已经有 RabbitMQ.RochetMQ.A

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

  • 消息队列应用场景介绍

    一.什么是队列 队列(Queue)是一种常见的数据结构,其最大的特点就是先进先出(First In First Out),作为最基础的数据结构,队列应用很广泛.比如火车站排队买票等等.可以用下图表示队列: 其中a1.a2.an表示队列中的数据.数据从队尾入队列,然后从队头出队列. 二.什么是消息队列 消息队列(Message Queue)是一种使用队列(Queue)作为底层存储数据结构,可以用于解决不同进程与应用程序之间通讯的分布式消息容器,也可以称为消息中间件. 目前比较常用的消息队列有Act

随机推荐