java并发编程之同步器代码示例

同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger

队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它内部使用了一个volatiole修饰的int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,这时就需要使用同步器来提供的3个方法(getState()、setState(intnewState)/和compareAndSetState(intexpect,intupdate))来进行操作,因为他们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取个释放的方法来供自定义同步组件使用,同步器既可以独占式的获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReadWriteLock、和CountDownLatch等)。

同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。他们二者直接的关系就是:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节;同步器则是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好的隔离了使用者与实现者所需关注的领域。

同步器的设计是基于模版方法模式实现的,使用者需要继承同步器并重写这顶的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模版方法,而这些模版方法将会调用使用者重写的方法。

同步器提供的模版方法基本上分为3类:独占式获取锁与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模版方法来实现自己的同步语义。倒计数器锁存器是一次性障碍,允许一个或者多个线程等待一个或者多个其它线程来做某些事情。CountDownLatch的唯一构造器带一个int类型的参数,这个int参数是指允许所有在等待线程被处理之前,必须在锁存器上调用countDown方法的次数。

EG:

package hb.java.thread;
import java.util.concurrent.CountDownLatch;
/**
 *
 * @author hb
 *     CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0
 *     ,就只有阻塞等待了。 *JAVA同步器之
 *     CountDownLatch(不能循环使用,如果需要循环使用可以考虑使用CyclicBarrier) 两种比较常规用法: 1:new
 *     CountDownLatch(1);所有的线程在开始工作前需要做一些准备工作,当所有的线程都准备到位后再统一执行时有用 2:new
 *     CountDownLatch(THREAD_COUNT);当所有的线程都执行完毕后,等待这些线程的其他线程才开始继续执行时有用
 */
public class CountDownLatchTest {
	private static final int THREAD_COUNT = 10;
	// 在调用startSingal.countDown()之前调用了startSingal.await()的线程一律等待,直到startSingal.countDown()的调用
	private static final CountDownLatch startSingal = new CountDownLatch(1);
	// 在finishedSingal的初始化记数量通过调用finishedSingal.countDown()减少为0时调用了finishedSingal.await()的线程一直阻塞
	private static final CountDownLatch finishedSingal = new CountDownLatch(
				THREAD_COUNT);
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					System.out.println(Thread.currentThread().getName()
												+ " prepared!!");
					try {
						startSingal.await();
					}
					catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName()
												+ " finished!!");
					finishedSingal.countDown();
				}
				;
			}
			.start();
		}
		Thread.sleep(1000);
		startSingal.countDown();
		// 所有的线程被唤醒,同时开始工作.countDown 方法的线程等到计数到达零时才继续
		finishedSingal.await();
		// 等待所有的线程完成!!
		System.out.println("All task are finished!!");
	}
}
package hb.java.thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 *
 * JAVA同步器之Barrier(能够循环使用,当计数器增加到Barrier的初始化计数器之后马上会被置为0为下一次循环使用做准备)
 * Barrier能够为指定的一个或多个(一般为多个)线程设置一道屏障,只有当所有的线程都到达该屏障后才能一起冲过该屏障继续其他任务 一般可以new
 * CyclicBarrier(ThreadCount)来进行初始化,也可以new
 * CyclicBarrier(ThreadCount,RunableAction)当初始化数量的线程都调用
 * 了await()方法后触发RunableAction线程,也可以通过初始化一个new
 * CyclicBarrier(ThreadCount+1)的Barrier在前置线程未执行完成时一直阻塞一个或多个
 * 后续线程,这一点类似于CountDownLatch
 */
public class BarrierTest {
	private static final int THREAD_COUNT = 10;
	private static final CyclicBarrier barrier = new CyclicBarrier(
	      THREAD_COUNT + 1, new Runnable() {
		@Override
		        public void run() {
			System.out.println("All task are prepared or finished!!");
		}
	}
	);
	public static void main(String[] args) throws InterruptedException,
	      BrokenBarrierException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					try {
						System.out.println(Thread.currentThread().getName()
						                + " prepared!!");
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					// do something
					System.out.println(Thread.currentThread().getName()
					              + " finished!!");
				}
				;
			}
			.start();
		}
		barrier.await();
		// --------------开始准备循环使用--------------
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					// do something
					System.out.println(Thread.currentThread().getName()
					              + " finished!!");
					try {
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				;
			}
			.start();
		}
		barrier.await();
	}
}
package hb.java.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
	final static Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
	public static void main(String[] args) {
		new Producer("Producer", exchanger).start();
		new Consumer("Consumer", exchanger).start();
	}
	static class Producer extends Thread {
		private Exchanger<List<String>> exchanger;
		/**
     *
     */
		public Producer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/*
     * (non-Javadoc)
     *
     * @see java.lang.Thread#run()
     */
		@Override
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("product " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("get results from consumer");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	static class Consumer extends Thread {
		private Exchanger<List<String>> exchanger;
		/**
     *
     */
		public Consumer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/*
     * (non-Javadoc)
     *
     * @see java.lang.Thread#run()
     */
		@Override
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("consumed " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("got products from produces");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

总结

以上就是本文关于java并发编程之同步器代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:

深入分析java并发编程中volatile的实现原理

Javaweb应用使用限流处理大量的并发请求详解

java并发学习之BlockingQueue实现生产者消费者详解

如有不足之处,欢迎留言指出。

(0)

相关推荐

  • Java从同步容器到并发容器的操作过程

    引言 容器是Java基础类库中使用频率最高的一部分,Java集合包中提供了大量的容器类来帮组我们简化开发,我前面的文章中对Java集合包中的关键容器进行过一个系列的分析,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性,最简单的就是通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行.这种方式虽然可以达到线程安全的目的,但存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁.其次这种一刀切的做法在高并发情况下性

  • Java同步容器和并发容器详解

    同步容器 在 Java 中,同步容器主要包括 2 类: Vector.Stack.HashTableCollections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXxxx 等方法) Collections类中提供的静态工厂方法创建的类 Vector 实现了 List 接口,Vector 实际上就是一个数组,和 ArrayList 类似,但是Vector 中的方法都是 synchronized 方法,即进行了同步措施. Stack 也是一个同步容器,它

  • 详解Java利用同步块synchronized()保证并发安全

    本文实例为大家分享了Java利用同步块synchronized()保证并发安全的具体代码,供大家参考,具体内容如下 package day10; /** * 同步块 * 有效地缩小同步范围 * 可以在保证并发安全的同时尽可能提高并发效率 * * 实例:模拟两个人同时进店买衣服,为提高效率 * 只在试衣服阶段进行同步排队过程,其他阶段无需排队. * @author kaixu * */ public class SyncDemo2 { public static void main(String[

  • Java中同步与并发用法分析

    本文较为详细的分析了Java中同步与并发的用法.分享给大家供大家参考.具体分析如下: 1.同步容器类包括两部分:vector和hashtable 另一类是同步包装类,由Collections.synchronizedXXX创建.同步容器对容器的所有状态进行串行访问,从而实现线程安全. 它们存在如下问题: a) 对于符合操作,需要额外的锁保护.比如迭代,缺少则添加等条件运算. b) toString,hashCode,equals都会间接的调用迭代,都需要注意并发.   2.java5.0中的并发

  • 浅谈Java并发 J.U.C之AQS:CLH同步队列

    CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态. 在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread).状态(waitStatus).前驱节点(prev).后继节点(next),其定义如下: static final class Node

  • Java并发之传统线程同步通信技术代码详解

    本文研究的主要是Java并发之传统线程同步通信技术的相关代码示例,具体介绍如下. 先看一个问题: 有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次--如此往返执行50次. 看完这个问题,很明显要用到线程间的通信了, 先分析一下思路:首先肯定要有两个线程,然后每个线程中肯定有个50次的循环,因为每个线程都要往返执行任务50次,主线程的任务是执行5次,子线程的任务是执行10次.线程间通信技术主要用到wait()方法和notify()方法.wait()方

  • java并发编程之同步器代码示例

    同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作.最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger 队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它内部使用了一个volatiole修饰的int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作. 同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽

  • java并发编程包JUC线程同步CyclicBarrier语法示例

    目录 1.创建CyclicBarrier障碍 2.在CyclicBarrier障碍处等待 3.CyclicBarrierAction 4.CyclicBarrier例子 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口.ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue.PriorityBlockingQueue.SynchronousQueue.BlockingDeque接口.ConcurrentHashMap

  • Java并发编程同步器CountDownLatch

    CountDownLatch 在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景.在 CountDownLatch 出现之前般都使用线程的join()方法来实现这一点,但是 join 方法不够灵活,不能够满足不同场景的需要,所以 JDK 开发组提供了 CountDownLatch 这个类,我们前面介绍的例子使用 CoumtDownLatch 会更优雅. 使用CountDownLatch 的代码如下: package LockSu

  • Java多线程编程实现socket通信示例代码

    流传于网络上有关Java多线程通信的编程实例有很多,这一篇还算比较不错,代码可用.下面看看具体内容. TCP是Tranfer Control Protocol的 简称,是一种面向连接的保证可靠传输的协议.通过TCP协议传输,得到的是一个顺序的无差错的数据流.发送方和接收方的成对的两个socket之间必须建 立连接,以便在TCP协议的基础上进行通信,当一个socket(通常都是server socket)等待建立连接时,另一个socket可以要求进行连接,一旦这两个socket连接起来,它们就可以

  • Java编程异常简单代码示例

    练习1 写一个方法void triangle(int a,int b,int c),判断三个参数是否能构成一个三角形.如果不能则抛出异常IllegalArgumentException,显示异常信息:a,b,c "不能构成三角形":如果可以构成则显示三角形三个边长.在主方法中得到命令行输入的三个整数,调用此方法,并捕获异常. 两边之和大于第三边:a+b>c 两边之差小于第三边:c-a package 异常; import java.util.Arrays; import java

  • java编程队列数据结构代码示例

    队列是一种特殊的线性表,只允许在表的前端进行删除,在表的后端进行插入,表的前端称为(front)队头,表的后端称为(rear)队尾. 所以队列跟生活的场景很是相似,在电影院买电影票,人们排成一排,第一个人进入队尾最先到达队头后买票进入影院,后面排队的人按照排队的次序买到票后进入影院. 所以 队列是一种先进先出的数据结构(FIFO). 编程实现对循环链队列的入队和出队操作. ⑴根据输入的队列长度n和各元素值建立一个带头结点的循环链表表示的队列(循环链队列),并且只设一个尾指针来指向尾结点,然后输出

  • Java并发编程Callable与Future的应用实例代码

    本文主要探究的是java并发编程callable与future的使用,分享了相关实例代码,具体介绍如下. 我们都知道实现多线程有2种方式,一种是继承Thread,一种是实现Runnable,但这2种方式都有一个缺陷,在任务完成后无法获取返回结果.要想获得返回结果,就得使用Callable,Callable任务可以有返回值,但是没法直接从Callable任务里获取返回值:想要获取Callabel任务的返回值,需要用到Future.所以Callable任务和Future模式,通常结合起来使用. 试想

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • Java并发编程(CyclicBarrier)实例详解

    Java并发编程(CyclicBarrier)实例详解 前言: 使用JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作.有时候,我们启动N个线程去做一件事情,只有当这N个线程都达到某一个临界点的时候,我们才能继续下面的工作,就是说如果这N个线程中的某一个线程先到达预先定义好的临界点,它必须等待其他N-1线程也到达这个临界点,接下来的工作才能继续,只要这N个线程中有1个线程没有到达所谓的临界点,其他线程就算抢先到达了临界点,也只能等待,只有所有这N

  • Java 并发编程之线程挂起、恢复与终止

    挂起和恢复线程 Thread 的API中包含两个被淘汰的方法,它们用于临时挂起和重启某个线程,这些方法已经被淘汰,因为它们是不安全的,不稳定的.如果在不合适的时候挂起线程(比如,锁定共享资源时),此时便可能会发生死锁条件--其他线程在等待该线程释放锁,但该线程却被挂起了,便会发生死锁.另外,在长时间计算期间挂起线程也可能导致问题. 下面的代码演示了通过休眠来延缓运行,模拟长时间运行的情况,使线程更可能在不适当的时候被挂起: public class DeprecatedSuspendResume

随机推荐