Kotlin Channel处理多个数据组合的流

目录
  • 结论先行
  • Channel使用示例
  • Channel的源码
  • 安全的从Channel中取数据
  • 热的数据流从何而来
  • Channel能力的来源

结论先行

Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。

Channel使用示例

fun main() = runBlocking {
    logX("开始")
    val channel = Channel<Int> {  }
    launch {
        (1..3).forEach{
            channel.send(it)
            logX("发送数据: $it")
        }
        // 关闭channel, 节省资源
        channel.close()
    }
    launch {
        for (i in channel){
            logX("接收数据: $i")
        }
    }
    logX("结束")
}

示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。

Channel的源码

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}
    }

可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,没有限量
  • CONFLATED: 容量为1,新的覆盖旧的值
  • BUFFERED: 添加缓冲容量,默认值是64,可以通过修改VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修改

接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是:

SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,当管道的容量满了以后,如果发送方还要继续发送,我们就会挂起当前的 send() 方法。由于它是一个挂起函数,所以我们可以以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复,有点像生产者-消费者模型
  • DROP_OLDEST,顾名思义,就是丢弃最旧的那条数据,然后发送新的数据,有点像LRU算法。
  • DROP_LATEST,丢弃最新的那条数据。这里要注意,这个动作的含义是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变。

最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。

综合这个参数使用一下

fun main() = runBlocking {
    println("开始")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
        // 关闭channel, 节省资源
        channel.close()
    }
    launch {
        for (i in channel){
            println("接收数据: $i")
        }
    }
    println("结束")
}

输出结果如下:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3
接收数据: 2
接收数据: 3

安全的从Channel中取数据

先看一个例子

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
while (!channel.isClosedForReceive){
    val i = channel.receive();
    println("接收: $i")
}

输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。

推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
channel.consumeEach {
    println("接收:$it")
}

所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。

热的数据流从何而来

先看一下代码

    println("开始")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
    }
    println("结束")
}

输出:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3

可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种“不管有没有接收方,发送方都会工作”的模式,就是我们将其认定为“热”的原因。

举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。

总的来说,不管接收方是否存在,Channel 的发送方一定会工作。

Channel能力的来源

通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。

这也是一种良好的设计思想,“对读取开放,对写入封闭”的开闭原则。

到此这篇关于Kotlin Channel处理多个数据组合的流的文章就介绍到这了,更多相关Kotlin Channel内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kotlin协程开发之Flow的融合与Channel容量及溢出策略介绍

    目录 一.协程间的通信 1.通道容量 2.溢出策略 二.FusibleFlow接口 三.ChannelFlow类 一.协程间的通信 当需要进行协程间的通信时,可以调用Channel方法,创建一个Channel接口指向的对象,通过调用该对象的send方法和receive方法实现消息的发送与接收.协程对Channel接口的实现,本质上与阻塞队列类似,这里不再赘述. 1.通道容量 事实上,send方法与receive方法并没有定义在Channel接口中,而是分别定义在SendChannel接口和Rec

  • Kotlin Channel处理多个数据组合的流

    目录 结论先行 Channel使用示例 Channel的源码 安全的从Channel中取数据 热的数据流从何而来 Channel能力的来源 结论先行 Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了. Channel使用示例 fun main() = runBlocking { logX("开始") val channel = Channel<Int> { } launch { (1..3).forEach{

  • Kotlin协程Channel特点及使用细节详解

    目录 正文 1.认识Channel 2.Channel使用中的细节 3.Channe的特点 正文 在协程启动模式中已经知道async是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel就出现了. 1.认识Channel Channel的意思是管道.通道,用图表示如下: Channel的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样: fun main() { channelTest() } fun channelTest() = runBlock

  • 针对Sqlserver大数据量插入速度慢或丢失数据的解决方法

    我的设备上每秒将2000条数据插入数据库,2个设备总共4000条,当在程序里面直接用insert语句插入时,两个设备同时插入大概总共能插入约2800条左右,数据丢失约1200条左右,测试了很多方法,整理出了两种效果比较明显的解决办法: 方法一:使用Sql Server函数: 1.将数据组合成字串,使用函数将数据插入内存表,后将内存表数据复制到要插入的表. 2.组合成的字符换格式:'111|222|333|456,7894,7458|0|1|2014-01-01 12:15:16;1111|222

  • JavaScript如何实现组合列表框中元素移动效果

    首先给大家说下组合框和列表框的区别: 组合框包括列表框和文本框的功能 文本框:只能输入数据 列表框:只能选择数据 组合框:既能输入数据,又能选择`` 应用背景:在页面中有两个列表框,需要把其中一个列表框的元素移动到另一个列表框 . 实现的基本思想: (1)编写init方法对两个列表框进行初始化: (2)为body添加onload事件调用init方法: (3)编写move(s1,s2)把s1中选中的选项移到s2; (4)编写moveAll(s1,s2)把s1中所有的选项都移到s2. (5)为按钮添

  • 详解java nio中的select和channel

    什么是NIO? 线程在处理数据时,如果线程还处于将数据从channel读到buffer的这段时间内,线程可以去做别的事情,等数据都读到buffer了,线程再回来处理读到的数据 channel是什么? 类比流的概念.与流的区别在于 1.channel是可读可写的,但是一个流要么写要么读 2.chanel可以异步的读和写 3.数据总是从channel中读到buffer,或者从buffer中写到channel 流的读取或写一般是一次性的操作,数据在读取过程中不会有缓存,这也就意味着没有办法自己随便移动

  • Java获得一个数组的指定长度排列组合算法示例

    本文实例讲述了Java获得一个数组的指定长度排列组合算法.分享给大家供大家参考,具体如下: package demo; import java.util.Stack; /** * JAVA获得一个数组的指定长度的排列组合.<br> * * @author JAVA世纪网(java2000.net, laozizhu.com) */ public class TestSequenceAll { public static void main(String[] args) { TestSequen

  • Go语言中 Channel 详解

    Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication). 它的操作符是箭头 <- . ch <- v    // 发送值v到Channel ch中 v := <-ch  // 从Channel ch中接收数据,并将数据赋值给v (箭头的指向就是数据的流向) 就像 map 和 slice 数据类型一样, channel必须先创建再使用: ch := make(chan int) Channel类型 Cha

  • 在Java与Kotlin之间如何进行互操作详解

    前言 目前kotlin是谷歌首推的开发Android的语言,但由于历史原因,我们绝大部分项目依旧还是以Java为主的,也就是说存在Java和Kotlin两种语言同时开发的情况. 有人会说把老项目全部翻译成Kotlin,的确可以怎么做,但是成本还是挺大的.我们只能一点一点慢慢的向kotlin语言迁移. 那么在迁移的过程中就避免不了Java和Kotlin相互调用的情况.即Kotlin调用Java或者Java调用Kotlin.下面我们就来具体看下两者之间相互操作的一些解决方案. kotlin调用jav

  • C#导出数据到excel如何提升性能

    一,要提升性能,我们先要知道耗时的地方在哪里 1,数据库查询, 2,把数据组合成新集合循环嵌套太多 二,那我们怎么优化呢? 一,数据库查询, 1>,数据库查询:如果数据量小,我们可以用临时datatable,连表查询,,可是如果是连表都是千万级上亿数据,就不建议用连表 那这个时候该怎么办呢? 2>这个时候我们可以选择先单表查询,然后再循环体查询自己所要的其他关联数据,这个时候我们需要注意的点是什么? 3>减少数据库查询!!!!!!!!!这个是重点,那怎么减少呢?正常逻辑如下代码,可是数据

  • golang开发中channel使用

    channel[通道]是golang的一种重要特性,正是因为channel的存在才使得golang不同于其它语言.channel使得并发编程变得简单容易有趣. channel的概念和语法 一个channel可以理解为一个先进先出的消息队列.channel用来在协程[goroutine]之前传递数据,准确的说,是用来传递数据的所有权.一个设计良好的程序应该确保同一时刻channel里面的数据只会被同一个协程拥有,这样就可以避免并发带来的数据不安全问题[data races]. 正文 channel

随机推荐