Kotlin协程之Flow基础原理示例解析

目录
  • 引言
  • 一.Flow的创建
  • 二.Flow的消费
    • 1.SafeFlow类
    • 2.AbstractFlow类
    • 3. SafeCollector类
    • 4.消费过程中的挂起

引言

本文分析示例代码如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.IO) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector<in T> {
    // 可挂起,非线程安全
    public suspend fun emit(value: T)
}

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

     // 核心方法
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 创建SafeCollector对象,对collector进行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 调用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 释放拦截的续体
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

    ...
    // 保存上下文中元素数量,用于检查上下文是否变化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 执行结束后的续体
    private var completion: Continuation<Unit>? = null

    // 协程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 挂起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }

    // 释放拦截的续体
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 发射数据
    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }

    // 重载的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        // 从续体中获取上下文
        val currentContext = uCont.context
        // 保证当前协程的Job是active的
        currentContext.ensureActive()
        // 获取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文发生变化
        if (previousContext !== currentContext) {
            // 检查上下文是否发生异常
            checkContext(currentContext, previousContext, value)
        }
        // 保存续体
        completion = uCont
        // 调用emitFun方法,传入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

    // 检查上下文变化,防止并发
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次执行过程中发生了异常
        if (previousContext is DownstreamExceptionElement) {
            // 抛出异常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 检查上下文是否发生变化,如果变化,则抛出异常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }

    // 用于抛出异常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心执行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) {
    // 获取当前suspend方法续体
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 调用重载的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 抛出异常
            throw e
        }
    }
}

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 尝试获取异常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果没有异常,则恢复flow方法续体的执行
    completion?.resumeWith(result as Result<Unit>)
    // 返回挂起标识,这里挂起的是消费过程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

以上就是Kotlin协程之Flow基础原理示例解析的详细内容,更多关于Kotlin协程Flow原理的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • Kotlin协程启动createCoroutine及创建startCoroutine原理

    目录 createCoroutine 和 startCoroutine startCoroutine调用 createCoroutineUnintercepted intercepted resume 结语 createCoroutine 和 startCoroutine 协程到底是怎么创建和启动的?本篇文章带你揭晓. 在Continuation.kt文件中,有2个基础API,这里单独提出来说一下,方便后面我们理解launch. public fun <T> (suspend () ->

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程的启动方式介绍

    目录 1.GlobalScope.launch 2.runBlocking 启动协程 3.async启动协程 启动协程的基本方式 1.GlobalScope.launch 代码示例: fun testGlobalScope() { GlobalScope.launch { println("Coroutinue started!") delay(1000L) println("Hello World!") } println("After launch!&

  • Kotlin协程开发之Flow的融合与Channel容量及溢出策略介绍

    目录 一.协程间的通信 1.通道容量 2.溢出策略 二.FusibleFlow接口 三.ChannelFlow类 一.协程间的通信 当需要进行协程间的通信时,可以调用Channel方法,创建一个Channel接口指向的对象,通过调用该对象的send方法和receive方法实现消息的发送与接收.协程对Channel接口的实现,本质上与阻塞队列类似,这里不再赘述. 1.通道容量 事实上,send方法与receive方法并没有定义在Channel接口中,而是分别定义在SendChannel接口和Rec

  • kotlin 协程上下文异常处理详解

    目录 引言 一.协程上下文 1.CoroutineContext 2.CorountineScope 3.子协程继承父协程 二.协程的异常传递 1.协程的异常传播 2.不同上下文(没有继承关系)之间协程异常会怎么样? 3.向用户暴露异常 三.协程的异常处理 使用SupervisorJob 异常捕获器CoroutineExceptionHandler Android中全局异常的处理 引言 从前面我们可以大致了解了协程的玩法,如果一个协程中使用子协程,那么该协程会等待子协程执行结束后才真正退出,而达

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • Kotlin协程操作之创建启动挂起恢复详解

    目录 一.协程的创建 1.start方法 2.CoroutineStart类 3.startCoroutineCancellable方法 4.createCoroutineUnintercepted方法 5.createCoroutineFromSuspendFunction方法 二.协程的启动 1.ContinuationImpl类 2.resumeCancellableWith方法 3.BaseContinuationImpl类 4.invokeSuspend方法 三.协程的挂起与恢复 下面

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程之Flow基础原理示例解析

    目录 引言 一.Flow的创建 二.Flow的消费 1.SafeFlow类 2.AbstractFlow类 3. SafeCollector类 4.消费过程中的挂起 引言 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d(&qu

  • Kotlin协程之Flow异常示例处理

    目录 示例 一.catch方法 catchImpl方法 二. onCompletion方法 1.unsafeFlow方法 2.ThrowingCollector类 三. retryWhen方法 示例 代码如下: launch(Dispatchers.Main) { // 第一部分 flow { emit(1) throw NullPointerException("e") }.catch { Log.d("liduo", "onCreate1: $it&q

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • go sync Once实现原理示例解析

    目录 正文 Once 的实现 使用示例 Once 的一些工作机制 Once 详解 hotpath atomic.LoadUint32 atomic.StoreUint32 Mutex 总结 正文 在很多情况下,我们可能需要控制某一段代码只执行一次,比如做某些初始化操作,如初始化数据库连接等. 对于这种场景,go 为我们提供了 sync.Once 对象,它保证了某个动作只被执行一次. 当然我们也是可以自己通过 Mutex 实现 sync.Once 的功能,但是相比来说繁琐了那么一点, 因为我们不仅

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • flutter Bloc 实现原理示例解析

    目录 序言 1. 事件流 > 状态流 (中转) 2. 使用 BlocBuilder 实时监听状态变更, 如何实现的呢? 总结 扩展 序言 在flutter开发中,我们使用 bloc 框架,基于状态变更进行响应式开发.本篇文章,小轰将 bloc 核心业务块进行拆解简化,聊一聊它的实现思想,bloc 核心能力分为如下两点: 添加事件 event,将 '事件流' 转换为 '状态流' state 监听 bloc 流,每次 state 状态变更,通知 widget 更新 下面,用自定义Bloc的方式,来给

  • java LockSupport实现原理示例解析

    目录 引言 LockSupport常见函数 LockSupport.park LockSupport.unpark 引言 前文中了解到AQS借助LockSupport.park和LockSupport.unpark完成线程的阻塞和唤醒,那么LockSupport内部又是怎么实现的?这是一个什么类? LockSupport是用于使用锁阻塞线程的基础实现,是其他同步类的基础,这个类为每个使用它的线程关联一个许可证(有点类似于Semaphore),如果许可证可用,线程调用park方法时会立即返回,线程

  • 网页资源阻塞浏览器加载的原理示例解析

    目录 正文 测试前环境准备 图片会造成阻塞吗? CSS 加载阻塞 CSS 会阻塞后面 JS 的执行吗? JS 加载阻塞 defer 和 async 动态脚本会造成阻塞吗? DOMContentLoaded 和 onload DOMContentLoaded 遇到脚本 DOMContentLoaded 遇到样式 正文 一个页面允许加载的外部资源有很多,常见的有脚本.样式.字体.图片和视频等,对于这些外部资源究竟是如何影响整个页面的加载和渲染的呢?今天来一探究竟. 如何用 Chrome 定制网络加载

  • go语言csrf库使用实现原理示例解析

    目录 引言 csrf小档案 一.CSRF及其实现原理 CSRF攻击示例 二.如何预防 三.CSRF包的使用及实现原理 csrf包的安装 基本使用 使用net/http包启动的服务 echo框架下使用csrf包 gin框架下使用csrf包 beego框架下使用csrf包 实现原理 csrf结构体 csrf包的工作流程 为什么GET.HEAD.OPTIONS.TRACE的请求方法不需要token验证 总结 引言 今天给大家推荐的是web应用安全防护方面的一个包:csrf.该包为Go web应用中常见

  • react fiber执行原理示例解析

    目录 为什么要使用fiber,要解决什么问题? fiber是什么? 数据结构 执行单元 浏览器工作: Fiber执行原理 workInProgress tree: currentFiber tree: Effects list: render阶段: 遍历节点过程: 收集effect list: commit阶段: 为什么commit必须是同步的操作的? 为什么要使用fiber,要解决什么问题? 在 react16 引入 Fiber 架构之前,react 会采用递归方法对比两颗虚拟DOM树,找出需

随机推荐