java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore

目录
  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • 总结

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据,

这时我们就可以使用CountDownLatch实现主线程等待所有sheet线程完成sheet的解析操作后,再继续执行自己的任务。

public class CountDownLatchTest {
    private static class WorkThread extends Thread {
        private CountDownLatch cdl;
        public WorkThread(String name, CountDownLatch cdl) {
            super(name);
            this.cdl = cdl;
        }
        public void run() {
            System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());
            System.out.println(this.getName() + "我要统计每个sheet的行数");
            try {
                cdl.await();
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis());
        }
    }
    private static class sheetThread extends Thread {
        private CountDownLatch cdl;
        public sheetThread(String name, CountDownLatch cdl) {
            super(name);
            this.cdl = cdl;
        }
        public void run() {
            try {
                System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());
                Thread.sleep(1000); //模拟任务执行耗时
                cdl.countDown();
                System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis() + " sheet的行数为:" + (int) (Math.random()*100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        CountDownLatch cdl = new CountDownLatch(2);
        WorkThread wt0 = new WorkThread("WorkThread", cdl );
        wt0.start();
        sheetThread dt0 = new sheetThread("sheetThread1", cdl);
        sheetThread dt1 = new sheetThread("sheetThread2", cdl);
        dt0.start();
        dt1.start();
    }
}

执行结果:

WorkThread启动了,时间为1640054503027
WorkThread我要统计每个sheet的行数
sheetThread1启动了,时间为1640054503028
sheetThread2启动了,时间为1640054503029
sheetThread2执行完了,时间为1640054504031 sheet的行数为:6
sheetThread1执行完了,时间为1640054504031 sheet的行数为:44
WorkThread执行完了,时间为1640054505036

可以看到,首先WorkThread执行await后开始等待,WorkThread在等待sheetThread1和sheetThread2都执行完自己的任务后,WorkThread立刻继续执行后面的代码。

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。

由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。

用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

我们继续根据上面的测试案例流程,一步一步的分析CountDownLatch 源码。

第一步看CountDownLatch的构造方法,传入一个不能小于0的int类型的参数作为计数器

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
/**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

看它的注释,说的非常清楚,Sync就是CountDownLatch的同步控制器了,而它也是继承了AQS,并且第3行注释说到使用了AQS的state去代表count值。

第二步就是工作线程调用await()方法

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

如果线程中断,抛出异常,否则开始调用tryAcquireShared(1),其内部类Sync的实现也非常简单,就是判断state也就是CountDownLatch的计数是否等于0,

如果等于0,则该方法返回1,第5行的if判断不成立,否则该方法返回-1,第5行的if判断成立,继续执行doAcquireSharedInterruptibly(1)。

/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个方法其实就是去获取共享模式下的锁,获取失败就park住。正如我们测试案例中的WorkThread线程应该次数就被park住了,那么它又是何时被唤醒的呢?

下面就到countDown()方法了

public void countDown() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared(1)方法尝试去释放共享锁

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

在for循环中,先获取CountDownLatch的计数也就是当前state,如果等于0返回false,否则将state更新为state-1,并返回最新的state是否等于0。

因此在我们的测试案例中,我们需要调用两次countDown方法,才会将全局的state更新为0,然后继续执行doReleaseShared()方法。

/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

LockSupport.unpark(s.thread),唤醒线程的方法被调用后,WorkThread线程就可以继续执行了。

至此我们简单分析了整个测试案例中CountDownLatch的代码流程。

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,相当于一个并发控制器,构造的时候传入可供管理的信号量的数值,这个数值就是用来控制并发数量的,

每个线程执行前先通过acquire方法获取信号,执行后通过release归还信号 。每次acquire返回成功后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,

acquire调用就会阻塞,等待有release调用释放信号后,acquire才会得到信号并返回。

下面我们看个测试案例

public class SemaphoreTest {
    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(5);
        Runnable runnable = () -> {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "获得了信号量>>>>>,时间为" + System.currentTimeMillis());
                Thread.sleep(1000);
          System.out.println(Thread.currentThread().getName() + "释放了信号量<<<<<,时间为" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        };
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++)
            threads[i] = new Thread(runnable);
        for (int i = 0; i < threads.length; i++)
            threads[i].start();
    }
}

执行结果:

Thread-0获得了信号量>>>>>,时间为1640058647604
Thread-1获得了信号量>>>>>,时间为1640058647604
Thread-2获得了信号量>>>>>,时间为1640058647604
Thread-3获得了信号量>>>>>,时间为1640058647605
Thread-4获得了信号量>>>>>,时间为1640058647605
Thread-0释放了信号量<<<<<,时间为1640058648606
Thread-1释放了信号量<<<<<,时间为1640058648606
Thread-5获得了信号量>>>>>,时间为1640058648607
Thread-4释放了信号量<<<<<,时间为1640058648607
Thread-3释放了信号量<<<<<,时间为1640058648607
Thread-7获得了信号量>>>>>,时间为1640058648607
Thread-8获得了信号量>>>>>,时间为1640058648607
Thread-2释放了信号量<<<<<,时间为1640058648606
Thread-6获得了信号量>>>>>,时间为1640058648607
Thread-9获得了信号量>>>>>,时间为1640058648607
Thread-7释放了信号量<<<<<,时间为1640058649607
Thread-6释放了信号量<<<<<,时间为1640058649607
Thread-8释放了信号量<<<<<,时间为1640058649607
Thread-9释放了信号量<<<<<,时间为1640058649608
Thread-5释放了信号量<<<<<,时间为1640058649607

我们使用for循环同时创建10个线程,首先是线程 0 1 2 3 4获得了信号量,再后面的10行打印结果中,线程1到5分别释放信号量,相同线程间隔也是1000毫秒,然后线程5 6 7 8 9才能继续获得信号量,而且保持最大获取信号量的线程数小于等于5。

看下Semaphore的构造方法

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

它支持传入一个int类型的permits,一个布尔类型的fair,因此Semaphore也有公平模式与非公平模式。

/**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
        final int getPermits() {
            return getState();
        }
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

第9行代码可见Semaphore也是通过AQS的state来作为信号量的计数的

第12行 getPermits() 方法获取当前的可用的信号量,即还有多少线程可以同时获得信号量

第15行nonfairTryAcquireShared方法尝试获取共享锁,逻辑就是直接将可用信号量减去该方法请求获取的数量,更新state并返回该值。

第24行tryReleaseShared 方法尝试释放共享锁,逻辑就是直接将可用信号量加上该方法请求释放的数量,更新state并返回。

再看下Semaphore的公平锁

/**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

看尝试获取共享锁的方法中,多了个 if (hasQueuedPredecessors) 的判断,在java多线程6:ReentrantLock

分析过hasQueuedPredecessors其实就是判断当前等待队列中是否存在等待线程,并判断第一个等待的线程(head.next)是否是当前线程。

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

一组线程同时被唤醒,让我们想到了ReentrantLock的Condition,它的signalAll方法可以唤醒await在同一个condition的所有线程。

下面我们还是从一个简单的测试案例先了解下CyclicBarrier的用法

public class CyclicBarrierTest extends Thread {
    private CyclicBarrier cb;
    private int sleepSecond;
    public CyclicBarrierTest(CyclicBarrier cb, int sleepSecond) {
        this.cb = cb;
        this.sleepSecond = sleepSecond;
    }
    public void run() {
        try {
            System.out.println(this.getName() + "开始, 时间为" + System.currentTimeMillis());
            Thread.sleep(sleepSecond * 1000);
            cb.await();
            System.out.println(this.getName() + "结束, 时间为" + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            public void run() {
                System.out.println("CyclicBarrier的barrierAction开始运行, 时间为" + System.currentTimeMillis());
            }
        };
        CyclicBarrier cb = new CyclicBarrier(2, runnable);
        CyclicBarrierTest cbt0 = new CyclicBarrierTest(cb, 3);
        CyclicBarrierTest cbt1 = new CyclicBarrierTest(cb, 6);
        cbt0.start();
        cbt1.start();
    }
}

执行结果:

Thread-1开始, 时间为1640069673534
Thread-0开始, 时间为1640069673534
CyclicBarrier的barrierAction开始运行, 时间为1640069679536
Thread-1结束, 时间为1640069679536
Thread-0结束, 时间为1640069679536

可以看到Thread-0和Thread-1同时运行,而自定义的线程barrierAction是在6000毫秒后开始执行,说明Thread-0在await之后,等待了3000毫秒,和Thread-1一起继续执行的。

看下CyclicBarrier 的一个更高级的构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

parties就是设定需要多少线程在屏障前等待,只有调用await方法的线程数达到才能唤醒所有的线程,还有注意因为使用CyclicBarrier的线程都会阻塞在await方法上,所以在线程池中使用CyclicBarrier时要特别小心,如果线程池的线程过少,那么就会发生死锁。

Runnable barrierAction用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

/**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

首先是ReentrantLock加锁,全局的count值-1,然后判断count是否等于0,如果不等于0,则循环,condition执行await等待,直到触发、中断、中断或超时,如果count值等于0,先执行barrierAction线程,然后condition开始唤醒所有等待的线程。

简单是使用之后,有人会觉得CyclicBarrierCountDownLatch有点像,其实它们两者有些细微的差别:

1:CountDownLatch是在多个线程都进行了latch.countDown()后才会触发事件,唤醒await()在latch上的线程,而执行countDown()的线程,是不会阻塞的;

CyclicBarrier是一个栅栏,用于同步所有调用await()方法的线程,线程执行了await()方法之后并不会执行之后的代码,而只有当执行await()方法的线程数等于指定的parties之后,这些执行了await()方法的线程才会同时运行。

2:CountDownLatch不能循环使用,计数器减为0就减为0了,不能被重置;CyclicBarrier本是就是支持循环使用parties,而且提供了reset()方法,可以重置计数器。

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • Java线程并发工具类CountDownLatch原理及用法

    一.CountDownLatch [1]CountDownLatch是什么? CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或 多个线程一直等待. 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行: 确保某个计算在其需要的所有资源都被初始化之后才继续执行; 确保某个服务在其依赖的所有其他服务都已经启动之后才启动; 等待直到某个操作所有参与者都准备就绪再继续执行: CountD

  • Java线程的并发工具类实现原理解析

    目录 一.fork/join 1. Fork-Join原理 2. 工作窃取 3. 代码实现 二.CountDownLatch 三.CyclicBarrier 四.Semaphore 五.Exchange 六.Callable.Future.FutureTask 在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应

  • 通俗易懂学习java并发工具类-Semaphore,Exchanger

    1. 控制资源并发访问--Semaphore Semaphore可以理解为信号量,用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源. Semaphore就相当于一个许可证,线程需要先通过acquire方法获取该许可证,该线程才能继续往下执行,否则只能在该方法出阻塞等待.当执行完业务功能后,需要通过release()方法将许可证归还,以便其他线程能够获得许可证继续执行. Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接.假如有多个线程读取

  • Java并发工具类LongAdder原理实例解析

    LongAdder实现原理图 高并发下N多线程同时去操作一个变量会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性.既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源. LongAdder则是内部维护一个Cells数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同

  • Java并发工具类Exchanger的相关知识总结

    一.Exchanger的理解 Exchanger 属于java.util.concurrent包: Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类; 一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据. 二.Exchanger类中常用方法 public Exchanger() 无参构造方法.表示创建一个新的交换器. public V exchange(V

  • java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore

    目录 CountDownLatch Semaphore CyclicBarrier 总结 CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. 假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据, 这时我们就可以使用CountDownLatch实现主线程等待所有sheet线程完成sheet的解析操作后,再继续执行自己的任务. public class CountDownLatchTest { private static

  • Java多线程之同步工具类CountDownLatch

    前言: CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行.例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行. 1 CountDownLatch主要方法 void await():如果当前count大于0,当前线程将会wait,直到count等于0或者中断. PS:当count等于0的时候,再去调用await() , 线程将不会阻塞,而是立即运行.后面可以通过源码分析得到. boolean await(long t

  • Java多线程之同步工具类CyclicBarrier

    目录 1 CyclicBarrier方法说明 2 CyclicBarrier实例 3 CyclicBarrier源码解析 CyclicBarrier构造函数 await方法 nextGeneration的源码 breakBarrier源码 isBroken方法 reset方法 getNumberWaiting方法 前言: CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到达到某个公共屏障点.与CountDownLatch不同的是该barrier在释放线程等待后可以重用,所以

  • Java多线程之同步工具类Exchanger

    目录 1 Exchanger 介绍 2 Exchanger 实例 exchange等待超时 3 实现原理 1 Exchanger 介绍 前面分别介绍了CyclicBarrier.CountDownLatch.Semaphore,现在介绍并发工具类中的最后一个Exchange. Exchanger 是一个用于线程间协作的工具类,Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange 方法交换数据,如果第一个线程先执行e

  • 教你如何使用Java多线程编程LockSupport工具类

    LockSupport类 用于创建锁和其他同步类的基本线程阻塞原语,此类与使用它的每个线程关联一个许可.如果获得许可,将立即返回对park的调用,并在此过程中消耗掉它:否则may会被阻止.调用unpark可使许可证可用(如果尚不可用).(不过与信号量不同,许可证不会累积.最多只能有一个.) 方法park和unpark提供了有效的阻塞和解阻塞线程的方法,这些线程不会遇到导致已弃用的方法Thread.suspend和Thread.resume无法用于以下问题:由于许可,在调用park的一个线程与试图

  • Java多线程同步工具类CountDownLatch详解

    目录 简介 核心方法 CountDownLatch如何使用 CountDownLatch运行流程 运用场景 总结 简介 CountDownLatch是一个多线程同步工具类,在多线程环境中它允许多个线程处于等待状态,直到前面的线程执行结束.从类名上看CountDown既是数量递减的意思,我们可以把它理解为计数器. 核心方法 countDown():计数器递减方法. await():使调用此方法的线程进入等待状态,直到计数器计数为0时主线程才会被唤醒. await(long, TimeUnit):在

  • Java并发工具类Future使用示例

    目录 前言 Future使用示例 FutureTask 前言 Future是一个接口类,定义了5个方法: boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws Interrupte

随机推荐