Kotlin协程低级api startCoroutine与ContinuationInterceptor

目录
  • 聊一聊kotlin协程“低级”api
  • startCoroutine
  • ContinuationInterceptor
  • 实战
    • kotlin协程api中的 async await
    • 通过startCoroutine与ContinuationInterceptor实现自定义的 async await
  • 本章代码
  • 最后

聊一聊kotlin协程“低级”api

Kotlin协程已经出来很久了,相信大家都有不同程度的用上了,由于最近处理的需求有遇到协程相关,因此今天来聊一Kotlin协程的“低级”api,首先低级api并不是它真的很“低级”,而是kotlin协程库中的基础api,我们一般开发用的,其实都是通过低级api进行封装的高级函数,本章会通过低级api的组合,实现一个自定义的async await 函数(下文也会介绍kotlin 高级api的async await),涉及的低级api有startCoroutineContinuationInterceptor

startCoroutine

我们知道,一个suspend关键字修饰的函数,只能在协程体中执行,伴随着suspend 关键字,kotlin coroutine common库(平台无关)也提供出来一个api,用于直接通过suspend 修饰的函数直接启动一个协程,它就是startCoroutine

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
    作为Receiver
    receiver: R,
    当前协程结束时的回调
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}

可以看到,它的Receiver是(suspend R.() -> T),即是一个suspend修饰的函数,那么这个有什么作用呢?我们知道,在普通函数中无法调起suspend函数(因为普通函数没有隐含的Continuation对象,这里我们不在这章讲,可以参考kotlin协程的资料)

但是普通函数是可以调起一个以suspend函数作为Receiver的函数(本质也是一个普通函数)

其中startCoroutine就是其中一个,本质就是我们直接从外部提供了一个Continuation,同时调用了resume方法,去进入到了协程的世界

startCoroutine实现
createCoroutineUnintercepted(completion).intercepted().resume(Unit)

这个原理我们就不细讲下去原理,之前也有写过相关的文章。通过这种调用,我们其实就可以实现在普通的函数环境,开启一个协程环境(即带有了Continuation),进而调用其他的suspend函数。

ContinuationInterceptor

我们都知道拦截器的概念,那么kotlin协程也有,就是ContinuationInterceptor,它提供以AOP的方式,让外部在resume(协程恢复)前后进行自定义的拦截操作,比如高级api中的Diapatcher就是。当然什么是resume协程恢复呢,可能读者有点懵,我们还是以上图中出现的mySuspendFunc举例子

mySuspendFunc是一个suspned函数
::mySuspendFunc.startCoroutine(object : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext
    override fun resumeWith(result: Result<Unit>) {
    }
})

它其实等价于

val continuation = ::mySuspendFunc.createCoroutine(object :Continuation<Unit>{
    override val context: CoroutineContext
        get() = EmptyCoroutineContext
    override fun resumeWith(result: Result<Unit>) {
        Log.e("hello","当前协程执行完成的回调")
    }
})
continuation.resume(Unit)

startCoroutine方法就相当于创建了一个Continuation对象,并调用了resume。创建Continuation可通过createCoroutine方法,返回一个Continuation,如果我们不调用resume方法,那么它其实什么也不会执行,只有调用了resume等执行方法之后,才会执行到后续的协程体(这个也是协程内部实现,感兴趣可以看看之前文章)

而我们的拦截器,就相当于在continuation.resume前后,可以添加自己的逻辑。我们可以通过继承ContinuationInterceptor,实现自己的拦截器逻辑,其中需要复写的方法是interceptContinuation方法,用于返回一个自己定义的Continuation对象,而我们可以在这个Continuation的resumeWith方法里面(当调用了resume之后,会执行到resumeWith方法),进行前后打印/其他自定义操作(比如切换线程)

class ClassInterceptor() :ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>):Continuation<T> by continuation{
    override fun resumeWith(result: Result<T>) {
        Log.e("hello","MyContinuation start ${result.getOrThrow()}")
        continuation.resumeWith(result)
        Log.e("hello","MyContinuation end ")
    }
}

其中的key是ContinuationInterceptor,协程内部会在每次协程恢复的时候,通过coroutineContext取出key为ContinuationInterceptor的拦截器,进行拦截调用,当然这也是kotlin协程内部实现,这里简单提一下。

实战

kotlin协程api中的 async await

我们来看一下kotlin Coroutine 的高级api async await用法

CoroutineScope(Dispatchers.Main).launch {
    val block = async(Dispatchers.IO) {
        // 阻塞的事项
    }
    // 处理其他主线程的事务
    // 此时必须需要async的结果时,则可通过await()进行获取
    val result =  block.await()
}

我们可以通过async方法,在其他线程中处理其他阻塞事务,当主线程必须要用async的结果的时候,就可以通过await等待,这里如果结果返回了,则直接获取值,否则就等待async执行完成。这是Coroutine提供给我们的高级api,能够将任务简单分层而不需要过多的回调处理。

通过startCoroutine与ContinuationInterceptor实现自定义的 async await

我们可以参考其他语言的async,或者Dart的异步方法调用,都有类似这种方式进行线程调用

async {
    val result = await {
        suspend 函数
    }
    消费result
}

await在async作用域里面,同时获取到result后再进行消费,async可以直接在普通函数调用,而不需要在协程体内,下面我们来实现一下这个做法。

首先我们想要限定await函数只能在async的作用域才能使用,那么首先我们就要定义出来一个Receiver,我们可以在Receiver里面定义出自己想要暴露的方法

interface AsyncScope {
    fun myFunc(){
    }
}
fun async(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个作用 1.充当receiver 2.completion,接收回调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}
注意这个类,resumeWith 只会跟startCoroutine的这个协程绑定关系,跟await的协程没有关系
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
    Continuation<Unit>, AsyncScope {
    override fun resumeWith(result: Result<Unit>) {
        // 这个是干嘛的 == > 完成的回调
        Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
    }
}

上面我们定义出来一个async函数,同时定义出来了一个AsyncStub的类,它有两个用处,第一个是为了充当Receiver,用于规范后续的await函数只能在这个Receiver作用域中调用,第二个作用是startCoroutine函数必须要传入一个参数completion,是为了收到当前协程结束的回调resumeWith中可以得到当前协程体结束回调的信息

await方法里面
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
    // 自定义的Receiver函数
    myFunc()
    Thread{
         切换线程执行await中的方法
        it.resumeWith(Result.success(block()))
    }.start()
}

在await中,其实是一个扩展函数,我们可以调用任何在AsyncScope中定义的方法,同时这里我们模拟了一下线程切换的操作(Dispatcher的实现,这里不采用Dispatcher就是想让大家知道其实Dispatcher.IO也是这样实现的),在子线程中调用it.resumeWith(Result.success(block())),用于返回所需要的信息

通过上面定的方法,我们可以实现

async {
    val result = await {
        suspend 函数
    }
    消费result
}
public interface ContinuationInterceptor : CoroutineContext.Element
//而CoroutineContext.Element又是继承于CoroutineContext
CoroutineContext.Element:CoroutineContext

而我们的拦截器,正是CoroutineContext的子类,我们把上文的ClassInterceptor修改一下

class ClassInterceptor() : ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
    Continuation<T> by continuation {
    private val handler = Handler(Looper.getMainLooper())
    override fun resumeWith(result: Result<T>) {
        Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
        handler.post {
            continuation.resumeWith(Result.success(自定义内容))
        }
        Log.e("hello", "MyContinuation end ")
    }
}

同时把async默认参数CoroutineContext实现一下即可

fun async(
    context: CoroutineContext = ClassInterceptor(),
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个作用 1.充当receiver 2.completion,接收回调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}

此后我们就可以直接通过,完美实现了一个类js协程的调用,同时具备了自动切换线程的能力

async {
    val result = await {
        test()
    }
    Log.e("hello", "result is $result   ${Looper.myLooper() == Looper.getMainLooper()}")
}

结果

E  start 
  E  MyContinuation start kotlin.Unit
  E  MyContinuation end 
  E  end 
  E  执行阻塞函数 test 1923
  E  MyContinuation start 自定义内容数值
  E  MyContinuation end 
  E  result is 自定义内容的数值   true
  E  AsyncStub resumeWith 2 kotlin.Unit

最后,这里需要注意的是,为什么拦截器回调了两次,因为我们async的时候开启了一个协程,同时await的时候也开启了一个,因此是两个。AsyncStub只回调了一次,是因为AsyncStub被当作complete参数传入了async开启的协程block.startCoroutine,因此只是async中的协程结束才会被回调。

本章代码

class ClassInterceptor() : ContinuationInterceptor {
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
    Continuation<T> by continuation {
    private val handler = Handler(Looper.getMainLooper())
    override fun resumeWith(result: Result<T>) {
        Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
        handler.post {
            continuation.resumeWith(Result.success(6 as T))
        }
        Log.e("hello", "MyContinuation end ")
    }
}
interface AsyncScope {
    fun myFunc(){
    }
}
fun async(
    context: CoroutineContext = ClassInterceptor(),
    block: suspend AsyncScope.() -> Unit
) {
    // 这个有两个作用 1.充当receiver 2.completion,接收回调
    val completion = AsyncStub(context)
    block.startCoroutine(completion, completion)
}
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
    Continuation<Unit>, AsyncScope {
    override fun resumeWith(result: Result<Unit>) {
        // 这个是干嘛的 == > 完成的回调
        Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
    }
}
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
    myFunc()
    Thread{
        it.resumeWith(Result.success(block()))
    }.start()
}

模拟阻塞

fun test(): Int {
    Thread.sleep(5000)
    Log.e("hello", "执行阻塞函数 test ${Thread.currentThread().id}")
    return 5
}
async {
    val result = await {
        test()
    }
    Log.e("hello", "result is $result   ${Looper.myLooper() == Looper.getMainLooper()}")
}

最后

我们通过协程的低级api,实现了一个与官方库不同版本的async await,同时也希望通过对低级api的设计,也能对Coroutine官方库的高级api的实现有一定的了解。

以上就是Kotlin协程低级api startCoroutine与ContinuationInterceptor 的详细内容,更多关于Kotlin协程低级api的资料请关注我们其它相关文章!

(0)

相关推荐

  • Room Kotlin API的使用入门教程

    Room 是 SQLite 的封装,它使 Android 对数据库的操作变得非常简单,也是迄今为止我最喜欢的 Jetpack 库.在本文中我会告诉大家如何使用并且测试 Room Kotlin API,同时在介绍过程中,我也会为大家分享其工作原理. 我们将基于 Room with a view codelab 为大家讲解.这里我们会创建一个存储在数据库的词汇表,然后将它们显示到屏幕上,同时用户还可以向列表中添加单词. 定义数据库表 在我们的数据库中仅有一个表,就是保存词汇的表.Word 类代表表中

  • Kotlin协程的线程调度示例详解

    目录 引言 一.协程的分发器作用 1.1 测试代码 1.2 CoroutineScope.launch 1.2.1 newCoroutineContext 1.3 startCoroutineCancellable 1.3.1 intercepted() 1.3.2 CoroutineDispatcher 1.3.3 小结 1.4 DispatchedContinuation 1.5 DefaultScheduler 1.5.1 SchedulerCoroutineDispatcher 1.5.

  • Kotlin启动协程的三种方式示例详解

    目录 1.launch启动协程 2.runBlocking启动协程 3.async启动协程 1.launch启动协程 fun main() = runBlocking { launch { delay(1000L) println("World!") } println("Hello") } fun main() { GlobalScope.launch { delay(1000L) println("World!") } println(&qu

  • Kotlin协程Context应用使用示例详解

    目录 1.Context的应用 2.万物皆有 Context 1.CoroutineScope 2.Job 3.Dispatcher 4.CoroutineExceptionHandler 1.Context的应用 Context在启动协程模式中就已经遇到过叫CoroutineContext,它的意思就是协程上下文,线程的切换离不开它. 在启动协程模式中也说明过为什么不用传递Context,因为它有一个默认值EmptyCoroutineContext,需要注意的是这个Context是不可以切换线

  • Android使用Kotlin API实践WorkManager

    WorkManager 提供了一系列 API 可以更加便捷地规划异步任务,即使在应用被关闭之后或者设备重启之后,仍然需要保证立即执行的或者推迟执行的任务被正常处理.对于 Kotlin 开发者,WorkManager 为协程提供了最佳的支持.在本文中,我将通过实践 WorkManager codelab 为大家展示 WorkManager 中与协程相关的基本操作.那么让我们开始吧! WorkManager 基础 当您需要某个任务保持运行状态,即使用户切换到别的界面或者用户将应用切换到后台,甚至设备

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • Kotlin协程启动createCoroutine及创建startCoroutine原理

    目录 createCoroutine 和 startCoroutine startCoroutine调用 createCoroutineUnintercepted intercepted resume 结语 createCoroutine 和 startCoroutine 协程到底是怎么创建和启动的?本篇文章带你揭晓. 在Continuation.kt文件中,有2个基础API,这里单独提出来说一下,方便后面我们理解launch. public fun <T> (suspend () ->

  • Kotlin协程到底是如何切换线程的

    随着kotlin在Android开发领域越来越火,协程在各个项目中的应用也逐渐变得广泛 但是协程到底是什么呢? 协程其实是个古老的概念,已经非常成熟了,但大家对它的概念一直存在各种疑问,众说纷纷 有人说协程是轻量级的线程,也有人说kotlin协程其实本质是一套线程切换方案 显然这对初学者不太友好,当不清楚一个东西是什么的时候,就很难进入为什么和怎么办的阶段了 本文主要就是回答这个问题,主要包括以下内容 1.关于协程的一些前置知识 2.协程到底是什么? 3.kotlin协程的一些基本概念,挂起函数

  • Kotlin协程操作之创建启动挂起恢复详解

    目录 一.协程的创建 1.start方法 2.CoroutineStart类 3.startCoroutineCancellable方法 4.createCoroutineUnintercepted方法 5.createCoroutineFromSuspendFunction方法 二.协程的启动 1.ContinuationImpl类 2.resumeCancellableWith方法 3.BaseContinuationImpl类 4.invokeSuspend方法 三.协程的挂起与恢复 下面

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程launch启动流程原理详解

    目录 1.launch启动流程 反编译后的Java代码 2.协程是如何被启动的 1.launch启动流程 已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的呢,直接进入launch的源码开始看起. fun main() { coroutineTest() Thread.sleep(2000L) } val block = suspend { println("Hello") delay(1000L) println(&q

  • Kotlin协程上下文与上下文元素深入理解

    目录 一.EmptyCoroutineContext 二.CombinedContext 三.Key与Element 四.CoroutineContext 五.AbstractCoroutineContextKey与AbstractCoroutineContextElement 一.EmptyCoroutineContext EmptyCoroutineContext代表空上下文,由于自身为空,因此get方法的返回值是空的,fold方法直接返回传入的初始值,plus方法也是直接返回传入的cont

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程的启动方式介绍

    目录 1.GlobalScope.launch 2.runBlocking 启动协程 3.async启动协程 启动协程的基本方式 1.GlobalScope.launch 代码示例: fun testGlobalScope() { GlobalScope.launch { println("Coroutinue started!") delay(1000L) println("Hello World!") } println("After launch!&

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

随机推荐