如何使用JCTools实现Java并发程序

概述

在本文中,我们将介绍JCTools(Java并发工具)库。

简单地说,这提供了许多适用于多线程环境的实用数据结构。

非阻塞算法

传统上,在可变共享状态下工作的多线程代码使用锁来确保数据一致性和发布(一个线程所做的更改对另一个线程可见)。

这种方法有许多缺点:

  • 线程在试图获取锁时可能会被阻塞,在另一个线程的操作完成之前不会取得任何进展—这有效地防止了并行性
  • 锁争用越重,JVM处理调度线程、管理争用和等待线程队列的时间就越多,实际工作就越少
  • 如果涉及多个锁,并且它们以错误的顺序获取/释放,则可能出现死锁
  • 优先级反转的危险是可能的——高优先级线程被锁定,试图获得由低优先级线程持有的锁
  • 大多数情况下,使用粗粒度锁会严重损害并行性—细粒度锁需要更仔细的设计,增加锁开销,并且更容易出错

另一种方法是使用非阻塞算法,即任何线程的故障或挂起都不会导致另一个线程的故障或挂起的算法。

如果所涉及的线程中至少有一个能够在任意时间段内取得进展,即在处理过程中不会出现死锁,则非阻塞算法是无锁的。

此外,如果保证每个线程的进程,这些算法是无等待的。

下面是一个非阻塞堆栈示例,它定义了基本状态:

public class ConcurrentStack<E> {

  AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

  private static class Node <E> {
    public E item;
    public Node<E> next;

    // standard constructor
  }
}

还有一些API方法:

public void push(E item){
  Node<E> newHead = new Node<E>(item);
  Node<E> oldHead;

  do {
    oldHead = top.get();
    newHead.next = oldHead;
  } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
  Node<E> oldHead;
  Node<E> newHead;
  do {
    oldHead = top.get();
    if (oldHead == null) {
      return null;
    }
    newHead = oldHead.next;
  } while (!top.compareAndSet(oldHead, newHead));

  return oldHead.item;
}

我们可以看到,该算法使用细粒度比较和交换(CAS)指令,并且是无锁的(即使多个线程调用top.compareAndSet()同时,它们中的一个保证会成功)但不能无等待,因为不能保证CAS最终会对任何特定线程成功。

依赖

首先,让我们将JCTools依赖项添加到pom.xml文件:

<dependency>
  <groupId>org.jctools</groupId>
  <artifactId>jctools-core</artifactId>
  <version>2.1.2</version>
</dependency>

请注意,Maven Central上提供了最新的可用版本。

JCTools队列

该库提供了许多队列以在多线程环境中使用,即一个或多个线程以线程安全的无锁方式写入队列,一个或多个线程以线程安全的无锁方式从队列中读取。

所有队列实现的通用接口是org.jctools.queues.MessagePassingQueue。

队列类型
所有队列都可以根据其生产者/消费者策略进行分类:

  • 单个生产者,单个消费者–此类类使用前缀Spsc命名,例如SpscArrayQueue
  • 单个生产者,多个消费者–使用Spmc前缀,例如SpmcArrayQueue
  • 多个生产者,单个消费者-使用Mpsc前缀,例如MpscArrayQueue
  • 多个生产者、多个消费者—使用Mpmc前缀,例如MpmcArrayQueue

需要注意的是,在内部没有策略检查,也就是说,如果使用不正确,队列可能会无声地发生故障。

例如,下面的测试从两个线程填充单个生产者队列并通过,即使不能保证使用者看到来自不同生产者的数据:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

队列实现

总结以上分类,以下是JCTools队列列表:

  • SpscArrayQueue–单个生产者,单个消费者,在内部使用一个数组,限制容量
  • SpscLinkedQueue–单个生产者,单个消费者,内部使用链表,未绑定容量
  • SpscChunkedArrayQueue–单生产商、单消费者,从初始容量开始,一直增长到最大容量
  • SpscGrowableArrayQueue–单生产者、单消费者,从初始容量开始,一直增长到最大容量。这与SpscChunkedArrayQueue是相同的契约,唯一的区别是内部块管理。建议使用SpscChunkedArrayQueue,因为它有一个简化的实现
  • SpscUnboundedArrayQueue–单个生产者,单个消费者,在内部使用数组,未绑定容量
  • SpmcArrayQueue–单个生产者、多个使用者,在内部使用一个阵列,限制容量
  • MpscArrayQueue—多个生产者、单个消费者在内部使用一个阵列,限制容量
  • MpscLinkedQueue–多个生产者,单个消费者,在内部使用链表,未绑定容量
  • MpmcArrayQueue—多个生产者、多个消费者在内部使用一个阵列,限制容量

原子队列

前面提到的所有队列都使用sun.misc.Unsafe. 然而,随着java9和JEP-260的出现,这个API在默认情况下变得不可访问。

因此,有其他队列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能较差)而不是sun.misc.Unsafe.

它们是从上面的队列生成的,它们的名称中间插入了单词Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。

如果可能,建议使用“常规”队列,并且仅在sun.misc.Unsafe像Hot Java9+和JRockit一样被禁止/无效。

容量

所有JCTools队列也可能具有最大容量或未绑定。当队列已满且受容量限制时,它将停止接受新元素。

在以下示例中,我们:

  • 填满队列
  • 确保在此之后停止接受新元素
  • 从中排出,并确保之后可以添加更多元素

请注意,为了可读性,删除了几个代码语句。

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
  IntStream.range(0, queue.capacity()).forEach(i -> {
    assertThat(queue.offer(i)).isTrue();
  });
  assertThat(queue.offer(queue.capacity())).isFalse();
  startConsuming.countDown();
  awakeProducer.await();
  assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
 IntStream.range(0, 17).boxed().collect(toSet()));

其他数据结构工具

JCTools还提供了一些非队列数据结构。

它们都列在下面:

  • NonBlockingHashMap–一个无锁的ConcurrentHashMap替代方案,具有更好的伸缩性和通常更低的突变成本。它是实现sun.misc.Unsafe,因此,不建议在Java9+或JRockit环境中使用此类
  • NonBlockingHashMapLong–与NonBlockingHashMap类似,但使用基本长键
  • NonBlockingHashSet–一个简单的包装器,围绕着像JDK的java.util.Collections.newSetFromMap()一样的NonBlockingHashMap
  • NonBlockingIdentityHashMap–与NonBlockingHashMap类似,但按标识比较键。
  • NonBlockingSetInt–一个多线程位向量集,实现为一个原始long数组。在无声自动装箱的情况下工作无效

性能测试

让我们使用JMH来比较JDK的ArrayBlockingQueue和JCTools队列的性能。JMH是Sun/Oracle JVM gurus提供的一个开源微基准框架,它保护我们不受编译器/JVM优化算法的不确定性的影响。

请注意,为了提高可读性,下面的代码段遗漏了几个语句。

public class MpmcBenchmark {

  @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
  public volatile String implementation;

  public volatile Queue<Long> queue;

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(PRODUCER_THREADS_NUMBER)
  public void write(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && !queue.offer(1L)) {
      // intentionally left blank
    }
  }

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(CONSUMER_THREADS_NUMBER)
  public void read(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && queue.poll() == null) {
      // intentionally left blank
    }
  }
}

结果:

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

我们可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了两倍。

使用JCTools的缺点

使用JCTools有一个重要的缺点——不可能强制正确使用库类。例如,考虑在我们的大型成熟项目中开始使用MpscArrayQueue的情况(注意,必须有一个使用者)。

不幸的是,由于项目很大,有可能有人出现编程或配置错误,现在从多个线程读取队列。这个系统看起来像以前一样工作,但现在有可能消费者错过了一些信息。这是一个真正的问题,可能会有很大的影响,是很难调试。

理想情况下,应该可以运行具有特定系统属性的系统,该属性强制JCTools确保线程访问策略。例如,本地/测试/暂存环境(而不是生产环境)可能已启用它。遗憾的是,JCTools没有提供这样的属性。

另一个需要考虑的问题是,尽管我们确保JCTools比JDK的对应工具快得多,但这并不意味着我们的应用程序获得了与我们开始使用自定义队列实现时相同的速度。大多数应用程序不会在线程之间交换很多对象,而且大多是I/O绑定的。

结论

现在,我们对JCTools提供的实用程序类有了基本的了解,并了解了它们在重载下与JDK的对应类相比的性能。

总之,只有当我们在线程之间交换大量对象时,才有必要使用该库,即使这样,也有必要非常小心地保留线程访问策略。

以上示例的完整源代码地址:https://github.com/eugenp/tutorials/tree/master/libraries-5

JCTools git地址:https://github.com/JCTools/JCTools

以上就是如何使用JCTools实现Java并发程序的详细内容,更多关于使用JCTools实现Java并发程序的资料请关注我们其它相关文章!

(0)

相关推荐

  • java迭代器移除元素出现并发修改异常的原因及解决

    迭代器(Iterator的对象)主要用于遍历集合,体现的就是迭代器模式. Iterator接口定义了以下四种方法. boolean hasNext():如果集合还没遍历完就返回true. Object next():返回集合里的下一个元素. void remove():删除集合里上一次next方法返回的元素. void forEachRemaining(Consumer action):这是java8新增的默认方法,可用Lambda表达式遍历数组. 使用迭代器遍历元素时不能不能通过Collect

  • Java 并发编程中如何创建线程

    简介 线程是基本的调度单位,它被包含在进程之中,是进程中的实际运作单位,它本身是不会独立存在.一个进程至少有一个线程,进程中的多个线程共享进程的资源. Java中创建线程的方式有多种如继承Thread类.实现Runnable接口.实现Callable接口以及使用线程池的方式,线程池将在后面文章中单独介绍,这里先介绍另外三种方式. 继承Thread类 优点:在run方法里可以用this获取到当前线程. 缺点:由于Java不支持多继承,所以如果继承了Thread类后就不能再继承其他类. public

  • Java工作中常见的并发问题处理方法总结

    问题复现 1. "设备Aの奇怪分身" 时间回到很久很久以前的一个深夜,那时我开发的多媒体广告播放控制系统刚刚投产上线,公司开出的第一家线下生鲜店里,几十个大大小小的多媒体硬件设备正常联网后,正由我一台一台的注册及接入到已经上线的多媒体广告播控系统中. 注册过程简述如下: 每一个设备注册到系统中后,相应的在数据库设备表中都会新增一条记录,来存储这个设备的各项信息. 本来一切都有条不紊的进行着,直到设备A的注册打破了这默契的宁静-- 设备A注册完成后,我突然发现,数据库设备表中,新增了两条

  • java并发包中CountDownLatch和线程池的使用详解

    1.CountDownLatch 现在做的这个华为云TaurusDB比赛中,参考的之前参加过阿里的PolarDB大赛的两个大佬的代码,发现都有用到CountDownLatch这个类,之前看代码的时候也看过,但是没有搞得很明白,自己写也写不出来,在此自己先学习一下. 字面理解:CountDownLatch:数量减少的门栓. 创建这样一个门栓 CountDownLatch countDownLatch = new CountDownLatch(count); 参数:count,门栓的计数次数. 在所

  • java并发学习-CountDownLatch实现原理全面讲解

    CountDownLatch在多线程并发编程中充当一个计时器的功能,并且维护一个count的变量,并且其操作都是原子操作. 如下图,内部有下static final的Sync类继承自AQS. 该类主要通过countDown()和await()两个方法实现功能的,首先通过建立CountDownLatch对象,并且传入参数即为count初始值. 如果一个线程调用了await()方法,那么这个线程便进入阻塞状态,并进入阻塞队列. 如果一个线程调用了countDown()方法,则会使count-1:当c

  • Java利用Redis实现高并发计数器的示例代码

    业务需求中经常有需要用到计数器的场景:譬如一个手机号一天限制发送5条短信.一个接口一分钟限制多少请求.一个接口一天限制调用多少次等等.使用Redis的Incr自增命令可以轻松实现以上需求.以一个接口一天限制调用次数为例: /** * 是否拒绝服务 * @return */ private boolean denialOfService(String userId){ long count=JedisUtil.setIncr(DateUtil.getDate()+"&"+user

  • 详解Java并发编程之内置锁(synchronized)

    简介 synchronized在JDK5.0的早期版本中是重量级锁,效率很低,但从JDK6.0开始,JDK在关键字synchronized上做了大量的优化,如偏向锁.轻量级锁等,使它的效率有了很大的提升. synchronized的作用是实现线程间的同步,当多个线程都需要访问共享代码区域时,对共享代码区域进行加锁,使得每一次只能有一个线程访问共享代码区域,从而保证线程间的安全性. 因为没有显式的加锁和解锁过程,所以称之为隐式锁,也叫作内置锁.监视器锁. 如下实例,在没有使用synchronize

  • Java高并发BlockingQueue重要的实现类详解

    ArrayBlockingQueue 有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部. public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列元素 */ final Object

  • Java并发包之CopyOnWriteArrayList类的深入讲解

    前言 大家在学习Java的过程中,或者工作中,始终都绕不开集合.在单线程环境下,ArrayList就可以满足要求.多线程时,我们可以使用CopyOnWriteArrayList来保证数据安全.下面我们一起来看看CopyOnWriteArrayList类中的一些值得学习的方法. CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)上进行的,也就是使用了写时复制策略实现的. 说明:代码部分,均基于JDK1.8 一.添加元素

  • 如何使用JCTools实现Java并发程序

    概述 在本文中,我们将介绍JCTools(Java并发工具)库. 简单地说,这提供了许多适用于多线程环境的实用数据结构. 非阻塞算法 传统上,在可变共享状态下工作的多线程代码使用锁来确保数据一致性和发布(一个线程所做的更改对另一个线程可见). 这种方法有许多缺点: 线程在试图获取锁时可能会被阻塞,在另一个线程的操作完成之前不会取得任何进展-这有效地防止了并行性 锁争用越重,JVM处理调度线程.管理争用和等待线程队列的时间就越多,实际工作就越少 如果涉及多个锁,并且它们以错误的顺序获取/释放,则可

  • Java并发程序入门介绍

    今天看了看Java并发程序,写一写入门程序,并设置了线程的优先级. class Elem implements Runnable{ public static int id = 0; private int cutDown = 5; private int priority; public void setPriority(int priority){ this.priority = priority; } public int getPriority(){ return this.priori

  • Java并发程序刺客之假共享的原理及复现

    目录 前言 假共享(False Sharing) 缓存行 假共享 Java代码复现假共享 复现假共享 JDK解决假共享 从更低层次C语言看假共享 总结 前言 前段时间在各种社交平台“雪糕刺客”这个词比较火,简单的来说就是雪糕的价格非常高!其实在并发程序当中也有一个刺客,如果在写并发程序的时候不注意不小心,这个刺客很可能会拖累我们的并发程序,让我们并发程序执行的效率变低,让并发程序付出很大的代价,这和“雪糕刺客”当中的“刺客”的含义是一致的.这个并发程序当中的刺客就是——假共享(False Sha

  • 深入解析Java并发程序中线程的同步与线程锁的使用

    synchronized关键字 synchronized,我们谓之锁,主要用来给方法.代码块加锁.当某个方法或者代码块使用synchronized时,那么在同一时刻至多仅有有一个线程在执行该段代码.当有多个线程访问同一对象的加锁方法/代码块时,同一时间只有一个线程在执行,其余线程必须要等待当前线程执行完之后才能执行该代码段.但是,其余线程是可以访问该对象中的非加锁代码块的. synchronized主要包括两种方法:synchronized 方法.synchronized 块. synchron

  • JAVA并发编程有界缓存的实现详解

    JAVA并发编程有界缓存的实现 1.有界缓存的基类 package cn.xf.cp.ch14; /** * *功能:有界缓存实现基类 *时间:下午2:20:00 *文件:BaseBoundedBuffer.java *@author Administrator * * @param <V> */ public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head

  • Java 并发编程:volatile的使用及其原理解析

    Java并发编程系列[未完]: •Java 并发编程:核心理论 •Java并发编程:Synchronized及其实现原理 •Java并发编程:Synchronized底层优化(轻量级锁.偏向锁) •Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) •Java 并发编程:volatile的使用及其原理 一.volatile的作用 在<Java并发编程:核心理论>一文中,我们已经提到过可见性.有序性及原子性问题,通常情况下我们可以通过Synchroniz

  • Java并发编程示例(十):线程组

    对线程分组是Java并发API提供的一个有趣功能.我们可以将一组线程看成一个独立单元,并且可以随意操纵线程组中的线程对象.比如,可以控制一组线程来运行同样的任务,无需关心有多少线程还在运行,还可以使用一次中断调用中断所有线程的执行. Java提供了ThreadGroup类来控制一个线程组.一个线程组可以通过线程对象来创建,也可以由其他线程组来创建,生成一个树形结构的线程. 根据<Effective Java>的说明,不再建议使用ThreadGroup.建议使用Executor. --D瓜哥特此

  • Java并发编程示例(九):本地线程变量的使用

    共享数据是并发程序最关键的特性之一.对于无论是继承Thread类的对象,还是实现Runnable接口的对象,这都是一个非常周重要的方面. 如果创建了一个实现Runnable接口的类的对象,并使用该对象启动了一系列的线程,则所有这些线程共享相同的属性.换句话说,如果一个线程修改了一个属性,则其余所有线程都会受此改变的影响. 有时,我们更希望能在线程内单独使用,而不和其他使用同一对象启动的线程共享.Java并发接口提供了一种很清晰的机制来满足此需求,该机制称为本地线程变量.该机制的性能也非常可观.

  • Java并发编程示例(五):线程休眠与恢复

    有时,我们需要在指定的时间点中断正在执行的线程.比如,每分钟检查一次传感器状态的线程,其余时间,线程不需要做任何事情.在此期间,线程不需要使用计算机的任何资源.过了这段时间之后,并且当Java虚拟机调度了该线程,则该线程继续执行.为此,你可以使用Thread类的sleeep()方法.该方法以休眠的方式来推迟线程的执行,而且整数类型的参数则指明休眠的毫秒数.当调用sleep()方法,休眠时间结束后,Java虚拟机分配给线程CPU运行时间,线程就会继续执行. 另一种是用sleep()方法的方式是通过

  • Java并发编程(CyclicBarrier)实例详解

    Java并发编程(CyclicBarrier)实例详解 前言: 使用JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作.有时候,我们启动N个线程去做一件事情,只有当这N个线程都达到某一个临界点的时候,我们才能继续下面的工作,就是说如果这N个线程中的某一个线程先到达预先定义好的临界点,它必须等待其他N-1线程也到达这个临界点,接下来的工作才能继续,只要这N个线程中有1个线程没有到达所谓的临界点,其他线程就算抢先到达了临界点,也只能等待,只有所有这N

随机推荐