Java延迟队列原理与用法实例详解

本文实例讲述了Java延迟队列原理与用法。分享给大家供大家参考,具体如下:

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……
 *
 * @author whd
 * @date 2017年9月24日 下午8:57:14
 */
public class Message implements Delayed {
    private int id;
    private String body; // 消息内容
    private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
    public int getId() {
        return id;
    }
    public String getBody() {
        return body;
    }
    public long getExcuteTime() {
        return excuteTime;
    }
    public Message(int id, String body, long delayTime) {
        this.id = id;
        this.body = body;
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }
    // 自定义实现比较方法返回 1 0 -1三个参数
    @Override
    public int compareTo(Delayed delayed) {
        Message msg = (Message) delayed;
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
    }
    // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}

二、消息消费者

package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
    // 延时队列 ,消费者从其中获取消息进行消费
    private DelayQueue<Message> queue;
    public Consumer(DelayQueue<Message> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message take = queue.take();
                System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

三、延时队列

package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
     public static void main(String[] args) {
        // 创建延时队列
        DelayQueue<Message> queue = new DelayQueue<Message>();
        // 添加延时消息,m1 延时3s
        Message m1 = new Message(1, "world", 3000);
        // 添加延时消息,m2 延时10s
        Message m2 = new Message(2, "hello", 10000);
        //将延时消息放到延时队列中
        queue.offer(m2);
        queue.offer(m1);
        // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(new Consumer(queue));
        exec.shutdown();
      }
}

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类   第二、按照任务类组装的消息体类  第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

package com.test.delayqueue;
/**
 * 具体执行相关业务的业务类
 * @author whd
 * @date 2017年9月25日 上午12:49:32
 */
public class DelayOrderWorker implements Runnable {
    @Override
    public void run() {
        // TODO Auto-generated method stub
        //相关业务逻辑处理
        System.out.println(Thread.currentThread().getName()+" do something ……");
    }
}

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 延时队列中的消息体将任务封装为消息体
 *
 * @author whd
 * @date 2017年9月25日 上午12:48:30
 * @param <T>
 */
public class DelayOrderTask<T extends Runnable> implements Delayed {
    private final long time;
    private final T task; // 任务类,也就是之前定义的任务类
    /**
     * @param timeout
     *      超时时间(秒)
     * @param task
     *      任务
     */
    public DelayOrderTask(long timeout, T task) {
        this.time = System.nanoTime() + timeout;
        this.task = task;
    }
    @Override
    public int compareTo(Delayed o) {
        // TODO Auto-generated method stub
        DelayOrderTask other = (DelayOrderTask) o;
        long diff = time - other.time;
        if (diff > 0) {
            return 1;
        } else if (diff < 0) {
            return -1;
        } else {
            return 0;
        }
    }
    @Override
    public long getDelay(TimeUnit unit) {
        // TODO Auto-generated method stub
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
    @Override
    public int hashCode() {
        return task.hashCode();
    }
    public T getTask() {
        return task;
    }
}

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 延时队列管理类,用来添加任务、执行任务
 *
 * @author whd
 * @date 2017年9月25日 上午12:44:59
 */
public class DelayOrderQueueManager {
    private final static int DEFAULT_THREAD_NUM = 5;
    private static int thread_num = DEFAULT_THREAD_NUM;
    // 固定大小线程池
    private ExecutorService executor;
    // 守护线程
    private Thread daemonThread;
    // 延时队列
    private DelayQueue<DelayOrderTask<?>> delayQueue;
    private static final AtomicLong atomic = new AtomicLong(0);
    private final long n = 1;
    private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
    private DelayOrderQueueManager() {
        executor = Executors.newFixedThreadPool(thread_num);
        delayQueue = new DelayQueue<>();
        init();
    }
    public static DelayOrderQueueManager getInstance() {
        return instance;
    }
    /**
     * 初始化
     */
    public void init() {
        daemonThread = new Thread(() -> {
            execute();
        });
        daemonThread.setName("DelayQueueMonitor");
        daemonThread.start();
    }
    private void execute() {
        while (true) {
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            System.out.println("当前存活线程数量:" + map.size());
            int taskNum = delayQueue.size();
            System.out.println("当前延时任务数量:" + taskNum);
            try {
                // 从延时队列中获取任务
                DelayOrderTask<?> delayOrderTask = delayQueue.take();
                if (delayOrderTask != null) {
                    Runnable task = delayOrderTask.getTask();
                    if (null == task) {
                        continue;
                    }
                    // 提交到线程池执行task
                    executor.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 添加任务
     *
     * @param task
     * @param time
     *      延时时间
     * @param unit
     *      时间单位
     */
    public void put(Runnable task, long time, TimeUnit unit) {
        // 获取延时时间
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
        // 将任务封装成实现Delayed接口的消息体
        DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
        // 将消息体放到延时队列中
        delayQueue.put(delayOrder);
    }
    /**
     * 删除任务
     *
     * @param task
     * @return
     */
    public boolean removeTask(DelayOrderTask task) {
        return delayQueue.remove(task);
    }
}

测试类

package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
    public static void main(String[] args) {
        DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
        DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
        DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
        // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
        DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
        manager.put(work1, 3000, TimeUnit.MILLISECONDS);
        manager.put(work2, 6000, TimeUnit.MILLISECONDS);
        manager.put(work3, 9000, TimeUnit.MILLISECONDS);
    }
}

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现……

更多关于java算法相关内容感兴趣的读者可查看本站专题:《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总》

希望本文所述对大家java程序设计有所帮助。

(0)

相关推荐

  • Java 队列实现原理及简单实现代码

    Java 队列实现原理 "队列"这个单词是英国人说的"排".在英国"排队"的意思就是站到一排当中去.计算机科学中,队列是一种数据结构,有点类似栈,只是在队列中第一个插入的数据项也会最先被移除,而在栈中,最后插入的数据项最先移除.队列的作用就像电影院前的人们站成的排一样:第一个进入附属的人将最先到达队头买票.最后排队的人最后才能买到票. 队列和栈一样也被用作程序员的工具.它也可以用于模拟真实世界的环境,例如模拟人们在银行里排队等待,飞机等待起飞,或

  • java利用delayedQueue实现本地的延迟队列

    一.了解DelayQueue DelayQueue是什么? DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长. 注意:不能将null元素放置到这种队列中. DelayQueue能做什么? 在我们的业务中通常会有一些需求是这样的: 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知. 那么这类

  • 剖析Java中阻塞队列的实现原理及应用场景

    我们平时使用的一些常见队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • java使用数组和链表实现队列示例

    (1)用数组实现的队列: 复制代码 代码如下: //先自己定义一个接口  public interface NetJavaList {    public void add(Student t);    //继承该接口的类必须实现的方法    public Student get(int index);//队列的加入,取出,队列的大小    public int size(); } 定义一个学生类 复制代码 代码如下: class Student {      private String na

  • Java消息队列的简单实现代码

    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆. 我可以举个小例子先说明应用场景 假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有序的处理,先进后出(LIFO),先进先出(FIFO)可根据自己的情况进行定夺 stack  先进后出(LIFO)--------Java 对应的类 Stack 队列 先进先出(FIFO)--------java对应的类Queue 这两种都可用Linkedl

  • 一个简易的Java多页面队列爬虫程序

    之前写过很多单页面python爬虫,感觉python还是很好用的,这里用java总结一个多页面的爬虫,迭代爬取种子页面的所有链接的页面,全部保存在tmp路径下. 一. 序言 实现这个爬虫需要两个数据结构支持,unvisited队列(priorityqueue:可以适用pagerank等算法计算出url重要度)和visited表(hashset:可以快速查找url是否存在):队列用于实现宽度优先爬取,visited表用于记录爬取过的url,不再重复爬取,避免了环.java爬虫需要的工具包有http

  • java结合WebSphere MQ实现接收队列文件功能

    首先我们先来简单介绍下websphere mq以及安装使用简介 websphere mq  : 用于传输信息 具有跨平台的功能. 1 安装websphere mq 并启动 2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM) 3 建立queue 类型选择 Local类型 的 (如lq  ) 4 建立channels 类型选择Server Connection (如BridgeChannel) 接下来,我们来看实例代码: MQFileReceiver

  • 详解Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 1.消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 2.优势:异步.可靠 3.消息模型:点对点,发布/订阅 4.JMS中的对象 然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 1.多种语言和协议编写客户端

  • 解析Java中的队列和用LinkedList集合模拟队列的方法

    API中对队列的说明: public interface Queue<E> extends Collection<E> 在处理元素前用于保存元素的 collection.除了基本的 Collection 操作外,队列还提供其他的插入.提取和检查操作.每个方法都存在两种形式:一种抛出异常(操作失败时),另一种返回一个特殊值(null 或 false,具体取决于操作).插入操作的后一种形式是用于专门为有容量限制的 Queue 实现设计的:在大多数实现中,插入操作不会失败. 队列通常(但

  • java多线程消息队列的实现代码

    本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记 1.定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象.也就是说,它不依赖类特定的实例,被类的所有实例共享. private static List<Queue> queueCache = new LinkedList<Queue>(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. private Integer

随机推荐