Java redisTemplate阻塞式处理消息队列

目录
  • Redis 消息队列
  • redis五种数据结构
  • 队列生产者
  • 队列消费者
  • 测试类
  • 并发情况下使用increment递增
  • 补充

Redis 消息队列

redis五种数据结构

队列生产者

package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;
import java.util.UUID;

/**
 * <p>
 * 队列生产者
 * </p>
 *
 * @SINCE 2021/11/30 21:03
 * @AUTHOR dispark
 * @Date: 2021/11/30 21:03
 */
@Slf4j
public class QueueProducer implements Runnable {

    /**
     * 生产者队列 key
     */
    public static final String QUEUE_PRODUCTER = "queue-producter";

    private RedisTemplate<String, Object> redisTemplate;

    public QueueProducer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void run() {
        Random random = new Random();
        while (true) {
            try {
                Thread.sleep(random.nextInt(600) + 600);
                // 1.模拟生成一个任务
                UUID queueProducerId = UUID.randomUUID();
                // 2.将任务插入任务队列:queue-producter
                redisTemplate.opsForList().leftPush(QUEUE_PRODUCTER, queueProducerId.toString());
                log.info("生产一条数据 >>> {}", queueProducerId.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

队列消费者

package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;

/**
 * <p>
 * 队列消费者
 * </p>
 *
 * @SINCE 2021/11/30 21:14
 * @AUTHOR dispark
 * @Date: 2021/11/30 21:14
 */
@Slf4j
public class QueueConsumer implements Runnable {
    public static final String QUEUE_PRODUCTER = "queue-producter";
    public static final String TMP_QUEUE = "tmp-queue";

    private RedisTemplate<String, Object> redisTemplate;

    public QueueConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 功能描述: 取值 - <brpop:阻塞式> - 推荐使用
     *
     * @author dispark
     * @date 2021/11/30 21:17
     */
    @Override
    public void run() {
        Random random = new Random();
        while (true) {
            // 1.从任务队列"queue-producter"中获取一个任务,并将该任务放入暂存队列"tmp-queue"
            Long ququeConsumerId = redisTemplate.opsForList().rightPush(QUEUE_PRODUCTER, TMP_QUEUE);
            // 2.处理任务----纯属业务逻辑,模拟一下:睡觉
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 3.模拟成功和失败的偶然现象,模拟失败的情况,概率为2/13
            if (random.nextInt(13) % 7 == 0) {
                // 4.将本次处理失败的任务从暂存队列"tmp-queue"中,弹回任务队列"queue-producter"
                redisTemplate.opsForList().rightPush(TMP_QUEUE, QUEUE_PRODUCTER);
                log.info(ququeConsumerId + "处理失败,被弹回任务队列");
            } else {
                // 5. 模拟成功的情况,将本次任务从暂存队列"tmp-queue"中清除
                redisTemplate.opsForList().rightPop(TMP_QUEUE);
                log.info(ququeConsumerId + "处理成功,被清除");
            }
        }
    }
}

测试类

    @Test
    public void QueueThreadTotalEntry() throws Exception {
        // 1.启动一个生产者线程,模拟任务的产生
        new Thread(new QueueProducer(redisTemplate)).start();
        Thread.sleep(15000);
        // 2.启动一个线程者线程,模拟任务的处理
        new Thread(new QueueConsumer(redisTemplate)).start();
        // 3.主线程
        Thread.sleep(Long.MAX_VALUE);
    }

并发情况下使用increment递增

线程一:

Long increment = redisTemplate.opsForValue().increment("increment", 1L);
            log.info("队列消费者 >> increment递增: {}", increment);

线程二:

Long increment = redisTemplate.opsForValue().increment("increment", 1L);
            log.info("生产者队列 >> increment递增: {}", increment);

补充

redisTemplate处理/获取redis消息队列

(参考代码)

/**
 * redis消息队列
 */
@Component
public class RedisQueue {
    @Autowired
    private RedisTemplate redisTemplate;

    /** ---------------------------------- redis消息队列 ---------------------------------- */
    /**
     * 存值
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lpush(String key, Object value) {
        try {
            redisTemplate.opsForList().leftPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 取值 - <rpop:非阻塞式>
     * @param key 键
     * @return
     */
    public Object rpop(String key) {
        try {
            return redisTemplate.opsForList().rightPop(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 取值 - <brpop:阻塞式> - 推荐使用
     * @param key 键
     * @param timeout 超时时间
     * @param timeUnit 给定单元粒度的时间段
     *                 TimeUnit.DAYS          //天
     *                 TimeUnit.HOURS         //小时
     *                 TimeUnit.MINUTES       //分钟
     *                 TimeUnit.SECONDS       //秒
     *                 TimeUnit.MILLISECONDS  //毫秒
     * @return
     */
    public Object brpop(String key, long timeout, TimeUnit timeUnit) {
        try {
            return redisTemplate.opsForList().rightPop(key, timeout, timeUnit);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 查看值
     * @param key 键
     * @param start 开始
     * @param end 结束 0 到 -1代表所有值
     * @return
     */
    public List<Object> lrange(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

}

以上就是Java redisTemplate阻塞式处理消息队列的详细内容,更多关于Java redisTemplate 处理消息队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spring boot项目redisTemplate实现轻量级消息队列的方法

    背景 公司项目有个需求, 前端上传excel文件, 后端读取数据.处理数据.返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发 一.本文涉及知识点 excel文件读写--阿里easyexcel sdk 文件上传.下载--腾讯云对象存储 远程服务调用--restTe

  • Redis延迟队列和分布式延迟队列的简答实现

    最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现. 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队列. 我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图.DelayQueue实现了Delay.BlockingQue

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • 基于Redis延迟队列的实现代码

    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户. 2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单. 解决方案分析 方案一: 采用通过定时任务采用数据库/非关系型数据库轮询方案. 优点: 1. 实现简单,对于项目前期这样是最容易的解决方案. 缺点: 1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询. 2. 服务资源浪费,因为轮询需

  • Java redisTemplate阻塞式处理消息队列

    目录 Redis 消息队列 redis五种数据结构 队列生产者 队列消费者 测试类 并发情况下使用increment递增 补充 Redis 消息队列 redis五种数据结构 队列生产者 package cn.stylefeng.guns.knowledge.modular.knowledge.schedule; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; i

  • java 打造阻塞式线程池的实例详解

    java 打造阻塞式线程池的实例详解 原来以为tiger已经自带了这种线程池,就是在任务数量超出时能够阻塞住投放任务的线程,主要想用在JMS消息监听. 开始做法: 在ThreadPoolExcecutor中代入new ArrayBlockingQueue(MAX_TASK). 在任务超出时报错:RejectedExecutionException. 后来不用execute方法加入任务,直接getQueue().add(task), 利用其阻塞特性.但是发现阻塞好用了,但是任务没有被处理.一看Qu

  • Java分布式学习之Kafka消息队列

    目录 介绍 Kafka核心相关名称 kafka集群安装 kafka使用 kafka文件存储 Springboot整合kafka 介绍 Apache Kafka 是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统. 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. 注意:Kafka并没有遵循JMS规范(

  • Java消息队列的简单实现代码

    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆. 我可以举个小例子先说明应用场景 假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有序的处理,先进后出(LIFO),先进先出(FIFO)可根据自己的情况进行定夺 stack  先进后出(LIFO)--------Java 对应的类 Stack 队列 先进先出(FIFO)--------java对应的类Queue 这两种都可用Linkedl

  • C++基于消息队列的多线程实现示例代码

    前言 实现消息队列的关键因素是考量不同线程访问消息队列的同步问题.本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类.定义如下: template <class Mutex> class lock_guard; lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态:而 l

  • C++消息队列(定义,结构,如何创建,发送与接收)

    目录 一.定义 二.结构 三.消息队列的创建 四.消息队列的发送与接收 五.小结 一.定义 1.消息队列是一种先进先出的队列型数据结构,实际上是系统内核中的一个内部链表.消息被顺序插入队列中,其中发送进程将消息添加到队列末尾,接受进程从队列头读取消息.2.多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息.发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除. 二.结构 1.消息队列中消息本身由消息类型和消息数据组成,通常使用如下结构:

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • Java消息队列JMS实现原理解析

    一.什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述).我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合. 二.为什么需要JMS 在JA

  • Java进程间通信之消息队列

    目录 消息队列 1.消息队列的原理 2.消息队列的接口: 2.1创建消息队列 2.2向消息队列发送消息 2.3接收消息: 2.4操作消息队列的接口 2.5代码测试: 信号量: 信号量的原理 总结 消息队列 1.消息队列的原理 1.1 msgqueue采用链表来实现消息队列, 该链表是由系统内核维护, 1.2 系统中可能有很多的msgqueue, 每个MQ用消息队列描述符(消息队列ID: qid) 来区分,qid是唯一 的,用来区分不同的MQ. 1.3在进行进程间通信时,一个进程将消息加到MQ尾端

随机推荐