JVM优先级线程池做任务队列的实现方法

前言

我们都知道 web 服务的工作大多是接受 http 请求,并返回处理后的结果。服务器接受的每一个请求又可以看是一个任务。一般而言这些请求任务会根据请求的先后有序处理,如果请求任务的处理比较耗时,往往就需要排队了。而同时不同的任务直接可能会存在一些优先级的变化,这时候就需要引入任务队列并进行管理了。可以做任务队列的东西有很多,Java 自带的线程池,以及其他的消息中间件都可以。

同步与异步

这个问题在之前已经提过很多次了,有些任务是需要请求后立即返回结果的,而有的则不需要。设想一下你下单购物的场景,付完钱后,系统只需要返回一个支付成功即可,后续的积分增加、优惠券发放、安排发货等等业务都不需要实时返回给用户的,这些就是异步的任务。大量的异步任务到达我们部署的服务上,由于处理效率的瓶颈,无法达到实时处理,因此与需要用队列将他们暂时保存起来,排队处理。

线程池

在 Java 中提到队列,我们除了想到基本的数据结构之外,应该还有线程池。线程池自带一套机制可以实现任务的排队和执行,可以满足单点环境下绝大多数异步化的场景。下面是典型的一个处理流程:

// 注入合适类型的线程池
@Autowired
private final ThreadPoolExecutor asyncPool;
@RequestMapping(value = "/async/someOperate", method = RequestMethod.POST)
public RestResult someOperate(HttpServletRequest request, String params,String callbackUrl {
  // 接受请求后 submit 到线程池排队处理
  asyncPool.submit(new Task(params,callbackUrl);
  return new RestResult(ResultCode.SUCCESS.getCode(), null) {{
    setMsg("successful!" + prop.getShowMsg());
  }};
}

// 异步任务处理
@Slf4j
public class Task extends Callable<RestResult> {
  private String params;
  private String callbackUrl;
  private final IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.class);
  private final ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.class);
  public ImageTask(String params,String callbackUrl) {
    this.params = params;
    this.callbackUrl = callbackUrl;
  }

  @Override
  public RestResult call() {
    try {
      // 业务处理
      CarDamageResult result = algorithmService.someOperate(this.params);
      // 回调
      return serviceUtils.callback(this.callbackUrl, this.caseNum, ResultCode.SUCCESS.getCode(), result, this.isAsync);
    } catch (ServiceException e) {
      return serviceUtils.callback(this.callbackUrl, this.caseNum, e.getCode(), null, this.isAsync);
    }
  }
}

对于线程池这里就不具体展开讲了,仅仅简单理了下具体的流程:

  1. 收到请求后,参数校验后传入线程池排队。
  2. 返回结果:“请求成功,正在处理”。
  3. 任务排到后由相应的线程处理,处理完后进行接口回调。

上面的例子描述了一个生产速度远远大于消费速度的模型,普通面向数据库开发的企业级应用,由于数据库的连接池开发的连接数较大,一般不需要这样通过线程池来处理,而一些 GPU 密集型的应用场景,由于显存的瓶颈导致消费速度慢时,就需要队列来作出调整了。

带优先级的线程池

更复杂的,例如考虑到任务的优先级,还需要对线程池进行重写,通过 PriorityBlockingQueue 来替换默认的阻塞队列。直接上代码。

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * @author Fururur
 * @create 2020-01-14-10:37
 */
@Data
public abstract class PriorityCallable<T> implements Callable<T> {
  private int priority;
}
import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 优先级线程池的实现
 *
 * @author Fururur
 * @create 2019-07-23-10:19
 */
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

  private ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0);

  public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
  }

  public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                   long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
  }

  public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                   long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
  }

  public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                   long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
                   RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
  }

  private static PriorityBlockingQueue getWorkQueue() {
    return new PriorityBlockingQueue();
  }

  @Override
  public void execute(Runnable command) {
    int priority = local.get();
    try {
      this.execute(command, priority);
    } finally {
      local.set(0);
    }
  }

  public void execute(Runnable command, int priority) {
    super.execute(new PriorityRunnable(command, priority));
  }

  public <T> Future<T> submit(PriorityCallable<T> task) {
    local.set(task.getPriority());
    return super.submit(task);
  }

  public <T> Future<T> submit(Runnable task, T result, int priority) {
    local.set(priority);
    return super.submit(task, result);
  }

  public Future<?> submit(Runnable task, int priority) {
    local.set(priority);
    return super.submit(task);
  }

  @Getter
  @Setter
  protected static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
    private final static AtomicLong seq = new AtomicLong();
    private final long seqNum;
    private Runnable run;

    private int priority;

    PriorityRunnable(Runnable run, int priority) {
      seqNum = seq.getAndIncrement();
      this.run = run;
      this.priority = priority;
    }

    @Override
    public void run() {
      this.run.run();
    }

    @Override
    public int compareTo(PriorityRunnable other) {
      int res = 0;
      if (this.priority == other.priority) {
        if (other.run != this.run) {
          // ASC
          res = (seqNum < other.seqNum ? -1 : 1);
        }
      } else {
        // DESC
        res = this.priority > other.priority ? -1 : 1;
      }
      return res;
    }
  }
}

要点如下:

  1. 替换线程池默认的阻塞队列为 PriorityBlockingQueue,响应的传入的线程类需要实现 Comparable<T> 才能进行比较。
  2. PriorityBlockingQueue 的数据结构决定了,优先级相同的任务无法保证 FIFO,需要自己控制顺序。
  3. 需要重写线程池的 execute() 方法。看过线程池源码的会发现,执行 submit(task) 方法后,都会转化成 RunnableFuture<T> 再进一步执行,由于传入的 task 虽然实现了 Comparable<T> 到,但是内部转换成的 RunnableFuture<T> 并未实现,因此直接 submit 会抛出 Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 这样一个异常,所以需要重写 execute() 方法,构造一个 PriorityRunnable 作为中转。

总结

JVM 线程池是实现异步任务队列最简单最原生的一种方式,本文介绍了基本的使用流程和带有优先队列需求的用法。这种方法可有满足到一些简单的业务场景,但也存在一定的局限性:

  1. JVM 线程池是单机的,横向扩展多个服务下做负载均衡时,就会存在多个线程池了他们是分开工作的,无法很好的统一和管理,不太适合分布式场景。
  2. JVM 线程池是基于内存的,一旦服务挂了,会出现任务丢失的情况,可靠性低。
  3. 缺少作为任务队列的 ack 机制,一旦任务失败不会重新执行,且无法很好地对线程池队列进行监控。

显然简单的 JVM 线程池是无法 handle 到负载的业务场景的,这就需要引入其他中间件了,在接下来的文章中我们会继续探讨。

参考文献

ThreadPoolExecutor 优先级的线程池

implementing PriorityQueue on ThreadPoolExecutor

ThreadPoolExecutor 的 PriorityBlockingQueue 类型转化问题

大搜车异步任务队列中间件的建设实践

到此这篇关于JVM优先级线程池做任务队列的实现方法的文章就介绍到这了,更多相关java线程池优先级内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java虚拟机JVM性能优化(一):JVM知识总结

    Java应用程序是运行在JVM上的,但是你对JVM技术了解吗?这篇文章(这个系列的第一部分)讲述了经典Java虚拟机是怎么样工作的,例如:Java一次编写的利弊,跨平台引擎,垃圾回收基础知识,经典的GC算法和编译优化.之后的文章会讲JVM性能优化,包括最新的JVM设计--支持当今高并发Java应用的性能和扩展. 如果你是一个开发人员,你肯定遇到过这样的特殊感觉,你突然灵光一现,所有的思路连接起来了,你能以一个新的视角来回想起你以前的想法.我个人很喜欢学习新知识带来的这种感觉.我已经有过很多次这样

  • JVM的垃圾回收机制详解和调优

    文章来源:matrix.org.cn 作者:ginger547 1.JVM的gc概述 gc即垃圾收集机制是指jvm用于释放那些不再使用的对象所占用的内存.java语言并不要求jvm有gc,也没有规定gc如何工作.不过常用的jvm都有gc,而且大多数gc都使用类似的算法管理内存和执行收集操作. 在充分理解了垃圾收集算法和执行过程后,才能有效的优化它的性能.有些垃圾收集专用于特殊的应用程序.比如,实时应用程序主要是为了避免垃圾收集中断,而大多数OLTP应用程序则注重整体效率.理解了应用程序的工作负荷

  • JAVA中JVM的重排序详细介绍

    在并发程序中,程序员会特别关注不同进程或线程之间的数据同步,特别是多个线程同时修改同一变量时,必须采取可靠的同步或其它措施保障数据被正确地修改,这里的一条重要原则是:不要假设指令执行的顺序,你无法预知不同线程之间的指令会以何种顺序执行. 但是在单线程程序中,通常我们容易假设指令是顺序执行的,否则可以想象程序会发生什么可怕的变化.理想的模型是:各种指令执行的顺序是唯一且有序的,这个顺序就是它们被编写在代码中的顺序,与处理器或其它因素无关,这种模型被称作顺序一致性模型,也是基于冯·诺依曼体系的模型.

  • 基于jni调用时,jvm报错问题的深入分析

    执行如下的jni调用: 复制代码 代码如下: package jni;public class JNITransObject { public native TestJNI[] ObjectMethod(String text); static {  System.loadLibrary("JNITransObject"); } public static void main(String args[]) {  JNITransObject jniTransObject = new J

  • 详解JVM 运行时内存使用情况监控

    java 语言, 开发者不能直接控制程序运行内存, 对象的创建都是由类加载器一步步解析, 执行与生成与内存区域中的; 并且jvm有自己的垃圾回收器对内存区域管理, 回收; 但是我们已经可以通过一些工具来在程序运行时查看对应的jvm内存使用情况, 帮助更好的分析与优化我们的代码; 注: 查看系统里java进程信息 // 查看当前机器上所有运行的java进程名称与pid(进程编号) jps -l // 显示指定的jvm进程所有的属性设置和配置参数 jinfo pid 1 . jmap : 内存占用情

  • JVM类加载机制详解

    一.先看看编写出的代码的执行过程: 二.研究类加载机制的意义 从上图可以看出,类加载是Java程序运行的第一步,研究类的加载有助于了解JVM执行过程,并指导开发者采取更有效的措施配合程序执行. 研究类加载机制的第二个目的是让程序能动态的控制类加载,比如热部署等,提高程序的灵活性和适应性. 三.类加载的一般过程 原理:双亲委托模式 1.寻找jre目录,寻找jvm.dll,并初始化JVM: 2.产生一个Bootstrap Loader(启动类加载器): 3.Bootstrap Loader自动加载E

  • IntelliJ IDEA设置JVM运行参数的操作方法

    打开 IDEA 安装目录,看到有一个 bin 目录,其中有两个 vmoptions 文件,需针对不同的JDK进行配置: 32 位:idea.exe.vmoptions 64 位:idea64.exe.vmoptions -Xms512m -Xmx1024m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=225m -XX:+UseConcMarkSweepGC -XX:SoftRefLRUPolicyMSPerMB=50 -ea -Dsun.io.u

  • 优化Java虚拟机总结(jvm调优)

    堆设置 -Xmx3550m:设置JVM最大堆内存为3550M. -Xms3550m:设置JVM初始堆内存为3550M.此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存. -Xss128k:设置每个线程的栈大小.JDK5.0以后每个线程栈大小为1M,之前每个线程栈大小为256K.应当根据应用的线程所需内存大小进行调整.在相同物理内存下,减小这个值能生成更多的线程.但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右. -Xmn2g:设置堆

  • jvm内存溢出解决方法(jvm内存溢出怎么解决)

    java.lang.OutOfMemoryError: PermGen space 发现很多人把问题归因于: spring,hibernate,tomcat,因为他们动态产生类,导致JVM中的permanent heap溢出 .然后解决方法众说纷纭,有人说升级 tomcat版本到最新甚至干脆不用tomcat.还有人怀疑spring的问题,在spring论坛上讨论很激烈,因为spring在AOP时使用CBLIB会动态产生很多类. 但问题是为什么这些王牌的开源会出现同一个问题呢,那么是不是更基础的原

  • JVM优先级线程池做任务队列的实现方法

    前言 我们都知道 web 服务的工作大多是接受 http 请求,并返回处理后的结果.服务器接受的每一个请求又可以看是一个任务.一般而言这些请求任务会根据请求的先后有序处理,如果请求任务的处理比较耗时,往往就需要排队了.而同时不同的任务直接可能会存在一些优先级的变化,这时候就需要引入任务队列并进行管理了.可以做任务队列的东西有很多,Java 自带的线程池,以及其他的消息中间件都可以. 同步与异步 这个问题在之前已经提过很多次了,有些任务是需要请求后立即返回结果的,而有的则不需要.设想一下你下单购物

  • Java线程池的几种实现方法及常见问题解答

    工作中,经常会涉及到线程.比如有些任务,经常会交与线程去异步执行.抑或服务端程序为每个请求单独建立一个线程处理任务.线程之外的,比如我们用的数据库连接.这些创建销毁或者打开关闭的操作,非常影响系统性能.所以,"池"的用处就凸显出来了. 1. 为什么要使用线程池 在3.6.1节介绍的实现方式中,对每个客户都分配一个新的工作线程.当工作线程与客户通信结束,这个线程就被销毁.这种实现方式有以下不足之处: •服务器创建和销毁工作的开销( 包括所花费的时间和系统资源 )很大.这一项不用解释,可以

  • 详谈Java几种线程池类型介绍及使用方法

    一.线程池使用场景 •单个任务处理时间短 •将需处理的任务数量大 二.使用Java线程池好处 1.使用new Thread()创建线程的弊端: •每次通过new Thread()创建对象性能不佳. •线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom. •缺乏更多功能,如定时执行.定期执行.线程中断. 2.使用Java线程池的好处: •重用存在的线程,减少对象创建.消亡的开销,提升性能. •可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞

  • Java线程池的几种实现方法和区别介绍实例详解

    下面通过实例代码为大家介绍Java线程池的几种实现方法和区别: import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.

  • Java线程池的几种实现方法和区别介绍

    Java线程池的几种实现方法和区别介绍 import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.E

  • 线程池ThreadPoolExecutor使用简介与方法实例

    一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量 maximumPool

  • Java使用线程池实现socket编程的方法详解

    目录 前言 一.一个简单的C/S模型实现 1.服务器: 2.客户端: 二.线程池使用方法 1.新建一个线程池 2.用Runnable接口实现线程 3.创建线程对象并提交至线程池执行 三.结合起来 四.使用新的输入输出流 总结 前言 以多个客户端和一个服务端的socket通信为例,服务端启动时创建一个固定大小的线程池.服务端每接收到一个连接请求后(通信任务),交给线程池执行,任务类实现了Runnable接口,用于跟客户端进行读写操作,该类的对象作为任务通过execute(Runnable task

  • 深入了解Java线程池:从设计思想到源码解读

    目录 为什么需要线程池 线程池设计思路 线程池的工作机制 线程池的参数及使用 线程池的状态 提交任务 任务队列 线程工厂 拒绝策略 关闭线程池 Executors 静态工厂 合理地配置线程池 线程池的监控 源码分析 execute addWorker Worker runWorker getTask processWorkerExit 面试题 为什么需要线程池 我们知道创建线程的常用方式就是 new Thread() ,而每一次 new Thread() 都会重新创建一个线程,而线程的创建和销毁

  • Java 线程池详解及实例代码

    线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因. 例如Android中常见到的很多通用组件一般都离不开"池"的概念,如各种图片

  • 浅谈Android 的线程和线程池的使用

    Android 的线程和线程池 从用途上分,线程分为主线程和子线程:主线程主要处理和界面相关的事情,子线程则往往用于耗时操作. 主线程和子线程 主线程是指进程所拥有的线程.Android 中主线程交 UI 线程,主要作用是运行四大组件以及处理它们和用户的交互:子线程的作业则是执行耗时任务. Android 中的线程形态 1.AsyncTask AsyncTask 是一种轻量级的异步任务类,可以在线程池中执行后台任务,然后把执行的进度和最终结果传递给主线程并在主线程中更新 UI, AsyncTas

随机推荐