redis实现队列的阻塞、延时、发布和订阅

目录
  • 普通队列
  • 阻塞队列
  • 发布订阅模式
  • 延时队列和优先级队列
  • 应用场景

Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。

普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左进右出的队列
  • rpush+lpop:左出右进的队列

下面使用redis的命令来模拟普通队列。
使用lpush命令生产消息:

>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"

使用rpop命令消费消息:

>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"

下面使用Java代码来实现普通队列。

生产者SingleProducer

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

/**
 * 生产者
 */
public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

消费者SingleConsumer:

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

  • 消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。
  • 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

阻塞队列

消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,

使用redis的brpop命令来模拟阻塞队列。

>brpop queue:single 30

可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。

Java代码实现如下:

生产者与普通队列的生产者一致。

消费者BlockConsumer:

package com.morris.redis.demo.queue.block;

import redis.clients.jedis.Jedis;

import java.util.List;

/**
 * 消费者
 */
public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }
}

缺点:无法实现一次生产多次消费。

发布订阅模式

Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。

发布:PUBLISH指令可用于发布一条消息,格式:

PUBLISH channel message

返回值表示订阅了该消息的数量。

订阅:SUBSCRIBE指令用于接收一条消息,格式:

SUBSCRIBE channel

使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。

回复分为三种类型:

  • 如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量
  • 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
  • 如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

下面使用redis的命令来模拟发布订阅模式。

生产者:

127.0.0.1:6379> publish queue hello
(integer) 1
127.0.0.1:6379> publish queue hi
(integer) 1

消费者:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "hello"
1) "message"
2) "queue"
3) "hi"

Java代码实现如下:

生产者PubsubProducer:

​
package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;

/**
 * 生产者
 */
public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

​

消费者PubsubConsumer:

package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * 消费者
 */
public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }
}

消费者可以启动多个,每个消费者都能收到所有的消息。

可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。

Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。

同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE \*不会匹配到channel.\*,所以要取消订阅channel.\*就要这样写PUBSUBSCRIBE channel.\*。

Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Random;

/**
 * 生产者
 */
public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            int second = random.nextInt(30); // 随机订单失效时间
            jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
        }
        jedis.close();
    }
}

消费者:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time < now) {
                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
                        System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    break;
                }
            }
        }
    }
}

应用场景

延时队列可用于订单超时失效的场景
二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。

到此这篇关于redis实现队列的阻塞、延时、发布和订阅的文章就介绍到这了,更多相关redis 队列阻塞、延时、发布和订阅内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java redisTemplate阻塞式处理消息队列

    目录 Redis 消息队列 redis五种数据结构 队列生产者 队列消费者 测试类 并发情况下使用increment递增 补充 Redis 消息队列 redis五种数据结构 队列生产者 package cn.stylefeng.guns.knowledge.modular.knowledge.schedule; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; i

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • 基于Redis实现阻塞队列

    日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理. 这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味. 也可以使用基于 JVM 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失. 大部分的项目都会用到 Redis

  • redis实现延时队列的两种方式(小结)

    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时.按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢:2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟.所以就想到用延迟队列的方式去实现. 一,redis的过期key监控 1,开启过期key监听 在redis的配置里把这个注释去掉 notify-keyspace-events Ex 然后重启redis 2,使用redis过期监听实现延迟队列 继承KeyExpirationEventMess

  • redis实现队列的阻塞、延时、发布和订阅

    目录 普通队列 阻塞队列 发布订阅模式 延时队列和优先级队列 应用场景 Redis不仅可作为缓存服务器,还可以用作消息队列.它的列表类型天生支持用作消息队列.如下图所示: 由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1). 普通队列 可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop. lpush+rpop:左进右出的队列 rpush+lpo

  • PHP基于Redis消息队列实现发布微博的方法

    本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]https://github.com/ErikDubbelboer/phpRedisAdmin.git[/url] cd phpRedisAdmin git clone [url]https://github.com/nrk/predis.git[/url] vendor 首先安装上述的Redis图形化管理

  • PHP使用redis消息队列发布微博的方法示例

    本文实例讲述了PHP使用redis消息队列发布微博的方法.分享给大家供大家参考,具体如下: 在一些用户发布内容应用中,可能出现1秒上万个用户同时发布消息的情况,此时使用mysql可能会出现" too many connections"错误,当然把Mysql的max_connections参数设置为更大数,不过这是一个治标不治本的方法.而使用redis的消息队列,把用户发布的消息暂时存储在消息队列中,然后使用多个cron程序把消息队列中的数据插入到Mysql.这样就有效的降低了Mysql

  • go+redis实现消息队列发布与订阅的详细过程

    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息. 1.生产者随机发布消息,用rpush发布.2.消费者用lpop订阅消费,一旦没有消息,随机休眠.redis做消息队列的缺点:没有持久化.一旦消息没有人消费,积累到一定程度后就会丢失 package main import ( "fmt

  • Redis 实现队列原理的实例详解

    Redis 实现队列原理的实例详解 场景说明: ·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 ·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求 ·抢购场景,先入先出的模式 命令: rpush + blpop 或 lpush + brpop rpush : 往列表右侧推入数据 blpop : 客户端阻塞直到队列有值输出 简单队列: simple.php $stmt = $pdo->prepare('select id, c

  • SpringBoot+Redis实现消息的发布与订阅的示例代码

    目录 1.什么是redis的发布与订阅 2.Redis发布订阅 3.命令行实现功能 订阅主题 模式匹配订阅 发布消息 取消订阅 测试 4.SpringBoot实现功能 Springboot整合Redis 配置消息监听 测试 1.什么是redis的发布与订阅 在官网的文档介绍中有一行介绍:Redis是一个快速稳定的发布/订阅消息系统. 2.Redis发布订阅 机制 Redis提供了发布与订阅的功能,可以用于消息的传输,Redis的发布订阅机制包括三部分,发布者.订阅者和Channel(主题或者队列

  • PHP+Redis 消息队列 实现高并发下注册人数统计的实例

    前言 现在越来越多的网站开始注重统计和用户行为分析,作为网站经常使用的功能,如何让统计性能更加高,这也是我们需要考虑的事情.本篇通过Redis来优化统计功能(以注册人数统计为例). 传统的统计功能都是直接操作数据库把数据插入表中.这样做,对数据库的性能消耗就会比较大. 思路: 这里我们用到了redis的队列,注册的时候先添加到队列,然后在处理的时候出队,并且把人数添加redis里. 代码: <?php //register.php $redis = new Redis(); $redis->c

  • redis中队列消息实现应用解耦的方法

    消息队列的场景 我们都知道,消息是在两台计算机之间传送的数据单位,这个"消息"可以非常简单,例如只包含文本字符串,也可以更复杂,可能包含嵌入对象.而所谓的"消息队列"是在消息的传输过程中保存消息的容器.在web程序中,可能我们需要将用户的请求数据更新或者添加到数据库中,但是在高炳发的情况下,虽然作为用户的我们不知道后台是什么原因,但是依旧会抱怨或者吐槽这个程序反应缓慢,比如在过去的几年里,你有没有吐槽过12306抢票很难?反应很慢?有没有在使用某个程序的时候收到&q

  • Spring boot+redis实现消息发布与订阅的代码

    一.创建spring boot项目 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId>

  • Redis延迟队列和分布式延迟队列的简答实现

    最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现. 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队列. 我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图.DelayQueue实现了Delay.BlockingQue

随机推荐