如何基于ThreadPoolExecutor创建线程池并操作

日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码

1、用到的guava坐标

<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
    </dependency>

2、创建一个枚举保证线程池是单例

package com.hao.service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public enum ExecutorManager {

  INSTANCE;

  private ExecutorManager() {

  }

  private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();

  public static final ThreadPoolExecutor threadPoolExecutor =
    new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
      new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());
}

3、创建一个方法类

package com.hao.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.google.common.base.Preconditions;

@Service
public class ExecutorContext {

  public ExecutorService executorService;
  private int DEFAULT_WAIT_SECONDS = 2;

  @PostConstruct
  public void init() {
    executorService = ExecutorManager.threadPoolExecutor;
  }

  public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
    Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
    LatchedCallables<T> latchAndCallables = wrapCallables(calls);
    List<Future<T>> futurres = new LinkedList<>();
    for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
      if (null != callable) {
        futurres.add(executorService.submit(callable));
      }
    }
    List<T> rets = new ArrayList<>();
    if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
      for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
        rets.add(call.getResult());
      }
    } else {
      for (Future<T> future : futurres) {
        if (!future.isDone()) {
          future.cancel(true);
        }
      }
    }
    return rets;
  }

  public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
    Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
    LatchedCallables<T> latchAndCallables = wrapCallables(calls);
    for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
      executorService.submit(callable);
    }
    List<T> rets = new ArrayList<>();
    if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
      for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
        rets.add(call.getResult());
      }
    }
    return rets;
  }

  public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
    Preconditions.checkNotNull(calls, "callable empty.");
    return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
  }

  private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
    CountDownLatch latch = new CountDownLatch(callables.size());
    List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
    for (Callable<T> callable : callables) {
      wrapped.add(new CountdownedCallable<>(callable, latch));
    }

    LatchedCallables<T> returnVal = new LatchedCallables<>();
    returnVal.latch = latch;
    returnVal.wrappedCallables = wrapped;
    return returnVal;
  }

  public static class LatchedCallables<T> {
    public CountDownLatch latch;
    public List<CountdownedCallable<T>> wrappedCallables;
  }

  public static class CountdownedCallable<T> implements Callable<T> {
    private final Callable<T> wrapped;
    private final CountDownLatch latch;
    private T result;

    public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
      this.wrapped = wrapped;
      this.latch = latch;
    }

    @Override
    public T call() throws Exception {
      try {
        result = wrapped.call();
        return result;
      } finally {
        latch.countDown();
      }
    }

    public T getResult() {
      return result;
    }
  }

}

4、创建一个测试类

package com.hao;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.hao.bean.Employee;
import com.hao.service.EmployeeService;
import com.hao.service.ExecutorContext;

public class ExecutorTest extends BaseTest {

  @Autowired
  ExecutorContext executorContext;

  @Autowired
  EmployeeService employeeService;

  @Test
  public void test01() {
    long t0 = System.currentTimeMillis();
    List<Employee> employees = new ArrayList<Employee>();
    try {
      List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
      Callable<Integer> able1 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
          Thread.sleep(5000);
          Employee employee = employeeService.getById(1L);
          employees.add(employee);
          return 1;
        }

      };
      calls.add(able1);
      Callable<Integer> able2 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
          Thread.sleep(5000);
          Employee employee = employeeService.getById(2L);
          employees.add(employee);
          return 2;
        }

      };
      calls.add(able2);
      Callable<Integer> able3 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
          Thread.sleep(5000);
          Employee employee = employeeService.getById(3L);
          employees.add(employee);
          return 3;
        }

      };
      calls.add(able3);

      executorContext.waitAllCallables(calls, 5000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    for (Employee employee : employees) {
      System.out.println(employee);
    }
    System.out.println(System.currentTimeMillis() - t0);
  }

}

5、执行结果如下

次工具类的好处在于能够像使用普通 service一样使用线程池完成并行操作,当然不要忘记将 ExecutorContext 置于能被sping扫描到的地方,

否则不能直接使用@Autowired 依赖注入

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 简单谈谈ThreadPoolExecutor线程池之submit方法

    jdk1.7.0_79 在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法.本文解析ThreadPoolExecutor#submit. 对于一个任务的执行有时我们不需要它返回结果,但是有我们需要它的返回执行结果.对于线程来讲,如果不需要它返回结果则实现Runnable,而如果需要执行结果的话则可以实现Callable.在线程池同样execute提供一个不需要返回结果的任务执行,而对

  • Java ThreadPoolExecutor 线程池的使用介绍

    Executors Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池. 从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个: newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待. newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60

  • Python线程池模块ThreadPoolExecutor用法分析

    本文实例讲述了Python线程池模块ThreadPoolExecutor用法.分享给大家供大家参考,具体如下: python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,而且也有ProcessPoolExecutor进程池模块,使用方法基本一致. 首先导入模块 from concurrent.futures import ThreadPoolExecutor 使用方法很简单,最常用的可能就

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • ThreadPoolExecutor线程池的使用方法

    ThreadPoolExecutor ThreadPoolExecutor线程池,java提供开发框架,管理线程的创建.销毁.优化.监控等. 有4种不同的任务队列: 1.ArrayBlockingQueue:基于数组结构的任务队列.此队列按先进先出的原则对任务进行排序. 2.LinkedBlockingQueue:基于链表结构的任务队列.此队列也是按先进先出的原则对任务进行排序.但性能比ArrayBlockingQueue高. 3.synchronousQueue:不存储元素的任务队列.每个插入

  • ThreadPoolExecutor线程池原理及其execute方法(详解)

    jdk1.7.0_79 对于线程池大部分人可能会用,也知道为什么用.无非就是任务需要异步执行,再者就是线程需要统一管理起来.对于从线程池中获取线程,大部分人可能只知道,我现在需要一个线程来执行一个任务,那我就把任务丢到线程池里,线程池里有空闲的线程就执行,没有空闲的线程就等待.实际上对于线程池的执行原理远远不止这么简单. 在Java并发包中提供了线程池类--ThreadPoolExecutor,实际上更多的我们可能用到的是Executors工厂类为我们提供的线程池:newFixedThreadP

  • Spring线程池ThreadPoolTaskExecutor配置详情

    本文介绍了Spring线程池ThreadPoolTaskExecutor配置,分享给大家,具体如下: 1. ThreadPoolTaskExecutor配置 <!-- spring thread pool executor --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线

  • Java线程池ThreadPoolExecutor原理及使用实例

    引导 要求:线程资源必须通过线程池提供,不允许在应用自行显式创建线程: 说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题.如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗内存或者"过度切换"的问题. 线程池介绍线程池概述   线程池,顾名思义是一个放着线程的池子,这个池子的线程主要是用来执行任务的.当用户提交任务时,线程池会创建线程去执行任务,若任务超过了核心线程数的时候,会在一个任务队列里进行排队等待,这个详细流程,我们会后面细

  • Spring线程池ThreadPoolExecutor配置并且得到任务执行的结果

    用ThreadPoolExecutor的时候,又想知道被执行的任务的执行情况,这时就可以用FutureTask. ThreadPoolTask package com.paul.threadPool; import java.io.Serializable; import java.util.concurrent.Callable; public class ThreadPoolTask implements Callable<String>, Serializable { private s

  • java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    前言 定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行.但定时器线程池也还是线程池,最底层实现还是ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通ThreadPoolExecutor. 特点说明 1.构造函数 public ScheduledThreadPoolExecutor(int corePoolSize) { // 对于其他几个参数在ThreadPoolExecutor中都已经详细分析过了,所以这里,将不再展开 // 这里我们可以看到调用基类

随机推荐