Java 实现协程的方法

协程(Coroutine)这个词其实有很多叫法,比如有的人喜欢称为纤程(Fiber),或者绿色线程(GreenThread)。其实究其本质,对于协程最直观的解释是线程的线程。虽然读上去有点拗口,但本质上就是这样。

协程的核心在于调度那块由他来负责解决,遇到阻塞操作,立刻放弃掉,并且记录当前栈上的数据,阻塞完后立刻再找一个线程恢复栈并把阻塞的结果放到这个线程上去跑,这样看上去好像跟写同步代码没有任何差别,这整个流程可以称为coroutine,而跑在由coroutine负责调度的线程称为Fiber。

java协程的实现

早期,在JVM上实现协程一般会使用kilim,不过这个工具已经很久不更新了,现在常用的工具是Quasar,而本文章会全部基于Quasar来介绍。

下面尝试通过Quasar来实现类似于go语言的coroutine以及channel。

为了能有明确的对比,这里先用go语言实现一个对于10以内自然数分别求平方的例子。

func counter(out chan<- int) {
 for x := 0; x < 10; x++ {
  out <- x
 }
 close(out)
}

func squarer(out chan<- int, in <-chan int) {
 for v := range in {
  out <- v * v
 }
 close(out)
}

func printer(in <-chan int) {
 for v := range in {
  fmt.Println(v)
 }
}

func main() {
 //定义两个int类型的channel
 naturals := make(chan int)
 squares := make(chan int)

 //产生两个Fiber,用go关键字
 go counter(naturals)
 go squarer(squares, naturals)
 //获取计算结果
 printer(squares)
}

上面这个例子,通过channel两解耦两边的数据共享。对于这个channel,大家可以理解为Java里的SynchronousQueue。下面我直接上Quasar版JAVA代码的,几乎可以原封不动的复制go语言的代码。

public class Example {

 private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
  Integer v;
  while ((v = in.receive()) != null) {
   System.out.println(v);
  }
 }

 public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
  //定义两个Channel
  Channel<Integer> naturals = Channels.newChannel(-1);
  Channel<Integer> squares = Channels.newChannel(-1);

  //运行两个Fiber实现.
  new Fiber(() -> {
   for (int i = 0; i < 10; i++)
    naturals.send(i);
   naturals.close();
  }).start();

  new Fiber(() -> {
   Integer v;
   while ((v = naturals.receive()) != null)
    squares.send(v * v);
   squares.close();
  }).start();

  printer(squares);
 }
}

两者对比,看上去Java似好像更复杂些,没办法这就是Java的风格,而且这还是通过第三方的库来实现的。

说到这里各位肯定对Fiber很好奇了。也许你会表示怀疑Fiber是不是如上面所描述的那样,下面我们尝试用Quasar建立一百万个Fiber,看看内存占用多少,我先尝试了创建百万个Thread。

for (int i = 0; i < 1_000_000; i++) {
 new Thread(() -> {
  try {
   Thread.sleep(10000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }).start();
}

很不幸,直接报Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,这是情理之中的。下面是通过Quasar建立百万个Fiber。

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
 int FiberNumber = 1_000_000;
 CountDownLatch latch = new CountDownLatch(1);
 AtomicInteger counter = new AtomicInteger(0);

 for (int i = 0; i < FiberNumber; i++) {
  new Fiber(() -> {
   counter.incrementAndGet();
   if (counter.get() == FiberNumber) {
    System.out.println("done");
   }
   Strand.sleep(1000000);
  }).start();
 }
 latch.await();
}

我这里加了latch,阻止程序跑完就关闭,Strand.sleep其实跟Thread.sleep一样,只是这里针对的是Fiber。

最终控制台是可以输出done的,说明程序已经创建了百万个Fiber,设置Sleep是为了让Fiber一直运行,从而方便计算内存占用。官方宣称一个空闲的Fiber大约占用400Byte,那这里应该是占用400MB堆内存,但是这里通过jmap -heap pid显示大约占用了1000MB,也就是说一个Fiber占用1KB。

Quasar是怎么实现Fiber的

其实Quasar实现的coroutine的方式与Go语言很像,只不过前者是使用框架来实现,而go语言则是语言内置的功能。

不过如果你熟悉了Go语言的调度机制的话,那么对于Quasar的调度机制就会好理解很多了,因为两者有很多相似之处。

Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。

Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,当然也可以在编译期间这么干。go语言的内置了自己的调度器,而Quasar则是默认使用ForkJoinPool这个具有work-stealing功能的线程池来当调度器。work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。

那这里你会问了,Quasar怎么知道修改哪些字节码呢,其实也很简单,Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑,如果你在方法上定义了@Suspendable注解,那Quasar会对调用该注解的方法做类似下面的事情。

这里假设你在方法f上定义了@Suspendable,同时去调用了有同样注解的方法g,那么所有调用f的方法会插入一些字节码,这些字节码的逻辑就是记录当前Fiber栈上的状态,以便在未来可以动态的恢复。(Fiber类似线程也有自己的栈)。在suspendable方法链内Fiber的父类会调用Fiber.park,这样会抛出SuspendExecution异常,从而来停止线程的运行,好让Quasar的调度器执行调度。这里的SuspendExecution会被Fiber自己捕获,业务层面上不应该捕获到。如果Fiber被唤醒了(调度器层面会去调用Fiber.unpark),那么f会在被中断的地方重新被调用(这里Fiber会知道自己在哪里被中断),同时会把g的调用结果(g会return结果)插入到f的恢复点,这样看上去就好像g的return是f的local variables了,从而避免了callback嵌套。

上面说了一大堆,其实简单点来讲就是,想办法让运行中的线程栈停下来,然后让Quasar的调度器介入。

JVM线程中断的条件有两个:

1、抛异常

2、return。

而在Quasar中,一般就是通过抛异常的方式来达到的,所以你会看到上面的代码会抛出SuspendExecution。但是如果你真捕获到这个异常,那就说明有问题了,所以一般会这么写。

@Suspendable
public int f() {
 try {
  // do some stuff
  return g() * 2;
 } catch(SuspendExecution s) {
  //这里不应该捕获到异常.
  throw new AssertionError(s);
 }
}

以上就是Java 实现协程的方法的详细内容,更多关于Java 实现协程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java实现ECDSA签名算法

    ECDSA签名算法 package com.albedo.security; /** * DSA 加解密实现 */ public class ECDSAUtils extends Base { //字符编码 public static final String ALGORITHM = "EC"; public static final String SIGN_ALGORITHM = "SHA1withECDSA"; /** * ECDSA 验签 * * @param

  • Java将byte[]转图片存储到本地的案例

    Java中,将字节数组转成图片的有很多种方式,今天在这里记录其中一种,方便以后查询,也可以提供给没有接触的童鞋做一个参考. 首先是将图片转成字节数组 import sun.misc.BASE64Encoder; import java.io.*; // 传入图片路径,获取图片 FileInputStream fis = new FileInputStream("/Users/curry/error.png"); BufferedInputStream bis = new Buffere

  • JAVA WSIMPORT生成WEBSERVICE客户端401认证过程图解

    概述 wsimport是jdk自带的命令,可以根据wsdl文档生成客户端中间代码,基于生成的代码编写客户端,可以省很多麻烦. 先看两张截图: 使用浏览器打开webservice出现的界面: 使用wsimport生成webservice客户端出现401错误: 需用到的命令参数如下: 1. -d <directory> 在指定的目录生成class文件 2. -p <pkg> 指定生成文件的包结构 3. -Xauthfile <filepath> 在格式文件进行授权信息 4.

  • Java FastJson使用教程

    Fastjson 是一个 Java 库,可以将 Java 对象转换为 JSON 格式,当然它也可以将 JSON 字符串转换为 Java 对象. Fastjson 可以操作任何 Java 对象,即使是一些预先存在的没有源码的对象. Fastjson 源码地址:https://github.com/alibaba/fastjson Fastjson 扩展阅读:https://www.w3cschool.cn/fastjson/ 一.Fastjson 特性 提供服务器端.安卓客户端两种解析工具,性能表

  • 解决JavaMail附件名字过长导致的乱码问题

    问题背景: 公司有个业务场景是审核客户机构通过后,给客户发送一封邮件,并将机构相关材料以附件形式一块发送,有些附件名正常,有些就乱了,如下图: 后来发现是附近名称过长导致的! 问题原因:java mail中设置附件名称会采用 base64格式进行编码,如果附件名称过长会被进行切割,将剩下字符抹去,所以导致不知道这是什么格式的文件. 注:虽然将文件格式被改变了,但是若强制转换成原格式(右键->另存为->xxx.pdf) 仍然可以进行打开,文件内容也并非改变(这是测试后的结果) 解决方案: 由于是

  • Java 将List中的实体类按照某个字段进行分组并存放至Map中操作

    1.JDK1.8之前: 假设有实体类User,里面有字段id,我们将相同id的User进行分组,并存放在Map中.(例子不是很恰当,但很能说明问题) public static void main(String[] args) { List<User> list = new ArrayList<>(); list.add(new User(1, 1)); list.add(new User(1, 2)); list.add(new User(2, 1)); list.add(new

  • Java日志框架用法及常见问题解决方案

    日志定义: 在计算机领域,日志文件(logfile)是一个记录了发生在运行中的操作系统或其他软件中的事件的文件,或者记录了在网络聊天软件的用户之间发送的消息. 日志记录(Logging):是指保存日志的行为.最简单的做法是将日志写入单个存放日志的文件. 日志级别优先级: ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 日志框架的作用: ①:跟踪用户对系统访问,记录了系统行为的时间.地点.状态等相关信息

  • Java比较对象大小两种常用方法

    引入原因: Java中的对象,正常情况下,只能进行比较:== 或!= ,不能使用 < 或 > ,但是在开发时需要用到比较对象的大小 1.Comparable接口的使用(自然排序) 1.像String .包装类等实现了Comparable接口,重写了compareTo()方法,给出了比较两个对象大小的方法 2.像String .包装类等重写了compareTo()方法后,默认执行了从小到大的排序 3.重写compareTo()的规则: 如果当前对象this大于形参对象obj,则返回正整数,如果当

  • JAVA调用SAP WEBSERVICE服务实现流程图解

    调用SAP WebService服务需要转换操作 1.通过浏览器访问SAP WebService地址,进行验证并生成wsdl文件地址并不是可以直接转化的wsdl: 直接访问地址并进行验证后: 另存文件wsdl 2. 通过Idea生成java可执行代码存放wsdl 选中 wsdl文件,选择Tools ->WebService -> Generate Java Code From Wsdl- 最开始引用路径会是一个绝对路径,我们修改Service引用wsdl方式 3. 调用service 此时便可

  • java jdk1.8 使用stream流进行list 分组归类操作

    我就废话不多说了,大家还是直接看代码吧~ import com.alibaba.fastjson.JSON; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * @author czw */ public class Foo{ private String name; private String type; private Double typeValue; p

  • Java中将File转化为MultipartFile的操作

    话不多说直接上代码,简单明了 import java.io.File; import java.io.FileInputStream; import org.springframework.web.multipart.MultipartFile; import org.springframework.mock.web.MockMultipartFile; import org.apache.http.entity.ContentType; File pdfFile = new File("D:/

随机推荐