java基于quasar实现协程池的方法示例

业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多。不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)

一个线程可以多个协程,一个进程也可以单独拥有多个协程。

线程进程都是同步机制,而协程则是异步。

协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。

线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。

协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。

线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

废话不多说,直接上代码:

导入包:

        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>

WorkTools工具类:

package com.example.ai;
 
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;
 
import java.util.concurrent.ArrayBlockingQueue;
 
 
public class WorkTools {
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    //队列默认任务为100
    private static int TASK_COUNT = 100;
 
    //工做协程数组
    private Fiber[] workThreads;
    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;
 
    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;
 
 
    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
        this(WORK_NUM,TASK_COUNT);
    }
 
    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {
        if (workerNum <= 0) {
            workerNum = WORK_NUM;
        }
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        this.workerNum = workerNum;
        taskQueue = new ArrayBlockingQueue(taskCount);
        workThreads = new Fiber[workerNum];
        for (int i = 0; i < workerNum; i++) {
            int finalI = i;
            workThreads[i] = new Fiber<>(new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    SuspendableRunnable runnable = null;
                    while (true){
                        try{
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();
                        }catch (Exception e){
                            System.out.println(e.getMessage());
                        }
                        //存在任务则运行。
                        if(runnable != null){
                            runnable.run();
                        }
 
                        runnable = null;
                    }
                }
            });  //new一个工做协程
 
            workThreads[i].start();  //启动工做协程
 
        }
 
        Runtime.getRuntime().availableProcessors();
    }
    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {
        try {
            taskQueue.put(task);   //put:阻塞接口的插入
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("阻塞");
        }
    }
    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
        for (int i = 0; i < workerNum; i++) {
 
            workThreads[i] = null; //help gc
        }
        taskQueue.clear();  //清空等待队列
    }
    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
    }
}

测试代码:

package com.example.ai;

import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;

@SpringBootApplication

public class AiApplication {

    @SneakyThrows
    public static void main(String[] args) {
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
        for (int i = 0; i< 50; i++){
            int finalI = i;
            myThreadPool.execute(new SuspendableRunnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                    try {
                        //延迟1秒
                        Thread.sleep(1000);
                        cdl.countDown();
                    } catch (InterruptedException e) {
                        System.out.println("阻塞中");
                    }
                }
            });

        }
        //阻塞
        cdl.await();
    }

}

具体代码都有注释了,自行了解。我也是以线程池写法实现。

当前为解决问题:在协程阻塞过程中Fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~

到此这篇关于java基于quasar实现协程池的方法示例的文章就介绍到这了,更多相关java quasar协程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java协程框架quasar和kotlin中的协程对比分析

    目录 前言 快速体验 添加依赖 添加javaagent 线程VS协程 协程代码 多线程代码 协程完胜 后记 前言 早就听说Go语言开发的服务不用任何架构优化,就可以轻松实现百万级别的qps.这得益于Go语言级别的协程的处理效率.协程不同于线程,线程是操作系统级别的资源,创建线程,调度线程,销毁线程都是重量级别的操作.而且线程的资源有限,在java中大量的不加限制的创建线程非常容易将系统搞垮.接下来要分享的这个开源项目,正是解决了在java中只能使用多线程模型开发高并发应用的窘境,使得java也能

  • java基于quasar实现协程池的方法示例

    业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍.所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多.不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~) 一个线程可以多个协程,一个进程也可以单独拥有多个协程. 线程进程都是同步机制,而协程则是异步. 协程能保留上一次调用时的状态,每次过程重入时,就相当于进

  • Go简单实现协程池的实现示例

    目录 MPG模型 通道的特性 首先就是进程.线程.协程讲解老三样. 进程: 本质上是一个独立执行的程序,进程是操作系统进行资源分配和调度的基本概念,操作系统进行资源分配和调度的一个独立单位. 线程: 是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一个进程中可以并发多个线程,每条线程执行不同的任务,切换受系统控制. 协程:  又称为微线程,是一种用户态的轻量级线程,协程不像线程和进程需要进行系统内核上的上下文切换,协程的上下文切换是由用户自己决定的,有自己的上下

  • golang 40行代码实现通用协程池

    代码仓库 goroutine-pool golang的协程管理 golang协程机制很方便的解决了并发编程的问题,但是协程并不是没有开销的,所以也需要适当限制一下数量. 不使用协程池的代码(示例代码使用chan实现,代码略啰嗦) func (p *converter) upload(bytes [][]byte) ([]string, error) { ch := make(chan struct{}, 4) wg := &sync.WaitGroup{} wg.Add(len(bytes))

  • php基于协程实现异步的方法分析

    本文实例讲述了php基于协程实现异步的方法.分享给大家供大家参考,具体如下: github上php的协程大部分是根据这篇文章实现的:http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html. 它们最终的结果都是把回调变成了优雅的顺序执行的代码,但还是阻塞的,不是真正的异步. 比如最热门的:https://github.com/recoilphp/recoil 先安装: compo

  • golang协程池设计详解

    Why Pool go自从出生就身带"高并发"的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量 for { // 监听tcp rw, e := l.Accept() if e != nil { ....... } tempDelay = 0

  • golang协程池模拟实现群发邮件功能

    比如批量群发邮件的功能 因为发送邮件是个比较耗时的操作, 如果是传统的一个个执行 , 总体耗时比较长 可以使用golang实现一个协程池 , 并行发送邮件 pool包下的pool.go文件 package pool import "log" //具体任务,可以传参可以自定义操作 type Task struct { Args interface{} Do func(interface{})error } //协程的个数 var Nums int //任务通道 var JobChanne

  • 详解Python生成器和基于生成器的协程

    一.什么是生成器 Generator 1.生成器就是可以生成值的函数 2.当一个函数里有了 yield关键字就成了生成器 3.生成器可以挂起执行并且保持当前执行的状态 代码示例: def simple_gen(): yield 'hello' yield 'world' gen = simple_gen() print(type(gen)) # 'generator' object print(next(gen)) # 'hello' print(next(gen)) # 'world' 二.基

  • GO实现协程池管理的方法

    使用channel实现协程池 通过 Channel 实现 Goroutine Pool,缺点是会造成协程的频繁开辟和注销,但好在简单灵活通用. package main import ( "fmt" "io/ioutil" "net/http" "sync" ) // Pool goroutine Pool type Pool struct { queue chan int wg *sync.WaitGroup } // Ne

  • Golang协程池gopool设计与实现

    目录 Goroutine 协程池 gopool 核心实现 Pool Task Worker 整体来看 三个角色的定位 使用 sync.Pool 进行性能优化 Goroutine Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的.所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景. Goroutine 的优势: 与线程相比,Goroutines 成本很低. 它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需

  • PHP生成器(generator)和协程的实现方法详解

    本文实例讲述了PHP生成器(generator)和协程的实现方法.分享给大家供大家参考,具体如下: 先说一些废话 PHP 5.5 以来,新的诸多特性又一次令 PHP 焕发新的光彩,虽然在本文写的时候已是 PHP 7 alpha 2 发布后的一段时间,但此时国内依旧是 php 5.3 的天下.不过我认为新的特性迟早会因为旧的版本的逐渐消失而变得越发重要,尤其是 PHP 7 的正式版出来后,因此本文的目的就是为了在这之前,帮助一些 PHPer 了解一些他们从没有了解的东西.所以打算将以本篇作为博客中

随机推荐