图解 Kotlin SharedFlow 缓存系统及示例详解

目录
  • 前言
  • replay
  • extraBufferCapacity
  • onBufferOverflow
  • SharedFlow Buffer

前言

Kotlin 为我们提供了两种创建“热流”的工具:StateFlowSharedFlow。StateFlow 经常被用来替代 LiveData 充当架构组件使用,所以大家相对熟悉。其实 StateFlow 只是 SharedFlow 的一种特化形式,SharedFlow 的功能更强大、使用场景更多,这得益于其自带的缓存系统,本文用图解的方式,带大家更形象地理解 SharedFlow 的缓存系统。

创建 SharedFlow 需要使用到 MutableSharedFlow() 方法,我们通过方法的三个参数配置缓存:

fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

接下来,我们通过时序图的形式介绍这三个关键参数对缓存的影响。正文之前让我们先统一一下用语:

  • Emitter:Flow 数据的生产者,从上游发射数据
  • Subcriber:Flow 数据的消费者,在下游接收数据

replay

当 Subscriber 订阅 SharedFlow 时,有机会接收到之前已发送过的数据,replay 指定了可以收到 subscribe 之前数据的数量。replay 不能为负数,默认值为 0 表示 Subscriber 只能接收到 subscribe 之后 emit 的数据:

上图展示的是 replay = 0 的情况,Subscriber 无法收到 subscribe 之前 emit 的 ❶,只能接收到 ❷ 和 ❸。

当 replay = n ( n > 0)时,SharedFlow 会启用缓存,此时 BufferSize 为 n,意味着可以缓存发射过的最近 n 个数据,并发送给新增的 Subscriber。

上图以 n = 1 为例 :

  • Emitter 发送 ❶ ,并被 Buffer 缓存
  • Subscriber 订阅 SharedFlow 后,接收到缓存的 ❶
  • Emitter 相继发送 ❷ ❸ ,Buffer 缓存的数据相继依次被更新

在生产者消费者模型中,有时消费的速度赶不及生产,此时要加以控制,要么停止生产,要么丢弃数据。SharedFlow 也同样如此。有时 Subscriber 的处理速度较慢,Buffer 缓存的数据得不到及时处理,当 Buffer 为空时,emit 默认将会被挂起 ( onBufferOverflow = SUSPEND)

上面的图展示了 replay = 1 时 emit 发生 suspend 场景:

  • Emitter 发送 ❶ 并被缓存
  • Subscriber 订阅 SharedFlow ,接收 replay 的 ❶ 开始处理
  • Emitter 发送 ❷ ,缓存数据更新为 ❷ ,由于 Subscriber 对 ❶ 的处理尚未结束,❷ 在缓存中没有及时被消费
  • Emitter 发送 ❸,由于缓存的 ❷ 尚未被 Subscriber 消费,emit 发生挂起
  • Subscriber 开始消费 ❷ ,Buffer 缓存 ❸ , Emitter 可以继续 emit 新数据

注意 SharedFlow 作为一个多播可以有多个 Subscriber,所以上面例子中,❷ 被消费的时间点,取决于最后一个开始处理的 Subscriber。

extraBufferCapacity

extraBufferCapacity 中的 extra 表示 replay-cache 之外为 Buffer 还可以额外追加的缓存。

若 replay = n, extraBufferCapacity = m,则 BufferSize = m + n。

extraBufferCapacity 默认为 0,设置 extraBufferCapacity 有助于提升 Emitter 的吞吐量

在上图的基础之上,我们再设置 extraBufferCapacity = 1,效果如下图:

上图中 BufferSize = 1 + 1 = 2 :

  • Emitter 发送 ❶ 并得到 Subscriber1 的处理 ,❶ 作为 replay 的一个数据被缓存,
  • Emitter 发送 ❷,Buffer 中 replay-cache 的数据更新为 ❷
  • Emitter 发送 ❸,Buffer 在存储了 replay 数据 ❷ 之上,作为 extra 又存储了 ❸
  • Emitter 发送 ❹,此时 Buffer 已没有空余位置,emit 挂起
  • Subscriber2 订阅 SharedFlow。虽然此时 Buffer 中存有 ❷ ❸ 两个数据,但是由于 replay = 1,所以 Subscriber2 只能收到最近的一个数据 ❸
  • Subscriber1 处理完 ❶ 后,依次处理 Buffer 中的下一个数据,开始消费 ❷
  • 对于 SharedFlow 来说,已经不存在没有消费 ❷ 的 Subscriber,❷ 移除缓存,❹ 的 emit 继续,并进入缓存,此时 Buffer 又有两个数据 ❸ ❹ ,
  • Subscriber1 处理完 ❷ ,开始消费 ❸
  • 不存在没有消费 ❸ 的 Subscriber, ❸ 移除缓存。

onBufferOverflow

前面的例子中,当 Buffer 被填满时,emit 会被挂起,这都是建立在 onBufferOverflow 为 SUSPEND 的前提下的。onBufferOverflow 用来指定缓存移除时的策略,除了默认的 SUSPEND,还有两个数据丢弃策略:

  • DROP_LATEST:丢弃最新的数据
  • DROP_OLDEST:丢弃最老的数据

需要特别注意的是,当 BufferSize = 0 时,extraBufferCapacity 只支持 SUSPEND,其他丢弃策略是无效的。这很好理解,因为 Buffer 中没有数据,所以丢弃无从下手,所以启动丢弃策略的前提是 Buffer 至少有一个缓冲区,且数据被填满

上图展示 DROP_LATEST 的效果。假设 replay = 2,extra = 0

  • Emitter 发送 ❸ 时,由于 ❶ 已经被消费,所以 Buffer 数据从 ❶❷ 变为 ❷❸
  • Emitter 发送 ❹ 时,由于 ❷ 还未被消费,Buffer 处于填满状态, ❹ 直接被丢弃
  • Emitter 发送 ❺ 时,由于 ❷ 已经被费,可以移除缓存,Buffer 数据变为 ❸❺

上图展示了 DROP_OLDEST 的效果,与 DROP_LATEST 比较后非常明显,缓存中永远会储存最新的两个数据,但是较老的数据不管有没有被消费,都可能会从 Buffer 移除,所以 Subscriber 可以消费当前最新的数据,但是有可能漏掉中间的数据,比如图中漏掉了 ❷

注意:当 extraBufferCapacity 设为 SUSPEND 可以保证 Subscriber 一个不漏的消费掉所有数据,但是会影响 Emitter 的速度;当设置为 DROP_XXX 时,可以保证 emit 调用后立即返回,但是 Subscriber 可能会漏掉部分数据。

如果我们不想让 emit 发生挂起,除了设置 DROP_XXX 之外,还有一个方法就是调用 tryEmit,这是一个非 suspend 版本的 emit

abstract suspend override fun emit(value: T)
abstract fun tryEmit(value: T): Boolean

tryEmit 返回一个 boolean 值,你可以这样判断返回值,当使用 emit 会挂起时,使用 tryEmit 会返回 false,其余情况都是 true。这意味着 tryEmit 返回 false 的前提是 extraBufferCapacity 必须设为 SUSPEND,且 Buffer 中空余位置为 0 。此时使用 tryEmit 的效果等同于 DROP_LATEST。

SharedFlow Buffer

前面介绍的 MutableSharedFlow 的三个参数,其本质都是围绕 SharedFlow 的 Buffer 进行工作的。那么这个 Buffer 具体结构是怎样的呢?

上面这个图是 SharedFlow 源码中关于 Buffer 的注释,这个图形象地告诉了我们 Buffer 是一个线性数据结构(就是一个普通的数组 Array<Any?>),但是这个图不能直观反应 Buffer 运行机制。下面通过一个例子,看一下 Buffer 在运行时的具体更新过程:

val sharedFlow = MutableSharedFlow<Int>(
    replay = 2,
    extraBufferCapacity = 2,
    onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1
fun main() {
    runBlocking {
        launch {
            sharedFlow.onEach {
                delay(200) // simulate the consume of data
            }.collect()
        }
        repeat(12) {
            sharedFlow.emit(emitValue)
            emitValue++
            delay(50)
        }
    }
}

上面的代码很简单,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 生产的速度大于 Subscriber 消费的速度,所以过程中会出现 Buffer 的填充和更新,下面依旧用图的方式展示 Buffer 的变化

先看一下代码对应的时序图:

有前面的介绍,相信这个时序图很容易理解,这里就不再赘述了,下面重点图解一下 Buffer 的内存变化。SharedFlow 的 Buffer 本质上是一个基于 Array 实现的 queue,通过指针移动从往队列增删元素,避免了元素在实际数组中的移动。这里关键的指针有三个:

  • head:队列的 head 指向 Buffer 的第一个有效数据,这是时间上最早进入缓存的数据,在数据被所有的 Subscriber 消费之前不会移除缓存。因此 head 也代表了最慢的 Subscriber 的处理进度
  • replay:Buffer 为 replay-cache 预留空间的其实位置,当有新的 Subscriber 订阅发生时,从此位置开始处理数据。
  • end:新数据进入缓存时的位置,end 这也代表了最快的 Subscriber 的处理进度。

如果 bufferSize 表示当前 Buffer 中存储数据的个数,则我们可知三指针 index 符合如下关系:

  • replay <= head + bufferSize
  • end = head + bufferSize

了解了三指针的含义后,我们再来看上图中的 Buffer 是如何工作的:

最后,总结一下 Buffer 的特点:

  • 基于数组实现,当数组空间不够时进行 2n 的扩容
  • 元素进入数组后的位置保持不变,通过移动指针,决定数据的消费起点
  • 指针移动到数组尾部后,会重新指向头部,数组空间可循环使用

以上就是图解 Kotlin SharedFlow 缓存系统及示例详解的详细内容,更多关于Kotlin SharedFlow 缓存的资料请关注我们其它相关文章!

(0)

相关推荐

  • 绘制flowable 流程图的Vue 库使用详解

    目录 引言 workflow-bpmn-modeler 注册 bpmnModeler 组件 muheflow-bpmn-modeler 引言 之前松哥发了一篇文章和小伙伴们介绍了前端的 bpmn.js 这个库,利用这个库我们可以自己将绘制流程图的功能嵌入到我们的项目中. 然而,这个库默认是给 Camunda 设计的,所以画出来的流程图导出来的 XML 文件无法直接使用,必须要做一些深度定制,才能将 XML 文件转为 Flowable 流程引擎可用的 XML 文件.这个深度定制太太太麻烦了. 所以

  • Flowable 设置流程变量的四种方式详解

    目录 引言 1. 为什么需要流程变量 2. 流程变量的分类 3. 全局流程变量 3.1 启动时设置 3.2 通过 Task 设置 3.3 完成任务时设置 3.4 通过流程设置 4. 本地流程变量 4.1 通过 Task 设置 5. 临时流程变量 引言 在之前的文章中,松哥也有和小伙伴们使用过流程变量,然而没有和大家系统的梳理过流程变量的具体玩法以及它对应的数据表详情,今天我们就来看看 Flowable 中流程变量的详细玩法. 1. 为什么需要流程变量 首先我们来看看为什么需要流程变量. 举一个简

  • Flowable执行完毕的流程查找方法

    目录 正文 1. 历史流程信息 2. 历史任务查询 3. 历史活动查询 4. 历史变量查询 5. 历史日志查询 6. 历史权限查询 7. 自定义查询 SQL 8. 历史数据记录级别 正文 @[toc]在之前的文章中松哥和小伙伴们聊过,正在执行的流程信息是保存在以 ACT_RU_ 为前缀的表中,执行完毕的流程信息则保存在以 ACT_HI_ 为前缀的表中,也就是流程历史信息表,当然这个历史信息表继续细分的话,还有好多种,今天我们就来聊一聊这个话题. 假设我有如下一个流程: 当这个流程执行完毕后,以 

  • Tensorflow高性能数据优化增强工具Pipeline使用详解

    目录 安装方法 功能 高级用户部分 用例1,为训练创建数据Pipeline 用例2,为验证创建数据Pipeline 初学者部分 Keras 兼容性 配置 增强: GridMask MixUp RandomErase CutMix Mosaic CutMix , CutOut, MixUp Mosaic Grid Mask 安装方法 给大家介绍一个非常好用的TensorFlow数据pipeline工具. 高性能的Tensorflow Data Pipeline,使用SOTA的增强和底层优化. pi

  • Flow如何解决背压问题的方法详解

    目录 前言 关于背压(BackPressure) 背压问题是什么 定义背压策略 Flow的背压机制 模拟背压问题 背压处理方式 使用buffer进行缓存收集 使用conflate解决 使用collectLatest解决 小结 前言 随着时间的推移,越来越多的主流应用已经开始全面拥抱Kotlin,协程的引入,Flow的诞生,给予了开发很多便捷,作为协程与响应式编程结合的流式处理框架,一方面它简单的数据转换与操作符,没有繁琐的操作符处理,广受大部分开发的青睐,另一方面它并没有响应式编程带来的背压问题

  • 图解 Kotlin SharedFlow 缓存系统及示例详解

    目录 前言 replay extraBufferCapacity onBufferOverflow SharedFlow Buffer 前言 Kotlin 为我们提供了两种创建“热流”的工具:StateFlow 和 SharedFlow.StateFlow 经常被用来替代 LiveData 充当架构组件使用,所以大家相对熟悉.其实 StateFlow 只是 SharedFlow 的一种特化形式,SharedFlow 的功能更强大.使用场景更多,这得益于其自带的缓存系统,本文用图解的方式,带大家更

  • kotlin android extensions 插件实现示例详解

    目录 前言 原理浅析 总体结构 源码分析 插件入口 配置编译器插件传参 编译器插件接收参数 注册各种Extension IrGenerationExtension ExpressionCodegenExtension StorageComponentContainerContributor ClassBuilderInterceptorExtension PackageFragmentProviderExtension 总结 前言 kotlin-android-extensions 插件是 Ko

  • Kotlin实现Android系统悬浮窗详解

    目录 Android 弹窗浅谈 系统悬浮窗具体实现 权限申请 代码设计 具体实现 FloatWindowService 类 FloatWindowManager 类 FloatWindowManager 类代码 FloatLayout 类及其 Layout HomeKeyObserverReceiver 类 FloatWindowUtils 类 总结 Android 弹窗浅谈 我们知道 Android 弹窗中,有一类弹窗会在应用之外也显示,这是因为他被申明成了系统弹窗,除此之外还有2类弹窗分别是

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • php项目接入xxl-job调度系统的示例详解

    目录 1. 部署xxl-job调度中心 2. 整合xxl-job调度系统 2.1 创建执行器项目 2.2 新增执行器  2.3 部署执行器项目 2.4 新增GLUE模式任务 2.5 编写php代码片段 1. 部署xxl-job调度中心 此处略,请自行百度.下面重点介绍如何将php项目接入xxl-job调度. 2. 整合xxl-job调度系统 核心是使用xxl-job的GLUE运行模式,通过一段php代码片段,调用远程的http资源. 2.1 创建执行器项目 参考执行器示例项目, xxl-job-

  • linux获取系统启动时间示例详解

    1.前言 时间对操作系统来说非常重要,从内核级到应用层,时间的表达方式及精度各部相同.linux内核里面用一个名为jiffes的常量来计算时间戳.应用层有time.getdaytime等函数.今天需要在应用程序获取系统的启动时间,百度了一下,通过sysinfo中的uptime可以计算出系统的启动时间. 2.sysinfo结构 sysinfo结构保持了系统启动后的信息,主要包括启动到现在的时间,可用内存空间.共享内存空间.进程的数目等.man sysinfo得到结果如下所示: 复制代码 代码如下:

  • Kotlin的枚举与异常示例详解

    一.kotlin中枚举的定义 枚举需要用到两个关键字 enum class,譬如这样 enum class Color(val r: Int,val g: Int,val b: Int){ //彩虹色也是一个典故:韦克菲尔德战役 RED(255,0,0),ORANGE(255,165,0),YELLOW(255,255,0), GREEN(0,255,0),BLUE(0,0,255),INDIGO(75,0,130),VIOLET(238,130,238); fun rgb() = (r * 2

  • ASP.NET Core缓存静态资源示例详解

    背景 缓存样式表,JavaScript或图像文件等静态资源可以提高您网站的性能.在客户端,总是从缓存中加载一个静态文件,这样可以减少对服务器的请求数量,从而减少获取页面及其资源的时间.在服务器端,由于它们的请求较少,服务器可以处理更多的客户端而无需升级硬件. 虽然缓存是一件好事,但您必须确保客户端始终运行最新版本的应用程序.当您部署下一个版本的网站时,您不希望客户端使用过时的缓存版本的文件. 方案: 为确保用户始终使用最新版本的文件,我们必须为每个文件版本提供一个唯一的URL.有很多策略: 使用

  • Kotlin对象比较注意点示例详解

    目录 背景 原因 另一个问题 解决办法 结论 背景 现有一个StateFlow及其监听 private val stateFlow = MutableStateFlow(kotlin.Pair<String, ArrayList<String>>("abc", ArrayList())) GlobalScope.launch { stateFlow.collect { // do something } } 更新ArrayList并尝试emit GlobalSc

  • Kotlin下Rxjava的基础用法及流式调用示例详解

    目录 前言 基础用法 fromXXX create interval & timer 指定线程 observeOn & subscribeOn Flowable 流式调用 背压 前言 万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的

随机推荐