详解Java中CountDownLatch异步转同步工具类

使用场景

由于公司业务需求,需要对接socket、MQTT等消息队列。
众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线。无法像http请求有回复。
下发指令给硬件时,需要校验此次数据下发是否成功。
用户体验而言,点击按钮就要知道此次的下发成功或失败。

如上图模型,

第一种方案使用Tread.sleep
优点:占用资源小,放弃当前cpu资源
缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响应第二种方案使用CountDownLatch

package com.lzy.demo.delay;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CountDownLatchPool {

    //countDonw池
    private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
    //延迟队列
    private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();

    private volatile static boolean flag =false;
    //单线程池
    private final static ExecutorService t = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<>(1));

    public static void addCountDownLatch(Integer messageId) {
        CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) );
        if(countDownLatch == null){
            countDownLatch = countDownLatchMap.get(messageId);
        }
        try {
            addDelayQueue(messageId);
            countDownLatch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("阻塞等待结束~~~~~~");
    }

    public static void removeCountDownLatch(Integer messageId){
        CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
        if(countDownLatch == null)
            return;
        countDownLatch.countDown();
        countDownLatchMap.remove(messageId);
        System.out.println("清除Map数据"+countDownLatchMap);
    }

    private static void addDelayQueue(Integer messageId){
        delayQueue.add(new MessageDelayQueueUtil(messageId));
        clearMessageId();
    }

    private static void clearMessageId(){
        synchronized (CountDownLatchPool.class){
            if(flag){
                return;
            }
            flag = true;
        }
        t.execute(()->{
            while (delayQueue.size() > 0){
                System.out.println("进入线程并开始执行");
                try {
                    MessageDelayQueueUtil take = delayQueue.take();
                    Integer messageId1 = take.getMessageId();
                    removeCountDownLatch(messageId1);
                    System.out.println("清除队列数据"+messageId1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            flag = false;
            System.out.println("结束end----");
        });
    }

    public static void main(String[] args) throws InterruptedException {
        /*
        测试超时清空map
        new Thread(()->addCountDownLatch(1)).start();
        new Thread(()->addCountDownLatch(2)).start();
        new Thread(()->addCountDownLatch(3)).start();
        */
        //提前创建线程,清空countdown
        new Thread(()->{
            try {
                Thread.sleep(500L);
                removeCountDownLatch(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //开始阻塞
        addCountDownLatch(1);
    	//通过调整上面的sleep我们发现阻塞市场取决于countDownLatch.countDown()执行时间
    	System.out.println("阻塞结束----");
    }
}
class MessageDelayQueueUtil implements Delayed {

    private Integer messageId;
    private long avaibleTime;

    public Integer getMessageId() {
        return messageId;
    }

    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }

    public long getAvaibleTime() {
        return avaibleTime;
    }

    public void setAvaibleTime(long avaibleTime) {
        this.avaibleTime = avaibleTime;
    }

    public MessageDelayQueueUtil(Integer messageId){
        this.messageId = messageId;
        //avaibleTime = 当前时间+ delayTime
        //重试3次,每次3秒+1秒的延迟
        this.avaibleTime=3000*3+1000 + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
    }
}

由于socket并不确定每次都会有数据返回,所以map的数据会越来越大,最终导致内存溢出
需定时清除map内的无效数据。
可以使用DelayedQuene延迟队列来处理,相当于给对象添加一个过期时间

使用方法 addCountDownLatch 等待消息,异步回调消息清空removeCountDownLatch

到此这篇关于详解Java中CountDownLatch异步转同步工具类的文章就介绍到这了,更多相关CountDownLatch异步转同步工具类内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java countDownLatch如何实现多线程任务阻塞等待

    我这里需要通过多线程去处理数据,然后在所有数据都处理完成后再往下执行.这里就用到了CountDownLatch.把countdownlatch作为参数传入到每个线程类里,在线程中处理完数据后执行countdown方法.在所有countdownlatch归零后,其await方法结束阻塞状态而往下执行. 具体代码如下: 将多线程任务提交线程池 @Bean(name = "ggnews_executor") public Executor postExecutor() { ThreadPoo

  • JAVA CountDownLatch(倒计时计数器)用法实例

    这篇文章主要介绍了JAVA CountDownLatch(倒计时计数器)用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 方法说明: public void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程.如果当前计数大于零,则将计数减少.如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程. 如果当前计数等于零,则不发生任何操作. public boolean await(long timeout

  • 详解Java线程同步器CountDownLatch

    Java程序有的时候在主线程中会创建多个线程去执行任务,然后在主线程执行完毕之前,把所有线程的任务进行汇总,以前可以用线程的join方法,但是这个方法不够灵活,我们可以使用CountDownLatch类,实现更优雅,而且使用线程池的话,可没有办法调用线程的join方法的呀! 一.简单使用CountDownLatch 直接使用线程: package com.example.demo.study; import java.util.concurrent.CountDownLatch; public

  • Java并发编程之CountDownLatch源码解析

    一.前言 CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0.可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的. 二.使用 一个线程等待其它线程执行完再继续执行 ...... CountDownLatch cdl = new CountDownLatch(10); Executor

  • 浅谈java并发之计数器CountDownLatch

    CountDownLatch简介 CountDownLatch顾名思义,count + down + latch = 计数 + 减 + 门闩(这么拆分也是便于记忆=_=) 可以理解这个东西就是个计数器,只能减不能加,同时它还有个门闩的作用,当计数器不为0时,门闩是锁着的:当计数器减到0时,门闩就打开了. 如果你感到懵比的话,可以类比考生考试交卷,考生交一份试卷,计数器就减一.直到考生都交了试卷(计数器为0),监考老师(一个或多个)才能离开考场.至于考生是否做完试卷,监考老师并不关注.只要都交了试

  • 详解Java中CountDownLatch异步转同步工具类

    使用场景 由于公司业务需求,需要对接socket.MQTT等消息队列. 众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线.无法像http请求有回复. 下发指令给硬件时,需要校验此次数据下发是否成功. 用户体验而言,点击按钮就要知道此次的下发成功或失败. 如上图模型, 第一种方案使用Tread.sleep 优点:占用资源小,放弃当前cpu资源 缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响

  • 详解Java中String JSONObject JSONArray List<实体类>转换

    JSON使用阿里的fastJson为依赖包 gradle依赖管理如下: compile group: 'com.alibaba', name: 'fastjson', version:'1.2.41' 1.String转JSONObject 前言:String 是JSONObject格式的字符串 eg: JSONObject jSONObject = JSONObject.parseObject(String); 2.String转JSONArray 前言:String 是JSONArray格式

  • 详解Java中异步转同步的六种方法

    目录 一.问题 应用场景 二.分析 三.实现方法 1.轮询与休眠重试机制 2.wait/notify 3.Lock Condition 4.CountDownLatch 5.CyclicBarrier 6.LockSupport 一.问题 应用场景 应用中通过框架发送异步命令时,不能立刻返回命令的执行结果,而是异步返回命令的执行结果. 那么,问题来了,针对应用中这种异步调用,能不能像同步调用一样立刻获取到命令的执行结果,如何实现异步转同步? 二.分析 首先,解释下同步和异步 同步,就是发出一个调

  • Java中CountDownLatch进行多线程同步详解及实例代码

    Java中CountDownLatch进行多线程同步详解 CountDownLatch介绍 在前面的Java学习笔记中,总结了Java中进行多线程同步的几个方法: 1.synchronized关键字进行同步. 2.Lock锁接口及其实现类ReentrantLock.ReadWriteLock锁实现同步. 3.信号量Semaphore实现同步. 其中,synchronized关键字和Lock锁解决的是多个线程对同一资源的并发访问问题.信号量Semaphore解决的是多副本资源的共享访问问题. 今天

  • 详解java中保持compareTo和equals同步

    详解java中保持compareTo和equals同步 摘要 : 介绍重写equlas()和comparable接口,两者进行不相同的判断.从而使两者的对应的list.indexOf()与 Collections.binarySearch()得到的不一样. 在Java中我们常使用Comparable接口来实现排序,其中compareTo是实现该接口方法.我们知道compareTo返回0表示两个对象相等,返回正数表示大于,返回负数表示小于.同时我们也知道equals也可以判断两个对象是否相等,那么

  • 详解Java中HashSet和TreeSet的区别

    详解Java中HashSet和TreeSet的区别 1. HashSet HashSet有以下特点: 不能保证元素的排列顺序,顺序有可能发生变化 不是同步的 集合元素可以是null,但只能放入一个null 当向HashSet集合中存入一个元素时,HashSet会调用该对象的hashCode()方法来得到该对象的hashCode值,然后根据 hashCode值来决定该对象在HashSet中存储位置. 简单的说,HashSet集合判断两个元素相等的标准是两个对象通过equals方法比较相等,并且两个

  • 详解java中的四种代码块

    在java中用{}括起来的称为代码块,代码块可分为以下四种: 一.简介 1.普通代码块: 类中方法的方法体 2.构造代码块: 构造块会在创建对象时被调用,每次创建时都会被调用,优先于类构造函数执行. 3.静态代码块: 用static{}包裹起来的代码片段,只会执行一次.静态代码块优先于构造块执行. 4.同步代码块: 使用synchronized(){}包裹起来的代码块,在多线程环境下,对共享数据的读写操作是需要互斥进行的,否则会导致数据的不一致性.同步代码块需要写在方法中. 二.静态代码块和构造

  • 详解Java中AbstractMap抽象类

    jdk1.8.0_144 下载地址:http://www.jb51.net/softs/551512.html AbstractMap抽象类实现了一些简单且通用的方法,本身并不难.但在这个抽象类中有两个方法非常值得关注,keySet和values方法源码的实现可以说是教科书式的典范. 抽象类通常作为一种骨架实现,为各自子类实现公共的方法.上一篇我们讲解了Map接口,此篇对AbstractMap抽象类进行剖析研究. Java中Map类型的数据结构有相当多,AbstractMap作为它们的骨架实现实

  • 详解java中的阻塞队列

    阻塞队列简介 阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同: 其次是一个支持阻塞操作的队列,即: 当队列满时,会阻塞执行插入操作的线程,直到队列不满. 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空. 阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁: 而对于阻塞与唤醒机制则有与锁绑定的Condition实现 应用场景:生产者消费者模式 java中的阻塞队列 java中的阻塞队列根据容量可以分为有界队列和无界队

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

随机推荐