Kotlin协程Job生命周期结构化并发详解

目录
  • 1.Job的生命周期
  • 2.Deffered
  • 3.Job与结构化并发
  • 4.launch和async的使用场景

前面在学习协程启动方式的时候在launch的源码中有一个返回值是Jobasync的返回Deferred也是实现了Job,那么而也就是说launchasync在创建一个协程的时候也会创建一个对应的Job对象。还提到过Job是协程的句柄,那么Job到底是什么?它有什么用?

1.Job的生命周期

先看一下Job的源码,这里只保留了跟标题相关的内容

public interface Job : CoroutineContext.Element {

    // ------------ 状态查询API ------------

    /**
    * 当该Job处于活动状态时,返回true——它已经开始,没有完成,也没有取消。
    * 如果没有取消或失败,等待其子任务完成的Job仍被认为是活动的。
    */
    public val isActive: Boolean

    /**
    * 当Job因任何原因完成时返回true。作业被取消或失败并已完成其执行也被视为完成。
    * Job只有在所有子任务完成后才算完成。
    */
    public val isCompleted: Boolean

    /**
    *如果该作业因任何原因被取消,无论是通过显式调用cancel,还是因为它失败或其子或父作业被取消,
    * 则返回true。在一般情况下,它并不意味着任务已经完成,因为它可能仍然在完成它正在做的事情,
    * 并等待它的子任务完成。
    */
    public val isCancelled: Boolean

    // ------------ 操控状态API ------------

    /**
    * 如果Job所在的协程还没有被启动那么调用这个方法就会启动协程
    * 如果这个协程被启动了返回true,如果已经启动或者执行完毕了返回false
    */
    public fun start(): Boolean

    /**
    * 取消此Job,可用于指定错误消息或提供有关取消原因的其他详细信息
    */
    public fun cancel(cause: CancellationException? = null)

    /**
    * 取消此Job
    */
    public fun cancel(): Unit = cancel(null)

    public fun cancel(cause: Throwable? = null): Boolean

    // ------------ 等待状态API ------------

    /**
    * 挂起协程,知道任务完成再恢复
    */
    public suspend fun join()

    // ------------ 完成状态回调API ------------

    /**
    * 注册Job完成时同步调用的处理程序.
    * 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null
    * 否则,该处理程序将在此Job完成时调用一次。
    */
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

    /**
    * 注册在取消或完成此Job时同步调用的处理程序。
    * 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null,
    * 除非将invokeImmediately设置为false。否则,
    * 当Job取消或完成时将调用一次handler。
    */
    public fun invokeOnCompletion(
        onCancelling: Boolean = false,
        invokeImmediately: Boolean = true,
        handler: CompletionHandler): DisposableHandle
}
复制代码

从源码中可以发现这几个函数和变量跟Actviity或者Fragment非常像,所以我们可以总结出两个结论:

  • Job可以监测协程的生命周期
  • Job可以操控协程

在例子中使用这几个函数和变量再来校验一下上面的结论:

fun main() = runBlocking {
    val job = launch {
        delay(1000L)
    }
    job.log()
    job.cancel()
    job.log()
}

fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}
        ================================
    """.trimIndent()
    )
}

//输出结果
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1
//================================
复制代码

Job.log用了扩展函数,方便调用Job中的状态监测返回值。

上面的代码通过launch创建了一个协程,接收了Job的返回值,这里用这个job对象做了三件事:

  • 第一个job.log() launch的创建标志着协程已经被启动所以在第一个job.log()的日志中isActive返回值是true;
  • job.cancel() 这里调用了job的取消函数将协程任务取消;
  • 第二个job.log() 上面的代码将协程任务取消了,然后再次获取协程状态发现isActivte返回false,isCancelled返回true。

上面的代码也印证了前面提出的结论,还有一个函数start没使用,再来调用它之后输出的日志:

fun main() = runBlocking {
    //变化1
    val job = launch(start = CoroutineStart.LAZY) {
        delay(1000L)
    }
    job.log()
    //变化2
    job.start()
    job.log()
    job.cancel()
    job.log()
}

fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}
        ================================
    """.trimIndent()
    )
}

//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1
//================================
复制代码

上面的代码增加了两处修改:

  • 变化1:协程在创建出来的时候就已经被启动,因此为了查看调用Job.start()前的日志需要加上懒启动
  • 变化2:调用start函数启动协程

从输出结果来看没有调用start函数前isActive返回true,调用后就返回了true,当使用懒启动后在调用cancel函数与前面使用cancel函数输出的日志是一样的,可以得知懒启动后对协程的生命周期并没有设么影响(这可能是句废话)。

现在还有最后一个变量没有看isCompleted,在上面的代码中添加一个延时函数,等协程任务结束再打印日志

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(1000L)
    }
    job.log()
    job.start()
    job.log()
    job.cancel()
    delay(2000L)		//变化在这里
    job.log()
}

fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}
        ================================
    """.trimIndent()
    )
}

//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:true
//isCancelled:true
//Thread:main @coroutine#1
//================================
复制代码

从输出结果中看到当调用isCancelisCompleted也返回了true,也就是说任务结束了。

上面的代码为了监测isCompleted的状态加了一个延时函数delay,但是这种方式并不建议使用,因为这个时间他不是固定的,例如从后台请求数据或者下载文件,这种情况下的时间是完全无法预知的。

现在假设已经知道协程执行完毕需要delay(1000L)的时间,如果将协程内的delay时长设置的大于外部的delay时长,会带来什么问题?

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(4000L)
    }
    job.log()
    job.start()
    job.log()
    delay(1000L)
    job.log()
    println("Process end!")
}

fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}
        ================================
    """.trimIndent()
    )
}

//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//Process end!
复制代码

由输出结果可知isCompleted状态是false,协程任务是否执行完毕不得而知。另外当println("Process end!")执行完毕后程序并没有立即输出Process finished with exit code 0,这是因为runBlocking 会一直阻塞,等到 job 任务执行完毕以后才真正退出。

那要如何解决这个问题?

//Job#join 

/**
* 挂起协程,知道任务完成再恢复
*/
public suspend fun join()
复制代码

joinJob中的一个挂起函数,调用后会挂起当前程序的执行流程,等待job当中的协程任务执行完毕然后再恢复当前程序的执行流程。

join将任务挂起后再恢复,那要如何知道任务是否执行完毕了?invokeOnCompletion可以监听任务执行的状态

//Job#invokeOnCompletion
/**
* 注册Job完成时同步调用的处理程序.
* 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null
* 否则,该处理程序将在此Job完成时调用一次。
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

//Job#invokeOnCompletion
/**
* 注册在取消或完成此Job时同步调用的处理程序。
* 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null,
* 除非将invokeImmediately设置为false。否则,
* 当Job取消或完成时将调用一次handler。
*/
public fun invokeOnCompletion(
        onCancelling: Boolean = false,
        invokeImmediately: Boolean = true,
        handler: CompletionHandler): DisposableHandle
复制代码

joininvokeOnCompletion的使用如下:

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(4000L)
    }
    job.log()
    job.start()
    job.log()
    //新增
    job.join()
    //新增
    job.invokeOnCompletion {
        println("==========Task status==========")
        job.log()
    }
    println("Process end!")
}

fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}
        ================================
    """.trimIndent()
    )
}

//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//==========Task status==========
//isActive:false
//isCompleted:true
//isCancelled:false
//Thread:main @coroutine#1
//================================
//Process end!
复制代码

可以看到加入joininvokeOnCompletion之后isCompleted的状态就正确了,同时Process end!输出后Process finished with exit code 0也会很快的输出,这说明任务确实执行完毕了。

在讲协程的启动方式的时候提出一个观点:launch的返回值Job代表的是协程的句柄。那么Job是协程的句柄该怎么理解?

句柄: 是指一个中间媒介,可以操控一个东西。就类似于遥控器操作空调场景中遥控器就是句柄,开关控制灯具场景中开关就是句柄。

所以Job和协程的关系就类似于遥控器和空调,开关和灯具。Job可以监测协程的运行状态也可以控制协程的运行状态。那么Job就和遥控器、开关一样看做是一个句柄。

2.Deffered

launch直接创建了Jobasync通过Deffered间接创建了Job对象,但是它并没有在 Job 的基础上扩展出很多其他功能,而接收一个返回值是依靠 await() 方法,那await方法是如何实现的?

fun main() = runBlocking {
    val deferred = async {
        logX("Coroutine start!")
        delay(1000L)
        logX("Coroutine end!")
        "Coroutine result!"
    }
    val result = deferred.await()
    println("Result = $result")
    logX("Process end!")
}

fun logX(any: Any?) {
    println(
        """
================================
$any
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
    )
}

//输出结果:
//Coroutine start!
//Thread:main @coroutine#2
//================================
//================================
//Coroutine end!
//Thread:main @coroutine#2
//================================
//Result = Coroutine result!
//================================
//Process end!
//Thread:main @coroutine#1
复制代码

从输出结果来看,await方法可以获取协程执行结果外,好像还会阻塞协程的执行流程,直到协程任务执行完毕。看一下await的源码

//Deferred#await
public interface Deferred<out T> : Job {
    ...
    public suspend fun await(): T
    ...
}
复制代码

从源码来看await也是一个挂起函数,它跟join是一样的,看似阻塞的过程其实是协程的挂起恢复能力。

所以,总的来说,Deferred 只是比 Job 多了一个 await() 挂起函数而已,通过这个挂起函数,就可以等待协程执行完毕的同时,还可以直接拿到协程的执行结果。

3.Job与结构化并发

在其他地方看过这么一句话:协程的优势在于结构化并发, 这句话该如何理解?

这句话可以理解为带有结构和层级的并发,用代码表现就像这样:

fun main() = runBlocking {
    val parentJob: Job
    var childJob1: Job? = null
    var childJob2: Job? = null
    var childJob3: Job? = null

    parentJob = launch {
        childJob1 = launch {
            delay(1000L)
        }

        childJob2 = launch {
            delay(3000L)
        }

        childJob3 = launch {
            delay(5000L)
        }
    }

    delay(500L)

    parentJob.children.forEachIndexed { index, job ->
        when (index) {
            0 -> println("childJob1 === childJob1 is ${childJob1 === job}")
            1 -> println("childJob2 === childJob2 is ${childJob2 === job}")
            2 -> println("childJob3 === childJob3 is ${childJob3 === job}")
        }
    }

    parentJob.join()
    logX("Process end!")
}

//输出结果:
//childJob1 === childJob1 is true
//childJob2 === childJob2 is true
//childJob3 === childJob3 is true
//================================
//Process end!
//Thread:main @coroutine#1
复制代码

上面的代码是父子层级,父Job使用launch启动了协程同时它的内部还有三个Job,三个子Job是并发执行的,同时也是用过launch启动的协程,调用了parentJob.join()那么挂起的时间就是childJob3的时长—5秒,因为它要等待所有任务都执行完毕才会恢复执行,然后通过children.forEachIndexed进行遍历并分别对比他们与三个子Job的引用是否相等“===”代表了引用相等,即是否是同一个对象)。图示如下

前面讲过,Job可以调用cancel方法取消执行,那么当调用parentJob.cancel会有什么样的情况?

fun main() = runBlocking {
    val parentJob: Job
    var childJob1: Job? = null
    var childJob2: Job? = null
    var childJob3: Job? = null

    parentJob = launch {
        childJob1 = launch {
            println("childJob1 start")
            delay(1000L)
            println("childJob1 end")
        }

        childJob2 = launch {
            println("childJob2 start")
            delay(3000L)
            println("childJob2 start")
        }

        childJob3 = launch {
            println("childJob3 start")
            delay(5000L)
            println("childJob3 start")
        }
    }

    delay(500L)

    parentJob.cancel()
    logX("Process end!")
}

//输出结果:
//childJob1 start
//childJob2 start
//childJob3 start
//================================
//Process end!
//Thread:main @coroutine#1
复制代码

parentJob.cancel调用后,每个子Job只是输出了start,这就可以得出一个结论:父Job取消后子Job也会依次跟着取消。如果调用任何一个子Jobcancel则不会对父Job和其他子Job产生影响。

到这里对于开头的那句协程的优势在于结构化并发就有更更好的理解了,这是Kotlin协程的第二大优势。

4.launch和async的使用场景

  • launch: 主要用来发起一些不需要任何结果的耗时任务,这个任务在执行中可以改变它的执行状态。
  • async: 主要用来发起一些需要结果的耗时任务,以及与挂起函数结合,优化并发。

以上就是Kotlin协程Job生命周期结构化并发详解的详细内容,更多关于Kotlin协程Job的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin协程Job生命周期结构化并发详解

    目录 1.Job的生命周期 2.Deffered 3.Job与结构化并发 4.launch和async的使用场景 前面在学习协程启动方式的时候在launch的源码中有一个返回值是Job,async的返回Deferred也是实现了Job,那么而也就是说launch和async在创建一个协程的时候也会创建一个对应的Job对象.还提到过Job是协程的句柄,那么Job到底是什么?它有什么用? 1.Job的生命周期 先看一下Job的源码,这里只保留了跟标题相关的内容 public interface Jo

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • Servlet方法生命周期及执行原理详解

    目录 快速入门 创建JavaEE项目 实现接口中的抽象方法 执行 执行原理 Servlet中的生命周期方法 1.被创建:执行init方法,只执行一次 2.提供服务:执行service方法,执行多次 3.被销毁:执行destroy方法,只执行一次 Servlet3.0 Servlet体系结构 GenericServlet HttpServlet Servlet相关配置 快速入门 创建JavaEE项目 定义一个类,实现Servlet接口 public class ServletDemo1 imple

  • Python获取协程返回值的四种方式详解

    目录 介绍 源码 依次执行结果 介绍 获取协程返回值的四种方式: 1.通过ensure_future获取,本质是future对象中的result方 2.使用loop自带的create_task, 获取返回值 3.使用callback, 一旦await地方的内容运行完,就会运行callback 4.使用partial这个模块向callback函数中传入值 源码 import asyncio from functools import partial async def talk(name): pr

  • vue组件生命周期钩子使用示例详解

    目录 组件生命周期图 组件生命周期钩子 1.beforeCreate 2.created 3.beforeMount 4.mounted 5.beforeUpdate 6.updated 7.activated 8.deactivated 9.beforeDestroy 10.destroyed 11.errorCaptured 组件生命周期图 组件生命周期钩子 所有的生命周期钩子自动绑定 一.组件的生命周期:一个组件从创建到销毁的整个过程 二.生命周期钩子:在一个组件生命周期中,会有很多特殊的

  • Vue生命周期与setup深入详解

    目录 生命周期 生命周期函数 不同API的生命周期 setup介绍 1. 访问Props 2. setup上下文 3. 与渲染函数一起使用 生命周期 下图对比了vue3(左)和vue2(右)的生命周期:vue3将destoryed该名成了unmounted,相应的beforeDestory改成了beforeUnmounted.除此之外在组合式API中新增了个钩子函数:setup.它发生在beforeCreate之前. 先简单介绍下setup函数: setup() 钩子是在组件中使用组合式 API

  • 利用Kotlin的协程实现简单的异步加载详解

    前言 众所周知在android中当执行程序的耗时超过5秒时就会引发ANR而导致程序崩溃.由于UI的更新操作是在UI主线程进行的,理想状态下每秒展示60帧时人眼感受不到卡顿,1000ms/60帧,即每帧绘制时间不应超过16.67ms.如果某项操作的耗时超过这一数值就会导致UI卡顿.因此在实际的开发中我通常把耗时操作放在一个新的线程中(比如从网络获取数据,从SD卡读取图片等操作),但是呢在android中UI的更新只能在UI主线程中进行更新,因此当我们在非UI线程中执行某些操作的时候想要更新UI就需

  • Android中Activity生命周期和启动模式详解

    Activity生命周期经典图解: 按键对生命周期的影响: BACK键: 当我们按BACK键时,我们这个应用程序将结束,这时候我们将先后调用onPause()->onStop()->onDestory()三个方法. 再次启动App时,会执行onCreate()->onStart()->onResume() HOME键: 当我们打开应用程序时,比如浏览器,我正在浏览NBA新闻,看到一半时,我突然想听歌,这时候我们会选择按HOME键,然后去打开音乐应用程序,而当我们按HOME的时候,A

  • Servlet生命周期与工作原理详解

    本文为大家分享了Servlet生命周期与工作原理,供大家参考,具体内容如下 Servlet生命周期分为三个阶段: 1.初始化阶段  调用init()方法 2.响应客户请求阶段 调用service()方法 3.终止阶段 调用destroy()方法 Servlet初始化阶段: 在下列时刻Servlet容器装载Servlet: 1.Servlet容器启动时自动装载某些Servlet,实现它只需要在web.XML文件中的<Servlet></Servlet>之间添加如下代码: <lo

  • python time模块时间戳 与 结构化时间详解

    目录 time模块 1. 时间戳 1.1 time.time() 1.2 时间戳 转 字符串 2. 结构化时间 2.1 本地时间 2.2 格林威治时间(零时区) 2.3 结构化时间 转 字符串 2.4 字符串 转 结构化时间 2.5 结构化数据的属性 2.6 结构化时间 转 时间戳 time模块 1:概述 时间表示的分类 时间戳 格式化的时间字符串 结构化时间 时间戳:时间戳表示的是从1970年1月1日整0点到目前秒的偏移量,数据类型是浮点型,主要用来让计算机看的 格式化的时间字符串:如 201

随机推荐