Kotlin协程Channel特点及使用细节详解

目录
  • 正文
    • 1.认识Channel
    • 2.Channel使用中的细节
    • 3.Channe的特点

正文

在协程启动模式中已经知道async是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel就出现了。

1.认识Channel

Channel的意思是管道、通道,用图表示如下:

Channel的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样:

fun main() {
    channelTest()
}
fun channelTest() = runBlocking {
    val channel = Channel<Int>()            //关键点①
    launch {
        for (i in 1..3) {
            channel.send(i)                 //关键点②
            logX("send: $i")
        }
    }
    launch {
        for (i in channel) {                //关键点③
            logX("receiver: $i")
        }
    }
    logX("end")
}
//输出结果:
//================================
//end
//Thread:main @coroutine#1
//================================
//================================
//receiver: 1
//Thread:main @coroutine#3
//================================
//================================
//send: 1
//Thread:main @coroutine#2
//================================
//================================
//send: 2
//Thread:main @coroutine#2
//================================
//================================
//receiver: 2
//Thread:main @coroutine#3
//================================
//================================
//receiver: 3
//Thread:main @coroutine#3
//================================
//================================
//send: 3
//Thread:main @coroutine#2
//================================

上面的代码中启动了两个协程,一个发送,一个接收,还有几个关键点:

  • 关键点①:通过Channel创建一个管道,其中泛型Int表示发送的数据类型;
  • 关键点②:启动一个协程通过send发送数据,send是一个挂起函数;
  • 关键点③:启动一个协程遍历channel打印出接收到的消息。

那么这里还有一个问题,在执行完上述代码后程序并没有终止,那要如何终止程序呢?

很简单,在发送完所有消息后调用close方法即可。

launch {
        for (i in 1..3) {
            channel.send(i)                 //关键点②
            logX("send: $i")
        }
//			修改点
//			  ↓
        channel.close()
    }

Channel也是一种协程资源,用完后如果不关闭那么这个资源就会一直被占用。

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
             ...
        }
        CONFLATED -> {
           ...
        }
        UNLIMITED -> {
            ...
        }
        BUFFERED -> {
            ...
        }
        else -> {
            ...
        }
    }

Channel中有三个参数:

  • capacity 代表管道的容量,默认值为RENDEZVOUS,代表容量为0,除此之外还有三个类型:
  • CONFLATED:代表容量为1,新的数据会替代旧的数据;
  • UNLIMITED:代表无限容量;
  • BUFFERED:代表具备一定缓存的容量,默认情况下是64,具体容量由VM参数kotlinx.coroutines.channels.defaultBuffer决定。
  • onBufferOverflow 代表缓冲策略,也就是当缓冲的容量满了之后要怎么做。默认值为SUSPEND,表示在缓冲区溢出时挂起。除此之外还有两个类型:
  • DROP_OLDEST:在缓冲区溢出时删除最旧的值,向缓冲区添加新值,不要挂起;

  • DROP_LATEST:在缓冲区溢出时,立即删除正在添加到缓冲区的最新值(以便缓冲区内容保持不变),不要挂起。

  • onUndeliveredElement 它相当于一个异常处理回调。当管道中的某些数据没有被成功接收的时候,这个回调就会被调用

现在写个案例看一下capacity在其他类型下的区别

/**
 * Channel.CONFLATED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.CONFLATED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 4
/**
 * Channel.UNLIMITED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.UNLIMITED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * Channel.BUFFERED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.BUFFERED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4

再看一下onBufferOverflow在其他类型下的区别

/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST
 * 缓冲区设置为3,缓冲区溢出时删除最旧的值,向缓冲区添加新值
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 3,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST
 * 缓冲区设置为3,缓冲区溢出时立即删除正在添加到缓冲区的最新值
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 3,
        onBufferOverflow = BufferOverflow.DROP_LATEST
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3

再看一下onUndeliveredElement要如何使用

/**
 * capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement
 * 缓冲区设置为2,缓冲区溢出时立即删除正在添加到缓冲区的最新值
 * 接收一个数据后取消接收其他数据
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_LATEST,
        onUndeliveredElement = {
            println("onUndeliveredElement: $it")
        }
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
    }
    println("receive:${channel.receive()}")
    channel.cancel()
}
//输出结果:
//send: 1
//send: 2
//send: 3
//send: 4
//receive:1
//onUndeliveredElement: 2
//onUndeliveredElement: 3

上面的代码容量设置为2,缓冲策略是删除正在添加到缓冲区的最新值,接收一个数据后立即取消接收其他数据,也就是说接收到了【send: 1】的数据【receive:1】,【send: 4】的数据被缓冲策略删除了,由于接收消息的同道已经被取消了那么【send: 2】和【send: 3】的数据就只能在异常中被处理,从输出结果就可以看到。

从上面的代码示例可以总结出它的应用场景:接收方很关心数据是否被消费,例如企业微信、钉钉的消息是否已读的状态,对于异常处理那块的场景就像是发送消息过程中消息没有被发送出去,那么接收方就无法接受到这个消息。

2.Channel使用中的细节

前面在使用Channel时为了让程序终止在发送完成后调用了channel.close(),但是这个很容易被忘记,忘记添加就会造成程序无法终止的问题,那么Produce就诞生了,它是一个高阶函数。

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        for (i in 1..4) {
            send(i)
        }
    }
    launch {
        for (i in channel) {
            println("receive: $i")
        }
    }
    println("end")
}
//输出结果:
//end
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//Process finished with exit code 0

可以看到没有加入close代码就可以正常结束,上面发送了4条数据,那么我要是接收5条数据会不会有什么问题?

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        for (i in 1..4) {
            send(i)
        }
    }
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("end")
}
//输出结果:
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//ClosedReceiveChannelException: Channel was closed

可以看到当我接收第5条数据的时候报出channel被关闭的提示,也就是说produce确实会在消息发送完毕后关闭通道。

业务开发中有可能我们确实需要对channel发送的消息进行单独处理,那么也许并不知道具体发送了几条数据,如果接收数据数量超过发送数据数量就会出现错误,那有没有像isClose这类的方法可以在接收前判断是否被关闭呢?有的,在Channel中还有两个变量:

//如果该通道已通过调用[close]关闭,则返回' true '。这意味着调用[send]将导致异常。
public val isClosedForSend: Boolean
//如果通过在SendChannel端调用close关闭了此通道,
//并且已经接收到以前发送的所有项目,则返回true。
public val isClosedForReceive: Boolean

那么安全的调用channel.receive()接收就可以这么写

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce(capacity = 3) {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    while (!channel.isClosedForReceive) {
        println("receive: ${channel.receive()}")
    }
    println("end")
}
//输出结果:
//Send 1
//Send 2
//Send 3
//Send 4
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//end

但是这里会有一个问题,不定义capacity的数量

fun produceTest() = runBlocking {
    //										变化在这里
    //											↓
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    while (!channel.isClosedForReceive) {
        println("receive: ${channel.receive()}")
    }
    println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//
//ClosedReceiveChannelException: Channel was closed

可以看到send发送的数据全部都被接收了,但是还是报出channel被关闭的错误,原因在注释中已经写明:如果通过在SendChannel端调用close关闭了此通道,并且已经接收到以前发送的所有项目,则返回true。

这意味着调用receive将导致closereceivechannelexception。 所以channel.receive()要慎用。可以用channel.consumeEach代替

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    //变化在这里
    channel.consumeEach {
        println("receive: $it")
    }
    println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//end

3.Channe的特点

Channel主要你用来传递数据流的,这个数据流指的是多个数据组合形成别的流,与它形成鲜明对比的是async、挂起函数。

数据流的传输,有发送就有接收,而Channel是完全符合这一点的。发送与接收存在两种情况:

  • 数据流的发送了但是还没有被接收,没有接收则不再进行发送消息,例如文件的传输;
  • 数据流的发送了不管有没有被接收,都要继续发送消息,例如微信聊天。

Channel符合第二个结论,无论发送的数据是否被消费或者说被接收,Channel都会进行工作。我们来证明一下这个结论。

/**
 * 消息容量为10,发送4条数据
 * 无论消息是否被接收都会吧消息发送完毕
 */
fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce(capacity = 10) {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    println("end")
}
//输出结果:
//end
//Send 1
//Send 2
//Send 3
//Send 4
/**
 * 消息容量改为默认,默认值时0,发送4条数据
 * Channel依旧是在工作的,只是说在调用send方法的时候
 * 接收方还没有准备完毕且容量为0,所以会被挂起,程序一直无法退出
 */
fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    println("end")
}
//输出结果:
//end
//程序没有结束

通过上面的代码引出一个结论:Channel是“热” 的。不管接收方是否存在,Channel是一定会工作的。类似于自来水厂向像居民提供水源,发电厂向居民提供电能。

以上就是Kotlin协程Channel特点及使用细节详解的详细内容,更多关于Kotlin协程Channel特点的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin 协程思维模型的引入使用建立

    目录 1.协程 2.Kotlin协程 1.引入Kotlin协程 2.Kotlin协程的使用 3.Kotlin协程的轻量(总结的还不够清晰) 4.协程的“非阻塞式” 5.建立思维模型 1.协程 协程不是进程或线程,它的执行过程更类似于子例程或者说不带返回值的函数调用. 一个程序可以包含多个协程,类似于一个进程包含多个线程.线程有自己的上下文多个线程存在时它们相对独立,切换受系统控制,而协程也相对独立,也有自己的上下文,但是切换是由自己控制的,当需要切换到其他协程时是由当前协程控制的. 线程 协程

  • Kotlin 协程的取消机制详细解读

    目录 引言 协程的状态 取消协程的用法 协程取消的有效性 如何写出可以取消的代码 在 finally 中释放资源 使用不可取消的 block CancellationException 超时取消 异步的超时和资源 取消检查的底层原理 引言 在 Java 语言中提供了线程中断的能力,但并不是所有的线程都可以中断的,因为 interrupt 方法并不是真正的终止线程,而是将一个标志位标记为中断状态,当运行到下一次中断标志位检查时,才能触发终止线程. 但无论如何,终止线程是一个糟糕的方案,因为在线程的

  • Kotlin协程Job生命周期结构化并发详解

    目录 1.Job的生命周期 2.Deffered 3.Job与结构化并发 4.launch和async的使用场景 前面在学习协程启动方式的时候在launch的源码中有一个返回值是Job,async的返回Deferred也是实现了Job,那么而也就是说launch和async在创建一个协程的时候也会创建一个对应的Job对象.还提到过Job是协程的句柄,那么Job到底是什么?它有什么用? 1.Job的生命周期 先看一下Job的源码,这里只保留了跟标题相关的内容 public interface Jo

  • Kotlin Channel处理多个数据组合的流

    目录 结论先行 Channel使用示例 Channel的源码 安全的从Channel中取数据 热的数据流从何而来 Channel能力的来源 结论先行 Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了. Channel使用示例 fun main() = runBlocking { logX("开始") val channel = Channel<Int> { } launch { (1..3).forEach{

  • Kotlin协程Flow生命周期及异常处理浅析

    目录 正文 Flow基本概念 Flow生命周期 处理异常 上游或者中间异常使用catch 下游使用try-catch 切换执行线程 终止操作符 "冷的数据流"从何而来 正文 Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大. Flow基本概念 Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • Kotlin协程launch启动流程原理详解

    目录 1.launch启动流程 反编译后的Java代码 2.协程是如何被启动的 1.launch启动流程 已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的呢,直接进入launch的源码开始看起. fun main() { coroutineTest() Thread.sleep(2000L) } val block = suspend { println("Hello") delay(1000L) println(&q

  • Kotlin协程开发之Flow的融合与Channel容量及溢出策略介绍

    目录 一.协程间的通信 1.通道容量 2.溢出策略 二.FusibleFlow接口 三.ChannelFlow类 一.协程间的通信 当需要进行协程间的通信时,可以调用Channel方法,创建一个Channel接口指向的对象,通过调用该对象的send方法和receive方法实现消息的发送与接收.协程对Channel接口的实现,本质上与阻塞队列类似,这里不再赘述. 1.通道容量 事实上,send方法与receive方法并没有定义在Channel接口中,而是分别定义在SendChannel接口和Rec

  • Kotlin协程Channel特点及使用细节详解

    目录 正文 1.认识Channel 2.Channel使用中的细节 3.Channe的特点 正文 在协程启动模式中已经知道async是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel就出现了. 1.认识Channel Channel的意思是管道.通道,用图表示如下: Channel的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样: fun main() { channelTest() } fun channelTest() = runBlock

  • Unity中协程IEnumerator的使用方法介绍详解

    在Unity中,一般的方法都是顺序执行的,一般的方法也都是在一帧中执行完毕的,当我们所写的方法需要耗费一定时间时,便会出现帧率下降,画面卡顿的现象.当我们调用一个方法想要让一个物体缓慢消失时,除了在Update中执行相关操作外,Unity还提供了更加便利的方法,这便是协程. 在通常情况下,如果我们想要让一个物体逐渐消失,我们希望方法可以一次调用便可在程序后续执行中实现我们想要的效果. 我们希望代码可以写成如下所示: void Fade() { for (float f = 1f; f >= 0;

  • python使用协程实现并发操作的方法详解

    本文实例讲述了python使用协程实现并发操作的方法.分享给大家供大家参考,具体如下: 协程 协程是一种用户态的轻量级线程,又称微线程. 协程拥有自己的寄存器上下文和栈,调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 优点: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控制

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • Kotlin协程与并发深入全面讲解

    目录 协程与并发 1.协程并发问题 2.协程处理并发的手段 协程与并发 Kotlin协程是基于线程执行的.经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同. 在java的世界里,并发往往是多个线程一起工作,存在共享的变量.需要处理好同步问题.要避免把协程与线程的概念混淆. runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) } L

  • 一篇文章揭开Kotlin协程的神秘面纱

    前言 Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱. 理论 它是什么 这是别人翻译: 协程把异步编程放入库中来简化这类操作.程序逻辑在协程中顺序表述,而底层的库会将其转换为异步操作.库会将相关的用户代码打包成回调,订阅相关事件,调度其执行到不同的线程(甚至不同的机器),而代码依然想顺序执行那么简单. 我的理解:子任务程协作运行,优雅的处理异步问题解决方案. 它能干什么? 我在做安卓开发,它能替换掉Handler,AsyncTask 甚至

随机推荐