Kotlin协程的线程调度示例详解

目录
  • 引言
  • 一、协程的分发器作用
    • 1.1 测试代码
    • 1.2 CoroutineScope.launch
      • 1.2.1 newCoroutineContext
    • 1.3 startCoroutineCancellable
      • 1.3.1 intercepted()
      • 1.3.2 CoroutineDispatcher
      • 1.3.3 小结
    • 1.4 DispatchedContinuation
    • 1.5 DefaultScheduler
      • 1.5.1 SchedulerCoroutineDispatcher
      • 1.5.2 CoroutineScheduler
    • 1.6 DispatchedTask.run()
    • 1.7 总结
  • 二、协程中的线程切换
    • 2.1 反编译代码
      • 2.1.1 MainActivityonCreateonCreateonCreate1
      • 2.1.2 AnonymousClass1
    • 2.2 withContext
      • 2.2.1 startCoroutineCancellable
    • 2.3 resumeWith
    • 2.4 DispatchedCoroutine
      • 2.4.1 DispatchedCoroutine 的继承关系
    • 2.5 协程线程的恢复
      • 2.5.1 AbstractCoroutine.resumeWith()
      • 2.5.2 afterResume
    • 2.6 总结
    • 2.7 Dispatchers.Main
      • 2.7.1 MainDispatcherLoader
      • 2.7.2 AndroidDispatcherFactory
      • 2.7.3 HandlerContext

引言

在第一篇文章中我们分析了协程启动创建过程启动过程,在本文中,我们将着重剖析协程中协程调度的逻辑流程。主要是分析解答如下2个问题:

  • 涉及到协程方法器是如何将协程代码调度到特定的线程执行?
  • 子协程执行完又是如何切换0回父协程的线程环境?

一、协程的分发器作用

1.1 测试代码

GlobalScope.launch {
    //协程体1
    Log.d(TAG, "before suspend job.")
    withContext(Dispatchers.Main) {
        //协程体2
        Log.d(TAG, "print in Main thread.")
    }
    Log.d(TAG, "after suspend job.")
}
  • 此次的协程测试用例中,我们默认的launch一个协程,我们简单的将launch需要执行的这外层逻辑为协程体1
  • 在协程体1中,我们使用withContext将协程切换到主线程执行,打印日志。我们将这里面执行的协程逻辑为协程体2
  • 协程体2执行完成后,切回协程体1中执行并打印Log。
  • 注意,根据我们之前《协程的创建与启动》文章中分析的,Kotlin编译器针对协程体1和协程体2分别生成一个继承与SuspenLamabda的类型,比如:class MainActivity#onCreate$1 : SuspenLambda{...}。我们在讲协程体时,也同时代指这个类实例。

继续跟踪launch()函数执行逻辑,这次跟踪过程不同与《协程的创建与启动》篇章,我们会将侧重点放在启动过程中协程调度器是如何起作用的?接下来见1.2

1.2 CoroutineScope.launch

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //1. 见1.2.1
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //2. 详见1.3
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • 这里会新建一个CoroutineContext,详见1.2.1
  • 根据之前的分析,这个里最终会调用到startCoroutineCancellable()方法,详见1.3流程。

1.2.1 newCoroutineContext

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = foldCopies(coroutineContext, context, true)
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return
    if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default
    else
    	debug
}

coroutineContextcoroutineContextCoroutineScope的成员变量,当此时为GlobalScope.coroutineContext==EmptyCoroutineContext

context:由于调用launch时没有指定Context,所以传到此处也是EmptyCoroutineContextfoldCopies()函数将2个context相加并拷贝,最终combied==EmptyCoroutineContext

而在return这最后判断返回的是debug+Dispatchers.Defatult,所以此时默认的分发器为Dispatchers.Defatult

这里涉及到的协程Context运算不做深入剖析,简单可以认为协程重写了“+”运算,使得Context之间可以使用“+”来叠加,没有的Element类型会被添加到Element集合,集合中已有的Element类型会被覆盖。

1.3 startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
    	//1. 创建SuspendLambda协程体
        createCoroutineUnintercepted(receiver, completion)
            //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
            .intercepted()
            //3. 调用方法器Continuation的resume方法,详见1.4
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }
  • 这里的构建协程体在《协程的创建与启动》一节中已经剖析,不再赘述。
  • 进行拦截,注意:这里其实会根据方法器再构建出一个DispatchedContinuation对象,它也是一个续体类型,这是对协程体的一次包装。详见1.3.1小节。
  • 调用拦截器续体的resumeCancellableWith()开始状态机流转,执行分发流程详见1.4小节。

1.3.1 intercepted()

 public fun intercepted(): Continuation<Any?> =
        intercepted?: (
                //1. 取出拦截器
                context[ContinuationInterceptor]?
                    //2.构建拦截器续体
                    .interceptContinuation(this)?: this)
                .also { intercepted = it }
  • 取出当前上下文中的拦截器类型,根据之前1.2.1小节的分析,这里取出来的是Dispatchers.Defatult
  • interceptContinuation(this)为构建拦截器续体,注意这里传入的this协程体1。 详见1.3.2。

1.3.2 CoroutineDispatcher

//Base class to be extended by all coroutine dispatcher implementations.
public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>):
        //详见1.4
        Continuation<T> = DispatchedContinuation(this, continuation)
}

直接新建了一个DispatchedContinuation对象实例这里需要注意传入的构建参数:

  • this:当前Dispatcher,也就是Dispatchers.Defatult
  • continuation:协程体1。

1.3.3 小结

自此Continuation.intercepted()方法就分析结束,最终的结果是:用上下文中的Dispatcher和当前Contination对象也就是协程体1,共同作为构建参数,新建了一个DispatchedContinuation对象。

接下来接着1.3中的第三点,调用DispatchedContinuation.resumeCancellableWith()方法开始分析。

1.4 DispatchedContinuation

internal class DispatchedContinuation<in T>(
    //1. 分发器
    @JvmField val dispatcher: CoroutineDispatcher,
	//2. 注意这里将Continuation的实现委托给了continuation成员变量。
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED)
, CoroutineStackFrame,
Continuation<T> by continuation {
    	//3. 复写属性delegate为自己
	    override val delegate: Continuation<T>
        get() = this
    ...
    // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
    // It is used only in Continuation<T>.resumeCancellableWith
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        //默认为true
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            //4. 详细见
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
}

这里的dispatcher==Dispatchers.Defatult,所以接下来需要解析Dispatchers.Defatult到底是什么东西。详见1.5

  • 成员变量dispatcher==Dispatchers.Default
  • 成员变量continucation==协程体1(SuspenLambda类型实例)。同时DispatchedContinuation继承于Continuation接口,它将Continuation接口的实现委托给了成员变量continuation
  • deleagte为复写了DispatchedTask.delegate属性,将其返回自己。
  • 调用分发器也就是Dispatchers.Defatultdispatch()方法,注意这里传入的参数:

context:来自Continuation接口的属性,由于委托给了成员变量continuation,所以此context==continuation.context

this:分发器本身Dispatchers.Defatult

自此这个方法的分析结束:调用分发器的进行分发,接下来分析就开始分析协程方法器CoroutineDispatcher

1.5 DefaultScheduler

//Dispathcer.kt
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
//Dispathcer.kt
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    ...
}

实际上是继承 SchedulerCoroutineDispatcher类型。详见1.5.1

1.5.1 SchedulerCoroutineDispatcher

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    override val executor: Executor
        get() = coroutineScheduler
    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()
    private fun createScheduler() =
        //1. 详见1.5.2
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    //2. 详见1.5.2
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit
    = coroutineScheduler.dispatch(block)
    ...
}
//Executors.kt
//2. 实际上是继承ExecutorCoroutineDispatcher
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    ...
}
  • 可以看到实际上调用了CoroutineScheduler.dispatch方法。此时发现,第二个参数是Runnable类型的,而在1.4小节中,我们知道传入的是this也就是DispatchedContinuation,所以DispatchedContinuation继承的父类中,必定有继承了Runnable接口,而他的run方法的实现也在父类中,这块我们暂时按下不表,接着看继续跟踪coroutineScheduler.dispatch(block)

1.5.2 CoroutineScheduler

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
	...
    override fun execute(command: Runnable) = dispatch(command)
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}
  • 该类继承了Executor类,而且它的构建参数可看到是线程池的参数,所以可以知道这个其实是Kotlin协程实现的一个线程池,具体就不跟进去了。
  • execute()过程也是dispatch过程:将任务投递到任务队列,然后通知线程去取任务执行,自此完成了线程切换动作。
  • 而在新线程里执行的Runnable为1.4中的调用代码:dispatcher.dispatch(context, this)中的this,也就是DispatchedContinuationDispatchedContinuation.kt并没有实现run方法,那么一定是他继承的父类实现了Runnable接口并实现,所以需要接着看它继承的父类:DispatchedTask类。

1.6 DispatchedTask.run()

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
	...
    internal abstract val delegate: Continuation<T>
    @Suppress("UNCHECKED_CAST")
    internal open fun <T> getSuccessfulResult(state: Any?): T =
        state as T
    internal open fun getExceptionalResult(state: Any?): Throwable? =
        (state as? CompletedExceptionally)?.cause
    public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            //1. 取出代理商的续体
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        //1. 被包装的续体的resume方法,真正的开始出发其协程状态机代码。
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
}
  • delegate转为DispatchedContinuation,应该注意1.4 小节中DispatchedContinuation继承DispatchTask时,便对此delegate进行了复写:

override val delegate: Continuation

get() = this

而此delegate.continucation便是当初newDispatchedContinuation(this)时传入的this,此this就是Kotlin编译器一开始为协程体生成的SuspendLambda类型对象。具体可以回看1.3小节。

  • 调用了continuation.resume()方法触发了协程的状态机进而开始执行协程业务逻辑代码,结合之前1.5.2的分析可以知道,这个方法的调用已经是被dispatch到特定线程,完成线程切换后执行的。所以协程状态机的代码也是跑在新线程上的。

1.7 总结

至此,协程的线程调度分析结束,关键有如下几个要点:

  • 创建SuspendLambda时,他的协程上下文对象来自于comletion.context,默认就是Dispatcher.Default
  • SuspendLambda启动时调用了intercept()进行一层包装,得到DispatchedContinuation,后续协程启动是启动的DispatchedContinuation协程。
  • DispatchedContinuation继承于Runnable接口,协程启动时将自己投递到分发器dispatcher执行run方法,从而达到了线程切换效果。
  • DispatchedContinuationrun方法中,调用SuspendLambda.resume()启动状态机。在新线程执行协程状态机代码。

这一小节中,介绍了如何将协程调度到目的线程执行,接下来分析如何做到随意切换线程后,然后再恢复到原来线程的。

二、协程中的线程切换

在第一小节中,我们搞清楚了协程启动时,协程调度器是如何在其中起作用的。这一小节旨在剖析在协程用分发器切换线程执行新的挂起函数后,是如何切换会原来线程继续执行剩下的逻辑的。

为此,我们需要将1.1的测试代码反编译出来实际代码进而分析。

2.1 反编译代码

2.1.1 MainActivityonCreateonCreateonCreate1

final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    ...
    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04());
                this.label = 1;
                //1. 新建编译器自动生成的继承于SuspendLambda的类型。
                AnonymousClass1 anonymousClass1 = new AnonymousClass1(null);
                //2. 调用withContext
            	Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this);
                if (res != coroutine_suspended) {
                    break;
                } else {
                    //挂起
                    return coroutine_suspended;
                }
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328());
        return Unit.INSTANCE;
    }
}

根据之前的文章分析,这里suspend lambda 的类型都自动生成继承于SuspendLambda的类型。详见2.1.2。

anonymousClass1传入withContext,而且注意这里传入了this==MainActivity$onCreate$1,详见2.2。

2.1.2 AnonymousClass1

/* compiled from: MainActivity.kt */
public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> {
    int label
    ...
    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object obj) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(obj);
                return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f()));
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }
}

2.2 withContext

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    //1. 获取当前协程, 注意这里的uCont就是当前续体,也就是MainActivity$onCreate$1
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        //2. 计算获的新的协程上下文
        val oldContext = uCont.context
        val newContext = oldContext + context
        //3. 快速判断:新上下文和旧上下文一致的情况快速处理。
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        //4. 新建一个DispatchedCoroutine
        val coroutine = DispatchedCoroutine(newContext, uCont)
        //5. 启动协程
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}
  • suspendCoroutineUninterceptedOrReturn这个函数直接步进是看不到实现的,它的实现是由Kotlin编译器生成的,它的作用是用来获取当前续体的,并且通过uCont返回,这里就是MainActivity$onCreate$1
  • 将旧协程上下文和新的上下文一起。计算得到最终的上下文。这里的context==Dispatchers.getIO()
  • 快速判断,不用看。
  • 新建一个DispatchedCoroutine,注意这里传入了新的协程上下文和当前续体对象。
  • 调用startCoroutineCancellable()启动协程。这里的同1.3.2小节分析一样,详见 2.2.1

2.2.1 startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
    	//1. 创建SuspendLambda协程体
        createCoroutineUnintercepted(receiver, completion)
            //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
            .intercepted()
            //3. 调用方法器Continuation的resume方法,详见1.4
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }

此方法在之前1.3小节已经分析过,针对此此次调用,其中的改变是协程上下文中的分发器已经被设置为Dispatchers.Main

  • 创建了SuspendLambda对象,此对象的CoroutineContextcompletion.context。而其中的ContinuationInterceptor类型Element就是我们之前传入的Dispatchers.Main
  • 创建一个DispatchedContinuation
  • 将协程SuspendLambda的状态机逻辑通过Dispatcher.Main调度到主线程执行,调度过程参考第一下节。分发逻辑详见2.7小节。
  • SuspendLambda的状态机invokeSuspend()逻辑执行完成后,会返回到BaseContinuationImpl.resumeWith(),我们需要接此方法分析,来得到协程在切换到主线程执行后,又是怎么切回协程体1的执行线程的,详见2.3。

2.3 resumeWith

public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    //1. 进入此判断
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

当状态机执行完后, 后进入到completion的类型判断,由2.2和2.2.1可以知道,当初传入的completion是DispatchedCoroutine类型,所以加入到else分支,调用了DispatchedCoroutine.resumeWith(),接下来分析此方法。

在此之前,我们需要看下DispatchedCoroutine的继承关系,详见2.4.1。如果想直接跟踪流程,可以直接看2.4.2。

2.4 DispatchedCoroutine

2.4.1 DispatchedCoroutine 的继承关系

internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
}

继承于ScopeCoroutine

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
}

继承于AbstractCoroutine

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
}

2.5 协程线程的恢复

2.5.1 AbstractCoroutine.resumeWith()

    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

调用了afterResume方法,此方法在DispatchedCoroutine类型有具体实现。见2.5.2

2.5.2 afterResume

//DispatchedCoroutine
override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
  • 取出当前续体uCont,这个续体根据之前的分析:2.2小节,可以知道它等于MainActivity$onCreate$1
  • intercepted():取出其分发拦截器
  • resumeCancellableWith:使用方法拦截器协程体,将uCont续体的状态机逻辑调度到相对应的线程环境执行,这里就是之前的Dispatcher.Default。注意其注释:“将其切换到原先的分发器”。而这一过程其实和1.3小节的过程一致。
  • 恢复到Dispatcher.Default继续执行状态机时,由于label已经被更新,所以会往下继续执行,打印最后一句log。

2.6 总结

withContext(Dispatcher.Main)启动的协程时,取得当前协程续体uCount也就是MainActivity$onCreate$1,会计算出新的协程context,然后用它们创建一个DispatchedCoroutine

AnonymousClass1协程启动时,用DispatchedCoroutine作为completion参数,然后启动,此时会调度主线程执行协程。

当协程执行完成后,AnonymousClass1.resumeWith()方法会调用completion.resumeWith()

DispatchedCoroutine.resumeWith()方法会调用uCount.intercepted().resumeCancellableWith(),使得父协程进行调度并接着执行状态机逻辑。

2.7 Dispatchers.Main

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get()
= MainDispatcherLoader.dispatcher

直接详见2.7.1

2.7.1 MainDispatcherLoader

internal object MainDispatcherLoader {
    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                // 1.获得MainDispatcherFactory的实现类
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
}
  • 通过ServiceLoad机制获取MainDispatcherFactory的实现类,而在源码里面,其实现类为AndroidDispatcherFactory
  • 调用tryCreateDispatcher()创建分发器,详见2.7.2。

2.7.2 AndroidDispatcherFactory

internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
    override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}

根据createDispatcher分发,主线程分发器的实现类为HandlerContext类型,传入用MainLooper构建的Handler。详见2.7.3。

2.7.3 HandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null
    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
            continuation.invokeOnCancellation { handler.removeCallbacks(block) }
        } else {
            cancelOnRejection(continuation.context, block)
        }
    }
   ...
}

HandlerContext继承于HandlerDispatcher,而他的dispatch方法,可以看到,就是将block丢到设置MainLooperhandler执行。所以续体将会在主线程执行状态机,达到切换到主线程执行协程的目的。

以上就是Kotlin协程的线程调度示例详解的详细内容,更多关于Kotlin协程的线程调度的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin启动协程的三种方式示例详解

    目录 1.launch启动协程 2.runBlocking启动协程 3.async启动协程 1.launch启动协程 fun main() = runBlocking { launch { delay(1000L) println("World!") } println("Hello") } fun main() { GlobalScope.launch { delay(1000L) println("World!") } println(&qu

  • Room Kotlin API的使用入门教程

    Room 是 SQLite 的封装,它使 Android 对数据库的操作变得非常简单,也是迄今为止我最喜欢的 Jetpack 库.在本文中我会告诉大家如何使用并且测试 Room Kotlin API,同时在介绍过程中,我也会为大家分享其工作原理. 我们将基于 Room with a view codelab 为大家讲解.这里我们会创建一个存储在数据库的词汇表,然后将它们显示到屏幕上,同时用户还可以向列表中添加单词. 定义数据库表 在我们的数据库中仅有一个表,就是保存词汇的表.Word 类代表表中

  • Kotlin协程Context应用使用示例详解

    目录 1.Context的应用 2.万物皆有 Context 1.CoroutineScope 2.Job 3.Dispatcher 4.CoroutineExceptionHandler 1.Context的应用 Context在启动协程模式中就已经遇到过叫CoroutineContext,它的意思就是协程上下文,线程的切换离不开它. 在启动协程模式中也说明过为什么不用传递Context,因为它有一个默认值EmptyCoroutineContext,需要注意的是这个Context是不可以切换线

  • Android使用Kotlin API实践WorkManager

    WorkManager 提供了一系列 API 可以更加便捷地规划异步任务,即使在应用被关闭之后或者设备重启之后,仍然需要保证立即执行的或者推迟执行的任务被正常处理.对于 Kotlin 开发者,WorkManager 为协程提供了最佳的支持.在本文中,我将通过实践 WorkManager codelab 为大家展示 WorkManager 中与协程相关的基本操作.那么让我们开始吧! WorkManager 基础 当您需要某个任务保持运行状态,即使用户切换到别的界面或者用户将应用切换到后台,甚至设备

  • Kotlin协程低级api startCoroutine与ContinuationInterceptor

    目录 聊一聊kotlin协程“低级”api startCoroutine ContinuationInterceptor 实战 kotlin协程api中的 async await 通过startCoroutine与ContinuationInterceptor实现自定义的 async await 本章代码 最后 聊一聊kotlin协程“低级”api Kotlin协程已经出来很久了,相信大家都有不同程度的用上了,由于最近处理的需求有遇到协程相关,因此今天来聊一Kotlin协程的“低级”api,首先

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • Python并发编程协程(Coroutine)之Gevent详解

    Gevent官网文档地址:http://www.gevent.org/contents.html 基本概念 我们通常所说的协程Coroutine其实是corporateroutine的缩写,直接翻译为协同的例程,一般我们都简称为协程. 在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程. 进程和协程 下面对比一下进程和协程的相同点和不同点: 相同点: 我们都可以把他们看做是一种执行流,执行流可以挂起,并且后面可以在你挂起的地方恢复执行,这实际上都可以看做是con

  • 图解 Kotlin SharedFlow 缓存系统及示例详解

    目录 前言 replay extraBufferCapacity onBufferOverflow SharedFlow Buffer 前言 Kotlin 为我们提供了两种创建“热流”的工具:StateFlow 和 SharedFlow.StateFlow 经常被用来替代 LiveData 充当架构组件使用,所以大家相对熟悉.其实 StateFlow 只是 SharedFlow 的一种特化形式,SharedFlow 的功能更强大.使用场景更多,这得益于其自带的缓存系统,本文用图解的方式,带大家更

  • Golang控制协程执行顺序方法详解

    目录 循环控制 通道控制 互斥锁 async.Mutex 在 Go 里面的协程执行实际上默认是没有严格的先后顺序的.由于 Go 语言 GPM 模型的设计理念,真正执行实际工作的实际上是 GPM 中的 M(machine) 执行器,而我们的协程任务 G(goroutine) 协程需要被 P(produce) 关联到某个 M 上才能被执行.而每一个 P 都有一个私有队列,除此之外所有的 P 还共用一个公共队列.因此当我们创建了一个协程之后,并不是立即执行,而是进入队列等待被分配,且不同队列之间没有顺

  • kotlin android extensions 插件实现示例详解

    目录 前言 原理浅析 总体结构 源码分析 插件入口 配置编译器插件传参 编译器插件接收参数 注册各种Extension IrGenerationExtension ExpressionCodegenExtension StorageComponentContainerContributor ClassBuilderInterceptorExtension PackageFragmentProviderExtension 总结 前言 kotlin-android-extensions 插件是 Ko

  • PHP7下协程的实现方法详解

    前言 相信大家都听说过『协程』这个概念吧. 但是有些同学对这个概念似懂非懂,不知道怎么实现,怎么用,用在哪,甚至有些人认为yield就是协程! 我始终相信,如果你无法准确地表达出一个知识点的话,我可以认为你就是不懂. 如果你之前了解过利用PHP实现协程的话,你肯定看过鸟哥的那篇文章:在PHP中使用协程实现多任务调度| 风雪之隅 鸟哥这篇文章是从国外的作者翻译来的,翻译的简洁明了,也给出了具体的例子了. 我写这篇文章的目的,是想对鸟哥文章做更加充足的补充,毕竟有部分同学的基础还是不够好,看得也是云

  • PHP生成器(generator)和协程的实现方法详解

    本文实例讲述了PHP生成器(generator)和协程的实现方法.分享给大家供大家参考,具体如下: 先说一些废话 PHP 5.5 以来,新的诸多特性又一次令 PHP 焕发新的光彩,虽然在本文写的时候已是 PHP 7 alpha 2 发布后的一段时间,但此时国内依旧是 php 5.3 的天下.不过我认为新的特性迟早会因为旧的版本的逐渐消失而变得越发重要,尤其是 PHP 7 的正式版出来后,因此本文的目的就是为了在这之前,帮助一些 PHPer 了解一些他们从没有了解的东西.所以打算将以本篇作为博客中

  • Kotlin的枚举与异常示例详解

    一.kotlin中枚举的定义 枚举需要用到两个关键字 enum class,譬如这样 enum class Color(val r: Int,val g: Int,val b: Int){ //彩虹色也是一个典故:韦克菲尔德战役 RED(255,0,0),ORANGE(255,165,0),YELLOW(255,255,0), GREEN(0,255,0),BLUE(0,0,255),INDIGO(75,0,130),VIOLET(238,130,238); fun rgb() = (r * 2

  • Kotlin对象比较注意点示例详解

    目录 背景 原因 另一个问题 解决办法 结论 背景 现有一个StateFlow及其监听 private val stateFlow = MutableStateFlow(kotlin.Pair<String, ArrayList<String>>("abc", ArrayList())) GlobalScope.launch { stateFlow.collect { // do something } } 更新ArrayList并尝试emit GlobalSc

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

随机推荐