Kotlin协程基础元素梳理分析

Kotlin 协程的基础元素:Continuation、SafeContinuation、CoroutineContext、CombinedContext、CancellationException、intrinsics。CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类。

Continuation 是什么?

class Call<T>(callBack: Call.CallBack<T>) {
    fun cancel() {
    }
    interface CallBack<T> {
        fun onSuccess(data: T)
        fun onFail(throwable: Throwable)
    }
}
suspend fun <T : Any> Call<T>.await(): T =
    suspendCancellableCoroutine<T> { continuation ->
        val call = Call(object : Call.CallBack<T> {
            override fun onSuccess(data: T) {
                continuation.resume(data, onCancellation = null)
            }
            override fun onFail(throwable: Throwable) {
                continuation.resumeWithException(throwable)
            }
        })
        continuation.invokeOnCancellation {
            call.cancel()
        }
    }

要实现挂起函数的时候,可以使用 suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,在它们的 Lambda 当中,我们可以使用它暴露出来的 continuation 对象,把程序的执行结果或异常传到外部去。常用于实现挂起函数内部逻辑。

fun checkLength() {
    runBlocking {
        val result = getStrLengthSuspend("Kotlin")
        println(result)
    }
}
suspend fun getStrLengthSuspend(text:String):Int = suspendCoroutine {
    continuation ->
    thread {
        Thread.sleep(1000L)
        continuation.resume(text.length)
    }
}
Log
6
Process finished with exit code 0

使用 suspendCoroutine{}实现了挂起函数,在它内部,使用 continuation.resume() 的方式,传出了挂起函数的返回值。

为什么以 continuation.resume() 这样异步的方式传出结果,挂起函数就能接收到结果呢?

suspend fun getStrLengthSuspend(text: String): Int = suspendCoroutine { continuation ->
    thread {
        Thread.sleep(1000L)
        continuation.resume(text.length)
    }
}
fun checkLength2() {
    val func = ::getStrLengthSuspend as (String, Continuation<Int>) -> Any?
    func("Kotlin", object :Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println(result.getOrNull())
        }
    })
    Thread.sleep(2000L)
}
Log
6
Process finished with exit code 0

把函数强转成了带有 Continuation 的函数类型,然后通过匿名内部类的方式,创建了一个 Continuation 对象传了进去。挂起函数的本质,就是 Callback!

把 Continuation 改为 Callback:

fun getStrLength() {
    func2("Kotlin", object : MyCallBack<Int> {
        override fun resume(value: Int) {
            println(value)
        }
    })
}
fun func2(text: String, callBack: MyCallBack<Int>) {
    thread {
        Thread.sleep(1000L)
        callBack.resume(text.length)
    }
}
interface MyCallBack<T> {
    fun resume(value: T)
}
Log
6
Process finished with exit code 0

Continuation 改成 Callback 以后,使用匿名内部类创建 Callback 用于接收异步结果;异步函数内部,使用 callback.resume() 将结果传出去。

Kotlin 协程当中的 Continuation,作用其实就相当于 Callback,它既可以用于实现挂起函数,往挂起函数的外部传递结果;也可以用于调用挂起函数,我们可以创建 Continuation 的匿名内部类,来接收挂起函数传递出来的结果。

Java 代码中如何调用 Kotlin 的挂起函数吗?

  public static void main(String[] args) {
        SuspendFromJavaDo.INSTANCE.getUserInfo(new Continuation<String>() {
            @NonNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }
            @Override
            public void resumeWith(@NonNull Object o) {
                System.out.println(o + "");
            }
        });
    }
object SuspendFromJavaDo {
    suspend fun getUserInfo():String {
        delay(1000L)
        return "Kotlin"
    }
}

所以,实现挂起函数逻辑的时候,常用到 suspendCoroutine{}、suspendCancellableCoroutine{}。

Continuation.kt源码

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

suspendCoroutineUninterceptedOrReturn{}实现了suspendCoroutine{}。

val safe = SafeContinuation(c.intercepted()) 包裹Continuation。

block(safe)调用 Lambda 当中的逻辑。

safe.getOrThrow() 取出 block(safe) 的运行结果,Continuation 当中是可以存储 result 的。这个 Result 可能是正确的结果,也可能是异常。

@Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

为什么这个高阶函数的源代码会是抛出异常呢?

“Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic.”suspendCoroutineUninterceptedOrReturn 是一个编译器内建函数,它是由 Kotlin 编译器来实现的。

suspendCoroutineUninterceptedOrReturn 这个高阶函数的参数,它会接收一个 Lambda,类型是(Continuation<T>) -> Any?,这里的“Any?”类型,其实就能代表当前这个挂起函数是否真正挂起。

fun test() {
    runBlocking {
        val result = testNoSuspendCoroutinue()
        println(result)
    }
}
private suspend fun testNoSuspendCoroutinue() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    return@suspendCoroutineUninterceptedOrReturn "Kotlin"
}
Log
Kotlin
Process finished with exit code 0

直接使用 suspendCoroutineUninterceptedOrReturn 实现了挂起函数,并且,在它的 Lambda 当中,我们并没有调用 continuation.resume()。在挂起函数的外部确实也可以接收到这个结果。

 private static final Object testNoSuspendCoroutinue(Continuation $completion) {
      int var2 = false;
      if ("Kotlin" == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
         DebugProbesKt.probeCoroutineSuspended($completion);
      }
      return "Kotlin";
   }

这个函数其实就是一个伪挂起函数,它的内部并不会真正挂起。这样,当我们从外部调用这个函数的时候,这个函数会立即返回结果。

private suspend fun testSuspendCoroutinues() =
    suspendCoroutineUninterceptedOrReturn<String> { continuation ->
        thread {
            Thread.sleep(2000L)
            continuation.resume("Kotlin")
        }
        return@suspendCoroutineUninterceptedOrReturn kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
    }
fun main() {
    runBlocking {
        val testSuspendCoroutinues = testSuspendCoroutinues()
        println(testSuspendCoroutinues)
    }
}
Kotlin
Process finished with exit code 0

使用了 continuation.resume(),挂起函数的外部也能接收到这个结果。

   private static final Object testSuspendCoroutinues(Continuation $completion) {
      int var2 = false;
      ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new TestCoroutine777Kt$testSuspendCoroutinues$2$1($completion)), 31, (Object)null);
      Object var10000 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        //将 var10000 赋值为 COROUTINE_SUSPENDED 这个挂起标志位。
      if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
         DebugProbesKt.probeCoroutineSuspended($completion);
      }
//返回挂起标志位,代表 testSuspendCoroutinues() 这个函数会真正挂起。
      return var10000;
   }
BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            Object var10000;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               this.label = 1;
               var10000 = TestCoroutine777Kt.testSuspendCoroutinues(this);
               if (var10000 == var3) {
                  return var3;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            String testSuspendCoroutinues = (String)var10000;
            System.out.println(testSuspendCoroutinues);
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 1, (Object)null);

由于 suspend 修饰的函数,既可能返回 CoroutineSingletons.COROUTINE_SUSPENDED,也可能返回实际结果,甚至可能返回 null,为了适配所有的可能性,CPS 转换后的函数返回值类型就只能是 Any? 了。

suspendCoroutineUninterceptedOrReturn{}这个高阶函数的作用了:它可以将挂起函数当中的 Continuation 以参数的形式暴露出来,在它的 Lambda 当中,我们可以直接返回结果,这时候它就是一个“伪挂起函数”;或者,我们也可以返回 COROUTINE_SUSPENDED 这个挂起标志位,然后使用 continuation.resume() 传递结果。相应的,suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,只是对它的一种封装而已。

到此这篇关于Kotlin协程基础元素梳理分析的文章就介绍到这了,更多相关Kotlin协程基础元素内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kotlin协程与并发深入全面讲解

    目录 协程与并发 1.协程并发问题 2.协程处理并发的手段 协程与并发 Kotlin协程是基于线程执行的.经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同. 在java的世界里,并发往往是多个线程一起工作,存在共享的变量.需要处理好同步问题.要避免把协程与线程的概念混淆. runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) } L

  • Kotlin 协程的取消机制详细解读

    目录 引言 协程的状态 取消协程的用法 协程取消的有效性 如何写出可以取消的代码 在 finally 中释放资源 使用不可取消的 block CancellationException 超时取消 异步的超时和资源 取消检查的底层原理 引言 在 Java 语言中提供了线程中断的能力,但并不是所有的线程都可以中断的,因为 interrupt 方法并不是真正的终止线程,而是将一个标志位标记为中断状态,当运行到下一次中断标志位检查时,才能触发终止线程. 但无论如何,终止线程是一个糟糕的方案,因为在线程的

  • Kotlin协程之Flow异常示例处理

    目录 示例 一.catch方法 catchImpl方法 二. onCompletion方法 1.unsafeFlow方法 2.ThrowingCollector类 三. retryWhen方法 示例 代码如下: launch(Dispatchers.Main) { // 第一部分 flow { emit(1) throw NullPointerException("e") }.catch { Log.d("liduo", "onCreate1: $it&q

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • Kotlin协程之Flow触发与消费示例解析

    目录 示例 一.Flow的触发与消费 1.onEach方法 2.transform方法 3.collect方法 二.多消费过程的执行 三.总结 示例 代码如下: launch(Dispatchers.Main) { val task = flow { emit(2) emit(3) }.onEach { Log.d("liduo", "$it") } task.collect() } 一.Flow的触发与消费 在Kotlin协程:Flow基础原理的分析中,流的触发与

  • Kotlin协程基础元素梳理分析

    Kotlin 协程的基础元素:Continuation.SafeContinuation.CoroutineContext.CombinedContext.CancellationException.intrinsics.CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类. Continuation 是什么? class Call<T>(callBack: Call.CallBack<T

  • Kotlin协程上下文与上下文元素深入理解

    目录 一.EmptyCoroutineContext 二.CombinedContext 三.Key与Element 四.CoroutineContext 五.AbstractCoroutineContextKey与AbstractCoroutineContextElement 一.EmptyCoroutineContext EmptyCoroutineContext代表空上下文,由于自身为空,因此get方法的返回值是空的,fold方法直接返回传入的初始值,plus方法也是直接返回传入的cont

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • 一篇文章揭开Kotlin协程的神秘面纱

    前言 Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱. 理论 它是什么 这是别人翻译: 协程把异步编程放入库中来简化这类操作.程序逻辑在协程中顺序表述,而底层的库会将其转换为异步操作.库会将相关的用户代码打包成回调,订阅相关事件,调度其执行到不同的线程(甚至不同的机器),而代码依然想顺序执行那么简单. 我的理解:子任务程协作运行,优雅的处理异步问题解决方案. 它能干什么? 我在做安卓开发,它能替换掉Handler,AsyncTask 甚至

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程启动createCoroutine及创建startCoroutine原理

    目录 createCoroutine 和 startCoroutine startCoroutine调用 createCoroutineUnintercepted intercepted resume 结语 createCoroutine 和 startCoroutine 协程到底是怎么创建和启动的?本篇文章带你揭晓. 在Continuation.kt文件中,有2个基础API,这里单独提出来说一下,方便后面我们理解launch. public fun <T> (suspend () ->

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • kotlin 协程上下文异常处理详解

    目录 引言 一.协程上下文 1.CoroutineContext 2.CorountineScope 3.子协程继承父协程 二.协程的异常传递 1.协程的异常传播 2.不同上下文(没有继承关系)之间协程异常会怎么样? 3.向用户暴露异常 三.协程的异常处理 使用SupervisorJob 异常捕获器CoroutineExceptionHandler Android中全局异常的处理 引言 从前面我们可以大致了解了协程的玩法,如果一个协程中使用子协程,那么该协程会等待子协程执行结束后才真正退出,而达

随机推荐