Java如何处理延迟任务过程解析

1、利用延迟队列

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

package com.delqueue; 

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; 

/**
 * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… */
public class Message implements Delayed {
  private int id;
  private String body; // 消息内容
  private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。 

  public int getId() {
    return id;
  } 

  public String getBody() {
    return body;
  } 

  public long getExcuteTime() {
    return excuteTime;
  } 

  public Message(int id, String body, long delayTime) {
    this.id = id;
    this.body = body;
    this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
  } 

  // 自定义实现比较方法返回 1 0 -1三个参数
  @Override
  public int compareTo(Delayed delayed) {
    Message msg = (Message) delayed;
    return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
        : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
  } 

  // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
  @Override
  public long getDelay(TimeUnit unit) {
    return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
  }
}

二、消息消费者

package com.delqueue; 

import java.util.concurrent.DelayQueue; 

public class Consumer implements Runnable {
  // 延时队列 ,消费者从其中获取消息进行消费
  private DelayQueue<Message> queue; 

  public Consumer(DelayQueue<Message> queue) {
    this.queue = queue;
  } 

  @Override
  public void run() {
    while (true) {
      try {
        Message take = queue.take();
        System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

三、延时队列

package com.delqueue; 

import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; 

public class DelayQueueTest {
   public static void main(String[] args) {
      // 创建延时队列
      DelayQueue<Message> queue = new DelayQueue<Message>();
      // 添加延时消息,m1 延时3s
      Message m1 = new Message(1, "world", 3000);
      // 添加延时消息,m2 延时10s
      Message m2 = new Message(2, "hello", 10000);
      //将延时消息放到延时队列中
      queue.offer(m2);
      queue.offer(m1);
      // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
      ExecutorService exec = Executors.newFixedThreadPool(1);
      exec.execute(new Consumer(queue));
      exec.shutdown();
    }
}

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类 第二、按照任务类组装的消息体类 第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

package com.test.delayqueue;
/**
 * 具体执行相关业务的业务类
 * @author whd
 * @date 2017年9月25日 上午12:49:32
 */
public class DelayOrderWorker implements Runnable { 

  @Override
  public void run() {
    // TODO Auto-generated method stub
    //相关业务逻辑处理
    System.out.println(Thread.currentThread().getName()+" do something ……");
  }
}

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

package com.test.delayqueue; 

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; 

/**
 * 延时队列中的消息体将任务封装为消息体
 *
 * @author whd
 * @date 2017年9月25日 上午12:48:30
 * @param <T>
 */
public class DelayOrderTask<T extends Runnable> implements Delayed {
  private final long time;
  private final T task; // 任务类,也就是之前定义的任务类 

  /**
   * @param timeout
   *      超时时间(秒)
   * @param task
   *      任务
   */
  public DelayOrderTask(long timeout, T task) {
    this.time = System.nanoTime() + timeout;
    this.task = task;
  } 

  @Override
  public int compareTo(Delayed o) {
    // TODO Auto-generated method stub
    DelayOrderTask other = (DelayOrderTask) o;
    long diff = time - other.time;
    if (diff > 0) {
      return 1;
    } else if (diff < 0) {
      return -1;
    } else {
      return 0;
    }
  } 

  @Override
  public long getDelay(TimeUnit unit) {
    // TODO Auto-generated method stub
    return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
  } 

  @Override
  public int hashCode() {
    return task.hashCode();
  } 

  public T getTask() {
    return task;
  }
}

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

package com.test.delayqueue; 

import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; 

/**
 * 延时队列管理类,用来添加任务、执行任务
 *
 * @author whd
 * @date 2017年9月25日 上午12:44:59
 */
public class DelayOrderQueueManager {
  private final static int DEFAULT_THREAD_NUM = 5;
  private static int thread_num = DEFAULT_THREAD_NUM;
  // 固定大小线程池
  private ExecutorService executor;
  // 守护线程
  private Thread daemonThread;
  // 延时队列
  private DelayQueue<DelayOrderTask<?>> delayQueue;
  private static final AtomicLong atomic = new AtomicLong(0);
  private final long n = 1;
  private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); 

  private DelayOrderQueueManager() {
    executor = Executors.newFixedThreadPool(thread_num);
    delayQueue = new DelayQueue<>();
    init();
  } 

  public static DelayOrderQueueManager getInstance() {
    return instance;
  } 

  /**
   * 初始化
   */
  public void init() {
    daemonThread = new Thread(() -> {
      execute();
    });
    daemonThread.setName("DelayQueueMonitor");
    daemonThread.start();
  } 

  private void execute() {
    while (true) {
      Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
      System.out.println("当前存活线程数量:" + map.size());
      int taskNum = delayQueue.size();
      System.out.println("当前延时任务数量:" + taskNum);
      try {
        // 从延时队列中获取任务
        DelayOrderTask<?> delayOrderTask = delayQueue.take();
        if (delayOrderTask != null) {
          Runnable task = delayOrderTask.getTask();
          if (null == task) {
            continue;
          }
          // 提交到线程池执行task
          executor.execute(task);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  } 

  /**
   * 添加任务
   *
   * @param task
   * @param time
   *      延时时间
   * @param unit
   *      时间单位
   */
  public void put(Runnable task, long time, TimeUnit unit) {
    // 获取延时时间
    long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
    // 将任务封装成实现Delayed接口的消息体
    DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
    // 将消息体放到延时队列中
    delayQueue.put(delayOrder);
  } 

  /**
   * 删除任务
   *
   * @param task
   * @return
   */
  public boolean removeTask(DelayOrderTask task) { 

    return delayQueue.remove(task);
  }
}

测试类

package com.delqueue; 

import java.util.concurrent.TimeUnit; 

import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker; 

public class Test {
  public static void main(String[] args) {
    DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
    DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
    DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
    // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
    DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
    manager.put(work1, 3000, TimeUnit.MILLISECONDS);
    manager.put(work2, 6000, TimeUnit.MILLISECONDS);
    manager.put(work3, 9000, TimeUnit.MILLISECONDS);
  } 

}

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现

2、mq实现延迟消息

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装:

进入插件安装目录

{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)

下载插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下载的文件名称不规则就手动重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

关闭插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的,发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间

直接在maven工程的pom.xml文件中加入

接下来在 application.properties 文件中加入redis配置:

也很简单,代码如下:

实现消息发送

x-delay

在这里我设置的延迟时间是3秒。

消息消费者

直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。

接下来只需要用Junit运行一下发送消息的接口即可。

运行完后,可以看到如下信息:

消息发送时间:2018-05-03 12:44:53
3秒钟后,Spring Boot控制台会输出:
消息接收时间:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java实现的并发任务处理实例

    本文实例讲述了Java实现的并发任务处理方法.分享给大家供大家参考,具体如下: public void init() { super.init(); this.ioThreadPool = new ThreadPoolExecutor(50, 50, Long.MAX_VALUE, TimeUnit.SECONDS, new java.util.concurrent.LinkedTransferQueue<Runnable>(), new ThreadFactory() { AtomicLon

  • java实现多线程之定时器任务

    在Java中Timer是java.util包中的一个工具类,提供了定时器的功能.我们可以创建一个Timer对象,然后调用其schedule方法在某个特定的时间去执行一个特定的任务.并且你可以让其以特定频率一直执行某个任务,这个任务是用TimerTask来描述的,我们只需要将要进行的操作写在TimerTask类的run方法中即可.先附上两个小例子一遍让读者了解什么是定时器.接着再分析其中的一些源码实现. 第一个小例子: package com.zkn.newlearn.thread; import

  • 在Java Web项目中添加定时任务的方法

    在Java Web程序中加入定时任务,这里介绍两种方式:1.使用监听器注入:2.使用Spring注解@Scheduled注入. 推荐使用第二种形式. 一.使用监听器注入 ①:创建监听器类: import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class TimerDataTaskListener implements ServletContextListener

  • 最流行的java后台框架spring quartz定时任务

    配置quartz 在spring中需要三个jar包: quartz-1.8.5.jar.commons-collections-3.2.1.jar.commons-logging-1.1.jar 首先要配置我们的spring.xml xmlns 多加下面的内容. xmlns:task="http://www.springframework.org/schema/task" 然后xsi:schemaLocation多加下面的内容. http://www.springframework.o

  • Java实现任务超时处理方法

    任务超时处理是比较常见的需求,比如在进行一些比较耗时的操作(如网络请求)或者在占用一些比较宝贵的资源(如数据库连接)时,我们通常需要给这些操作设置一个超时时间,当执行时长超过设置的阈值的时候,就终止操作并回收资源.Java中对超时任务的处理有两种方式:一种是基于异步任务结果的超时获取,一种则是使用延时任务来终止超时操作.下文将详细说明. 一.基于异步任务结果的超时获取 基于异步任务结果的获取通常是跟线程池一起使用的,我们向线程池提交任务时会返回一个Future对象,在调用Future的get方法

  • Java应用多机器部署解决大量定时任务问题

    今天来说一个Java多机部署下定时任务的处理方案. 需求: 有两台服务器同时部署了同一套代码, 代码中写有spring自带的定时任务,但是每次执行定时任务时只需要一台机器去执行. 当拿到这个需求时我脑子中立马出现了两个简单的解决方案: 利用ip进行判断, 两台机器ip肯定不一样, 指定某一台机器的ip运行. 只在一台机器上部署定时任务的代码. 最后两个方案又都被自己否决了. 第一条,如果指定ip的机器出现了问题怎么办? 例如说宕机了, 那么该制定ip的机器上的定时任务是不是就无法运行了?如果以后

  • Java定时任务:利用java Timer类实现定时执行任务的功能

    一.概述 在java中实现定时执行任务的功能,主要用到两个类,Timer和TimerTask类.其中Timer是用来在一个后台线程按指定的计划来执行指定的任务. TimerTask一个抽象类,它的子类代表一个可以被Timer计划的任务,具体要执行的代码写在TimerTask需要被实现的run方法中. 二.先看一个最简单的例子 我们通过代码来说明 import java.text.SimpleDateFormat; import java.util.Date; import java.util.T

  • Quartz实现JAVA定时任务的动态配置的方法

    先说点无关本文的问题,这段时间特别的不爽,可能有些同学也遇到过.其实也可以说是小事一桩,但感觉也是不容忽视的.我刚毕业时的公司,每个人每次提交代码都有着严格的规范,像table和space的缩进都有严格的要求,可以说你不遵守开发规范就相当于线上bug问题,还是比较严重的.现在发现外面的公司真的是没那么重视这个不重要却又特别重要的问题啊,啊啊啊啊啊啊!!! 什么是动态配置定时任务? 回归正题,说下这次主题,动态配置.没接触过定时任务的同学可以先看下此篇:JAVA定时任务实现的几种方式 定时任务实现

  • Java如何处理延迟任务过程解析

    1.利用延迟队列 延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到-- 应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用: 简单的延时队列要有三部分:第一实现了Delayed接口的消息体.第二消费消息的消费者.第三存放消息的延时队列,那下面就来看看延时队列demo. 一.消息体 package com.delqueue

  • Springboot实现Java邮件任务过程解析

    1.maven引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> 2.application.properties配置发送邮箱 //用户邮箱 spring.mail.username=753029781@qq.com //QQ邮箱开通第三

  • Jackson优雅序列化Java枚举类过程解析

    1. 前言 在Java开发中我们为了避免过多的魔法值,使用枚举类来封装一些静态的状态代码.但是在将这些枚举的意思正确而全面的返回给前端却并不是那么顺利,我们通常会使用Jackson类库序列化对象为JSON,今天就来讲一个关于使用Jackson序列化枚举的通用性技巧. 2. 通用枚举范式 为了便于统一处理和规范统一的风格,建议指定一个统一的抽象接口,例如: /** * The interface Enumerator. */ public interface Enumerator { /** *

  • 手动编译并运行Java项目实现过程解析

    现在Java开发基本上就是IDE调试,如果跨平台打个jar包过去运行一般就可以了,但是有些情况比如需要引入外部依赖的时候,这个时候是不能直接运行的,还需要引入一些外部的参数,并不是简单的javac和java的关系了,下面来详细说一下 一般情况下,在本地都是使用eclipse开发工具进行开发,很多东西基本上就不用我们考虑了,如果我们要将项目放到Linux下运行的话,那么就需要进行转移的操作,当然有Maven.Ant这样的自动化部署工具,简直是太方便了,为了做到更进一步认识的话,我们纯手动的去打包一

  • redis发布订阅Java代码实现过程解析

    前言 Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. 为了实现redis的发布订阅机制,首先要打开redis服务:其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码: <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> &

  • Java虚拟机启动过程探索

    目录 一.序言 二.Java虚拟机 (一)配置JVM装载环境 (二)命令行参数解析 (三)执行main方法 1.新建JVM实例 2.加载入口类 3.查找main方法 4.执行main方法 三.解析字节码 (一)解释字节码 1.基于栈指令集 2.基于寄存器指令集 (二)编译字节码 1.C1 编译器 2.C2 编译器 3.分层编译 四.小结 一.序言 当我们在编写Java应用的时候,很少会注意Java程序是如何被运行的,如何被操作系统管理和调度的.带着好奇心,探索一下Java虚拟机启动过程. 1.素

  • Java中类加载过程全面解析

    类文件加载的顺序 1.先加载执行父类的静态变量及静态初始化块(执行先后顺序按排列的先后顺序) 2.再加载执行本类的静态变量及静态初始化块 只要类没有被销毁,静态变量及静态初始化块只会执行1次,后续再对该类进行其他操作也不会再执行这两个步骤. 类实例创建过程 只有在调用new方法时才会创建类的实例 1.按照上面类文件加载的顺序(类已被加载则跳过此步) 2.父类的非静态变量及非静态初始化块 3.父类的构造方法 4.本类的非静态变量及非静态初始化块 5.本类的构造方法 4.类实例销毁时候,首先销毁子类

  • Java实现简单双色球摇奖功能过程解析

    这篇文章主要介绍了Java实现简单双色球摇奖功能过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 双色球:从1-33号球中选取6个红球,且红球不重复 从1-16号球中选取一个篮球 话不多说 上代码~~~ package Javaee; import java.util.Arrays; import java.util.Random; public class DoubleChromosphere { public static void

  • 使用java NIO及高速缓冲区写入文件过程解析

    这篇文章主要介绍了使用java NIO及高速缓冲区写入文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码如下 byte[] bytes = Files.readAllBytes(Paths.get("E:\\pdf\\aaa\\html\\text.txt").normalize()); String text = IOUtils.toString(bytes); String xml = text.substring(

  • Java对象转json的方法过程解析

    这篇文章主要介绍了Java对象转json的方法过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1. jsonlib:个人感觉最麻烦的一个需要导入的包也多,代码也相对多一些. 2.Gson:google的 3.FastJson:阿里巴巴的,个人觉得这个比较好,而且据说这个也是性能最好一个. 下面就贴出三种写法的代码,读者可以任选其一去使用.关于demo里面所使用的jar包,可以自行去下载. Jsonlib: package json; i

随机推荐