Kotlin协程flowOn与线程切换超详细示例介绍

目录
  • 示例代码
  • 一.flowOn方法
    • 1.ChannelFlowOperatorImpl类
  • 二.collect方法
    • 1.ChannelFlowOperator类的collect方法
    • 2.ChannelFlow类的collect方法
    • 3.flow方法中代码的执行
    • 4.接收flow方法发出的值
  • 三.flowOn方法与流的融合
  • 四.总结

示例代码

本文分析示例代码如下:

launch(Dispatchers.Main) {
  flow {
    emit(1)
    emit(2)
  }.flowOn(Dispatchers.IO).collect {
    delay(1000)
    withContext(Dispatchers.IO) {
      Log.d("liduo", "$it")
    }
    Log.d("liduo", "$it")
  }
}

一.flowOn方法

flowOn方法用于将上游的流切换到指定协程上下文的调度器中执行,同时不会把协程上下文暴露给下游的流,即flowOn方法中协程上下文的调度器不会对下游的流生效。如下面这段代码所示:

launch(Dispatchers.Main) {
  flow {
    emit(2) // 执行在IO线程池
  }.flowOn(Dispatchers.IO).map {
    it + 1 // 执行在Default线程池
  }.flowOn(Dispatchers.Default).collect {
    Log.d("liduo", "$it") //执行在主线程
  }
}

接下来,分析一下flowOn方法,代码如下:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    // 检查当前协程没有执行结束
    checkFlowContext(context)
    return when {
        // 为空,则返回自身
        context == EmptyCoroutineContext -> this
        // 如果是可融合的Flow,则尝试融合操作,获取新的流
        this is FusibleFlow -> fuse(context = context)
        // 其他情况,包装成可融合的Flow
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
// 确保Job不为空
private fun checkFlowContext(context: CoroutineContext) {
    require(context[Job] == null) {
        "Flow context cannot contain job in it. Had $context"
    }
}

在flowOn方法中,首先会检查方法所在的协程是否执行结束。如果没有结束,则会执行判断语句,这里flowOn方法传入的上下文不是空上下文,且通过flow方法构建出的Flow对象也不是FusibleFlow类型的对象,因此这里会走到else分支,将上游flow方法创建的Flow对象和上下文包装成ChannelFlowOperatorImpl类型的对象。

1.ChannelFlowOperatorImpl类

ChannelFlowOperatorImpl类继承自ChannelFlowOperator类,用于将上游的流包装成一个ChannelFlow对象,它的继承关系如下图所示:

通过上图可以知道,ChannelFlowOperatorImpl类最终继承了ChannelFlow类,代码如下:

internal class ChannelFlowOperatorImpl<T>(
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
    // 用于流融合时创建新的流
    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
        ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
    // 若当前的流不需要通过Channel即可实现正常工作时,会调用此方法
    override fun dropChannelOperators(): Flow<T>? = flow
    // 触发对下一级流进行收集
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)
}

二.collect方法

在Kotlin协程:Flow基础原理中讲到,当执行collect方法时,内部会调用最后产生的Flow对象的collect方法,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> override suspend fun emit(value: T) = action(value) })public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

这个最后产生的Flow对象就是ChannelFlowOperatorImpl类对象。

1.ChannelFlowOperator类的collect方法

ChannelFlowOperatorImpl类没有重写collect方法,因此调用的是它的父类ChannelFlowOperator类的collect方法,代码如下:

override suspend fun collect(collector: FlowCollector<T>) {
    // OPTIONAL_CHANNEL为默认值,这里满足条件,之后会详细讲解
    if (capacity == Channel.OPTIONAL_CHANNEL) {
        // 获取当前协程的上下文
        val collectContext = coroutineContext
        // 计算新的上下文
        val newContext = collectContext + context
        // 如果前后上下文没有发生变化
        if (newContext == collectContext)
            // 直接触发对下一级流的收集
            return flowCollect(collector)
        // 如果上下文发生变化,但不需要切换线程
        if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
            // 切换协程上下文,调用flowCollect方法触发下一级流的收集
            return collectWithContextUndispatched(collector, newContext)
    }
    // 调用父类的collect方法
    super.collect(collector)
}
// 获取当前协程的上下文,该方法会被编译器处理
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
    get() {
        throw NotImplementedError("Implemented as intrinsic")
    }

ChannelFlowOperator类的collect方法在设计上与协程的withContext方法设计思路是一致的:在方法内根据上下文的不同情况进行判断,在必要时才会切换线程去执行任务。

通过flowOn方法创建的ChannelFlowOperatorImpl类对象,参数capacity为默认值OPTIONAL_CHANNEL。因此代码在执行时会进入到判断中,但因为我们指定了上下文为Dispatchers.IO,因此上下文发生了变化,同时拦截器也发生了变化,所以最后会调用ChannelFlowOperator类的父类的collect方法,也就是ChannelFlow类的collect方法。

2.ChannelFlow类的collect方法

ChannelFlow类的代码如下:

override suspend fun collect(collector: FlowCollector<T>): Unit =
    coroutineScope {
        collector.emitAll(produceImpl(this))
    }

在ChannelFlow类的collect方法中,首先通过coroutineScope方法创建了一个作用域协程,接着调用了produceImpl方法,代码如下:

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
    scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produceImpl方法内部调用了produce方法,并且传入了待执行的任务collectToFun。

produce方法在Kotlin协程:协程的基础与使用中曾提到过,它是官方提供的启动协程的四个方法之一,另外三个方法为launch方法、async方法、actor方法。代码如下:

internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    // 根据容量与溢出策略创建Channel对象
    val channel = Channel<E>(capacity, onBufferOverflow)
    // 计算新的上下文
    val newContext = newCoroutineContext(context)
    // 创建协程
    val coroutine = ProducerCoroutine(newContext, channel)
    // 监听完成事件
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    // 启动协程
    coroutine.start(start, coroutine, block)
    return coroutine
}

在produce方法内部,首先创建了一个Channel类型的对象,接着创建了类型为ProducerCoroutine的协程,并且传入Channel对象作为参数。最后,produce方法返回了一个ReceiveChannel接口指向的对象,当协程执行完毕后,会通过Channel对象将结果通过send方法发送出来。

至此,可以知道flowOn方法的实现实际上是利用了协程拦截器的拦截功能。

在这里之后,代码逻辑分成了两部分,一部分是block在ProducerCoroutine协程中的执行,另一部分是通过ReceiveChannel对象获取执行的结果。

3.flow方法中代码的执行

在produceImpl方法中,调用了produce方法,并且传入了collectToFun对象,这个对象将会在produce方法创建的协程中执行,代码如下:

internal val collectToFun: suspend (ProducerScope<T>) -> Unit
    get() = { collectTo(it) }

当调用collectToFun对象的invoke方法时,会触发collectTo方法的执行,该方法在ChannelFlowOperator类中被重写,代码如下:

protected override suspend fun collectTo(scope: ProducerScope<T>) =
    flowCollect(SendingCollector(scope))

在collectTo方法中,首先将参数scope封装成SendingCollector类型的对象,接着调用了flowCollect方法,该方法在ChannelFlowOperatorImpl类中被重写,代码如下:

override suspend fun flowCollect(collector: FlowCollector<T>) =
    flow.collect(collector)

ChannelFlowOperatorImpl类的flowCollect方法内部调用了flow对象的collect方法,这个flow对象就是最初通过flow方法构建的对象。根据Kotlin协程:Flow基础原理的分析,这个flow对象类型为SafeFlow,最后会通过collectSafely方法,触发flow方法中的block执行。代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        // 触发执行
        collector.block()
    }
}

当flow方法在执行过程中需要向下游发出值时,会调用emit方法。根据上面flowCollect方法和collectTo方法可以知道,collectSafely方法的collector对象就是collectTo方法中创建的SendingCollector类型的对象,代码如下:

@InternalCoroutinesApi
public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    // 通过Channel类对象发送值
    override suspend fun emit(value: T): Unit = channel.send(value)
}

当调用SendingCollector类型的对象的emit方法时,会通过调用类型为Channel的对象的send方法,将值发送出去。

接下来,将分析下游如何接收上游发出的值。

4.接收flow方法发出的值

回到ChannelFlow类的collect方法,之前提到collect方法中调用produceImpl方法,开启了一个新的协程去执行任务,并且返回了一个ReceiveChannel接口指向的对象。代码如下:

override suspend fun collect(collector: FlowCollector<T>): Unit =
    coroutineScope {
        collector.emitAll(produceImpl(this))
    }

在调用完produceImpl方法后,接着调用了emitAll方法,将ReceiveChannel接口指向的对象作为emitAll方法的参数,代码如下:

public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
    emitAllImpl(channel, consume = true)

emitAll方法是FlowCollector接口的扩展方法,内部调用了emitAllImpl方法对参数channel进行封装,代码如下:

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    // 用于保存异常
    var cause: Throwable? = null
    try {
        // 死循环
        while (true) {
            // 挂起,等待接收Channel结果或Channel关闭
            val result = run { channel.receiveOrClosed() }
            // 如果Channel关闭了
            if (result.isClosed) {
                // 如果有异常,则抛出
                result.closeCause?.let { throw it }
                // 没有异常,则跳出循环
                break
            }
            // 获取并发送值
            emit(result.value)
        }
    } catch (e: Throwable) {
        // 捕获到异常时抛出
        cause = e
        throw e
    } finally {
         // 执行结束关闭Channel
        if (consume) channel.cancelConsumed(cause)
    }
}

emitAllImpl方法是FlowCollector接口的扩展方法,而这里的FlowCollector接口指向的对象,就是collect方法中创建的匿名对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

在emitAllImpl方法中,当通过receiveOrClosed方法获取到上游发出的值时,会调用emit方法通知下游,这时就会触发collect方法中block的执行,最终实现值从流的上游传递到了下游。

三.flowOn方法与流的融合

假设对一个流连续调用两次flowOn方法,那么流最终会在哪个flowOn方法指定的调度器中执行呢?代码如下:

launch(Dispatchers.Main) {
    flow {
        emit(2)
      // emit方法是在IO线程执行还是在主线程执行呢?
    }.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect {
        Log.d("liduo", "$it")
    }
}

答案是在IO线程执行,为什么呢?

根据本篇上面的分析,当第一次调用flowOn方法时,上游的流会被包裹成ChannelFlowOperatorImpl对象,代码如下:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    // 检查当前协程没有执行结束
    checkFlowContext(context)
    return when {
        // 为空,则返回自身
        context == EmptyCoroutineContext -> this
        // 如果是可融合的Flow,则尝试融合操作,获取新的流
        this is FusibleFlow -> fuse(context = context)
        // 其他情况,包装成可融合的Flow
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

而当第二次调用flowOn方法时,由于此时上游的流——ChannelFlowOperatorImpl类型的对象,实现了FusibleFlow接口,因此,这里会触发流的融合,直接调用上游的流的fuse方法,并传入新的上下文。这里容量和溢出策略均为默认值。

根据Kotlin协程:Flow的融合、Channel容量、溢出策略的分析,这里会调用ChannelFlow类的fuse方法。相关代码如下:

public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
    ...
    // 计算融合后流的上下文
    // context为下游的上下文,this.context为上游的上下文
    val newContext = context + this.context
    ...
}

再根据之前在Kotlin协程:协程上下文与上下文元素中的分析,当两个上下文进行相加时,后一个上下文中的拦截器会覆盖前一个上下文中的拦截器。在上面的代码中,后一个上下文为上游的流的上下文,因此会优先使用上游的拦截器。代码如下:

public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other

四.总结

粉线为使用时代码编写顺序,绿线为下游触发上游的调用顺序,红线为上游向下游发送值的调用顺序,蓝线为线程切换的位置。

到此这篇关于Kotlin协程flowOn与线程切换超详细示例介绍的文章就介绍到这了,更多相关Kotlin flowOn与线程切换内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kotlin协程开发之Flow的融合与Channel容量及溢出策略介绍

    目录 一.协程间的通信 1.通道容量 2.溢出策略 二.FusibleFlow接口 三.ChannelFlow类 一.协程间的通信 当需要进行协程间的通信时,可以调用Channel方法,创建一个Channel接口指向的对象,通过调用该对象的send方法和receive方法实现消息的发送与接收.协程对Channel接口的实现,本质上与阻塞队列类似,这里不再赘述. 1.通道容量 事实上,send方法与receive方法并没有定义在Channel接口中,而是分别定义在SendChannel接口和Rec

  • Kotlin协程启动createCoroutine及创建startCoroutine原理

    目录 createCoroutine 和 startCoroutine startCoroutine调用 createCoroutineUnintercepted intercepted resume 结语 createCoroutine 和 startCoroutine 协程到底是怎么创建和启动的?本篇文章带你揭晓. 在Continuation.kt文件中,有2个基础API,这里单独提出来说一下,方便后面我们理解launch. public fun <T> (suspend () ->

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程操作之创建启动挂起恢复详解

    目录 一.协程的创建 1.start方法 2.CoroutineStart类 3.startCoroutineCancellable方法 4.createCoroutineUnintercepted方法 5.createCoroutineFromSuspendFunction方法 二.协程的启动 1.ContinuationImpl类 2.resumeCancellableWith方法 3.BaseContinuationImpl类 4.invokeSuspend方法 三.协程的挂起与恢复 下面

  • Kotlin协程之Flow基础原理示例解析

    目录 引言 一.Flow的创建 二.Flow的消费 1.SafeFlow类 2.AbstractFlow类 3. SafeCollector类 4.消费过程中的挂起 引言 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d(&qu

  • kotlin 协程上下文异常处理详解

    目录 引言 一.协程上下文 1.CoroutineContext 2.CorountineScope 3.子协程继承父协程 二.协程的异常传递 1.协程的异常传播 2.不同上下文(没有继承关系)之间协程异常会怎么样? 3.向用户暴露异常 三.协程的异常处理 使用SupervisorJob 异常捕获器CoroutineExceptionHandler Android中全局异常的处理 引言 从前面我们可以大致了解了协程的玩法,如果一个协程中使用子协程,那么该协程会等待子协程执行结束后才真正退出,而达

  • Kotlin协程的启动方式介绍

    目录 1.GlobalScope.launch 2.runBlocking 启动协程 3.async启动协程 启动协程的基本方式 1.GlobalScope.launch 代码示例: fun testGlobalScope() { GlobalScope.launch { println("Coroutinue started!") delay(1000L) println("Hello World!") } println("After launch!&

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • Kotlin协程到底是如何切换线程的

    随着kotlin在Android开发领域越来越火,协程在各个项目中的应用也逐渐变得广泛 但是协程到底是什么呢? 协程其实是个古老的概念,已经非常成熟了,但大家对它的概念一直存在各种疑问,众说纷纷 有人说协程是轻量级的线程,也有人说kotlin协程其实本质是一套线程切换方案 显然这对初学者不太友好,当不清楚一个东西是什么的时候,就很难进入为什么和怎么办的阶段了 本文主要就是回答这个问题,主要包括以下内容 1.关于协程的一些前置知识 2.协程到底是什么? 3.kotlin协程的一些基本概念,挂起函数

  • Kotlin协程与并发深入全面讲解

    目录 协程与并发 1.协程并发问题 2.协程处理并发的手段 协程与并发 Kotlin协程是基于线程执行的.经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同. 在java的世界里,并发往往是多个线程一起工作,存在共享的变量.需要处理好同步问题.要避免把协程与线程的概念混淆. runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) } L

  • Kotlin协程基础元素梳理分析

    Kotlin 协程的基础元素:Continuation.SafeContinuation.CoroutineContext.CombinedContext.CancellationException.intrinsics.CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类. Continuation 是什么? class Call<T>(callBack: Call.CallBack<T

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • 使用kotlin协程提高app性能(译)

    协程是一种并发设计模式,您可以在Android上使用它来简化异步执行的代码.Kotlin1.3版本添加了 Coroutines,并基于其他语言的既定概念. 在Android上,协程有助于解决两个主要问题: 管理长时间运行的任务,否则可能会阻止主线程并导致应用冻结. 提供主安全性,或从主线程安全地调用网络或磁盘操作. 本主题描述了如何使用Kotlin协程解决这些问题,使您能够编写更清晰,更简洁的应用程序代码. 管理长时间运行的任务 在Android上,每个应用程序都有一个主线程来处理用户界面并管理

随机推荐