Kotlin协程Flow生命周期及异常处理浅析

目录
  • 正文
    • Flow基本概念
    • Flow生命周期
    • 处理异常
    • 上游或者中间异常使用catch
    • 下游使用try-catch
  • 切换执行线程
  • 终止操作符
  • "冷的数据流"从何而来

正文

Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大。

Flow基本概念

Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理“了一下,又流淌到了下游。

示例代码如下

flow {         // 发送方、上游
    emit(1)    // 挂起函数,发送数据
    emit(2)
    emit(3)
    emit(4)
    emit(5)
}
.filter { it > 2 }  // 中转站,处理数据
.map { it * 2 }
.take(2)
.collect{           // 接收方,下游
    println(it)
}
输出内容:
6
8

通过上面代码我们可以看到,基于一种链式调用api的方式,流式的进行处理数据还是很棒的,接下来具体看一下上面的组成:

  • flow{},是个高阶函数,主要用于创建一个新的Flow。在其Lambda函数内部使用了emit()挂起函数进行发送数据。
  • filter{}、map{}、take{},属于中间处理层,也是中间数据处理的操作符,Flow最大的优势,就是它的操作符跟集合操作符高度一致。只要会用List、Sequence,那么就可以快速上手 Flow 的操作符。
  • collect{},下游接收方,也成为终止操作符,它的作用其实只有一个:终止Flow数据流,并且接收这些数据。

其他创建Flow的方式还是flowOf()函数,示例代码如下

fun main() = runBlocking{aassssssssaaaaaaaas
    flowOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println("flowof: $it")
    }
}

我们在看一下list集合的操作示例

listOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println("listof: $it")
        }

通过以上对比发现,两者的基本操作几乎一致,Kotlin也提供了两者相互转换的API,Flow.toList()、List.asFlow()这两个扩展函数,让数据在 List、Flow 之间来回转换,示例代码如下:

//flow 转list
    flowOf(1,2,3)
        .toList()
        .filter { it > 1 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println(it)
        }
    // list 转 flow
    listOf(1,2,3).asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }

Flow生命周期

虽然从上面操作看和集合类型,但是Flow还是有些特殊操作符的,毕竟它是协程的一部分,和Channel不同,Flow是有生命周期的,只是以操作符的形式回调而已,比如onStart、onCompletion这两个中间操作符。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .collect{
            println("collect: $it")
        }
输出内容:
onStart
filter: 1
filter: 2
filter: 3
filter: 4
map: 4
collect: 8
filter: 5
map: 5
collect: 10

我们可以看到onStart,它的作用是注册一个监听事件:当 flow 启动以后,它就会被回调。

和filter、map、take这些中间操作符不同,他们的顺序会影响数据的处理结果,这也很好理解;onStart和位置没有关系,它本质上是一个回调,不是一个数据处理的中间站。同样的还有数据处理完成的回调onCompletion。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .onCompletion { println("onCompletion") }
        .collect{
            println("collect: $it")
        }

Flow中onCompletion{} 在面对以下三种情况时都会进行回调:

  • 1,Flow 正常执行完毕
  • 2,Flow 当中出现异常
  • 3,Flow 被取消。

处理异常

在数据流的处理过程中,很难保证不出现问题,那么出现异常之后再该怎么处理呢?

  • 对于发生在上游、中间操作这两个阶段的异常,我们可以直接使用 catch 这个操作符来进行捕获和进一步处理。
  • 对于发生在下游,使用try-catch,把collect{}当中可能出现问题的代码包裹起来进行捕获处理。

上游或者中间异常使用catch

fun main() = runBlocking{
    val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }
    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .collect{
            println("collect: $it")
        }
}
输出:
collect: 2
collect: 4
catch: java.lang.IllegalStateException

catch 这个操作符的作用是和它的位置强相关的,catch 的作用域,仅限于catch的上游。换句话说,发生在 catch 上游的异常,才会被捕获,发生在 catch 下游的异常,则不会被捕获。

val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }
    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .filter { it / 0 > 1 } // catch之后发生异常
        .collect{
            println("collect: $it")
    }
输出内容:
Exception in thread "main" java.lang.ArithmeticException: / by zero

下游使用try-catch

flowOf(1,2,3)
        .onCompletion { println("onCompletion $it") }
        .collect{
            try {
                println("collect: $it")
                throw IllegalStateException();
            }catch (e: Exception){
                println("catch $e")
            }
        }
输出:
collect: 1
catch java.lang.IllegalStateException
collect: 2
catch java.lang.IllegalStateException
collect: 3
catch java.lang.IllegalStateException
onCompletion null

切换执行线程

Flow适合处理复杂的异步任务,大多数情况下耗时任务放在子线程或线程池中处理,对于UI任务放在主线程中进行。

在Flow中可以使用flowOn操作符实现上述场景中的线程切换。

flowOf(1,2,3,4,5)
        .filter {
            logX("filter: $it")
            it > 2 }
        .flowOn(Dispatchers.IO) // 切换线程
        .collect{
            logX("collect: $it")
        }
输出内容:
================================
filter: 1
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 2
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 3
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 4
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 5
Thread:DefaultDispatcher-worker-1
================================
================================
collect: 3
Thread:main
================================
================================
collect: 4
Thread:main
================================
================================
collect: 5
Thread:main
================================

flowOn 操作符也是和它的位置强相关的。作用域限于它的上游。在上面的代码中,flowOn 的上游,就是 flowOf{}、filter{} 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有collect{}当中的代码是运行在 main 线程当中的。

终止操作符

Flow 里面,最常见的终止操作符就是collect。除此之外,还有一些从集合中借鉴过来的操作符,也是Flow的终止操作符。比如 first()、single()、fold{}、reduce{},本质上来说说当我们尝试将 Flow 转换成集合的时候,已经不属于Flow的API,也不属于协程的范畴了,它本身也就意味着 Flow 数据流的终止。

"冷的数据流"从何而来

在上面文章《Kotlin协程Channel浅析》中,我们认识到Channel是”热数据流“,随时准备好,随用随取,就像海底捞里的服务员。

现在我们看下Flow和Channel的区别

val flow = flow {
        (1..4).forEach{
            println("Flow发送前:$it")
            emit(it)
            println("Flow发送后: $it")
        }
    }
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach{
            println("Channel发送前: $it")
            send(it)
            println("Channel发送后: $it")
        }
    }
输出内容:
Channel发送前: 1

Flow中的逻辑并未执行,因此我们可以这样类比,Channel之所以被认为是“热”的原因,是因为不管有没有接收方,发送方都会工作。那么对应的,Flow被认为是“冷”的原因,就是因为只有调用终止操作符之后,Flow才会开始工作。

除此之外,Flow一次处理一条数据,是个”懒家伙“。

    val flow = flow {
        (3..6).forEach {
            println("Flow发送前:$it")
            emit(it)
            println("Flow发送后: $it")
        }
    }.filter {
        println("filter: $it")
        it > 3
    }.map {
        println("map: $it")
        it * 2
    }.collect {
        println("结果collect: $it")
    }
输出内容:
Flow发送前:3
filter: 3
Flow发送后: 3
Flow发送前:4
filter: 4
map: 4
结果collect: 8
Flow发送后: 4
Flow发送前:5
filter: 5
map: 5
结果collect: 10
Flow发送后: 5
Flow发送前:6
filter: 6
map: 6
结果collect: 12
Flow发送后: 6

相比于满面春风,热情服务的Channel,Flow更像个冷漠的家伙,你不找他,他不搭理你。

  • Channel,响应速度快,但数据可能是旧的,占用资源
  • Flow,响应速度慢,但数据是最新的,节省资源

Flow也可以是”热“的,你知道吗?

更多关于Kotlin协程Flow生命周期异常处理的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin Flow封装类SharedFlow StateFlow LiveData使用对比

    目录 Kotlin中SharedFlow的使用 VS StateFlow SharedFlow的特点 一.SharedFlow的使用 二.SharedFlow.StateFlow.LiveData的对比 三.SharedFlow 的粘性设置与事件总线 总结 Kotlin中SharedFlow的使用 VS StateFlow SharedFlow 是继承于 Flow ,同时它是 StateFlow 的父类,它们都是是热流,先说一下冷流与热流的概念. 冷流 :只有订阅者订阅时,才开始执行发射数据流的

  • 图解 Kotlin SharedFlow 缓存系统及示例详解

    目录 前言 replay extraBufferCapacity onBufferOverflow SharedFlow Buffer 前言 Kotlin 为我们提供了两种创建“热流”的工具:StateFlow 和 SharedFlow.StateFlow 经常被用来替代 LiveData 充当架构组件使用,所以大家相对熟悉.其实 StateFlow 只是 SharedFlow 的一种特化形式,SharedFlow 的功能更强大.使用场景更多,这得益于其自带的缓存系统,本文用图解的方式,带大家更

  • Kotlin协程之Flow基础原理示例解析

    目录 引言 一.Flow的创建 二.Flow的消费 1.SafeFlow类 2.AbstractFlow类 3. SafeCollector类 4.消费过程中的挂起 引言 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d(&qu

  • Kotlin Flow常用封装类StateFlow使用详解

    目录 Kotlin中StateFlow的使用 一.StateFlow的使用 二.替代LiveData 总结 Kotlin中StateFlow的使用 StateFlow 是 Flow 的实现,是一个特殊的流,默认的 Flow 是冷流,而StateFlow 是热流,和 LiveData 比较类似.关于冷热流后面一期 SharedFlow 会详细说明. 使用 StateFlow 替代 LiveData 应该是目前很多开发者的呼吁了,确实 LiveData 的功能 StateFlow 都能实现,可以说是

  • Kotlin Flow操作符及基本使用详解

    目录 一.Flow的基本概念 二.Flow的生命周期与异常处理 2.1 开始与结束 2.2 异常的处理 2.3 retry的处理 2.4 超时的处理 2.5 Flow的取消 三.Flow的创建方式 四.Flow的接收方式 五.Flow的转换操作符 5.1 基本操作符 5.2 特殊操作符 5.3 组合与展平操作符 5.4 切换线程 总结 一.Flow的基本概念 Kotlin 的 Flow 相信大家都或多或少使用过,毕竟目前比较火,目前我把Flow的使用整理了一下.希望和大家所学对照一下,能有所启发

  • Kotlin Flow常见场景下的使用实例

    目录 Kotlin Flow在开发中的常用场景使用 一.网络请求搭载Retrofit 1.1 LiveDataCallAdapterFactory 1.2 suspend 二.协程与Flow的选择与差异 三.StateFlow与SharedFlow的选择 总结 Kotlin Flow在开发中的常用场景使用 大家了解了 Flow 的创建与接收流程,了解 SharedFlow 创建的几种方式,各个参数的用途,了解了SharedFlow的 "青春版" StateFlow 的创建与接收,已经他

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • Kotlin协程Job生命周期结构化并发详解

    目录 1.Job的生命周期 2.Deffered 3.Job与结构化并发 4.launch和async的使用场景 前面在学习协程启动方式的时候在launch的源码中有一个返回值是Job,async的返回Deferred也是实现了Job,那么而也就是说launch和async在创建一个协程的时候也会创建一个对应的Job对象.还提到过Job是协程的句柄,那么Job到底是什么?它有什么用? 1.Job的生命周期 先看一下Job的源码,这里只保留了跟标题相关的内容 public interface Jo

  • kotlin 协程上下文异常处理详解

    目录 引言 一.协程上下文 1.CoroutineContext 2.CorountineScope 3.子协程继承父协程 二.协程的异常传递 1.协程的异常传播 2.不同上下文(没有继承关系)之间协程异常会怎么样? 3.向用户暴露异常 三.协程的异常处理 使用SupervisorJob 异常捕获器CoroutineExceptionHandler Android中全局异常的处理 引言 从前面我们可以大致了解了协程的玩法,如果一个协程中使用子协程,那么该协程会等待子协程执行结束后才真正退出,而达

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • Kotlin协程开发之Flow的融合与Channel容量及溢出策略介绍

    目录 一.协程间的通信 1.通道容量 2.溢出策略 二.FusibleFlow接口 三.ChannelFlow类 一.协程间的通信 当需要进行协程间的通信时,可以调用Channel方法,创建一个Channel接口指向的对象,通过调用该对象的send方法和receive方法实现消息的发送与接收.协程对Channel接口的实现,本质上与阻塞队列类似,这里不再赘述. 1.通道容量 事实上,send方法与receive方法并没有定义在Channel接口中,而是分别定义在SendChannel接口和Rec

  • 使用kotlin协程提高app性能(译)

    协程是一种并发设计模式,您可以在Android上使用它来简化异步执行的代码.Kotlin1.3版本添加了 Coroutines,并基于其他语言的既定概念. 在Android上,协程有助于解决两个主要问题: 管理长时间运行的任务,否则可能会阻止主线程并导致应用冻结. 提供主安全性,或从主线程安全地调用网络或磁盘操作. 本主题描述了如何使用Kotlin协程解决这些问题,使您能够编写更清晰,更简洁的应用程序代码. 管理长时间运行的任务 在Android上,每个应用程序都有一个主线程来处理用户界面并管理

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • Kotlin协程之Flow异常示例处理

    目录 示例 一.catch方法 catchImpl方法 二. onCompletion方法 1.unsafeFlow方法 2.ThrowingCollector类 三. retryWhen方法 示例 代码如下: launch(Dispatchers.Main) { // 第一部分 flow { emit(1) throw NullPointerException("e") }.catch { Log.d("liduo", "onCreate1: $it&q

随机推荐