Kotlin协程与并发深入全面讲解

目录
  • 协程与并发
    • 1.协程并发问题
    • 2.协程处理并发的手段

协程与并发

Kotlin协程是基于线程执行的。经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同。

在java的世界里,并发往往是多个线程一起工作,存在共享的变量。需要处理好同步问题。要避免把协程与线程的概念混淆。

runBlocking {
        var i = 0
        launch(Dispatchers.Default) {
            repeat(1000) {
                i++
            }
        }
        delay(1000L)
        println(i)
    }

Log
1000
 
Process finished with exit code 0

上述代码中没有任何并发任务,launch创建了一个协程,所有的计算都发生在协程中。所以不需要考虑同步问题。

1.协程并发问题

多个协程并发执行的例子:

runBlocking {
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    i++
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

9933
 
Process finished with exit code 0

上述代码中,创建了10个协程任务,每个协程任务都会工作在Default线程池中,这10个协程任务对i进行1000次自增操作,但是因为10个协程分别运行在不同的线程之前,且共享一个变量,所以会产生同步问题。

2.协程处理并发的手段

在Java中的同步手段有:synchronized、Atomic、Lock等;

使用@Synchronized注解或者synchronized(){}代码块

 runBlocking {
        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    synchronized(lock) {
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

如何在上面的synchronized代码块中加入挂起函数,则发现会报错。

如下:

  runBlocking {
        suspend fun prepare() {
        }
        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    synchronized(lock) {
                        prepare()
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

所以可以发现不能在synchronized{}当中调用挂起函数,编译器会报错。因为挂起函数会被翻译成带有Continuation的异步函数,造成synchronized代码块无法同步处理。

协程并发思路

单线程并发

在Kotlin协程中可以实现单线程并发。

runBlocking {
        suspend fun getResult1(): String {
            printlnCoroutine("Start getResult1")
            delay(1000L)
            printlnCoroutine("End getResult1")
            return "Result1"
        }
        suspend fun getResult2(): String {
            printlnCoroutine("Start getResult2")
            delay(1000L)
            printlnCoroutine("End getResult2")
            return "Result2"
        }
        suspend fun getResult3(): String {
            printlnCoroutine("Start getResult3")
            delay(1000L)
            printlnCoroutine("End getResult3")
            return "Result3"
        }
        val results = mutableListOf<String>()
        val time = measureTimeMillis {
            val result1 = async {
                getResult1()
            }
            val result2 = async {
                getResult2()
            }
            val result3 = async {
                getResult3()
            }
            results.add(result1.await())
            results.add(result2.await())
            results.add(result3.await())
        }
        println("Time:$time")
        println(results)
    }
fun printlnCoroutine(any: Any?) {
    println("" + any + ";Thread:" + Thread.currentThread().name)
}

Log
 
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
 
Process finished with exit code 0

上面代码启动了三个协程,它们之间是并发执行的,每个协程耗时1000ms,总耗时1000多毫秒,而且这几个协程都运行在main线程上。

所以 可以考虑将i++逻辑分发到单线程之上。

 runBlocking {
        val coroutineDispatcher = Executors.newSingleThreadExecutor {
            Thread(it, "MySingleThread").apply {
                isDaemon = true
            }
        }.asCoroutineDispatcher()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(coroutineDispatcher) {
                repeat(1000) {
                        i++
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

上述代码把所有协程任务分发到单独的线程中执行,但这10个协程是并发执行的。

Mutex

在java中,Lock之类的同步锁是阻塞式的,而Kotlin提供了非阻塞式的锁:Mutex。

   runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.lock()
                    i++
                    mutex.unlock()
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

Log
10000
 
Process finished with exit code 0

上述代码使用mutex.lock()、 mutex.unlock()包裹同步计算逻辑,实现多线程同步。Mutex 对比 JDK 当中的锁,最大的优势就在于支持挂起和恢复。

public interface Mutex {
    public val isLocked: Boolean
    public fun tryLock(owner: Any? = null): Boolean
    public suspend fun lock(owner: Any? = null;
    @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
        "For additional details please refer to #2794") // WARNING since 1.6.0
    public val onLock: SelectClause2<Any?, Mutex>
    public fun holdsLock(owner: Any): Boolean
    public fun unlock(owner: Any? = null)
}

Mutex 是一个接口,它的 lock() 方法其实是一个挂起函数。而这就是实现非阻塞式同步锁的根本原因。

但是上述代码中对于 Mutex 的使用其实是错误的,会存在问题。如果代码在 mutex.lock()、mutex.unlock() 之间发生异常,从而导致 mutex.unlock() 无法被调用。这个时候,整个程序的执行流程就会一直卡住,无法结束。看下面代码:

runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.lock()
                    i++
                    i/0
                    mutex.unlock()
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

如何解决?使用mutex.withLock{}。

代码入下:

 runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.withLock {
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract {
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }
    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

withLock{} 的本质,其实是在 finally{} 当中调用了 unlock()。

Actor

Actor,它本质上是基于 Channel 管道消息实现的。

sealed class Msg
object AddMsg : Msg()
class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
fun testCoroutinueConcurrent10() {
    runBlocking {
        suspend fun addActor() = actor<Msg> {
            var counter = 0
            for (msg in channel) {
                when (msg) {
                    is AddMsg -> counter++
                    is ResultMsg -> msg.result.complete(counter)
                }
            }
        }
        val actor = addActor()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    actor.send(AddMsg)
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        val deferred = CompletableDeferred<Int>()
        actor.send(ResultMsg(deferred))
        val result = deferred.await()
        actor.close()
        println(result)
    }
}

Log
10000
 
Process finished with exit code 0

addActor() 挂起函数,它其实调用了 actor() 这个高阶函数。而这个函数的返回值类型其实是 SendChannel。由此可见,Kotlin 当中的 Actor 其实就是 Channel 的简单封装。Actor 的多线程同步能力都源自于 Channel。这里,我们借助密封类定义了两种消息类型,AddMsg、ResultMsg,然后在 actor{} 内部,我们处理这两种消息类型,如果我们收到了 AddMsg,则计算“i++”;如果收到了 ResultMsg,则返回计算结果。而在 actor{} 的外部,我们则只需要发送 10000 次的 AddMsg 消息,最后再发送一次 ResultMsg,取回计算结果即可。Actor 本质上是基于 Channel 管道消息实现的。

避免共享可变状态

 runBlocking {
        val deferreds = mutableListOf<Deferred<Int>>()
        repeat(10) {
            val deferred = async(Dispatchers.Default) {
                var i = 0
                repeat(1000) {
                    i++
                }
                return@async i
            }
            deferreds.add(deferred)
        }
        var result = 0
        deferreds.forEach {
            result += it.await()
        }
        println(result)
    }

Log
 
10000
 
Process finished with exit code 0

在每一个协程当中,都有一个局部的变量 i,同时将 launch 都改为了 async,让每一个协程都可以返回计算结果。这种方式,相当于将 10000 次计算,平均分配给了 10 个协程,让它们各自计算 1000 次。这样一来,每个协程都可以进行独立的计算,然后我们将 10 个协程的结果汇总起来,最后累加在一起。

 runBlocking {
        val result = (1..10).map {
            async(Dispatchers.Default) {
                var i = 0
                repeat(1000) {
                    i++
                }
                return@async i
            }
        }.awaitAll()
            .sum()
        println(result)
    }

Log
10000
 
Process finished with exit code 0

到此这篇关于Kotlin协程与并发深入全面讲解的文章就介绍到这了,更多相关Kotlin协程与并发内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kotlin协程基础元素梳理分析

    Kotlin 协程的基础元素:Continuation.SafeContinuation.CoroutineContext.CombinedContext.CancellationException.intrinsics.CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类. Continuation 是什么? class Call<T>(callBack: Call.CallBack<T

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • Kotlin 协程的取消机制详细解读

    目录 引言 协程的状态 取消协程的用法 协程取消的有效性 如何写出可以取消的代码 在 finally 中释放资源 使用不可取消的 block CancellationException 超时取消 异步的超时和资源 取消检查的底层原理 引言 在 Java 语言中提供了线程中断的能力,但并不是所有的线程都可以中断的,因为 interrupt 方法并不是真正的终止线程,而是将一个标志位标记为中断状态,当运行到下一次中断标志位检查时,才能触发终止线程. 但无论如何,终止线程是一个糟糕的方案,因为在线程的

  • Kotlin协程之Flow触发与消费示例解析

    目录 示例 一.Flow的触发与消费 1.onEach方法 2.transform方法 3.collect方法 二.多消费过程的执行 三.总结 示例 代码如下: launch(Dispatchers.Main) { val task = flow { emit(2) emit(3) }.onEach { Log.d("liduo", "$it") } task.collect() } 一.Flow的触发与消费 在Kotlin协程:Flow基础原理的分析中,流的触发与

  • Kotlin协程之Flow异常示例处理

    目录 示例 一.catch方法 catchImpl方法 二. onCompletion方法 1.unsafeFlow方法 2.ThrowingCollector类 三. retryWhen方法 示例 代码如下: launch(Dispatchers.Main) { // 第一部分 flow { emit(1) throw NullPointerException("e") }.catch { Log.d("liduo", "onCreate1: $it&q

  • Kotlin协程与并发深入全面讲解

    目录 协程与并发 1.协程并发问题 2.协程处理并发的手段 协程与并发 Kotlin协程是基于线程执行的.经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同. 在java的世界里,并发往往是多个线程一起工作,存在共享的变量.需要处理好同步问题.要避免把协程与线程的概念混淆. runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) } L

  • Kotlin协程Job生命周期结构化并发详解

    目录 1.Job的生命周期 2.Deffered 3.Job与结构化并发 4.launch和async的使用场景 前面在学习协程启动方式的时候在launch的源码中有一个返回值是Job,async的返回Deferred也是实现了Job,那么而也就是说launch和async在创建一个协程的时候也会创建一个对应的Job对象.还提到过Job是协程的句柄,那么Job到底是什么?它有什么用? 1.Job的生命周期 先看一下Job的源码,这里只保留了跟标题相关的内容 public interface Jo

  • 使用kotlin协程提高app性能(译)

    协程是一种并发设计模式,您可以在Android上使用它来简化异步执行的代码.Kotlin1.3版本添加了 Coroutines,并基于其他语言的既定概念. 在Android上,协程有助于解决两个主要问题: 管理长时间运行的任务,否则可能会阻止主线程并导致应用冻结. 提供主安全性,或从主线程安全地调用网络或磁盘操作. 本主题描述了如何使用Kotlin协程解决这些问题,使您能够编写更清晰,更简洁的应用程序代码. 管理长时间运行的任务 在Android上,每个应用程序都有一个主线程来处理用户界面并管理

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程flowOn与线程切换超详细示例介绍

    目录 示例代码 一.flowOn方法 1.ChannelFlowOperatorImpl类 二.collect方法 1.ChannelFlowOperator类的collect方法 2.ChannelFlow类的collect方法 3.flow方法中代码的执行 4.接收flow方法发出的值 三.flowOn方法与流的融合 四.总结 示例代码 本文分析示例代码如下: launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatc

  • 一篇文章揭开Kotlin协程的神秘面纱

    前言 Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱. 理论 它是什么 这是别人翻译: 协程把异步编程放入库中来简化这类操作.程序逻辑在协程中顺序表述,而底层的库会将其转换为异步操作.库会将相关的用户代码打包成回调,订阅相关事件,调度其执行到不同的线程(甚至不同的机器),而代码依然想顺序执行那么简单. 我的理解:子任务程协作运行,优雅的处理异步问题解决方案. 它能干什么? 我在做安卓开发,它能替换掉Handler,AsyncTask 甚至

  • python使用协程实现并发操作的方法详解

    本文实例讲述了python使用协程实现并发操作的方法.分享给大家供大家参考,具体如下: 协程 协程是一种用户态的轻量级线程,又称微线程. 协程拥有自己的寄存器上下文和栈,调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 优点: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 方便切换控制

  • Android kotlin+协程+Room数据库的简单使用

    Room Room是Google为了简化旧版的SQLite操作专门提供的 1.拥有了SQLite的所有操作功能 2.使用简单(类似于Retrofit),通过注解的方式实现相关功能.编译时自动生成实现类impl 3.LiveData,LifeCycle,Paging天然融合支持 导入 ... plugins { id 'com.android.application' id 'kotlin-android' id 'kotlin-android-extensions' id 'kotlin-kap

随机推荐