Java 多线程并发编程提高数据处理效率的详细过程

工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息。需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询、更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作。

因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率。

这里使用可重用固定线程数的线程池 FixedThreadPool,并使用 CountDownLatch 并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:

  • 首先,通过 Runtime.getRuntime().availableProcessors() 方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。
  • 其次,判断任务的特性,如果为计算密集型任务则设置线程数为 CPU 线程数+1,如果为 IO 密集型任务则设置线程数为 2 * CPU 线程数,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。
  • 之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象 CountDownLatch,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。
  • 然后,调用 executorService.execute() 方法,重写 run 方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。
  • 最后,当所有子线程执行完成后,关闭线程池。

在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:

public ResponseData updateHostDept() {
		// ...
		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;
        // the number of each group data
        int eachGroupNum = hostMapList.size() / threadNum;
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executorService.execute(()->{
                try {
                    for (Map map : group) {
                    	// update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                	// let counter minus one
                    countDownLatch.countDown();
                }
            });
        }
        try {
        	// main thread donnot execute until all child threads finish
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executorService.shutdown();
        return ResponseData.success();
}

那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min 下降到了 8-10 min,大大提高了执行效率。

需要注意的是,这里使用的 newFixedThreadPool 创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个无界队列,默认值为 Integer.MAX_VALUE 极有可能会造成 OOM 问题。因此,一般可以使用 ThreadPoolExecutor 来创建线程池,自己可以指定等待队列中的线程个数,避免产生 OOM 问题。

public ResponseData updateHostDept() {
		// ...
		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;
        // the number of each group data
        int eachGroupNum = hostMapList.size() / threadNum;
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100));
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executor.execute(()->{
                try {
                    for (Map map : group) {
                    	// update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                	// let counter minus one
                    countDownLatch.countDown();
                }
            });
        }
        try {
        	// main thread donnot execute until all child threads finish
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executor.shutdown();
        return ResponseData.success();
}

在上述的代码中,核心线程数和最大线程数分别为 5 和 8,并没有设置的很大的值,因为如果如果设置的很大,线程间频繁的上下文切换也会增加时间消耗,反而不能最大程度上发挥多线程的优势。至于如何选择合适的参数,需要根据机器的参数以及任务的类型综合考虑决定。

最后补充一点,如果想要通过非编码的方式获取机器的 CPU 线程个数也很简单,windows 系统通过任务管理器,选择 “性能”,便可以查看 CPU 线程个数的情况,如下图所示:

从上图可以看到,我的机器中内核是八个 CPU,但是通过超线程技术一个物理的 CPU 核心可以模拟成两个逻辑 CPU 线程,因此我的机器是支持8核16线程的。

到此这篇关于Java 多线程并发编程提高数据处理效率的文章就介绍到这了,更多相关Java 多线程提高数据处理效率内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java多线程并发编程和锁原理解析

    这篇文章主要介绍了Java多线程并发编程和锁原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等方案后,问题得到解决. 加锁方案见下文. 二.乐观锁 & 悲观锁 1.乐观锁 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁

  • Java多线程之并发编程的基石CAS机制详解

    目录 一.CAS机制简介 1.1.悲观锁和乐观锁更新数据方式 1.2.什么是CAS机制 1.3.CAS与sychronized比较 1.4.Java中都有哪些地方应用到了CAS机制呢? 1.5.CAS 实现自旋锁 1.6.CAS机制优缺点 1>ABA问题 2>可能会消耗较高的CPU 3>不能保证代码块的原子性 二.Java提供的CAS操作类--Unsafe类 2.1.Unsafe类简介 2.2.Unsafe类的使用 三.CAS使用场景 3.1.使用一个变量统计网站的访问量 3.2.现在我

  • Java使用JDBC向MySQL数据库批次插入10W条数据(测试效率)

    使用JDBC连接MySQL数据库进行数据插入的时候,特别是大批量数据连续插入(100000),如何提高效率呢? 在JDBC编程接口中Statement 有两个方法特别值得注意: 通过使用addBatch()和executeBatch()这一对方法可以实现批量处理数据. 不过值得注意的是,首先需要在数据库链接中设置手动提交,connection.setAutoCommit(false),然后在执行Statement之后执行connection.commit(). import java.io.Bu

  • Java并发编程之volatile与JMM多线程内存模型

    目录 一.通过程序看现象 二.为什么会产生这种现象(JMM模型)? 三.MESI 缓存一致性协议 一.通过程序看现象 在开始为大家讲解Java 多线程缓存模型之前,我们先看下面的这一段代码.这段代码的逻辑很简单:主线程启动了两个子线程,一个线程1.一个线程2.线程1先执行,sleep睡眠2秒钟之后线程2执行.两个线程使用到了一个共享变量shareFlag,初始值为false.如果shareFlag一直等于false,线程1将一直处于死循环状态,所以我们在线程2中将shareFlag设置为true

  • 深入理解Java多线程与并发编程

    一.多线程三大特性 多线程有三大特性:原子性.可见性.有序性. 原子性 (跟数据库的事务特性中的原子性类似,数据库的原子性体现是dml语句执行后需要进行提交): 理解:即一个操作或多个操作,要么全部执行并且执行的过程中不会被任何因素打断,要么都不执行. 一个很经典的例子就是银行账户转账问题: 比如从账户A向账户B转1000元,那么必然包括2个操作:从账户A减去1000元,往账户B加上1000元.这2个操作必须要具备原子性才能保证不出现一些意外的问题. 我们操作数据也是如此,比如i = i+1:其

  • Java 多线程并发编程提高数据处理效率的详细过程

    工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息.需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询.更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作. 因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率. 这里使用可重用固定线程数的线程池 FixedThreadPo

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

  • Java多线程并发编程(互斥锁Reentrant Lock)

    Java 中的锁通常分为两种: 通过关键字 synchronized 获取的锁,我们称为同步锁,上一篇有介绍到:Java 多线程并发编程 Synchronized 关键字. java.util.concurrent(JUC)包里的锁,如通过继承接口 Lock 而实现的 ReentrantLock(互斥锁),继承 ReadWriteLock 实现的 ReentrantReadWriteLock(读写锁). 本篇主要介绍 ReentrantLock(互斥锁). ReentrantLock(互斥锁)

  • Java多线程并发编程 Volatile关键字

    volatile 关键字是一个神秘的关键字,也许在 J2EE 上的 JAVA 程序员会了解多一点,但在 Android 上的 JAVA 程序员大多不了解这个关键字.只要稍了解不当就好容易导致一些并发上的错误发生,例如好多人把 volatile 理解成变量的锁.(并不是) volatile 的特性: 具备可见性 保证不同线程对被 volatile 修饰的变量的可见性. 有一被 volatile 修饰的变量 i,在一个线程中修改了此变量 i,对于其他线程来说 i 的修改是立即可见的. 如: vola

  • 深入探究Java多线程并发编程的要点

    关键字synchronized synchronized关键可以修饰函数.函数内语句.无论它加上方法还是对象上,它取得的锁都是对象,而不是把一段代码或是函数当作锁. 1,当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一段时间只能有一个线程得到执行,而另一个线程只有等当前线程执行完以后才能执行这块代码. 2,当一个线程访问object中的一个synchronized(this)同步代码块时,其它线程仍可以访问这个object中是其它非synchr

  • Java多线程并发编程 并发三大要素

    一.原子性 原子,一个不可再被分割的颗粒.原子性,指的是一个或多个不能再被分割的操作. int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到朱内存. 虽然读取和写入都是原子操作,但合起来就不属于原子操作,我们又叫这种为"复合操作". 我们可以用synchronized 或 Lock 来把这个复合操作"变成"原子操作. 例子: private synchronized void increase

  • Java多线程并发编程 Synchronized关键字

    synchronized 关键字解析 同步锁依赖于对象,每个对象都有一个同步锁. 现有一成员变量 Test,当线程 A 调用 Test 的 synchronized 方法,线程 A 获得 Test 的同步锁,同时,线程 B 也去调用 Test 的 synchronized 方法,此时线程 B 无法获得 Test 的同步锁,必须等待线程 A 释放 Test 的同步锁才能获得从而执行对应方法的代码. 综上,正确使用 synchronized 关键字可确保原子性. synchronized 关键字的特

  • Java多线程并发与并行和线程与进程案例

    目录 一.并发与并行 二.线程与进程 三.创建线程类 前言: 程序在没有跳转语句的前提下,都是由上至下依次执行,那现在想要设计一个程序,边打游戏边听歌,怎么设计? 要解决上述问题,咱们得使用多进程或者多线程来解决. 一.并发与并行 并发:指两个或多个事件在同一个时间段内发生. 并行:指两个或多个事件在同一时刻发生(同时发生). 在操作系统中,安装了多个程序,并发指的是在一段时间内宏观上有多个程序同时运行,这在单 CPU 系统中,每一时刻只能有一道程序执行,即微观上这些程序是分时的交替运行,只不过

  • Java 高并发编程之最实用的任务执行架构设计建议收藏

    目录 前言 1.业务架构 2.技术架构 3.物理架构 高并发任务执行架构 需求场景 业务架构设计 技术架构设计 初始设计 演化阶段一 演化阶段二 演化阶段三 代码设计 总结 前言 随着互联网与软件的发展,除了程序员,架构师也是越来越火的职业.他们伴随着项目的整个生命过程,他们更像是传统工业的设计师,将项目当做生命一般细心雕琢. 目前对于项目架构而言,基本都会需要设计的几个架构. 1.业务架构 项目或者产品的市场定位.需求范围.作用场景都是需要在项目启动初期进行系统性分析的.在设计业务架构中,架构

随机推荐