Java实现Redis延时消息队列

目录
  • 什么是延时任务
  • 延时任务的特点
  • 实现思路:
  • 代码实现
    • 1.消息模型
    • 2.RedisMq 消息队列实现类
    • 3.消息生产者
    • 4.消息消费者
    • 5. 消息执接口
    • 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求

什么是延时任务

延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了。只要在前一天下班前指定第二天要发送资讯的时间,到了第二天指定的时间点资讯就能准时发出去了。如果大家有运营过公众号,就会知道公众号后台也有文章定时发送的功能。总而言之,延时任务的使用还是很广泛的。

延时任务的特点

  • 时间有序性
  • 时间具体性
  • 任务中携带详细的信息 ,通常包括 任务ID, 任务的类型 ,时间点。

实现思路:

将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
根据id拿到消息池的具体消息进行消费
消费成功,删除改队列和消息
消费失败,让该消息重新回到队列

代码实现

1.消息模型

import lombok.Data;
import lombok.experimental.Accessors;

import javax.validation.constraints.NotNull;
import java.io.Serializable;

/**
 * Redis 消息队列中的消息体
 * @author shikanatsu
 */
@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {

    /** 消息队列组 **/
    private String group;

    /**
     * 消息id
     */
    private String id;

    /**
     * 消息延迟/ 秒
     */
    @NotNull(message = "消息延时时间不能为空")
    private long delay;

    /**
     * 消息存活时间 单位:秒
     */
    @NotNull(message = "消息存活时间不能为空")
    private int ttl;
    /**
     * 消息体,对应业务内容
     */
    private Object body;
    /**
     * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
     * 用来消除时间的影响
     */
    private long createTime;
}

2.RedisMq 消息队列实现类

package com.shixun.base.redisMq;

import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Redis消息队列
 *
 * @author shikanatsu
 */
@Component
public class RedisMq {

    /**
     * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link MSG_POOL}
     * 的消息体body作为值存储
     */
    public static final String MSG_POOL = "Message:Pool:";

    /**
     * zset队列 名称 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";

//    private static final int SEMIH = 30 * 60;

    @Resource
    private RedisService redisService;

    /**
     * 存入消息池
     *
     * @param message
     * @return
     */
    public boolean addMsgPool(RedisMessage message) {
        if (null != message) {
            redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
            return true;
        }
        return false;
    }

    /**
     * 从消息池中删除消息
     *
     * @param id
     * @return
     */
    public void deMsgPool(String group, String id) {
        redisService.remove(MSG_POOL + group + id);
    }

    /**
     * 向队列中添加消息
     *
     * @param key
     * @param score 优先级
     * @param val
     * @return 返回消息id
     */
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key, val, score);
    }

    /**
     * 从队列删除消息
     *
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
}

3.消息生产者

import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 消息生产者
 *
 * @author shikanatsu
 */
@Component
public class MessageProvider {

    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

    @Resource
    private RedisMq redisMq;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public boolean sendMessage(@Validated RedisMessage message) {
        Assert.notNull(message);
        //The priority is if there is no creation time
//        message.setCreateTime(System.currentTimeMillis());
        message.setId(IdUtil.fastUUID());
        Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        try {
            redisMq.addMsgPool(message);
            redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
            logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}",message.toString(),new Date(),sdf.format(delayTime));
        }catch (Exception e){
            e.printStackTrace();
            logger.error("RedisMq 消息发送失败,当前时间:{}",new Date());
            return false;
        }
        return true;
    }
}

4.消息消费者

/**
 * Redis消息消费者
 * @author shikanatsu
 */
@Component
public class RedisMqConsumer {

    private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);

    @Resource
    private RedisMq redisMq;

    @Resource
    private RedisService redisService;

    @Resource
    private MessageProvider provider;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //@Scheduled(cron = "*/1 * * * * ? ")
    /**
     Instead of a thread loop, you can use Cron expressions to perform periodic tasks
     */
    public void baseMonitor(RedisMqExecute mqExecute){
        String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
        //The query is currently expired
        Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (Object id : set) {
                long  score = redisService.getScore(queueName, id.toString()).longValue();
                //Once again the guarantee has expired , And then perform the consumption
                if (current >= score) {
                    String str = "";
                    RedisMessage message = null;
                    String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
                    try {
                        message = (RedisMessage)redisService.get(msgPool + id.toString());
                        log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
                        if(null==message){
                            return;
                        }
                        //Do something ; You can add a judgment here and if it fails you can add it to the queue again
                        mqExecute.execute(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //If an exception occurs, it is put back into the queue
                        // todo:  If repeated, this can lead to repeated cycles
                        log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
                        provider.sendMessage(message);
                    } finally {
                        redisMq.deMessage(queueName, id.toString());
                        redisMq.deMsgPool(message.getGroup(),id.toString());
                    }
                }
            }
        }
    }
}

5. 消息执接口

/**
 * @author shikanatsu
 */

public interface RedisMqExecute {

    /**
     * 获取队列名称
     * @return
     */
    public String getQueueName();

    /**
     * 统一的通过执行期执行
     * @param message
     * @return
     */
    public boolean execute(RedisMessage message);

    /**
     * Perform thread polling
     */

    public void   threadPolling();

}

6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求

/**
 * 订单执行
 *
 * @author shikanatsu
 */
@Service
public class OrderMqExecuteImpl implements RedisMqExecute {

    private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);

    public final static String name = "orderPoll:";

    @Resource
    private RedisMqConsumer redisMqConsumer;

    private RedisMqExecute mqExecute = this;

    @Resource
    private OrderService orderService;

    @Override
    public String getQueueName() {
        return name;
    }

    @Override
    /**
     * For the time being, only all orders will be processed. You can change to make orders
     */
    public boolean execute(RedisMessage message) {
        logger.info("Do orderMqPoll ; Time:{}",new Date());
  //Do
        return true;
    }

    @Override
    /**  通过线程去执行轮询的过程,时间上可以自由控制 **/
    public void threadPolling() {
        ThreadUtil.execute(() -> {
            while (true) {
                redisMqConsumer.baseMonitor(mqExecute);
                ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
            }
        });
    }
}

使用事例
 1. 实现RedisMqExecute 接口 创建对应的轮询或者采取定时器的方式执行 和实现具体的任务。
 2.  通过MessageProvider 实现相对应的消息服务和绑定队列组,通过队列组的方式执行。
 3. 提示: 采取线程的方式需要在项目启动过程中执行,采取定时器或者调度的方式可以更加动态的调整。

到此这篇关于Java实现Redis延时消息队列的文章就介绍到这了,更多相关Java Redis延时消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • php+redis实现消息队列功能示例

    本文实例讲述了php+redis实现消息队列功能.分享给大家供大家参考,具体如下: 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',

  • go+redis实现消息队列发布与订阅的详细过程

    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息. 1.生产者随机发布消息,用rpush发布.2.消费者用lpop订阅消费,一旦没有消息,随机休眠.redis做消息队列的缺点:没有持久化.一旦消息没有人消费,积累到一定程度后就会丢失 package main import ( "fmt

  • 浅谈使用java实现阿里云消息队列简单封装

    一.前言 最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉. 现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪.日志.监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发

  • 基于python实现操作redis及消息队列

    操作 redis import redis redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8) redis= redis.Redis(connection_pool=redisPool) redis.set('key','values') redis.get('com') redis.append('keys','values') redis.delete('keys') print(redis.get

随机推荐