Java中有界队列的饱和策略(reject policy)原理解析

我们在使用ExecutorService的时候知道,在ExecutorService中有个一个Queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列(ExecutorService.newCachedThreadPool)和有界的队列(ExecutorService newFixedThreadPool(int nThreads))。

无界队列很好理解,我们可以无限制的向ExecutorService提交任务。那么对于有界队列来说,如果队列满了该怎么处理呢?

今天我们要介绍一下java中ExecutorService的饱和策略(reject policy)。

以ExecutorService的具体实现ThreadPoolExecutor来说,它定义了4种饱和策略。分别是AbortPolicy,DiscardPolicy,DiscardOldestPolicy和CallerRunsPolicy。

如果要在ThreadPoolExecutor中设定饱和策略可以调用setRejectedExecutionHandler方法,如下所示:

    ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(20));
    threadPoolExecutor.setRejectedExecutionHandler(
        new ThreadPoolExecutor.AbortPolicy()
    );

上面的例子中我们定义了一个初始5个,最大10个工作线程的Thread Pool,并且定义其中的Queue的容量是20。如果提交的任务超出了容量,则会使用AbortPolicy策略。

AbortPolicy

AbortPolicy意思是如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecutionException异常:

  public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() +
                         " rejected from " +
                         e.toString());
    }
  }

上面的代码中,rejectedExecution方法中我们直接抛出了RejectedExecutionException异常。

DiscardPolicy

DiscardPolicy将会悄悄的丢弃提交的任务,而不报任何异常。

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
  }

DiscardOldestPolicy

DiscardOldestPolicy将会丢弃最老的任务,保存最新插入的任务。

  public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
      }
    }
  }

我们看到在rejectedExecution方法中,poll了最老的一个任务,然后使用ThreadPoolExecutor提交了一个最新的任务。

CallerRunsPolicy

CallerRunsPolicy和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度。我们看下是怎么实现的:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
        r.run();
      }
    }
  }

在rejectedExecution方法中,直接调用了 r.run()方法,这会导致该方法直接在调用者的主线程中执行,而不是在线程池中执行。从而导致主线程在该任务执行结束之前不能提交任何任务。从而有效的阻止了任务的提交。

使用Semaphore

如果我们并没有定义饱和策略,那么有没有什么方法来控制任务的提交速度呢?考虑下之前我们讲到的Semaphore,我们可以指定一定的资源信号量来控制任务的提交,如下所示:

public class SemaphoreUsage {

  private final Executor executor;
  private final Semaphore semaphore;

  public SemaphoreUsage(Executor executor, int count) {
    this.executor = executor;
    this.semaphore = new Semaphore(count);
  }

  public void submitTask(final Runnable command) throws InterruptedException {
    semaphore.acquire();
    try {
      executor.execute(() -> {
            try {
              command.run();
            } finally {
              semaphore.release();
            }
          }
      );
    } catch (RejectedExecutionException e) {
      semaphore.release();
    }
  }
}

本文的例子可参考https://github.com/ddean2009/learn-java-concurrency/tree/master/rejectPolicy

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java定义队列结构,并实现入队、出队操作完整示例

    本文实例讲述了Java定义队列结构,并实现入队.出队操作.分享给大家供大家参考,具体如下: package com.example.demo; import java.util.ArrayList; public class Queue { ArrayList<Object> list = new ArrayList<>(); //入队 public void in(Object o) { list.add(o); } //出队 public Object out() { Obje

  • java数组实现队列及环形队列实现过程解析

    这篇文章主要介绍了java数组实现队列及环形队列实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码内容 ArrayQueue---用数组实现队列 package com.structure; import java.util.Scanner; /** * @auther::9527 * @Description: 数组模拟队列 * @program: jstl2 * @create: 2019-10-05 08:58 */ pub

  • Java的优先队列PriorityQueue原理及实例分析

    这篇文章主要介绍了Java的优先队列PriorityQueue原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.优先队列概述 优先队列PriorityQueue是Queue接口的实现,可以对其中元素进行排序, 可以放基本数据类型的包装类(如:Integer,Long等)或自定义的类 对于基本数据类型的包装器类,优先队列中元素默认排列顺序是升序排列 但对于自己定义的类来说,需要自己定义比较器 二.常用方法 peek()//返回队首元素

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • Java引用队列和虚引用实例分析

    本文实例讲述了Java引用队列和虚引用.分享给大家供大家参考,具体如下: 一 点睛 引用队列由ReferenceQueue类表示,它用于保存被回收后对象的引用.当联合使用软引用.弱引用和引用队列时,系统在回收被引用的对象之后,将把它所回收对象对应的引用添加到关联的引用队列中.而虚引用在对象被释放之前,将把它对应的虚引用添加到它关联的引用队列中,这使得可以在对象被回收之前采取行动. 虚引用无法获取它所对应的对象. 二 实战 1 代码 import java.lang.ref.*; public c

  • java优先队列PriorityQueue中Comparator的用法详解

    在使用java的优先队列PriorityQueue的时候,会看到这样的用法. PriorityQueue<Integer> queue = new PriorityQueue<Integer>(new Comparator<Integer>(){ @Override public int compare(Integer o1, Integer o2){ return o1.compareTo(o2); } }); 那这样到底构造的是最大优先还是最小优先队列呢? 看看源码

  • java阻塞队列实现原理及实例解析

    这篇文章主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 阻塞队列与普通队列的不同在于.当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完

  • Java中有界队列的饱和策略(reject policy)原理解析

    我们在使用ExecutorService的时候知道,在ExecutorService中有个一个Queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列(ExecutorService.newCachedThreadPool)和有界的队列(ExecutorService newFixedThreadPool(int nThreads)). 无界队列很好理解,我们可以无限制的向ExecutorService提交任务.那么对于有界队列来说,如果队列满了该怎么处理呢? 今天我们要介绍一下j

  • java 中 阻塞队列BlockingQueue详解及实例

    java 中 阻塞队列BlockingQueue详解及实例 BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  • Java中消息队列任务的平滑关闭详解

    前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景. 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1.问题背

  • java中switch case语句需要加入break的原因解析

    java中switch case语句需要加入break的原因解析            java 中使用switch case语句需要加入break 做了具体的实例分析,及编译源码,在源码中分析应该如何使用,大家可以参考下: 假设我们有如下这样一个switch语句: public static void test(int index) { switch (index) { case 1: System.out.println(1); case 2: System.out.println(2);

  • Java中字符串String的+和+=及循环操作String原理详解

    String对象是不可变的:意思就是无论是对String的新增或修改,出现一个全新的String内容时,都意味着诞生了一个新的对象.但是如果内容不变的话,增加的只是对象的引用而已. 例如: String a = "ljh"; String b = "ljh"; String c = "ljh"; System.out.println(a==b); System.out.println(b==c); 结果都是true 但是这种不可变性会产生一些性能

  • JavaScript中的return布尔值的用法和原理解析

    首先return作为返回关键字,他有以下两种返回方式 1.返回控制与函数结果 语法为:return 表达式; 语句结束函数执行,返回调用函数,而且把表达式的值作为函数的结果 2.返回控制无函数结果 语法为:return;在大多数情况下,为事件处理函数返回false,可以防止默认的事件行为.例如,默认情况下点击一个<A>元素,页面会跳转到该元素href属性指定的页. 例如:<a href="http:www.baidu.com;alert(11);return false;ale

  • 剖析Java中阻塞队列的实现原理及应用场景

    我们平时使用的一些常见队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动

  • 解析Java中的队列和用LinkedList集合模拟队列的方法

    API中对队列的说明: public interface Queue<E> extends Collection<E> 在处理元素前用于保存元素的 collection.除了基本的 Collection 操作外,队列还提供其他的插入.提取和检查操作.每个方法都存在两种形式:一种抛出异常(操作失败时),另一种返回一个特殊值(null 或 false,具体取决于操作).插入操作的后一种形式是用于专门为有容量限制的 Queue 实现设计的:在大多数实现中,插入操作不会失败. 队列通常(但

  • java中struts2实现文件上传下载功能实例解析

    本文实例讲述了java中struts2实现文件上传下载功能实现方法.分享给大家供大家参考.具体分析如下: 1.文件上传 首先是jsp页面的代码 在jsp页面中定义一个上传标签 复制代码 代码如下: <tr>      <td align="right" bgcolor="#F5F8F9"><b>附件:</b></td>      <td bgcolor="#FFFFFF">

  • JAVA的LIST接口的REMOVE重载方法调用原理解析

    前言 说真的,平常看源码都是自己看完自己懂,很少有写出来的冲动. 但是在写算法的时候,经常用到java中各种集合,其中也比较常用到remove方法. remove有重载函数,分别传入参数是索引index或者数据Object(指定泛型后自动转换),如果指定泛型是其他数据类型还好,但是指定的是Integer或者是int的话,或者就有点懵了. 这曾经也困惑过我,所以我就唯有用实践解惑了. 测试类设计 测试类一 public class Text { public void remove(int ind

随机推荐