JAVA多线程之实现用户任务排队并预估排队时长

目录
  • 实现流程
  • 排队论简介
  • 代码具体实现
  • 接口测试
  • 补充知识
    • BlockingQueue
    • 阻塞与非阻塞

实现流程

初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。

假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。

排队论简介

排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:

代码具体实现

任务队列初始化 TaskQueue

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 初始化队列及线程池
 * @author tarzan
 *
 */
@Component
public class TaskQueue {
    //处理器队列
    public static BlockingQueue<TaskProcessor> taskProcessors;
    //等待任务队列
    public static BlockingQueue<CompileTask> waitTasks;
    //处理任务队列
    public static BlockingQueue<CompileTask> executeTasks;
    //线程池
    public static ExecutorService exec;
    //初始处理器数(计算机cpu可用线程数)
    public static Integer processorNum=Runtime.getRuntime().availableProcessors();

    /**
     * 初始化处理器、等待任务、处理任务队列及线程池
     */
    @PostConstruct
    public static void initEquipmentAndUsersQueue(){
        exec = Executors.newCachedThreadPool();
        taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
        //将空闲的设备放入设备队列中
        setFreeDevices(processorNum);
        waitTasks =new LinkedBlockingQueue<CompileTask>();
        executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
    }

    /**
     * 将空闲的处理器放入处理器队列中
     */
    private static void setFreeDevices(int num) {
        //获取可用的设备
        for (int i = 0; i < num; i++) {
            TaskProcessor dc=new TaskProcessor();
            try {
                taskProcessors.put(dc);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static CompileTask getWaitTask(Long clazzId) {
        return get(TaskQueue.waitTasks,clazzId);
    }

    public static CompileTask getExecuteTask(Long clazzId) {
        return get(TaskQueue.executeTasks,clazzId);
    }

    private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
        CompileTask compileTask =null;
        if (CollectionUtils.isNotEmpty(users)){
            Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
            if(optional.isPresent()){
                compileTask =  optional.get();
            }
        }
        return compileTask;
    }

    public static Integer getSort(Long clazzId) {
        AtomicInteger index = new AtomicInteger(-1);
        BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
        if (CollectionUtils.isNotEmpty(compileTasks)){
            compileTasks.stream()
                    .filter(e -> {
                        index.getAndIncrement();
                        return e.getClazzId().longValue() == clazzId.longValue();
                    })
                    .findFirst();
        }
        return index.get();
    }

    //单位秒
    public static int estimatedTime(Long clazzId){
        return  estimatedTime(60,getSort(clazzId)+1);
    }

    //单位秒
    public static int estimatedTime(int cellMs,int num){
         int a= (num-1)/processorNum;
         int b= cellMs*(a+1);
        return  b;
    }

编译任务类 CompileTask

import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;

import java.util.Date;

@Data
public class CompileTask implements Runnable {
    //当前请求的线程对象
    private Long clazzId;
    //用户id
    private Long userId;
    //当前请求的线程对象
    private Thread thread;
    //绑定处理器
    private TaskProcessor taskProcessor;
    //任务状态
    private Integer status;
    //开始时间
    private Date startTime;
    //结束时间
    private Date endTime;

    private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);

    private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);

    @Override
    public void run() {
        compile();
    }

    /**
     * 编译
     */
    public void compile() {
        try {
            //取出一个设备
            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
            //取出一个任务
            CompileTask compileTask = TaskQueue.waitTasks.take();
            //任务和设备绑定
            compileTask.setTaskProcessor(taskProcessor);
            //放入
            TaskQueue.executeTasks.put(compileTask);
            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
            //切换用户数据源
            dataSourceService.switchDataSource(userId);
            //添加进度
            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
        } catch (InterruptedException e) {
            System.err.println( e.getMessage());
        }
    }

}

任务处理器 TaskProcessor 

import lombok.Data;

import java.util.Date;

@Data
public class TaskProcessor {

    /**
     * 释放
     */
    public  static Boolean release(CompileTask task)  {
        Boolean flag=false;
        Thread thread=task.getThread();
        synchronized (thread) {
            try {
                if(null!=task.getTaskProcessor()){
                    TaskQueue.taskProcessors.put(task.getTaskProcessor());
                    TaskQueue.executeTasks.remove(task);
                    task.setEndTime(new Date());
                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
                    flag=true;
                    System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return flag;
        }
    }

}

Controller控制器接口实现

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("task")
@Api(value = "数据编译任务", tags = "数据编译任务")
public class CompileTaskController {

    @ApiOperation(value = "添加等待请求 @author Tarzan Liu")
    @PostMapping("compile/{clazzId}")
    public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
        if(checkUser!=null){
            return  R.fail("已经正在排队!");
        }
        checkUser=TaskQueue.getExecuteTask(clazzId);
        if(checkUser!=null){
            return  R.fail("正在执行编译!");
        }
        //获取当前的线程
        Thread thread=Thread.currentThread();
        //创建当前的用户请求对象
        CompileTask compileTask =new CompileTask();
        compileTask.setThread(thread);
        compileTask.setClazzId(clazzId);
        compileTask.setStartTime(new Date());
        //将当前用户请求对象放入队列中
        try {
            TaskQueue.waitTasks.put(compileTask);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TaskQueue.exec.execute(compileTask);
        return R.data(TaskQueue.waitTasks.size()-1);
    }

    @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")
    @PostMapping("sort/{clazzId}")
    public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.getSort(clazzId));
    }

    @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")
    @PostMapping("estimate/time/{clazzId}")
    public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.estimatedTime(clazzId));
    }

    @ApiOperation(value = "任务释放 @author Tarzan Liu")
    @PostMapping("release/{clazzId}")
    public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
        CompileTask task=TaskQueue.getExecuteTask(clazzId);
        if(task==null){
            return  R.fail("资源释放异常");
        }
        return R.status(TaskProcessor.release(task));
    }

    @ApiOperation(value = "执行 @author Tarzan Liu")
    @PostMapping("exec")
    public R exec() {
        Long start=System.currentTimeMillis();
        for (Long i = 1L; i < 100; i++) {
            compile(i);
        }
        System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");
        return R.status(true);
    }
}

接口测试

根据任务id查询该任务前还有多少个任务待执行

根据任务id查询该任务预估执行完成的剩余时间,单位秒

补充知识

BlockingQueue

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

入队

offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞

put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞

offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞

被唤醒

等待时间超时

当前线程被中断

出队

poll():如果没有元素,直接返回null;如果有元素,出队

take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞

poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

被唤醒

等待时间超时

当前线程被中断 

到此这篇关于JAVA多线程之实现用户任务排队并预估排队时长的文章就介绍到这了,更多相关JAVA 多线程 用户任务排队内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java实现排队论的原理

    引入: 前段时间去银行办业务,排队的人那是真多,自己正式办理业务也就不到5分钟,但是却足足等了两个小时(相信很多人都遇到过这种情况),对这种服务水平真的是无语了,但是问题又来了,银行应该开几个窗口,既能保证整体的服务质量,又能保证资源资源的利用率呢?下面我们就通过排队论来模拟这个问题. 排队论简介 排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支.我们下面对排队论做下简化处理,先看下图: 我们在图的左侧安排若干个蓝色服务台,右侧为可能会过来

  • Java多线程之多种锁和阻塞队列

    一.悲观锁和乐观锁 1.1. 乐观锁 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制. 乐观锁适用于多读的应用类型,乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的. CAS全称 Compare And Swap(比较与交换),是一种无锁算法.在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步.java.util

  • Java 阻塞队列和线程池原理分析

    目录 [1]阻塞队列 一.什么是阻塞队列? 二.阻塞队列有什么用? 三.阻塞队列的简单实用 [2]Java 线程池 一.我们为什么需要Java 线程池?使用它的好处是什么? 二.Java中主要提供了哪几种线程的线程池? 三.线程类的继承关系 四.ThreadPoolExecutor参数的含义 corePoolSize 五.线程池工作流程(机制) 六.关于两种提交方法的比较 [1]阻塞队列 一.什么是阻塞队列? ① 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满. ②

  • 什么是Java多线程,如何实现

    目录 什么是进程? 什么是线程? 什么是线程安全? 添加一个状态呢? 如何确保线程安全? synchronized lock 总结 什么是进程? 电脑中时会有很多单独运行的程序,每个程序有一个独立的进程,而进程之间是相互独立存在的.比如下图中的QQ.酷狗播放器.电脑管家等等. 什么是线程? 进程想要执行任务就需要依赖线程.换句话说,就是进程中的最小执行单位就是线程,并且一个进程中至少有一个线程. 那什么是多线程?提到多线程这里要说两个概念,就是串行和并行,搞清楚这个,我们才能更好地理解多线程.

  • java项目中的多线程实践记录

    项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等. 通常会使用spring自带的线程池处理,做到对线程的定制化处理和更好的可控,建议使用自定义的线程池. 主要涉及到的几个点: 1. 自定义线程工厂(ThreadFactoryBuilder),主要用于线程的命名,方便追踪 2. 自定义的线程池(ThreadPoolExecutorUtils),可以按功能优化配置参数 3. 一个抽象的多线程任务处理接口(OperationThreadService

  • JAVA多线程之实现用户任务排队并预估排队时长

    目录 实现流程 排队论简介 代码具体实现 接口测试 补充知识 BlockingQueue 阻塞与非阻塞 实现流程 初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理. 假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行. 排队论简介 排队论是研究系统随

  • Java轻松使用工具类实现获取MP3音频时长

    获取mp3格式音频时长. Maven依赖 <dependency> <groupId>org</groupId> <artifactId>jaudiotagger</artifactId> <version>2.0.1</version> </dependency> 代码 import org.jaudiotagger.audio.AudioFileIO; import org.jaudiotagger.aud

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

  • java多线程消息队列的实现代码

    本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记 1.定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象.也就是说,它不依赖类特定的实例,被类的所有实例共享. private static List<Queue> queueCache = new LinkedList<Queue>(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. private Integer

  • java多线程之线程,进程和Synchronized概念初解

    一.进程与线程的概念 (1)在传统的操作系统中,程序并不能独立运行,作为资源分配和独立运行的基本单位都是进程. 在未配置 OS 的系统中,程序的执行方式是顺序执行,即必须在一个程序执行完后,才允许另一个程序执行:在多道程序环境下,则允许多个程序并发执行.程序的这两种执行方式间有着显著的不同.也正是程序并发执行时的这种特征,才导致了在操作系统中引入进程的概念. 自从在 20 世纪 60 年代人们提出了进程的概念后,在 OS 中一直都是以进程作为能拥有资源和独立运行的基本单位的.直到 20 世纪 8

  • JAVA多线程编程实例详解

    本文实例讲述了JAVA多线程编程.分享给大家供大家参考,具体如下: 进程是系统进行资源调度和分配的一个独立单位. 进程的特点 独立性:进程是系统中独立存在的实体,拥有自己的独立资源和私有空间.在没有经过进程本身允许的情况下,不能直接访问其他进程. 动态性:进程与程序的区别在于,前者是一个正在系统中活动的指令,而后者仅仅是一个静态的指令集合 并发性:多个进程可以在单个处理器上并发执行,而不受影响. 并发性和并行性的区别: 并行性:在同一时刻,有多条指令在多个处理器上同时执行(多个CPU) 并发性:

  • Java多线程并发编程和锁原理解析

    这篇文章主要介绍了Java多线程并发编程和锁原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等方案后,问题得到解决. 加锁方案见下文. 二.乐观锁 & 悲观锁 1.乐观锁 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁

  • Java多线程文件分片下载实现的示例代码

    多线程下载介绍 多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据组装成完整的数据文件,这样便大大加快了下载效率.常见的下载器,迅雷,QQ旋风等都采用了这种技术. 分片下载 所谓分片下载就是要利用多线程的优势,将要下载的文件一块一块的分配到各个线程中去下载,这样就极大的提高了下载速度. 技术难点 并不能说是什么难点,只能说没接触过不知道罢了. 1.如何请

  • 实例代码讲解JAVA多线程

    进程与线程 进程是程序的一次动态执行过程,它需要经历从代码加载,代码执行到执行完毕的一个完整的过程,这个过程也是进程本身从产生,发展到最终消亡的过程.多进程操作系统能同时达运行多个进程(程序),由于 CPU 具备分时机制,所以每个进程都能循环获得自己的CPU 时间片.由于 CPU 执行速度非常快,使得所有程序好像是在同时运行一样. 多线程是实现并发机制的一种有效手段.进程和线程一样,都是实现并发的一个基本单位.线程是比进程更小的执行单位,线程是进程的基础之上进行进一步的划分.所谓多线程是指一个进

  • Java多线程中Lock锁的使用总结

    多核时代 摩尔定律告诉我们:当价格不变时,集成电路上可容纳的晶体管数目,约每隔18个月便会增加一倍,性能也将提升一倍.换言之,每一美元所能买到的电脑性能,将每隔18个月翻两倍以上.然而最近摩尔定律似乎遇到了麻烦,目前微处理器的集成度似乎到了极限,在目前的制造工艺和体系架构下很难再提高单个处理器的速度了,否则它就被烧坏了.所以现在的芯片制造商改变了策略,转而在一个电路板上集成更多的处理器,也就是我们现在常见的多核处理器. 这就给软件行业带来麻烦(也可以说带来机会,比如说就业机会,呵呵).原来的情况

随机推荐