java并发编程_线程池的使用方法(详解)

一、任务和执行策略之间的隐性耦合

Executor可以将任务的提交和任务的执行策略解耦

只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题

1.线程饥饿死锁

类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够

定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁

2.线程池大小

注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池

如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小

3.配置ThreadPoolExecutor线程池

实例:

1.通过Executors的工厂方法返回默认的一些实现

2.通过实例化ThreadPoolExecutor(.....)自定义实现

线程池的队列

1.无界队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张

如:单例和固定大小的线程池用的就是此种

2.有界队列:如果新任务到达,队列满则使用饱和策略

3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队

SynchronousQueue直接将任务移交给工作线程

机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务

如:CacheThreadPool就是使用的这种策略

饱和策略:

setRejectedExecutionHandler来修改饱和策略

1.终止Abort(默认):抛出异常由调用者处理

2.抛弃Discard

3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务

4.CallerRuns:回退任务,有调用者线程自行处理

4.线程工厂ThreadFactoy

每当创建线程时:其实是调用了线程工厂来完成

自定义线程工厂:implements ThreadFactory

可以定制该线程工厂的行为:如UncaughtExceptionHandler等

public class MyAppThread extends Thread {
  public static final String DEFAULT_NAME = "MyAppThread";
  private static volatile boolean debugLifecycle = false;
  private static final AtomicInteger created = new AtomicInteger();
  private static final AtomicInteger alive = new AtomicInteger();
  private static final Logger log = Logger.getAnonymousLogger();

  public MyAppThread(Runnable r) {
    this(r, DEFAULT_NAME);
  }

  public MyAppThread(Runnable runnable, String name) {
    super(runnable, name + "-" + created.incrementAndGet());
    //设置该线程工厂创建的线程的 未捕获异常的行为
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      public void uncaughtException(Thread t,
                     Throwable e) {
        log.log(Level.SEVERE,
            "UNCAUGHT in thread " + t.getName(), e);
      }
    });
  }

  public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created " + getName());
    try {
      alive.incrementAndGet();
      super.run();
    } finally {
      alive.decrementAndGet();
      if (debug) log.log(Level.FINE, "Exiting " + getName());
    }
  }

  public static int getThreadsCreated() {
    return created.get();
  }

  public static int getThreadsAlive() {
    return alive.get();
  }

  public static boolean getDebug() {
    return debugLifecycle;
  }

  public static void setDebug(boolean b) {
    debugLifecycle = b;
  }
}

5.扩展ThreadPoolExecutor

可以被自定义子类覆盖的方法:

1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行

2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行

3.terminated:在线程池关闭时,可以用来释放资源等

二、递归算法的并行化

1.循环

在循环中,每次循环操作都是独立的

//串行化
  void processSequentially(List<Element> elements) {
    for (Element e : elements)
      process(e);
  }
  //并行化
  void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
      exec.execute(new Runnable() {
        public void run() {
          process(e);
        }
      });
  }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代
  public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
    for (Node<T> n : nodes) {
      results.add(n.compute());
      sequentialRecursive(n.getChildren(), results);
    }
  }
  //并行 计算compute 和串行迭代
  public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
      exec.execute(() -> results.add(n.compute()));
      parallelRecursive(exec, n.getChildren(), results);
    }
  }
  //调用并行方法的操作
  public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
      throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
  }

实例:

public class ConcurrentPuzzleSolver <P, M> {
  private final Puzzle<P, M> puzzle;
  private final ExecutorService exec;
  private final ConcurrentMap<P, Boolean> seen;
  protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

  public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
    this.puzzle = puzzle;
    this.exec = initThreadPool();
    this.seen = new ConcurrentHashMap<P, Boolean>();
    if (exec instanceof ThreadPoolExecutor) {
      ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
      tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    }
  }

  private ExecutorService initThreadPool() {
    return Executors.newCachedThreadPool();
  }

  public List<M> solve() throws InterruptedException {
    try {
      P p = puzzle.initialPosition();
      exec.execute(newTask(p, null, null));
      // 等待ValueLatch中闭锁解开,则表示已经找到答案
      PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
      return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
    } finally {
      exec.shutdown();//最终主线程关闭线程池
    }
  }

  protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
    return new SolverTask(p, m, n);
  }

  protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
    SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
      super(pos, move, prev);
    }
    public void run() {
      //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
      //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
      if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
        return; // already solved or seen this position
      }
      if (puzzle.isGoal(pos)) {
        solution.setValue(this);
      } else {
        for (M m : puzzle.legalMoves(pos))
          exec.execute(newTask(puzzle.move(pos, m), m, this));
      }
    }
  }
}

以上这篇java并发编程_线程池的使用方法(详解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • java ThreadPoolExecutor 并发调用实例详解

    java ThreadPoolExecutor 并发调用实例详解 概述 通常为了提供任务的处理速度,会使用一些并发模型,ThreadPoolExecutor中的invokeAll便是一种. 代码 package test.current; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util

  • Java多线程并发编程(互斥锁Reentrant Lock)

    Java 中的锁通常分为两种: 通过关键字 synchronized 获取的锁,我们称为同步锁,上一篇有介绍到:Java 多线程并发编程 Synchronized 关键字. java.util.concurrent(JUC)包里的锁,如通过继承接口 Lock 而实现的 ReentrantLock(互斥锁),继承 ReadWriteLock 实现的 ReentrantReadWriteLock(读写锁). 本篇主要介绍 ReentrantLock(互斥锁). ReentrantLock(互斥锁)

  • java高并发锁的3种实现示例代码

    初级技巧 - 乐观锁 乐观锁适合这样的场景:读不会冲突,写会冲突.同时读的频率远大于写. 以下面的代码为例,悲观锁的实现: public Object get(Object key) { synchronized(map) { if(map.get(key) == null) { // set some values } return map.get(key); } } 乐观锁的实现: public Object get(Object key) { Object val = null; if((

  • java 并发中的原子性与可视性实例详解

    java 并发中的原子性与可视性实例详解 并发其实是一种解耦合的策略,它帮助我们把做什么(目标)和什么时候做(时机)分开.这样做可以明显改进应用程序的吞吐量(获得更多的CPU调度时间)和结构(程序有多个部分在协同工作).做过java Web开发的人都知道,Java Web中的Servlet程序在Servlet容器的支持下采用单实例多线程的工作模式,Servlet容器为你处理了并发问题. 原子性 原子是世界上的最小单位,具有不可分割性.比如 a=0:(a非long和double类型) 这个操作是不

  • 详谈锁和监视器之间的区别_Java并发

    在面试中你可能遇到过这样的问题:锁(lock)和监视器(monitor)有什么区别? 嗯,要回答这个问题,你必须深入理解Java的多线程底层是如何工作的. 简短的答案是,锁为实现监视器提供必要的支持.详细答案如下. 锁(lock) 逻辑上锁是对象内存堆中头部的一部分数据.JVM中的每个对象都有一个锁(或互斥锁),任何程序都可以使用它来协调对对象的多线程访问.如果任何线程想要访问该对象的实例变量,那么线程必须拥有该对象的锁(在锁内存区域设置一些标志).所有其他的线程试图访问该对象的变量必须等到拥有

  • Java并发之嵌套管程锁死详解

    ·嵌套管程死锁是如何发生的 ·具体的嵌套管程死锁的例子 ·嵌套管程死锁 vs 死锁 嵌套管程锁死类似于死锁, 下面是一个嵌套管程锁死的场景: Thread 1 synchronizes on A Thread 1 synchronizes on B (while synchronized on A) Thread 1 decides to wait for a signal from another thread before continuing Thread 1 calls B.wait()

  • java并发编程_线程池的使用方法(详解)

    一.任务和执行策略之间的隐性耦合 Executor可以将任务的提交和任务的执行策略解耦 只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题 1.线程饥饿死锁 类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁:表现为池不够 定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁 2.线程池大小 注意:线程池的大小还受其他的限制,如其他资源池:

  • Java并发编程加锁导致的活跃性问题详解方案

    目录 死锁(Deadlock) 死锁的解决和预防 1.超时释放锁 2.按顺序加锁 3.死锁检测 活锁(Livelock) 避免活锁 饥饿 解决饥饿 性能问题 上下文切换 什么是上下文切换? 减少上下文切换的方法 资源限制 什么是资源限制 资源限制引发的问题 如何解决资源限制的问题 我们主要处理锁带来的问题. 首先就是最出名的死锁 死锁(Deadlock) 什么是死锁 死锁是当线程进入无限期等待状态时发生的情况,因为所请求的锁被另一个线程持有,而另一个线程又等待第一个线程持有的另一个锁 导致互相等

  • Java实现手写线程池实例并测试详解

    前言 在之前的文章中介绍过线程池的核心原理,在一次面试中面试官让手写线程池,这块知识忘记的差不多了,因此本篇文章做一个回顾. 希望能够加深自己的印象以及帮助到其他的小伙伴儿们 在线程池核心原理篇介绍过线程池的核心原理,今天来模拟线程池和工作队列的流程,以及编写代码和测试类进行测试.下面附下之前线程池的核心流程: 在线程池核心原理的源码中,涉及到了一系列的流程,包括线程池队列数量是否已满,运用什么样的拒绝策略等.在我们手写线程池的代码中,不需要考虑那么多因素,只需要模拟简单的情景和过程,因此整体来

  • Java中四种线程池的使用示例详解

    在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 本文详细的给大家介绍了关于Java中四种线程池的使用,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: FixedThreadPool 由Executors的newFixedThreadPool方法创建.它是一种线程数量固定的线程

  • Java面向对象编程中final关键字的使用方法详解

    在Java中通过final关键字来声明对象具有不变性(immutable),这里的对象包括变量,方法,类,与C++中的const关键字效果类似. immutable指对象在创建之后,状态无法被改变 可以从三个角度考虑使用final关键字: 代码本身:不希望final描述的对象所表现的含义被改变 安全:final对象具有只读属性,是线程安全的 效率:无法修改final对象本身,对其引用的操作更为高效 final 变量 定义final Object a,则a只能被初始化一次,一旦初始化,a的数据无法

  • Java并发编程之线程之间的共享和协作

    一.线程间的共享 1.1 ynchronized内置锁 用处 Java支持多个线程同时访问一个对象或者对象的成员变量 关键字synchronized可以修饰方法或者以同步块的形式来进行使用 它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中 它保证了线程对变量访问的可见性和排他性(原子性.可见性.有序性),又称为内置锁机制. 对象锁和类锁 对象锁是用于对象实例方法,或者一个对象实例上的 类锁是用于类的静态方法或者一个类的class对象上的 类的对象实例可以有很多个,但是每个类只有

  • Java 并发编程之线程挂起、恢复与终止

    挂起和恢复线程 Thread 的API中包含两个被淘汰的方法,它们用于临时挂起和重启某个线程,这些方法已经被淘汰,因为它们是不安全的,不稳定的.如果在不合适的时候挂起线程(比如,锁定共享资源时),此时便可能会发生死锁条件--其他线程在等待该线程释放锁,但该线程却被挂起了,便会发生死锁.另外,在长时间计算期间挂起线程也可能导致问题. 下面的代码演示了通过休眠来延缓运行,模拟长时间运行的情况,使线程更可能在不适当的时候被挂起: public class DeprecatedSuspendResume

  • Java并发编程之线程间的通信

    一.概念简介 1.线程通信 在操作系统中,线程是个独立的个体,但是在线程执行过程中,如果处理同一个业务逻辑,可能会产生资源争抢,导致并发问题,通常使用互斥锁来控制该逻辑.但是在还有这样一类场景,任务执行是有顺序控制的,例如常见的报表数据生成: 启动数据分析任务,生成报表数据: 报表数据存入指定位置数据容器: 通知数据搬运任务,把数据写入报表库: 该场景在相对复杂的系统中非常常见,如果基于多线程来描述该过程,则需要线程之间通信协作,才能有条不紊的处理该场景业务. 2.等待通知机制 如上的业务场景,

  • Java并发编程之线程中断

    目录 线程中断: void interrupted()方法:中断线程,例如,当线程A运行时,线程B可以调用线程A的interrupted()方法来设置线程的中断标志为true并立即返回.设置标志仅仅是为了设置标志,线程A实际并没有被中断,它会继续往下执行,如果线程A因为调用了wait()方法,join()方法或者sleep()方法而引起的阻塞挂起,这时候若线程B调用线程A的interrupted()方法,线程A回调用这些方法的地方会抛出InterruptedException异常而返回. boo

  • Java并发编程之线程安全性

    目录 1.什么是线程安全性 2.原子性 2.1 竞争条件 2.2 复合操作 3.加锁机制 3.1 内置锁 3.2 重入 4.用锁保护状态 5.活跃性与性能 1.什么是线程安全性 当多个线程访问某个类时,不管运行时环境采用何种调用方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的. 无状态的对象一定是线程安全的,比如:Servlet. 2.原子性 2.1 竞争条件 由于不恰当的执行时序而出现不正确的结果的情况,就是竞争

随机推荐