Java中常见的并发控制手段浅析

目录
  • 前言
  • 1.1 同步代码块
  • 1.2 CAS自旋方式
  • 1.3 锁
  • 1.4 阻塞队列
  • 1.5 信号量Semaphore
  • 1.6 计数器CountDownLatch
  • 1.7 栅栏 CyclicBarrier
  • 1.8 guava令牌桶
  • 1.9 滑动窗口TimeWindow
  • 1.10 小结

前言

单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些

  • 同步代码块
  • CAS自旋
  • 阻塞队列,令牌桶等

1.1 同步代码块

通过同步代码块,来确保同一时刻只会有一个线程执行对应的业务逻辑,常见的使用姿势如下

public synchronized doProcess() {
    // 同步代码块,只会有一个线程执行
}

一般推荐使用最小区间使用原则,尽量不要直接在方法上加synchronized,比如经典的双重判定单例模式

public class Single {
  private static volatile Single instance;
  private Single() {}
  public static Single getInstance() {
      if (instance == null) {
          synchronized(Single.class) {
              if (instance == null) instance = new Single();
          }
      }
      return instance;
  }
}

1.2 CAS自旋方式

比如AtomicXXX原子类中的很多实现,就是借助unsafe的CAS来实现的,如下

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

// unsafe 实现
// cas + 自选,不断的尝试更新设置,直到成功为止
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

1.3 锁

jdk本身提供了不少的锁,为了实现单实例的并发控制,我们需要选择写锁;如果支持多读,单实例写,则可以考虑读写锁;一般使用姿势也比较简单

private void doSome(ReentrantReadWriteLock.WriteLock writeLock) {
    try {
        writeLock.lock();
        System.out.println("持有锁成功 " + Thread.currentThread().getName());
        Thread.sleep(1000);
        System.out.println("执行完毕! " + Thread.currentThread().getName());
        writeLock.unlock();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void lock() throws InterruptedException {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();

    Thread.sleep(20000);
}

1.4 阻塞队列

借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化n个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制

demo版演示,下面指定队列长度为2,表示最大并发数控制为2;设置为1时,可以实现单线程的访问控制

AtomicInteger cnt = new AtomicInteger();

private void consumer(LinkedBlockingQueue<Integer> queue) {
    try {
        // 同步阻塞拿去数据
        int val = queue.take();
        Thread.sleep(2000);
        System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 添加数据
        System.out.println("结束 " + Thread.currentThread());
        queue.offer(cnt.getAndAdd(1));
    }
}

@Test
public void blockQueue() throws InterruptedException {
    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
    queue.add(cnt.getAndAdd(1));
    queue.add(cnt.getAndAdd(1));

    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();

    Thread.sleep(10000);
}

1.5 信号量Semaphore

上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数

private void semConsumer(Semaphore semaphore) {
    try {
        //同步阻塞,尝试获取信号
        semaphore.acquire(1);
        System.out.println("成功拿到信号,执行: " + Thread.currentThread());
        Thread.sleep(2000);
        System.out.println("执行完毕,释放信号: " + Thread.currentThread());
        semaphore.release(1);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void semaphore() throws InterruptedException {
    Semaphore semaphore = new Semaphore(2);

    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();

    Thread.sleep(20_000);
}

1.6 计数器CountDownLatch

计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一

这种场景下的使用姿势一般如下

重点:countDownLatch 计数为0时放行

@Test
public void countDown() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        try {
            System.out.println("do something in " + Thread.currentThread());
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    new Thread(() -> {
        try {
            System.out.println("do something in t2: " + Thread.currentThread());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    countDownLatch.await();
    System.out.printf("结束");
}

1.7 栅栏 CyclicBarrier

CyclicBarrier的作用与上面的CountDownLatch相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而CountDownLatch则不行

一个简单的demo

private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {
    // 等待达到条件才放行
    try {
        System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());
        Thread.sleep(sleep);
        int index = barrier.await();
        System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void testCyclicBarrier() throws InterruptedException {
    // 到达两个工作线程才能继续往后面执行
    CyclicBarrier barrier = new CyclicBarrier(2);
    // 三秒之后,下面两个线程的才会输出 开始执行
    new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();

    Thread.sleep(4000);
    // 重置,可以再次使用
    barrier.reset();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    Thread.sleep(10000);
}

1.8 guava令牌桶

guava封装了非常简单的并发控制工具类RateLimiter,作为单机的并发控制首选

一个控制qps为2的简单demo如下:

private void guavaProcess(RateLimiter rateLimiter) {
    try {
        // 同步阻塞方式获取
        System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());
        rateLimiter.acquire();
        System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Test
public void testGuavaRate() throws InterruptedException {
    // 1s 中放行两个请求
    RateLimiter rateLimiter = RateLimiter.create(2.0d);
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();

    Thread.sleep(20_000);
}

输出:

准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219

1.9 滑动窗口TimeWindow

没有找到通用的滑动窗口jar包,一般来讲滑动窗口更适用于平滑的限流,解决瞬时高峰问题

一个供参考的实现方式:

固定大小队列,队列中每个数据代表一个时间段的计数,

访问 -》 队列头拿数据(注意不出队)-》判断是否跨时间段 -》 同一时间段,计数+1 -》跨时间段,新增数据入队,若

扔不进去,表示时间窗满,队尾数据出队

问题:当流量稀疏时,导致不会自动释放过期的数据

解决方案:根据时间段设置定时任务,模拟访问操作,只是将计数改为 + 0

1.10 小结

本文给出了几种单机版的并发控制的技术手段,主要目的是介绍了一些可选的方案,技术细节待后续补全完善,当然如果有其他的建议,欢迎评论交流

到此这篇关于Java中常见的并发控制手段的文章就介绍到这了,更多相关Java并发控制手段内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 简要分析Java多进程编程的并发控制

    进程间的通讯无非就是读写文件,socket通讯或者使用共享内存. java没法管理内存,其实他也是靠创建映像文件来实现的.  共享内存在java中的实现 在jdk1.4中提供的类MappedByteBuffer为我们实现共享内存提供了较好的方法.该缓冲区实际上是一个磁盘文件的内存映像.二者的变化将保持同步,即内存数据发生变化会立刻反映到磁盘文件中,这样会有效的保证共享内存的实现. 将共享内存和磁盘文件建立联系的是文件通道类:FileChannel.该类的加入是JDK为了统一对外部设备(文件.网络

  • Java并发控制机制详解

    在一般性开发中,笔者经常看到很多同学在对待java并发开发模型中只会使用一些基础的方法.比如Volatile,synchronized.像Lock和atomic这类高级并发包很多人并不经常使用.我想大部分原因都是来之于对原理的不属性导致的.在繁忙的开发工作中,又有谁会很准确的把握和使用正确的并发模型呢? 所以最近基于这个思想,本人打算把并发控制机制这部分整理成一篇文章.既是对自己掌握知识的一个回忆,也是希望这篇讲到的类容能帮助到大部分开发者. 并行程序开发不可避免地要涉及多线程.多任务的协作和数

  • Java中常见的并发控制手段浅析

    目录 前言 1.1 同步代码块 1.2 CAS自旋方式 1.3 锁 1.4 阻塞队列 1.5 信号量Semaphore 1.6 计数器CountDownLatch 1.7 栅栏 CyclicBarrier 1.8 guava令牌桶 1.9 滑动窗口TimeWindow 1.10 小结 前言 单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些 同步代码块 CAS自旋 锁 阻塞队列,令牌桶等 1.1 同步代码块 通过同步代码块,来确保同一时刻只会有一个线程执行

  • Java工作中常见的并发问题处理方法总结

    问题复现 1. "设备Aの奇怪分身" 时间回到很久很久以前的一个深夜,那时我开发的多媒体广告播放控制系统刚刚投产上线,公司开出的第一家线下生鲜店里,几十个大大小小的多媒体硬件设备正常联网后,正由我一台一台的注册及接入到已经上线的多媒体广告播控系统中. 注册过程简述如下: 每一个设备注册到系统中后,相应的在数据库设备表中都会新增一条记录,来存储这个设备的各项信息. 本来一切都有条不紊的进行着,直到设备A的注册打破了这默契的宁静-- 设备A注册完成后,我突然发现,数据库设备表中,新增了两条

  • 详解5种Java中常见限流算法

    目录 01固定窗口 02滑动窗口 03漏桶算法 04令牌桶 05滑动日志 06分布式限流 07总结 1.瞬时流量过高,服务被压垮? 2.恶意用户高频光顾,导致服务器宕机? 3.消息消费过快,导致数据库压力过大,性能下降甚至崩溃? ...... 在高并发系统中,出于系统保护角度考虑,通常会对流量进行限流:不但在工作中要频繁使用,而且也是面试中的高频考点. 今天我们将图文并茂地对常见的限流算法分别进行介绍,通过各个算法的特点,给出限流算法选型的一些建议,并给出Java语言实现的代码示例. 01固定窗

  • Java中同步与并发用法分析

    本文较为详细的分析了Java中同步与并发的用法.分享给大家供大家参考.具体分析如下: 1.同步容器类包括两部分:vector和hashtable 另一类是同步包装类,由Collections.synchronizedXXX创建.同步容器对容器的所有状态进行串行访问,从而实现线程安全. 它们存在如下问题: a) 对于符合操作,需要额外的锁保护.比如迭代,缺少则添加等条件运算. b) toString,hashCode,equals都会间接的调用迭代,都需要注意并发.   2.java5.0中的并发

  • Java中常见的日期操作(取值、转换、加减、比较)

    Java 的开发过程中免不了与 Date 类型纠缠,准备总结一下项目经常使用的日期相关操作,JDK 版本 1.7,如果能够帮助大家节约那么几分钟起身活动一下,去泡杯咖啡,便是极好的,嘿嘿.当然,我只提供了可行的解决方案,并不保证是最佳实践,欢迎讨论. 1. 日期取值 在旧版本 JDK 的时代,有不少代码中日期取值利用了 java.util.Date 类,但是由于 Date 类不便于实现国际化,其实从 JDK1.1 开始,就更推荐使用 java.util.Calendar 类进行时间和日期方面的处

  • Java中常见死锁与活锁的实例详解

    本文介绍了Java中常见死锁与活锁的实例详解,分享给大家,具体如下: 顺序死锁:过度加锁,导致由于执行顺序的原因,互相持有对方正在等待的锁 资源死锁:多个线程在相同的资源上发生等待 由于调用顺序而产生的死锁 public class Test { Object leftLock = new Object(); Object rightLock = new Object(); public static void main(String[] args) { final Test test = ne

  • java中常见的死锁以及解决方法代码

    在java中我们常常使用加锁机制来确保线程安全,但是如果过度使用加锁,则可能导致锁顺序死锁.同样,我们使用线程池和信号量来限制对资源的使用,但是这些被限制的行为可能会导致资源死锁.java应用程序无法从死锁中恢复过来,因此设计时一定要排序那些可能导致死锁出现的条件. 1.一个最简单的死锁案例 当一个线程永远地持有一个锁,并且其他线程都尝试获得这个锁时,那么它们将永远被阻塞.在线程A持有锁L并想获得锁M的同时,线程B持有锁M并尝试获得锁L,那么这两个线程将永远地等待下去.这种就是最简答的死锁形式(

  • java中常见的6种线程池示例详解

    之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的六种线程池如下 线程池名称 描述 FixedThreadPool 核心线程数与最大线程数相同 SingleThreadExecutor 一个线程的线程池 CachedThreadPool 核心线程为0,最大线程数为Integer. MAX_VALUE ScheduledThreadPool 指定核心线程数的定时

  • Java中多线程与并发_volatile关键字的深入理解

    一.volatile关键字 volatile是JVM提供的一种轻量级的同步机制,特性: 1.保证内存可见性 2.不保证原子性 3.防止指令重排序 二.JMM(Java Memory Model) Java内存模型中规定了所有的变量都存储在主内存中(如虚拟机物理内存中的一部分),每条线程还有自己的工作内存(如CPU中的高速缓存),线程的工作内存中保存了该线程使用到的变量到主内存的副本拷贝,线程对变量的所有操作(读取.赋值)都必须在工作内存中进行,而不能直接读写主内存中的变量.不同线程之间无法直接访

  • JAVA中常见异常类

    1. java.lang.nullpointerexception 这个异常大家肯定都经常遇到,异常的解释是"程序遇上了空指针",简单地说就是调用了未经初始化的对象或者是不存在的对象,这个错误经常出现在创建图片,调用数组这些操作中,比如图片未经初始化,或者图片创建时的路径错误等等.对数组操作中出现空指针,很多情况下是一些刚开始学习编程的朋友常犯的错误,即把数组的初始化和数组元素的初始化混淆起来了.数组的初始化是对数组分配需要的空间,而初始化后的数组,其中的元素并没有实例化,依然是空的,

随机推荐