Kotlin 协程异步热数据流的设计与使用讲解

目录
  • 一.异步冷数据流
  • 二.异步热数据流
    • 1.异步热数据流的设计
      • 1)SharedFlow接口
      • 2)MutableSharedFlow接口
    • 2.异步热数据流的使用
      • 1)MutableSharedFlow方法
      • 2)使用示例

一.异步冷数据流

在Kotlin协程:协程的基础与使用中,通过使用协程中提供的flow方法可以创建一个Flow对象。这种方法得到的Flow对象实际上是一个异步冷数据流,代码如下:

private suspend fun test() {
    val flow = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }
    GlobalScope.launch {
        // 触发flow执行
        flow.collect {
            Log.d("liduo", "test1: $it")
        }
    }
    GlobalScope.launch {
        // 再次触发flow执行
        flow.collect {
            Log.d("liduo", "test2: $it")
        }
    }
}

在上面的代码中,通过调用flow方法,构建了一个名为flow对象,并对flow对象异步执行了两次。每次都会打印出1、2、3、4,然后结束执行。无论谁在前谁在后,无论执行多少次,得到的结果都是相同的,这就是异步冷数据流的一个特点。

二.异步热数据流

既然有冷数据流,那就一定有热数据流。在协程中提供了MutableSharedFlow方法来创建异步热数据流。相比于异步冷数据流,异步热数据流一般在类似广播订阅的场景中使用。

1.异步热数据流的设计

在异步热数据流中,核心接口的继承关系如下图所示:

1)SharedFlow接口

SharedFlow接口继承自Flow接口,代码如下:

public interface SharedFlow<out T> : Flow<T> {
    // 用于保存最近的已经发送的数据
    public val replayCache: List<T>
}
  • replay缓存:每个SharedFlow类型的对象会将最新发射的数据保存到replayCache中,每一个新的订阅者会先从replayCache中获取数据,然后再获取最新发射的数据。
  • 订阅过程:在SharedFlow中,每个FlowCollecter类型的对象都被称为订阅者。调用SharedFlow类型对象的collect方法会触发订阅。正常情况下,订阅不会自动结束,但订阅者可以取消订阅,当订阅者所在的协程被取消时,订阅过程就会取消。
  • 操作符使用:对于大部分终端操作符,比如:toList方法,当对SharedFlow类型的对象使用这些操作符将永远不会结束或完成变换(toList用于将上游发射的所有数据保存到列表中,并返回列表)。对于部分用于截断流的操作符,比如:take方法,当对SharedFlow类型的对象使用这些操作符可以完成变换(take用于截取指定数量的上游流发射的数据)。当对SharedFlow类型的对象使用flowOn操作符、cancellable操作符,或使用指定参数为RENDEZVOUS的buffer操作符是无效的。
  • SharedFlow并发: SharedFlow中所有的方法都是线程安全的,并且可以在多协程并发的场景中使用且不必额外加锁。
  • 冷流转换热流:对于一个冷流,可以通过调用shareIn方法,转换为一个热流。
  • SharedFlow与BroadcastChannel的区别:从概念上讲,SharedFlow与BroadcastChannel很相似,但二者也有很大的差别,推荐使用SharedFlow,SharedFlow设计的目的就是要在未来替代BroadcastChannel:
    • SharedFlow更简单,不需要实现一堆与Channel相关的接口。
    • SharedFlow支持配置replay缓存与缓存溢出策略。
    • SharedFlow清楚地划分了只读的SharedFlow和可读可写的SharedFlow。
    • SharedFlow不能关闭,也不能表示失败,因此如果需要,所有的错误与完成信号都应该具体化。

2)MutableSharedFlow接口

MutableSharedFlow接口继承自SharedFlow接口与FlowCollector接口,并在此基础上定义了两个方法与一个常量,代码如下:

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    // 该方法用于尝试发射一个数据,
    // 当返回true时表示发射成功,返回false时,表示缓存空间不足,需要挂起。
    public fun tryEmit(value: T): Boolean
    // 该常量表示当前SharedFlow的订阅者的数量,
    // 该常量是一个状态流StateFlow,也是一个热流,当其中数值发生变化时会进行回调通知
    public val subscriptionCount: StateFlow<Int>
    // 用于清空replayCache
    // 在调用该方法之前老的订阅者,可以继续收到replaycache中的缓存数据,
    // 在调用该方法之后的新的订阅者,只能收到emit方法发射的新数据
    @ExperimentalCoroutinesApi
    public fun resetReplayCache()
}

2.异步热数据流的使用

1)MutableSharedFlow方法

在协程中,可以通过调用MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
}

其中构造方法中三个参数的含义如下:

  • replay:表示新订阅的接收者可以收到的最近已经发射的数据的数量,默认为0。
  • extraBufferCapacity:表示除replay外,当发射速度大于接收速度时数据可缓存的数量,默认为0。
  • onBufferOverflow:表示当缓存已满,数据即将溢出时的数据的处理策略,默认为SUSPEND。

当创建MutableSharedFlow类型的对象时,可以通过参数replay确定SharedFlow接口中定义的replayCache的最大容量,通过参数extraBufferCapacity设置一个不包括replay大小的缓存数量。replayCache本质上也是缓存的一部分,因此extraBufferCapacity与replay共同决定了缓存的大小。

对于处理数据慢的订阅者,可以通过从缓存中获取数据,以此来避免发射者的挂起。缓存的数量大小决定了数据处理快的订阅者与数据处理慢的订阅者之间的延迟程度。

当使用默认的构造方法创建MutableSharedFlow类型的对象时,它的缓存数量为0。当调用它的emit方法时会直接挂起,直到所有的订阅者都处理完当前emit方法发送的数据,才会恢复emit方法的挂起。如果MutableSharedFlow类型的对象没有订阅者,则调用emit方法会直接返回。

2)使用示例

代码如下:

private suspend fun test() {
    // 创建一个热流
    val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)
    // 启动一个协程,发射数据:1
    // 由于有缓存,因此会被添加到缓存中,不会挂起
    GlobalScope.launch {
        flow.emit(1)
    }
    // 将MutableSharedFlow对象转换为SharedFlow对象
    // SharedFlow对象不能调用emit方法,因此只能用于接收
    val onlyReadFlow = flow.asSharedFlow()
    // 接收者1
    // 启动一个新协程
    GlobalScope.launch {
        // 订阅监听,当collect方法触发订阅时,会首先会调onSubscription方法
        onlyReadFlow.onSubscription {
            Log.d("liduozuishuai", "test0: ")
            // 发射数据:3
            // 向下游发射数据:3,其他接收者收不到
            emit(3)
        }.onEach {
            // 处理接收的数据
            Log.d("liduozuishuai", "test1: $it")
        }.collect()
    }
    // 接收者2
    // 启动一个新的协程
    GlobalScope.launch {
        // 触发并处理接收的数据
        onlyReadFlow.collect {
            Log.d("liduozuishuai", "test2: $it")
        }
    }
    // 发送数据:2
    GlobalScope.launch {
        flow.emit(2)
    }
}

对于上面的代码,接收者1会依次打印出:3、1、2,接收者2会依次打印出1、2。

以上就是Koltin 协程异步热数据流的设计与使用讲解的详细内容,更多关于Koltin 协程异步热数据流的资料请关注我们其它相关文章!

(0)

相关推荐

  • 利用Kotlin的协程实现简单的异步加载详解

    前言 众所周知在android中当执行程序的耗时超过5秒时就会引发ANR而导致程序崩溃.由于UI的更新操作是在UI主线程进行的,理想状态下每秒展示60帧时人眼感受不到卡顿,1000ms/60帧,即每帧绘制时间不应超过16.67ms.如果某项操作的耗时超过这一数值就会导致UI卡顿.因此在实际的开发中我通常把耗时操作放在一个新的线程中(比如从网络获取数据,从SD卡读取图片等操作),但是呢在android中UI的更新只能在UI主线程中进行更新,因此当我们在非UI线程中执行某些操作的时候想要更新UI就需

  • Kotlin Coroutines执行异步加载示例详解

    前言 Kotlin Coroutines是Kotlin推出的新的异步API.并不是解决所有问题的最优方案,但是希望在许多情况下它会使事情变得更容易一些.这里只简单的展示一下这个库在安卓中的具体使用方案.下面话不多说了,来一起看看详细的介绍吧. 引入Coroutines //在application的build.gradle文件中的android节点添加如下的代码 kotlin { experimental { coroutines 'enable' } } //添加下面两行到依赖中 implem

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • Kotlin协程Channel特点及使用细节详解

    目录 正文 1.认识Channel 2.Channel使用中的细节 3.Channe的特点 正文 在协程启动模式中已经知道async是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel就出现了. 1.认识Channel Channel的意思是管道.通道,用图表示如下: Channel的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样: fun main() { channelTest() } fun channelTest() = runBlock

  • 一篇文章揭开Kotlin协程的神秘面纱

    前言 Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱. 理论 它是什么 这是别人翻译: 协程把异步编程放入库中来简化这类操作.程序逻辑在协程中顺序表述,而底层的库会将其转换为异步操作.库会将相关的用户代码打包成回调,订阅相关事件,调度其执行到不同的线程(甚至不同的机器),而代码依然想顺序执行那么简单. 我的理解:子任务程协作运行,优雅的处理异步问题解决方案. 它能干什么? 我在做安卓开发,它能替换掉Handler,AsyncTask 甚至

  • 使用kotlin协程提高app性能(译)

    协程是一种并发设计模式,您可以在Android上使用它来简化异步执行的代码.Kotlin1.3版本添加了 Coroutines,并基于其他语言的既定概念. 在Android上,协程有助于解决两个主要问题: 管理长时间运行的任务,否则可能会阻止主线程并导致应用冻结. 提供主安全性,或从主线程安全地调用网络或磁盘操作. 本主题描述了如何使用Kotlin协程解决这些问题,使您能够编写更清晰,更简洁的应用程序代码. 管理长时间运行的任务 在Android上,每个应用程序都有一个主线程来处理用户界面并管理

  • Kotlin协程到底是如何切换线程的

    随着kotlin在Android开发领域越来越火,协程在各个项目中的应用也逐渐变得广泛 但是协程到底是什么呢? 协程其实是个古老的概念,已经非常成熟了,但大家对它的概念一直存在各种疑问,众说纷纷 有人说协程是轻量级的线程,也有人说kotlin协程其实本质是一套线程切换方案 显然这对初学者不太友好,当不清楚一个东西是什么的时候,就很难进入为什么和怎么办的阶段了 本文主要就是回答这个问题,主要包括以下内容 1.关于协程的一些前置知识 2.协程到底是什么? 3.kotlin协程的一些基本概念,挂起函数

  • Kotlin协程上下文与上下文元素深入理解

    目录 一.EmptyCoroutineContext 二.CombinedContext 三.Key与Element 四.CoroutineContext 五.AbstractCoroutineContextKey与AbstractCoroutineContextElement 一.EmptyCoroutineContext EmptyCoroutineContext代表空上下文,由于自身为空,因此get方法的返回值是空的,fold方法直接返回传入的初始值,plus方法也是直接返回传入的cont

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • Kotlin协程基础元素梳理分析

    Kotlin 协程的基础元素:Continuation.SafeContinuation.CoroutineContext.CombinedContext.CancellationException.intrinsics.CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类. Continuation 是什么? class Call<T>(callBack: Call.CallBack<T

随机推荐