Java多线程之CAS算法实现线程安全

前言

对于线程安全,我们有说不尽的话题。大多数保证线程安全的方法是添加各种类型锁,使用各种同步机制,用限制对共享的、可变的类变量并发访问的方式来保证线程安全。文本从另一个角度,使用“比较交换算法”(CompareAndSwap)实现同样的需求。我们实现一个简单的“栈”,并逐步重构代码来进行讲解。

本文通俗易懂,不会涉及到过多的底层知识,适合初学者阅读(言外之意是各位大神可以绕道了)。

旅程开始

1.先定个小目标,实现一个“栈”

“栈”(stack)是大家经常使用的抽象数据类型(啥?!不知道,请自行百度)。“栈”满足“后进先出”特性。我们用链表数据结构完成一个简单的实现:

public class Stack<E> {
//链表结构头部节点
private Node<E> head;
/**
* 入栈
* @param item
*/
public void push(E item) {
//为新插入item创建一个新node
Node<E> newHead = new Node<>(item);
if(head!=null){
//将新节点的下一个节点指向原来的头部
newHead.next = head;
}
//将头部指向新的节点
head=newHead;
}
/**
* 出栈
* @return
*/
public E pop() {
if(head==null){
//当前链表为空
return null;
}
//暂存当前节点。
Node<E> oldHead=head;
//将当前节点指向当前节点的下一个节点
head=head.next;
//从暂存的当前节点记录返回数据
return oldHead.item;
}
/**
* 链表中的节点
* @param <E>
*/
private static class Node<E> {
//节点保存的数据
public final E item;
//指向下一个链表中下一个节点
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}

代码使用链表数据结构实现“栈”,在Stack中维护一个链表的“头部节点”,通过对头部节点的操作完成入栈和出栈操作。

我们运行代码测试一下:

public static void main(String[] args) {
Stack<Integer> stack=new Stack<>();
for (int i = 0; i < 3; i++) {
//入栈1、2、3
stack.push(i+1);
}
for (int i = 0; i < 3; i++) {
//出栈3、2、1
System.out.println(stack.pop());
}
}

结果为:

3
2
1

我们使用入栈方法向Stack插入1、2、3,使用出栈方法打印为3、2、1,符合预期。

2.让多线程捣捣乱

前面我们已经测试过我们的方法,符合我们对Stack功能的预期,那是不是任何情况先我们的“栈”都能正常工作呢?

我们运行如下代码:

public static void main(String[] args) {
Stack<Integer> stack=new Stack<>();
int max=3;
Thread[] threads=new Thread[max];
for (int i = 0; i < max; i++) {
int temp=i;
//入栈1、2、3
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
stack.push(temp+1);
}
});
thread.start();
threads[temp]=thread;
}
//等待所有线程完成。
for (int i = 0; i < max; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
}
}
for (int i = 0; i < max; i++) {
//出栈3、2、1
System.out.println(stack.pop());
}
}

你可能运行了很多次,每次运行时除了打印顺序(3、2、1或2、3、1或1、2、3)有变化之外也没有发现其他异常,你可能会说打印顺序变化很正常呀,因为我们的将入栈操作放到异步线程中操作,三个线程的执行过程由系统调度,所以入栈操作的内容自然每次都有可能不同。

好吧,你说的没错,至少从大量运行的结果上看是这样的,但是这就是多线程编程的奇(tao)幻(yan)之处,也许你运行一次没有问题,两次没有问题,一万次也没有问题,但是终有一次你会得到那个意想不到的结果(你也不想得到,因为那是bug)。这就像一个“黑天鹅事件”,小概率但是一定会发生,且发生后对你的系统影响不堪设想。
下面让我带你看看如何得到意料之外的结果:

我们使用调试模式运行上面的程序在Stack中push()方法第一行打一个断点,然后按照表格中的顺序切换不同的线程以单步调试(step over)方式运行run方法中的每一步,直到遇到Resume。

执行顺序 thread-0 thread-1 thread-2
1 Node<E> newHead = new Node<>(item); -- --
2 head=newHead; -- --
3 (Resume) -- --
4 -- Node<E> newHead = new Node<>(item); --
5 -- -- Node<E> newHead = new Node<>(item);
6 -- newHead.next = head; --
7 -- -- newHead.next = head;
8 -- head=newHead; --
9 -- -- head=newHead;
10 -- (Resume)
11 -- -- (Resume)

当你再次看到打印结果,你会发现结果为3、1、null,“黑天鹅”出现了。

异常结果是如何产生的?

1.当thread-0执行到顺序3时,head表示的链表为node(1)。

2.当thread-1执行到顺序10时,head表示的链表为node(2)->node(1)。

3.当thread-2执行到顺序11时,head表示的链表为node(3)->node(1)。

当三个线程都执行完毕之后,head的最终表示为node(3)->node(1),也就是说thread-2将thread-1的执行结果覆盖了。

语句newHead.next = head;是对头部节点的读取。语句head=newHead;是对头部节点的写入操作。这两条语句组成了一个“读取——设置——写入”语句模式(就像n=n+1)。
如果一个线程执行了共享头部变量读取语句,切换其他线程执行了修改共享变量的值,再切回到第一个线程后,第一个线程中修改头部结点的数据就不是最新的数据为依据的,所以修改之后其他线程的修改就被覆盖了。
只有保证这两条语句及中间语句以原子方式执行,才能避免多线程覆盖问题。
大家可以任意调整代码中读取头部节点和写入头部节点的调试顺序,制造多线程交错读写观察不同的异常结果。

为什么我们直接执行无法看到异常结果呢?

因为我们的run方法很简单,在CPU分配的时间片内能运行完,没有出现在不同的运行周期中交错运行的状态。所以我们才要用调试模式这种交错运行。

为什么上文中我说过这种异常一定会发生?

原因在于我们在Stack类中对共享的、可变的变量head进行的多线程读写操作。

怎么才能保证类Stack在多线程情况下运行正确?

引用一段《JAVA并发编程实践》中的话:

无论何时,只要有多于一个的线程访问给定的状态变量,而且其中某个线程会写入该变量,此时必须使用同步来协调线程对该变量的访问。

好吧,看来我们必须采用“同步”方法了,来保障我们的Stack类在多线程并行和单线程串行的情况下都有正确的结果,也就是说将Stack变成一个线程安全的类。

3.让你捣乱,请家长!

既然多线程总来捣乱,我们就请他的家长,让家长管管他,守守规矩,不在捣乱。
我们已经知道了Stack类问什么不能再多线程下正确的运行的原因,所有我们要限制多线程对Stack类中head变量的并发写入,Stack方法中push()和pop()方法都会对head进行写操作,所以要限制这两个方法不能多线程并发访问,所以我们想到了synchronized关键字。

程序重构:

public class SynchronizedStack<E> {
//链表结构头部节点
private Node<E> head;
/**
* 入栈
* @param item
*/
public synchronized void push(E item) {
//为新插入item创建一个新node
Node<E> newHead = new Node<>(item);
if(head!=null){
//将新节点的下一个节点指向原来的头部
newHead.next = head;
}
//将头部指向新的节点
head=newHead;
}
/**
* 出栈
* @return
*/
public synchronized E pop() {
if(head==null){
//当前链表为空
return null;
}
//暂存当前节点。
Node<E> oldHead=head;
//将当前节点指向当前节点的下一个节点
head=head.next;
//从暂存的当前节点记录返回数据
return oldHead.item;
}
/**
* 链表中的节点
* @param <E>
*/
private static class Node<E> {
//节点保存的数据
public final E item;
//指向下一个链表中下一个节点
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}

Stack类替换为SynchronizedStack类的测试方法。

public static void main(String[] args) {
SynchronizedStack<Integer> stack=new SynchronizedStack<>();
int max=3;
Thread[] threads=new Thread[max];
for (int i = 0; i < max; i++) {
int temp=i;
//入栈1、2、3
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
stack.push(temp+1);
}
});
thread.start();
threads[temp]=thread;
}
//等待所有线程完成。
for (int i = 0; i < max; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
}
}
for (int i = 0; i < max; i++) {
//出栈3、2、1
System.out.println(stack.pop());
}
}

我们再次运行第二章为多线程准备的测试方法,发现当执行一个线程的方法时,其他线程的方法均被阻塞,只能等到第一个线程方法执行完成之后才能执行其他线程方法。

我们只不过是在push()pop()方法上加入了synchronized 关键字,就将这两个方法编程了同步方法,在多线程并发的情况下也如同单线程串行调用一般,方法再不能在线程间交替运行,也就不能对head变量做并发更改了,这样修改的Stack类就是线程安全的了。

除了synchronized关键字,还有其他的方式实现加锁吗?

除了synchronized关键字还可以使用java.util.concurrent.locks包中各种锁来保证同步,但是大概思路都是相同的,都是使用阻塞其他线程的方式在达到防止并发写入的目的。

阻塞线程是否会影响执行效率?

如果和不加通过的“栈”类相比,在多线程执行的之后效率一定会有影响,因为同步方法限制了线程之间的并发性,但是为了保证“栈”类的在多线程环境时功能正确,我们不得不做出效率和正确性的权衡。

必须要对整个方法加上锁吗?

我们上面已经分析了需要加锁的范围,只要保证读取头部节点和写入头部节点之间的语句原子性就可以。所以我们可以这样执行。

/**
* 入栈
*
* @param item
*/
public void push(E item) {
//为新插入item创建一个新node
Node<E> newHead = new Node<>(item);
synchronized (this) {
if (head != null) {
//将新节点的下一个节点指向原来的头部
newHead.next = head;
}
//将头部指向新的节点
head = newHead;
}
}
/**
* 出栈
*
* @return
*/
public E pop() {
synchronized (this) {
if (head == null) {
//当前链表为空
return null;
}
//暂存当前节点。
Node<E> oldHead = head;
//将当前节点指向当前节点的下一个节点
head = head.next;
//从暂存的当前节点记录返回数据
return oldHead.item;
}
}

通过synchronized块实现,因为方法比较简单,所以也没有很明显的缩小加锁范围。

除了加锁的方式,是否还有其他方式?

当然,我们还有无锁化编程来解决线程之间同步的问题。这就是下面要介绍的比较交换算法。

4.换个思路,乐观一点

加锁实现线程同步的方式是预防性方式。无论共享变量是否会被并发修改,我们都只允许同一时刻只有一个线程运行方法来阻止并发发生。这就相当于我们假设并发一定会发生,所以比较悲观。

现在我们换一种思路,乐观一点,不要假设对变量的并发修改一定发生,这样也就不用对方法加锁阻止多线程并行运行方法了。但是一旦发生了并发修改,我们想法发解决就是了,解决的方法就是将这个操作重试一下。

继续重构“栈”代码:

public class TreiberStack<E> {
private AtomicReference<Node<E>> headNode = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = headNode.get();
newHead.next = oldHead;
} while (!headNode.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = headNode.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!headNode.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}

这个就是大名鼎鼎的Treiber Stack,我也只是做了一次代码的搬运工。
我们来看看TreiberStack和我们前面的Stack有什么不同。

首先关注第一行:

private AtomicReference<Node<E>> headNode = new AtomicReference<>();

我们用了一个AtomicReference类存储链表的头部节点,这个类可以获取存储对象的最新值,并且在修改存储值时候采用比较交换算法保证原子操作,具体大家可以自行百度。

然后重点关注pop()push()方法中都有的一个代码结构:

//略...
do {
oldHead = headNode.get();
//略...
} while (!headNode.compareAndSet(oldHead, newHead));
//略...

我们AtomicReferenceget()方法最新的获取头部节点,然后调用AtomicReferencecompareAndSet()将设置新头部节点,如果当前线程执行这两端代码的时候如果有其他已经修改了头部节点的值,'compareAndSet()'方法返回false ,表明修改失败,循环继续,否则修改成功,跳出循环。

这样一个代码结构和synchronized关键字修饰的方法一样,都保证了对于头部节点的读取和写入操作及中间代码在一个线程下原子执行,前者是通过其他线程修改过就重试的方式,后者通过阻塞其他线程的方式,一个是乐观的方式,一个是悲观的方式。

大家可以按照前面的例子自己写测试方法测试。

后记

我们通过对“栈”的一步一步代码重构,逐步介绍了什么是线程安全及保证线程安全的各种方法。这里需要说明一点,对于一个类来说,是否需要支持线程安全是由类的使用场景决定,不是有类所提供的功能决定的,如果一个类不会被应用于多线程的情况下也就无需将他转化为线程安全的类。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java并发的CAS原理与ABA问题的讲解

    CAS原理 在计算机科学中,比较和交换(Compare And Swap)是用于实现多线程同步的原子指令. 它将内存位置的内容与给定值进行比较,只有在相同的情况下,将该内存位置的内容修改为新的给定值. 这是作为单个原子操作完成的. 原子性保证新值基于最新信息计算; 如果该值在同一时间被另一个线程更新,则写入将失败. 操作结果必须说明是否进行替换; 这可以通过一个简单的布尔响应(这个变体通常称为比较和设置),或通过返回从内存位置读取的值来完成(摘自维基本科) CAS流程 以AtomicIntege

  • java使用MulticastSocket实现组播

    组播是一种允许源进程将数据包发送到多个目标进程的网络技术.组播源将数据包发送到特定组播组,只有属于该组播组的进程才能接收到数据包.这些进程可以是在同一个物理网络,也可以来自不同的物理网络(只要有组播路由器支持). 组播分为无连接和面向连接组播,但是基本的组播机制是无连接的,我们这里所讲的也是无连接组播. 我们说过使用MulticastSocket类,这个类叫组播数据报套接字类,主要用来发送和接收IP组播报文.MulticastSocket是DatagramSocket的子类,它增加了加入和离开组

  • 学习非阻塞的同步机制CAS

    在研究线程池的执行原理时,看到一段不断循环重试的代码,不理解它的原理,看注释这是CAS的实现,所以学会之后记录下来. 锁有什么劣势 在多线程并发下,可以通过加锁来保证线程安全性,但多个线程同时请求锁,很多情况下避免不了要借助操作系统,线程挂起和恢复会存在很大的开销,并存在很长时间的中断.一些细粒度的操作,例如同步容器,操作往往只有很少代码量,如果存在锁并且线程激烈地竞争,调度的代价很大. 总结来说,线程持有锁,会让其他需要锁的线程阻塞,产生多种风险和开销.加锁是一种悲观方法,线程总是设想在自己持

  • Java多线程之CAS算法实现线程安全

    前言 对于线程安全,我们有说不尽的话题.大多数保证线程安全的方法是添加各种类型锁,使用各种同步机制,用限制对共享的.可变的类变量并发访问的方式来保证线程安全.文本从另一个角度,使用"比较交换算法"(CompareAndSwap)实现同样的需求.我们实现一个简单的"栈",并逐步重构代码来进行讲解. 本文通俗易懂,不会涉及到过多的底层知识,适合初学者阅读(言外之意是各位大神可以绕道了). 旅程开始 1.先定个小目标,实现一个"栈" "栈&q

  • Java多线程之Interrupt中断线程详解

    一.测试代码 https://gitee.com/zture/spring-test/blob/master/multithreading/src/test/java/cn/diswares/blog/InterruptTests.java 二.测试 为了方便理解简介中 interrupt 的概念, 写个 DEMO 测试一下 /** * 调用 interrupt 并不会影响线程正常运行 */ @Test public void testInvokeInterrupt() throws Inter

  • java并发编程之cas详解

    CAS(Compare and swap)比较和替换是设计并发算法时用到的一种技术.简单来说,比较和替换是使用一个期望值和一个变量的当前值进行比较,如果当前变量的值与我们期望的值相等,就使用一个新值替换当前变量的值.这听起来可能有一点复杂但是实际上你理解之后发现很简单,接下来,让我们跟深入的了解一下这项技术. CAS的使用场景 在程序和算法中一个经常出现的模式就是"check and act"模式.先检查后操作模式发生在代码中首先检查一个变量的值,然后再基于这个值做一些操作.下面是一个

  • java多线程之CyclicBarrier的使用方法

    java多线程之CyclicBarrier的使用方法 public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = n

  • Java多线程之readwritelock读写分离的实现代码

    在多线程开发中,经常会出现一种情况,我们希望读写分离.就是对于读取这个动作来说,可以同时有多个线程同时去读取这个资源,但是对于写这个动作来说,只能同时有一个线程来操作,而且同时,当有一个写线程在操作这个资源的时候,其他的读线程是不能来操作这个资源的,这样就极大的发挥了多线程的特点,能很好的将多线程的能力发挥出来. 在Java中,ReadWriteLock这个接口就为我们实现了这个需求,通过他的实现类ReentrantReadWriteLock我们可以很简单的来实现刚才的效果,下面我们使用一个例子

  • Java多线程之volatile关键字及内存屏障实例解析

    前面一篇文章在介绍Java内存模型的三大特性(原子性.可见性.有序性)时,在可见性和有序性中都提到了volatile关键字,那这篇文章就来介绍volatile关键字的内存语义以及实现其特性的内存屏障. volatile是JVM提供的一种最轻量级的同步机制,因为Java内存模型为volatile定义特殊的访问规则,使其可以实现Java内存模型中的两大特性:可见性和有序性.正因为volatile关键字具有这两大特性,所以我们可以使用volatile关键字解决多线程中的某些同步问题. volatile

  • java多线程之Phaser的使用详解

    前面的文章中我们讲到了CyclicBarrier.CountDownLatch的使用,这里再回顾一下CountDownLatch主要用在一个线程等待多个线程执行完毕的情况,而CyclicBarrier用在多个线程互相等待执行完毕的情况. Phaser是java 7 引入的新的并发API.他引入了新的Phaser的概念,我们可以将其看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段.所以Phaser特别适合使用在重复执行或者重用的情况. 基本使用 在CyclicBar

  • 深入分析Java并发编程之CAS

    在Java并发编程的世界里,synchronized 和 Lock 是控制多线程并发环境下对共享资源同步访问的两大手段.其中 Lock 是 JDK 层面的锁机制,是轻量级锁,底层使用大量的自旋+CAS操作实现的. 学习并发推荐<Java并发编程的艺术> 那什么是CAS呢?CAS,compare and swap,即比较并交换,什么是比较并交换呢?在Lock锁的理念中,采用的是一种乐观锁的形式,即多线程去修改共享资源时,不是在修改之前就加锁,而是乐观的认为没有别的线程和自己争锁,就是通过CAS的

  • java多线程之Future和FutureTask使用实例

    Executor框架使用Runnable 作为其基本的任务表示形式.Runnable是一种有局限性的抽象,然后可以写入日志,或者共享的数据结构,但是他不能返回一个值. 许多任务实际上都是存在延迟计算的:执行数据库查询,从网络上获取资源,或者某个复杂耗时的计算.对于这种任务,Callable是一个更好的抽象,他能返回一个值,并可能抛出一个异常.Future表示一个任务的周期,并提供了相应的方法来判断是否已经完成或者取消,以及获取任务的结果和取消任务. public interface Callab

  • Java多线程之Park和Unpark原理

    一.基本使用 它们是 LockSupport 类中的方法 // 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(暂停线程对象) 应用:先 park 再 unpark Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debu

随机推荐