Java并发中的ABA问题学习与解决方案

目录
  • 1.简介
  • 2.Compare and swap
  • 3. ABA问题
    • 3.1 ABA问题的实际场景:账户余额修改
    • 3.2 账户余额修改时产生的问题
  • 4.银行取款问题代码演示
  • 5.值类型与引用类型的场景
  • 6. 解决方法
  • 7. Java中的解决方法
  • 8. 总结

1.简介

我们将了解在并发编程中的ABA问题。同时学习引起该问题的根因及问题解决办法。

2.Compare and swap

为了理解根本原因,首先回顾一下Compare and swap的概念。Compare and Swap (CAS)在无锁算法中是一种常见的技术。能够保证并发修改共享数据时,一个线程将共享内存修改后,另一线程尝试对共享内存的修改会失败。

我们每次更新时,通过两种信息来实现:要更新的值及原始值。首先Compare and swap 会比较原始值和当前获取到的值。如果相等,那么将值更新为要设置的值。

3. ABA问题

当执行campare and swap会出现失败的情况。例如,一个线程先读取共享内存数据值A,随后因某种原因,线程暂时挂起,同时另一个线程临时将共享内存数据值先改为B,随后又改回为A。随后挂起线程恢复,并通过CAS比较,最终比较结果将会无变化。这样会通过检查,这就是ABA问题。 在CAS比较前会读取原始数据,随后进行原子CAS操作。这个间隙之间由于并发操作,最终可能会带来问题。

3.1 ABA问题的实际场景:账户余额修改

为了通过实例演示ABA问题。我们创建一个银行账户类,该类维护一个整型变量记录账户余额。该类有两个函数:一个用于存钱,一个用于取钱。这些操作使用CAS来修改账户余额。

3.2 账户余额修改时产生的问题

我们来考虑两个线程操作同一个账户时的场景。当线程1取钱时,先读取余额,随后通过CAS操作进行比较。然后,可能由于某些原因,线程1可能发生阻塞。与此同时,线程2同样通过CAS机制,在线程1挂起时,在同一个账户上执行两个操作。首先,改变原始值,这个值已经被线程1在刚才读取。随后线程2又将这个值改为原始值。

一旦线程1恢复后,在线程1看来,没有发生任何变化。cas将会执行成功。

4.银行取款问题代码演示

创建一个Account类,balance记录账户余额。transactionCount记录成功执行的事务数。currentThreadCASFailureCount来记录CAS操作失败的次数。

接着我们实现一个存款的方法deposit,与取款方法withdraw。为了演示ABA问题,同时实现一个maybeWait方法进行延迟等待。

最终的代码如下:

    public class Account {
        private AtomicInteger balance;
        private AtomicInteger transactionCount;
        private ThreadLocal<Integer> currentThreadCASFailureCount;
        public Account() {
            this.balance = new AtomicInteger(0);
            this.transactionCount = new AtomicInteger(0);
            this.currentThreadCASFailureCount = new ThreadLocal<>();
            this.currentThreadCASFailureCount.set(0);
        }
        public int getBalance() {
            return balance.get();
        }
        public int getTransactionCount() {
            return transactionCount.get();
        }
        public int getCurrentThreadCASFailureCount() {
            return Optional.ofNullable(currentThreadCASFailureCount.get()).orElse(0);
        }
        public boolean withdraw(int amount) {
            int current = getBalance();
            maybeWait();
            boolean result = balance.compareAndSet(current, current - amount);
            if (result) {
                transactionCount.incrementAndGet();
            } else {
                int currentCASFailureCount = currentThreadCASFailureCount.get();
                currentThreadCASFailureCount.set(currentCASFailureCount + 1);
            }
            return result;
        }
        private void maybeWait() {
            if ("thread1".equals(Thread.currentThread().getName())) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        public boolean deposit(int amount) {
            int current = balance.get();
            boolean result = balance.compareAndSet(current, current + amount);
            if (result) {
                transactionCount.incrementAndGet();
            } else {
                int currentCASFailureCount = currentThreadCASFailureCount.get();
                currentThreadCASFailureCount.set(currentCASFailureCount + 1);
            }
            return result;
        }
    }

接着我们对上述代码进行测试。通过maybeWait方法,模拟出现ABA问题。

    @Test
    public void abaProblemTest() throws InterruptedException {
        final int defaultBalance = 50;
        final int amountToWithdrawByThread1 = 20;
        final int amountToWithdrawByThread2 = 10;
        final int amountToDepositByThread2 = 10;
        Assert.assertEquals(0, account.getTransactionCount());
        Assert.assertEquals(0, account.getCurrentThreadCASFailureCount());
        account.deposit(defaultBalance);
        Assert.assertEquals(1, account.getTransactionCount());
        Thread thread1 = new Thread(() -> {
            // this will take longer due to the name of the thread
            Assert.assertTrue(account.withdraw(amountToWithdrawByThread1));
            // thread 1 fails to capture ABA problem
            Assert.assertNotEquals(1, account.getCurrentThreadCASFailureCount());
        }, "thread1");
        Thread thread2 = new Thread(() -> {
            Assert.assertTrue(account.deposit(amountToDepositByThread2));
            Assert.assertEquals(defaultBalance + amountToDepositByThread2, account.getBalance());
            // this will be fast due to the name of the thread
            Assert.assertTrue(account.withdraw(amountToWithdrawByThread2));
            // thread 1 didn't finish yet, so the original value will be in place for it
            Assert.assertEquals(defaultBalance, account.getBalance());
            Assert.assertEquals(0, account.getCurrentThreadCASFailureCount());
        }, "thread2");
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        // compareAndSet operation succeeds for thread 1
        Assert.assertEquals(defaultBalance - amountToWithdrawByThread1, account.getBalance());
        //but there are other transactions
        Assert.assertNotEquals(2, account.getTransactionCount());
        // thread 2 did two modifications as well
        Assert.assertEquals(4, account.getTransactionCount());
    }

5.值类型与引用类型的场景

上面的例子中使用了getBalance()方法获取了一个值类型数据。由于使用的是值类型,虽然出现ABA问题,但未对结果造成影响。如果我们操作的是引用类型,那么最终会保存不同的引用对象,会带来意外的结果。

对于引用类型,下面以链栈为例说明。

  • 线程A希望将A结点出栈,此时读取栈顶元素A,准备执行CAS操作,此时由于某种原因阻塞。
  • 线程B开始执行,执行出栈A、B。随后将D、C、A结点压入栈中。
  • 线程A恢复执行。接着执行CAS,比较发现栈顶结点A没有被修改。随后将栈顶结点改为B。由于B线程在第二步时,已经将B结点移除,A线程修改后发生错误。栈的结构发生破坏。

接着我们通过下面的代码进行演示:

    static class Stack {
        private AtomicReference<Node> top = new AtomicReference<>();
        static class Node {
            String value;
            Node next;
            public Node (String value) {
                this.value = value;
            }
        }
        //出栈
        public Node pop(int time) {
            Node newTop;
            Node oldTop;
            do {
                oldTop = top.get();
                if (oldTop == null) {
                    return null;
                }
                newTop = oldTop.next;
                try {
                    //休眠一段时间,模拟ABA问题
                    TimeUnit.SECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } while (!top.compareAndSet(oldTop, newTop));
            return oldTop;
        }
        public void push (Node node) {
            Node oldTop;
            do {
                oldTop = top.get();
                node.next = oldTop;
            } while (!top.compareAndSet(oldTop, node));
        }
        public AtomicReference<Node> getTop() {
            return top;
        }
    }
    @Test
    public void testStack() throws Exception{
        Stack stack = new Stack();
        Stack.Node a = new Stack.Node("A");
        Stack.Node b = new Stack.Node("B");
        // 初始化栈结构
        stack.push(b);
        stack.push(a);
        // ABA 测试
        Thread t1 = new Thread(() -> {
            stack.pop(2);
        });

        Stack.Node c = new Stack.Node("C");
        Stack.Node d = new Stack.Node("D");
        Thread t2 = new Thread(() -> {
            stack.pop(0);
            stack.pop(0);
            stack.push(d);
            stack.push(c);
            stack.push(a);
        });
        //
        t1.start();
        t2.start();
        TimeUnit.SECONDS.sleep(5);
        Stack.Node top = stack.getTop().get();
        do {
            System.out.println(top.value);
            top = top.next;
        } while (top != null);
    }

6. 解决方法

  • hazard pointer:首先出现问题是因为,多个线程操作共享数据,并未感知到别的线程正在对共享数据进行操作。通过hazard pointer介绍[1],其基本思想就是每个线程维护一个操作列表,在操作一个结点时将其记录。如果一个线程要做结点变更,先搜索线程操作列表,看是否有其它线程操作。如果有则此次操作执行失败。
  • 不变性:从上述栈的例子中可以看到,在对结点A进行比较时,由于A依然是多个线程共享并复用,因此CAS会成功。如果每次操作时,新创建对象而不是复用。这样CAS就会正常提示失败。但这样可能会创建大量对象。

7. Java中的解决方法

Java中提供了两个类来解决这个问题。

  • AtomicStampedReference
  • AtomicMarkableReference

在原有类的基础上,除了比较与修改期待的值外,增加了一个时间戳。对时间戳也进行CAS操作。这也称为双重CAS。从上例中看到。每次修改一个结点,其时间戳都发生变化。这样即使共享一个复用结点,最终CAS也能返回正常的结果。

8. 总结

本文介绍了CAS产生ABA问题的背景,通用解决办法及Java中的解决办法。对于值类型有时发生ABA问题可能并不会造成问题。但对于引用类型,就可能造成歧义,同时破坏数据结构。通过链栈的演示,我们可以有所了解ABA产生的问题。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 浅谈Java中ABA问题及避免

    本文主要研究的是关于Java中ABA问题及避免的相关内容,具体如下. 在<Java并发实战>一书的第15章中有一个用原子变量实现的并发栈,代码如下: public class Node { public final String item; public Node next; public Node(String item){ this.item = item; } } public class ConcurrentStack { AtomicReference<Node> top

  • 详解java 中的CAS与ABA

    1. 独占锁: 属于悲观锁,有共享资源,需要加锁时,会以独占锁的方式导致其它需要获取锁才能执行的线程挂起,等待持有锁的钱程释放锁.传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁.Java中synchronized和ReentrantLock等独占锁就是悲观锁的思想. 1.1 乐观锁的操作 多线程并发修改一个值时的实现: public class SimulatedCAS { //加volatile的目的是利用其happens-before原则

  • Java并发的CAS原理与ABA问题的讲解

    CAS原理 在计算机科学中,比较和交换(Compare And Swap)是用于实现多线程同步的原子指令. 它将内存位置的内容与给定值进行比较,只有在相同的情况下,将该内存位置的内容修改为新的给定值. 这是作为单个原子操作完成的. 原子性保证新值基于最新信息计算; 如果该值在同一时间被另一个线程更新,则写入将失败. 操作结果必须说明是否进行替换; 这可以通过一个简单的布尔响应(这个变体通常称为比较和设置),或通过返回从内存位置读取的值来完成(摘自维基本科) CAS流程 以AtomicIntege

  • Java并发中的ABA问题学习与解决方案

    目录 1.简介 2.Compare and swap 3. ABA问题 3.1 ABA问题的实际场景:账户余额修改 3.2 账户余额修改时产生的问题 4.银行取款问题代码演示 5.值类型与引用类型的场景 6. 解决方法 7. Java中的解决方法 8. 总结 1.简介 我们将了解在并发编程中的ABA问题.同时学习引起该问题的根因及问题解决办法. 2.Compare and swap 为了理解根本原因,首先回顾一下Compare and swap的概念.Compare and Swap (CAS)

  • 聊聊Java并发中的Synchronized

    1 引言 在多线程并发编程中Synchronized一直是元老级角色,很多人都会称呼它为重量级锁,但是随着Java SE1.6对Synchronized进行了各种优化之后,有些情况下它并不那么重了,本文详细介绍了Java SE1.6中为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁,以及锁的存储结构和升级过程. 2 术语定义 术语 英文 说明 CAS Compare and Swap 比较并设置.用于在硬件层面上提供原子性操作.在 Intel 处理器中,比较并交换通过指令cmpxch

  • java 并发中的原子性与可视性实例详解

    java 并发中的原子性与可视性实例详解 并发其实是一种解耦合的策略,它帮助我们把做什么(目标)和什么时候做(时机)分开.这样做可以明显改进应用程序的吞吐量(获得更多的CPU调度时间)和结构(程序有多个部分在协同工作).做过java Web开发的人都知道,Java Web中的Servlet程序在Servlet容器的支持下采用单实例多线程的工作模式,Servlet容器为你处理了并发问题. 原子性 原子是世界上的最小单位,具有不可分割性.比如 a=0:(a非long和double类型) 这个操作是不

  • 浅析java并发中的Synchronized关键词

    如果在多线程的环境中,我们经常会遇到资源竞争的情况,比如多个线程要去同时修改同一个共享变量,这时候,就需要对资源的访问方法进行一定的处理,保证同一时间只有一个线程访问. java提供了synchronized关键字,方便我们实现上述操作. 为什么要同步 我们举个例子,我们创建一个类,提供了一个setSum的方法: public class SynchronizedMethods { private int sum = 0; public void calculate() { setSum(get

  • JAVA并发中VOLATILE关键字的神奇之处详解

    并发编程中的三个概念: 1.原子性 在Java中,对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行. 2.可见性 对于可见性,Java提供了volatile关键字来保证可见性. 当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值. 而普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保

  • java并发中DelayQueue延迟队列原理剖析

    介绍 DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作. 源码分析 DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的Priority

  • 分析java并发中的wait notify notifyAll

    一.前言 java 面试是否有被问到过,sleep 和 wait 方法的区别,关于这个问题其实不用多说,大多数人都能回答出最主要的两点区别: sleep 是线程的方法, wait / notify / notifyAll 是 Object 类的方法: sleep 不会释放当前线程持有的锁,到时间后程序会继续执行,wait 会释放线程持有的锁并挂起,直到通过 notify 或者 notifyAll 重新获得锁. 另外还有一些参数.异常等区别,不细说了.本文重点记录一下 wait / notify

  • Java并发中的Fork/Join 框架机制详解

    什么是 Fork/Join 框架 Fork/Join 框架是一种在 JDk 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务.通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果. 这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个

  • 浅谈Java并发中ReentrantLock锁应该怎么用

    目录 1.重入锁 说明 2.中断响应 说明 3.锁申请等待限时 tryLock(long, TimeUnit) tryLock() 4.公平锁 说明 源码(JDK8) 重入锁可以替代关键字 synchronized . 在 JDK5.0 的早期版本中,重入锁的性能远远优于关键字 synchronized , 但从 JDK6.0 开始, JDK 在关键字 synchronized 上做了大量的优化,使得两者的性能差距并不大. 重入锁使用 ReentrantLock 实现 1.重入锁 package

  • Java并发中死锁、活锁和饥饿是什么意思

    解答 死锁是指两个或者两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们将无法推进下去. 如果线程的智力不够, 且都秉承着"谦让"的原则,主动将资源释放给他人使用,那么就会导致资源不断地在两个线程间跳动,而没有一个线程可以同时拿到所有资源正常执行.这种情况就是活锁. 饥饿是指某一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行.比如它的线程优先级可能太低,而高优先级的线程不断抢占它需要的资源,导致低优先级线程无法工作.  补充

随机推荐