带你快速搞定java并发库

目录
  • 一、总览
  • 二、Executor总览
  • 三、继承结构
  • 四、怎么保证只有一个线程
  • 五、怎么保证时间可以定时执行
  • 六、使用
  • 总结

一、总览

计算机程序 = 数据 + 算法。

并发编程的一切根本原因是为了保证数据的正确性,线程的效率性。

Java并发库共分为四个大的部分,如下图

Executor 和 future 是为了保证线程的效率性

Lock 和数据结构 是为了维持数据的一致性。

Java并发编程的时候,思考顺序为,

对自己的数据要么加锁。要么使用提供的数据结构,保证数据的安全性

调度线程的时候使用Executor更好的调度。

二、Executor总览

Executor 提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

相当于manager,老板让manager去执行一件任务,具体的是谁执行,什么时候执行,就不管了。

看上图的继承关系,介绍几个

内置的线程池基本上都在这里

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

三、继承结构

构造函数

包含一个定时的service

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
static class DelegatedScheduledExecutorService
        extends DelegatedExecutorService
        implements ScheduledExecutorService {
    private final ScheduledExecutorService e;
    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        e = executor;
    }

四、怎么保证只有一个线程

定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //  延迟执行
    delayedExecute(t);
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 加入任务队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 确保执行
            ensurePrestart();
    }
}
// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

五、怎么保证时间可以定时执行

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

在每次执行的时候会把下一次执行的时间放进任务中

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
 * Returns the trigger time of a delayed action.
 */
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //注意这里
            LockSupport.parkNanos(this, nanos);
        }
        else //注意这里
            LockSupport.park(this);
    }
}

总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。

注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常

六、使用

ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();
Runnable runnable1 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("11111111111111");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
Runnable runnable2 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("222");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS);
single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);

11111111111111 222 11111111111111 222 11111111111111

在项目中要注意关闭线程池

actionService = Executors.newSingleThreadScheduledExecutor();
        actionService.scheduleWithFixedDelay(() -> {
            try {
                Thread.currentThread().setName("robotActionService");
                Integer robotId = robotQueue.poll();
                if (robotId == null) {
                    //    关闭线程池
                    actionService.shutdown();
                } else {
                    int aiLv = robots.get(robotId);
                    if (actionQueueMap.containsKey(aiLv)) {
                        ActionQueue actionQueue = actionQueueMap.get(aiLv);
                        actionQueue.doAction(robotId);
                    }
                }
            } catch (Exception e) {
                //    捕捉异常
                LOG.error("",e);
            }
        }, 1, 1, TimeUnit.SECONDS);

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • Java并发编程之详解ConcurrentHashMap类

    前言 由于Java程序员常用的HashMap的操作方法不是同步的,所以在多线程环境下会导致存取操作数据不一致的问题,Map接口的另一个实现类Hashtable 虽然是线程安全的,但是在多线程下执行效率很低.为了解决这个问题,在java 1.5版本中引入了线程安全的集合类ConcurrentMap. java.util.concurrent.ConcurrentMap接口是Java集合类框架提供的线程安全的map,这意味着多线程同时访问它,不会影响map中每一条数据的一致性.ConcurrentM

  • 详解Java高并发编程之AtomicReference

    目录 一.AtomicReference 基本使用 1.1.使用 synchronized 保证线程安全性 二.了解 AtomicReference 2.1.使用 AtomicReference 保证线程安全性 2.2.AtomicReference 源码解析 2.2.1.get and set 2.2.2.lazySet 方法 2.2.3.getAndSet 方法 2.2.4.compareAndSet 方法 2.2.5.weakCompareAndSet 方法 一.AtomicReferen

  • [java并发编程之深入理解]Synchronized的使用

    1.为什么要使用synchronized 在并发编程中存在线程安全问题,主要原因有:1.存在共享数据 2.多线程共同操作共享数据.关键字synchronized可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时synchronized可以保证一个线程的变化可见(可见性),即可以代替volatile. 2.实现原理 synchronized可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,同时它还可以保证共享变量的内存可见性 3.synchronized的三种应

  • Java并发编程必备之Future机制

    前言 Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常. Callable接口使用泛型去定义它的返回类型.Executors类提供了一些有用的方法在线程池中执行Callable内的任务.由于Callable任务是并行的,我们必须等待它返回的结果.而线程是属于异步计算模型,所以不可能直接从别的线程中得到函数返回值. java.util.concurrent.Futur

  • Java并发编程之详解CyclicBarrier线程同步

    CyclicBarrier线程同步 java.util.concurrent.CyclicBarrier提供了一种多线程彼此等待的同步机制,可以把它理解成一个障碍,所有先到达这个障碍的线程都将将处于等待状态,直到所有线程都到达这个障碍处,所有线程才能继续执行. 举个例子:CyclicBarrier的同步方式有点像朋友们约好了去旅游,在景点入口处集合,这个景点入口就是一个Barrier障碍,等待大家都到了才一起进入景点游览参观. 进入景点后大家去爬山,有的人爬得快,有的人爬的慢,大家约好了山顶集合

  • 带你快速搞定java并发库

    目录 一.总览 二.Executor总览 三.继承结构 四.怎么保证只有一个线程 五.怎么保证时间可以定时执行 六.使用 总结 一.总览 计算机程序 = 数据 + 算法. 并发编程的一切根本原因是为了保证数据的正确性,线程的效率性. Java并发库共分为四个大的部分,如下图 Executor 和 future 是为了保证线程的效率性 Lock 和数据结构 是为了维持数据的一致性. Java并发编程的时候,思考顺序为, 对自己的数据要么加锁.要么使用提供的数据结构,保证数据的安全性 调度线程的时候

  • 带你快速搞定java多线程

    目录 1.什么是线程 2.线程的状态 3.怎么通俗理解进程,线程? 4.线程和进程的区别 5.什么是线程安全 6.如何创建线程 总结: 1.什么是线程 线程是操作系统调度的最小单元,也叫轻量级进程.它被包含在进程之中,是进程中的实际运作单位.同一进程可以创建多个线程,每个进程都有自己独立的一块内存空间.并且能够访问共享的内存变量. 2.线程的状态 线程的状态一般看到的也就是Runable 和blocked ,最多的还是blocked,因为cpu的时间片很短,切换的很快等待IO,等待临界资源.大概

  • 带你快速搞定java多线程(3)

    目录 一.锁的概念 二.synchronized 的使用方式 三.synchronized 的实现原理列 小结 四.线程池是什么 五.为什么要用线程池? 六.看下类图,从整体上理解下 七.线程池的创建 八.线程池核心参数说明 九.几个疑问点 9.1.是怎么保证线程不销毁的? 9.2 提交任务有哪几种方式? 9.3 拒绝策略都有哪些? 9.4 线程池的关闭 9.5 初始化线程池时线程数的选择 十.总结 一.锁的概念 先来聊聊这几个概念,总不能聊起来的时候啥也不知道,只知道干活也没有用. 公平锁:当

  • 带你快速搞定java IO

    目录 一.IO底层是怎么回事? 二.梳理类的结构 三.IO类大点兵 四.来波实例展示 1.访问操作文件(FileInputStream/FileReader ,FileOutputStream/FileWriter) 2.缓存流的使用(BufferedInputStream/BufferedOutputStream,BufferedReader/BufferedWriter) 3.获取键盘输入 总结: 一.IO底层是怎么回事? 操作系统就是管家,电脑的设备就是资源,如果进程先要操作资源,必须要进

  • 带你快速搞定java多线程(4)

    目录 1.AQS 是什么? 2.AQS 模型 3.AQS state 4.AQS 两种资源共享方式: 5.模板方式实现自定义 6.锁的分类:公平锁和非公平锁,乐观锁和悲观锁 7.CAS 8.总结 1.AQS 是什么? AQS 是类 AbstractQueuedSynchronizer的简称,也是常用锁的基类,比如常见的ReentrantLock,Semaphore,CountDownLatch 等等. AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架.是Java提供的一种模板

  • 带你快速搞定java多线程(5)

    目录 1.介绍 2.countdownlantch的用法. 3.如何利用AQS 实现 CountDownLatch 4.总结 1.介绍 CountDownLantch 倒数计时器,一个同步辅助类,一个线程(或者多个),等待另外N个线程完成某个事情后才能执行.用给定的计数初始化CountDownLatch,其含义是要被等待执行完的线程个数. 每次调用CountDown(),计数减1,执行到await()函数会阻塞等待线程的执行,直到计数为0. CountDownLantch 无法重置 2.coun

  • 带你快速搞定java多线程(2)

    目录 1.Future的类图结构,从整体上看下Future的结构 2.future的使用,说的再多都么什么用,来个例子悄悄怎么用的. 3.通俗理解 4.原理 5.总结 1.Future的类图结构,从整体上看下Future的结构 首先看下future接口的函数,共有5个方法. get() 获取执行的结果,另外一个重载是有时间限制的get ,如果超时会有异常 isDone() 判断future 结果是否处理完成 cancel 取消任务 2.future的使用,说的再多都么什么用,来个例子悄悄怎么用的

  • 带你快速搞定java数组

    目录 1.数组的定义 2.array 遍历 3.List和array 之间的转换 1.数组转list 2.list 转数组 3.Arrays工具类 4.可能遇到的问题 总结 1.数组的定义 先声明后使用 数据类型 [] 数组名称 = new 数据类型[长度];String[] arr3 = new String[5]; 数据类型 数组名称[] = new 数据类型[长度];String arr[] = new String[5]; 直接初始化 String[] arrs = {"1",

  • 10分钟搞定Java并发队列

    前言 如果按照用途与特性进行粗略的划分,JUC 包中包含的工具大体可以分为 6 类: 执行者与线程池 并发队列 同步工具 并发集合 锁 原子变量 在并发系列中,主要讲解了 执行者与线程池,同步工具,锁 , 在分析源码时,或多或少的提及到了「队列」,队列在 JUC 中也是多种多样存在,所以本文就以「远看」视角,帮助大家快速了解与区分这些看似「杂乱」的队列 并发队列 Java 并发队列按照实现方式来进行划分可以分为 2 种: 阻塞队列 非阻塞队列 如果你已经看完并发系列锁的实现,你已经能够知道他们实

  • 带你轻松搞定Java面向对象的编程--数组,集合框架

    目录 一.数组 1.数组的定义 2.数组的声明 3.数组的初始化 二.集合概述 三.Collection接口 1.Collection接口概述 2.集合框架的三个组件 3.Iterator接口 四.List接口 1.ArrayList类 2.LinkedList类 五.Set接口 1.HashSet类 六.Map接口 1.HashMap类 七.泛型 总结 一.数组 1.数组的定义 数组是为了解决同类数据整合摆放而提出的,可以理解为一组具有相同类型的变量的集合,它的每个元素都具有相同的数据类型.

随机推荐