java并发编程之同步器代码示例
同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger
队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它内部使用了一个volatiole修饰的int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,这时就需要使用同步器来提供的3个方法(getState()、setState(intnewState)/和compareAndSetState(intexpect,intupdate))来进行操作,因为他们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取个释放的方法来供自定义同步组件使用,同步器既可以独占式的获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReadWriteLock、和CountDownLatch等)。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。他们二者直接的关系就是:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节;同步器则是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好的隔离了使用者与实现者所需关注的领域。
同步器的设计是基于模版方法模式实现的,使用者需要继承同步器并重写这顶的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模版方法,而这些模版方法将会调用使用者重写的方法。
同步器提供的模版方法基本上分为3类:独占式获取锁与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模版方法来实现自己的同步语义。倒计数器锁存器是一次性障碍,允许一个或者多个线程等待一个或者多个其它线程来做某些事情。CountDownLatch的唯一构造器带一个int类型的参数,这个int参数是指允许所有在等待线程被处理之前,必须在锁存器上调用countDown方法的次数。
EG:
package hb.java.thread; import java.util.concurrent.CountDownLatch; /** * * @author hb * CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0 * ,就只有阻塞等待了。 *JAVA同步器之 * CountDownLatch(不能循环使用,如果需要循环使用可以考虑使用CyclicBarrier) 两种比较常规用法: 1:new * CountDownLatch(1);所有的线程在开始工作前需要做一些准备工作,当所有的线程都准备到位后再统一执行时有用 2:new * CountDownLatch(THREAD_COUNT);当所有的线程都执行完毕后,等待这些线程的其他线程才开始继续执行时有用 */ public class CountDownLatchTest { private static final int THREAD_COUNT = 10; // 在调用startSingal.countDown()之前调用了startSingal.await()的线程一律等待,直到startSingal.countDown()的调用 private static final CountDownLatch startSingal = new CountDownLatch(1); // 在finishedSingal的初始化记数量通过调用finishedSingal.countDown()减少为0时调用了finishedSingal.await()的线程一直阻塞 private static final CountDownLatch finishedSingal = new CountDownLatch( THREAD_COUNT); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < THREAD_COUNT; i++) { new Thread("Task " + i) { public void run() { System.out.println(Thread.currentThread().getName() + " prepared!!"); try { startSingal.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " finished!!"); finishedSingal.countDown(); } ; } .start(); } Thread.sleep(1000); startSingal.countDown(); // 所有的线程被唤醒,同时开始工作.countDown 方法的线程等到计数到达零时才继续 finishedSingal.await(); // 等待所有的线程完成!! System.out.println("All task are finished!!"); } }
package hb.java.thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * * JAVA同步器之Barrier(能够循环使用,当计数器增加到Barrier的初始化计数器之后马上会被置为0为下一次循环使用做准备) * Barrier能够为指定的一个或多个(一般为多个)线程设置一道屏障,只有当所有的线程都到达该屏障后才能一起冲过该屏障继续其他任务 一般可以new * CyclicBarrier(ThreadCount)来进行初始化,也可以new * CyclicBarrier(ThreadCount,RunableAction)当初始化数量的线程都调用 * 了await()方法后触发RunableAction线程,也可以通过初始化一个new * CyclicBarrier(ThreadCount+1)的Barrier在前置线程未执行完成时一直阻塞一个或多个 * 后续线程,这一点类似于CountDownLatch */ public class BarrierTest { private static final int THREAD_COUNT = 10; private static final CyclicBarrier barrier = new CyclicBarrier( THREAD_COUNT + 1, new Runnable() { @Override public void run() { System.out.println("All task are prepared or finished!!"); } } ); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { for (int i = 0; i < THREAD_COUNT; i++) { new Thread("Task " + i) { public void run() { try { System.out.println(Thread.currentThread().getName() + " prepared!!"); barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } // do something System.out.println(Thread.currentThread().getName() + " finished!!"); } ; } .start(); } barrier.await(); // --------------开始准备循环使用-------------- for (int i = 0; i < THREAD_COUNT; i++) { new Thread("Task " + i) { public void run() { // do something System.out.println(Thread.currentThread().getName() + " finished!!"); try { barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } ; } .start(); } barrier.await(); } }
package hb.java.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; public class ExchangerTest { final static Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); public static void main(String[] args) { new Producer("Producer", exchanger).start(); new Consumer("Consumer", exchanger).start(); } static class Producer extends Thread { private Exchanger<List<String>> exchanger; /** * */ public Producer(String threadName, Exchanger<List<String>> exchanger) { super(threadName); this.exchanger = exchanger; } /* * (non-Javadoc) * * @see java.lang.Thread#run() */ @Override public void run() { List<String> products = new ArrayList<String>(); for (int i = 0; i < 10; i++) { products.add("product " + i); } try { List<String> results = exchanger.exchange(products); System.out.println("get results from consumer"); for (String s : results) { System.out.println(s); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class Consumer extends Thread { private Exchanger<List<String>> exchanger; /** * */ public Consumer(String threadName, Exchanger<List<String>> exchanger) { super(threadName); this.exchanger = exchanger; } /* * (non-Javadoc) * * @see java.lang.Thread#run() */ @Override public void run() { List<String> products = new ArrayList<String>(); for (int i = 0; i < 10; i++) { products.add("consumed " + i); } try { List<String> results = exchanger.exchange(products); System.out.println("got products from produces"); for (String s : results) { System.out.println(s); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
总结
以上就是本文关于java并发编程之同步器代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:
深入分析java并发编程中volatile的实现原理
Javaweb应用使用限流处理大量的并发请求详解
java并发学习之BlockingQueue实现生产者消费者详解
如有不足之处,欢迎留言指出。