
/ 今日科技快訊 /
近日,有訊息稱中國移動正在考慮A股上市。此前,美國前總統唐納德·特朗普政府曾釋出投資禁令,促使中國移動被從紐約證券交易所摘牌。知情人士表示,中國移動正在尋找新的渠道為其5G網路發展提供資金,已與顧問討論了潛在的股票發行事宜。不過磋商還處於早期階段,中國移動尚未決定上市規模和時間表。
/ 作者簡介 /
本篇文章來自rain9155同學投稿,講解了Kotlin協程中的CoroutineContext的相關內容,相信會對大家有所幫助!同時也感謝作者貢獻的精彩文章!
rain9155的部落格地址:
https://juejin.cn/user/1556564196215464
/ 前言 /
從kotlin1.1開始,協程就被新增到kotlin中作為實驗性功能,直到kotlin1.3,協程在kotlin中的api已經基本穩定下來了,現在kotlin已經釋出到了1.4,為協程新增了更多的功能並進一步完善了它,所以我們現在在kotlin程式碼中可以放心的引入kotlin協程並使用它,其實協程並不是kotlin獨有的功能,它是一個廣泛的概念。
協作式多任務的實現,除了kotlin外,很多語言如Go、Python等都透過自己的方式實現了協程,本文閱讀前希望你已經知道如何使用kotlin協程,如果不熟悉可以閱讀一下官方檔案。
kotlin coroutines guide
https://kotlinlang.org/docs/coroutines-guide.html
其實入門協程我還是非常推薦透過官方檔案來進行學習,因為官方檔案的範例是很全面的,跟著它的範例敲一遍程式碼,你也基本掌握了協程的使用。
kotlinx.coroutines地址
https://github.com/Kotlin/kotlinx.coroutines
/ Coroutine的簡單理解 /
提到協程,很對人會把它和執行緒進行比較,就像提到執行緒,很多人會把它和行程進行比較,執行緒和行程分別是作業系統中的CPU排程單位和資源劃分單位,它們在作業系統中有專門的資料結構代表,而協程在作業系統中沒有專門的資料結構代表,所以協程並不是由作業系統建立和排程,它而是由程式自己建立和排程,由於不需要作業系統排程,所以協程比執行緒更加的輕量,切換協程比切換執行緒的開銷更小,即它的上下文切換比執行緒更快,因為作業系統切換執行緒時一般都會涉及到使用者態內核態的轉換,這是一個開銷相對較大的操作。
協程的實現依賴於執行緒,它不能脫離執行緒而存在,因為執行緒才是CPU排程的基本單位,協程透過程式的排程可以執行在一個或多個執行緒之中,所以協程需要執行於執行緒之中,由於協程是由程式自己排程的,所以程式就需要實現排程邏輯,不同語言的排程的實現不一樣,在kotlin中,透過Dispatcher來排程協程,而Dispatcher它通常是一個執行緒池的實現或者基於特定平台(例如Android)主執行緒的實現,透過排程讓協程執行於一個或多個執行緒之中,這些協程可以在同一執行緒的不同時刻被執行,也可以在不同執行緒上的不同時刻被執行。
協程可以說是程式設計語言的能力, 是上層的能力,它並不需要作業系統和硬體的支援, 是程式設計語言為了讓開發者更容易寫出協作式任務的程式碼,而封裝的一種任務排程能力,所以協程通常是包含一段特定邏輯的程式碼塊,多個協程之間就組合成一段具有特定邏輯的程式碼流程,這些程式設計語言為了讓開發者更方便的使用協程,它通常會提供一些關鍵字, 而這些關鍵字會透過編譯器自動生成了一些支援型程式碼,例如kotlin中的suspend關鍵字,對於suspend修飾的方法,編譯器會方法生成一些額外的程式碼。
上面就是我對協程的簡單理解,總的來說:協程需要執行緒的承載執行,協程需要程式自己完成排程,協程讓你更容易寫出協作式任務。
/ Coroutine的簡單使用 /
1 2 3 4 5 6 7 8 | fun main(){ val scope = CoroutineScope(CoroutineName("Coroutine-Name") + Dispatchers.IO) val job = scope.launch(start = CoroutineStart.DEFAULT){ println("hello world") } //行程保活1s,衹有行程存活的前提下,協程才能會啟動和執行 Thread.sleep(1000) } |
上面首先建構了一個CoroutineScope,它是協程的作用域,用於控制協程的生命週期,建構CoroutineScope需要一個CoroutineContext,它是協程的上下文,用於提供協程啟動和執行期需要的訊息,這是我們後面需要重點介紹的,最後透過CoroutineScope的launch方法啟動協程並輸出hello world,其中啟動協程時可以透過CoroutineStart指定協程的啟動模式,它是一個列舉值,預設是立即啟動,也透過指定CoroutineStart.LAZY變為延遲啟動,延遲啟動需要你主動呼叫回傳的Job物件的start方法後協程才會啟動,如果我們想取消掉這個協程的執行就可以呼叫CoroutineScope的cancel方法,或者呼叫launch方法回傳的Job物件的cancel方法,其實CoroutineScope的cancel方法內部也是呼叫回傳的Job物件的cancel方法來結束這個協程。
上面就是啟動一個協程的簡單步驟,需要用到CoroutineScope、CoroutineContext、CoroutineStart。
透過自定義CoroutineScope,可以在應用程式的某一個層次開啟或者控制協程的生命週期,例如Android,在ViewModel和Lifecycle類的生命週期里提供了CoroutineScope,分別是ViewModelScope和LifecycleScope.
/ CoroutineContext的元素 /
建構CoroutineScope使用到的CoroutineContext是一個特殊的集合,這個集合它既有Map的特點,也有Set的特點,集合的每一個元素都是Element,每個Element都有一個Key與之對應,對於相同Key的Element是不可以重複存在的,Element之間可以透過 + 號組合起來,後面我會詳細介紹CoroutineContext這個特殊集合的結構,接下來我先簡單講解一下組成CoroutineContext的各個Element的作用,CoroutineContext主要由以下4個Element組成:
-
Job:協程的唯一標識,用來控制協程的生命週期(new、active、completing、completed、cancelling、cancelled);
-
CoroutineDispatcher:指定協程執行的執行緒(IO、Default、Main、Unconfined);
-
CoroutineName: 指定協程的名稱,預設為coroutine;
-
CoroutineExceptionHandler: 指定協程的異常處理器,用來處理未捕獲的異常.
它們之間的關係如下:

下面分別介紹一下4個Element各自的作用:
Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public interface Job : CoroutineContext.Element { public companion object Key : CoroutineContext.Key<Job> { init { CoroutineExceptionHandler } } public val isActive: Boolean public val isCompleted: Boolean public val isCancelled: Boolean public fun start(): Boolean public fun cancel(cause: CancellationException? = null) public suspend fun join() public val children: Sequence<Job> //... } |
透過CoroutineScope的擴展方法launch啟動一個協程後,它會回傳一個Job物件,它是協程的唯一標識,這個Job物件包含了這個協程任務的一系列狀態,如下:

當一個協程建立後它就處於新建(New)狀態,當呼叫Job的start/join方法後協程就處於活躍(Active)狀態,這是執行狀態,協程執行出錯或者呼叫Job的cancel方法都會將當前協程置為取消中(Cancelling)狀態, 處於取消中狀態的協程會等所有子協程都完成後才進入取消 (Cancelled)狀態,當協程執行完成後或者呼叫CompletableJob(CompletableJob是Job的一個子介面)的complete方法都會讓當前協程進入完成中(Completing)狀態, 處於完成中狀態的協程會等所有子協程都完成後才進入完成(Completed)狀態。
雖然協程有New、Cancelling、Completing狀態,但是外部是無法感知這三個狀態的,Job只提供了isActive、isCancelled、isCompleted屬性來供外部判斷協程是否處於Active、Cancelled、Completed狀態,當協程處於Active狀態時,isActive為true,isCancelled和isCompleted為false,當協程處於Cancelled狀態時,isCancelled和isCompleted為true,isActive為false,當協程處於Completed狀態時,isCompleted為true,isActive和isCancelled為false。
協程中有兩種型別的Job,如果我們平時啟動協程時沒有特意地透過CoroutineContext指定一個Job,那麼使用launch/async方法啟動協程時回傳的Job它會產生異常傳播,我們知道協程有一個父子的概念,例如啟動一個協程1,在協程中繼續啟動協程2、協程3,那麼協程1就是協程2、協程3的父協程,協程2、協程3就是協程1的子協程,每個協程都會有一個對應的Job,協程之間的父子關係是透過Job物件維持的,像一顆樹一樣:

所以異常傳播就是這個Job因為除了CancellationException以外的異常而失敗時,那麼父Job就會感知到並丟擲異常,在丟擲異常之前,父Job會取消所有子Job的執行,這也是結構化程式設計的一個特點,如果要抑制這種異常傳播的行為,那麼可以用到另外一種型別的Job - SupervisorJob,SupervisorJob它不是一個類,它是一個建構方法:
1 | public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent) |
SupervisorJob方法會回傳CompletableJob的一個supervisor實現,CompletableJob是Job的一個子介面,它比Job介面多了一個complete方法,這意味著它可以呼叫complete方法讓協程任務進入完成狀態,supervisor實現的意思是這個Job它不會產生異常傳播,每個Job可以單獨被管理,當SupervisorJob因為除了CancellationException以外的異常而失敗時,並不會影響到父Job和其他子Job,下面是SupervisorJob的一個使用範例:
1 2 3 4 5 6 7 8 9 10 11 | fun main(){ val parentJob = GlobalScope.launch { //childJob是一個SupervisorJob val childJob = launch(SupervisorJob()){ throw NullPointerException() } childJob.join() println("parent complete") } Thread.sleep(1000) } |
childJob丟擲異常並不會影響parentJob的執行,parentJob會繼續執行並輸出parent complete。
CoroutineDispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 | public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>( ContinuationInterceptor, { it as? CoroutineDispatcher } ) public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true public abstract fun dispatch(context: CoroutineContext, block: Runnable) //... } |
CoroutineDispatcher可以指定協程的執行執行緒,CoroutineDispatcher裡面有一個dispatch方法,這個dispatch方法用於把協程任務分派到特定執行緒執行,kotlin已經內建了CoroutineDispatcher的4個實現,可以透過Dispatchers的Default、IO、Main、Unconfined欄位分別回傳使用,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 | public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = createDefaultDispatcher() @JvmStatic public val IO: CoroutineDispatcher = DefaultScheduler.IO @JvmStatic public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined @JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher } |
Default、IO
Dispatchers.Default和Dispatchers.IO內部都是執行緒池實現,它們的含義是把協程執行在共享的執行緒池中,我們先看Dispatchers.Default的實現,看createDefaultDispatcher方法:
1 | internal actual fun createDefaultDispatcher(): CoroutineDispatcher = if (useCoroutinesScheduler) DefaultScheduler else CommonPool |
DefaultScheduler和CommonPool都是CoroutineDispatcher的子類別,不同的是DefaultScheduler內部依賴的是kotlin自己實現的執行緒池邏輯,而CommonPool內部依賴的是java類別庫中的Executor,預設情況下useCoroutinesScheduler為true,所以createDefaultDispatcher方法回傳的是DefaultScheduler例項,我們看一下這個DefaultScheduler:
1 2 3 4 5 6 7 8 9 10 | internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { val IO: CoroutineDispatcher = LimitingDispatcher( this,//DefaultScheduler例項被傳進了LimitingDispatcher中 systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) //... } |
DefaultScheduler中的IO欄位就是Dispatchers.IO,它是LimitingDispatcher例項,所以Dispatchers.IO的實現是LimitingDispatcher,同時我們要注意到DefaultScheduler是用object欄位修飾,這說明它是一個單例,並且DefaultScheduler例項被傳進了LimitingDispatcher的建構方法中,所以LimitingDispatcher就會持有DefaultScheduler例項,而DefaultScheduler它的主要實現都在它的父類ExperimentalCoroutineDispatcher中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | @InternalCoroutinesApi public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { public constructor( corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE, schedulerName: String = DEFAULT_SCHEDULER_NAME ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) private var coroutineScheduler = createScheduler() //回傳CoroutineScheduler例項 private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { //dispatch方法委派給了CoroutineScheduler的dispatch方法 coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { //... } internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { try { //dispatchWithContext方法委派給了CoroutineScheduler的dispatch方法 coroutineScheduler.dispatch(block, context, tailDispatch) } catch (e: RejectedExecutionException) { //... } } //... } |
我們再看Dispatchers.IO對應的LimitingDispatcher實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | private class LimitingDispatcher( private val dispatcher: ExperimentalCoroutineDispatcher,//外部傳進的DefaultScheduler例項 private val parallelism: Int, private val name: String?, override val taskMode: Int ) : ExecutorCoroutineDispatcher(), TaskContext, Executor { private val queue = ConcurrentLinkedQueue<Runnable>() private val inFlightTasks = atomic(0) override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) private fun dispatch(block: Runnable, tailDispatch: Boolean) { var taskToSchedule = block while (true) { val inFlight = inFlightTasks.incrementAndGet() if (inFlight <= parallelism) { //LimitingDispatcher的dispatch方法委派給了DefaultScheduler的dispatchWithContext方法 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) return } queue.add(taskToSchedule) if (inFlightTasks.decrementAndGet() >= parallelism) { return } taskToSchedule = queue.poll() ?: return } } //... } |
從上面分析得知,Dispatchers.Default的實現是DefaultScheduler,Dispatchers.IO的實現是LimitingDispatcher,而LimitingDispatcher持有DefaultScheduler例項,把dispatch操作委派給DefaultScheduler,DefaultScheduler內部持有CoroutineScheduler例項,把dispatch操作委派給CoroutineScheduler,而DefaultScheduler又是一個單例,所以Dispatchers.Default和Dispatchers.IO它們共用同一個CoroutineScheduler例項,它們之間的關係如下:

CoroutineScheduler就是kotlin自己實現的共享執行緒池,是Dispatchers.Default和Dispatchers.IO內部的共同實現,Dispatchers.Default和Dispatchers.IO共享CoroutineScheduler中的執行緒,DefaultScheduler和LimitingDispatcher的主要作用是對CoroutineScheduler進行執行緒數、任務數等配置,CoroutineScheduler使用工作竊取演算法(Work Stealing)重新實現了一套執行緒池的任務排程邏輯,它的效能、擴展性對協程的任務排程更友好,具體的邏輯可以檢視這個類的dispatch方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { override fun execute(command: Runnable) = dispatch(command) fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { val task = createTask(block, taskContext) //... if (task.mode == TASK_NON_BLOCKING) { //... } else { //... } } //... } |
所以這個執行緒池既可以執行兩種型別的任務:CPU密集型任務和IO密集型任務,用一個mode來區別,當你為協程指定Dispatchers.Default時,Dispatcher會把協程的任務指定為CPU密集型任務,對應mode為TASK_NON_BLOCKING,當你為協程指定Dispatchers.IO時,Dispatcher會把協程的任務指定為IO密集型任務,對應mode為TASK_PROBABLY_BLOCKING,所以這時CoroutineScheduler就可以根據task mode作出不同的執行緒建立、排程、喚醒策略,當啟動協程時沒有指定Dispatcher,預設會使用Dispatchers.Default。
當執行CPU密集型任務時,CoroutineScheduler最多有corePoolSize個執行緒被建立,corePoolSize它的取值為max(2, CPU核心數),即它會盡量的等於CPU核心數,當執行IO密集型任務時,它可以建立比corePoolSize更多的執行緒來執行IO型任務,但不能大於maxPoolSize,maxPoolSize會取一個很大的值,預設為max(corePoolSize, min(CPU核心數 * 128, 2^21 - 2)),即大於corePoolSize,小於2^21 - 2,而2^21 - 2是一個很大的數約為2M,但是CoroutineScheduler是不可能建立這麼多執行緒的,所以就需要外部限制提交的任務數,而Dispatchers.IO建構時就透過LimitingDispatcher預設限制了最大執行緒併發數parallelism為max(64, CPU核心數),即Dispatchers.IO最多只能提交parallelism個任務到CoroutineScheduler中執行,剩餘的任務被放進一個佇列中等待。
CPU密集型任務:CPU密集型任務的特點是執行任務時CPU會處於忙碌狀態,任務會耗用大量的CPU資源,例如計算複雜的算術、影片解碼等,如果此時執行緒數太多,超過了CPU核心數,那麼這些超出來的執行緒是得不到CPU的執行的,只會浪費記憶體資源,因為執行緒本身也有棧等空間,同時執行緒過多,頻繁的執行緒切換帶來的耗用也會影響執行緒池的效能,所以對於CPU密集型任務,執行緒池併發執行緒數等於CPU核心數才能讓CPU的執行效率最大化;
IO密集型任務:IO密集型任務的特點是執行任務時CPU會處於閑置狀態,任務不會耗用大量的CPU資源,例如網路請求、IO操作等,執行緒執行IO密集型任務時大多數處於阻塞狀態,處於阻塞狀態的執行緒是不佔用CPU的執行時間,這時CPU就處於閑置狀態,為了讓CPU忙起來,執行IO密集型任務時理應讓執行緒的建立數量更多一點,理想情況下執行緒數應該等於提交的任務數,對於這些多建立出來的執行緒,當它們閑置時,執行緒池一般會有一個超時回收策略,所以大部分情況下並不會佔用大量的記憶體資源,但也會有極端情況,所以對於IO密集型任務,執行緒池併發執行緒數應儘可能地多才能提高CPU的吞吐量,這個儘可能地多的程度並不是無限大,而是根據業務情況設定,但肯定要大於CPU核心數。
Unconfined
Dispatchers.Unconfined的含義是不給協程指定執行的執行緒,在第一次被掛起(suspend)之前,由啟動協程的執行緒執行它,但被掛起後, 會由恢復協程的執行緒繼續執行, 如果一個協程會被掛起多次, 那麼每次被恢復後, 都有可能被不同執行緒繼續執行,看下面的一個範例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | fun main(){ GlobalScope.launch(Dispatchers.Unconfined){ println(Thread.currentThread().name) //掛起 withContext(Dispatchers.IO){ println(Thread.currentThread().name) } //恢復 println(Thread.currentThread().name) //掛起 withContext(Dispatchers.Default){ println(Thread.currentThread().name) } //恢復 println(Thread.currentThread().name) } //行程保活 Thread.sleep(1000) } 執行輸出: main DefaultDispatcher-worker-1 DefaultDispatcher-worker-1 DefaultDispatcher-worker-3 DefaultDispatcher-worker-3 |
協程啟動時指定了Dispatchers.Unconfined,所以第一次執行時是由啟動協程的執行緒執行,上面在主執行緒中啟動了協程,所以第一次輸出主執行緒main,withContext方法是一個suspend方法,它可以掛起當前協程,並把指定的程式碼塊執行到給定的上下文中,直到程式碼塊執行完成並回傳結果,第一個程式碼塊透過withContext方法把它執行在Dispatchers.IO中,所以第二次輸出了執行緒池中的某一個執行緒DefaultDispatcher-worker-1,第一個程式碼塊執行完畢後,協程在DefaultDispatcher-worker-1執行緒中恢復,所以協程恢復後執行在DefaultDispatcher-worker-1執行緒中,所以第三次繼續輸出DefaultDispatcher-worker-1,第二個程式碼塊同理。
那麼Dispatchers.Unconfined是怎麼做到的呢,我們看下Unconfined對應的CoroutineDispatcher實現 - kotlinx.coroutines.Unconfined:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | internal object Unconfined : CoroutineDispatcher() { override fun isDispatchNeeded(context: CoroutineContext): Boolean = false override fun dispatch(context: CoroutineContext, block: Runnable) { // It can only be called by the "yield" function. See also code of "yield" function. val yieldContext = context[YieldContext] if (yieldContext != null) { // report to "yield" that it is an unconfined dispatcher and don't call "block.run()" yieldContext.dispatcherWasUnconfined = true return } throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " + "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " + "isDispatchNeeded and dispatch calls.") } } |
Unconfined他重寫了CoroutineDispatcher的isDispatchNeeded方法和dispatch方法,isDispatchNeeded方法回傳了false,表示不需要dispatch,而預設CoroutineDispatcher的isDispatchNeeded方法是回傳true的,Dispatchers.Default和Dispatchers.IO都沒有重寫這個方法,Unconfined的dispatch方法沒有任何任務排程的邏輯,只是寫明了衹有當呼叫yield方法時,Unconfined的dispatch方法才會被呼叫,yield方法是一個suspend方法,當在協程中呼叫這個方法時表示當前協程讓出自己所在的執行緒給其他協程執行,所以正常情況下是不會呼叫Unconfined的dispatch方法的。
在kotlin中每個協程都有一個Continuation例項與之對應,當協程恢復時會呼叫Continuation的resumeWith方法,它的實現在DispatchedContinuation中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher,//協程的的CoroutineDispatcher例項 @JvmField val continuation: Continuation<T>//代表協程的Continuation例項 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { //... override fun resumeWith(result: Result<T>) { val context = continuation.context val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC dispatcher.dispatch(context, this) } else {//Unconfined走這裡的邏輯 //呼叫executeUnconfined方法 executeUnconfined(state, MODE_ATOMIC) { withCoroutineContext(this.context, countOrElement) { //呼叫Continuation的resumeWith方法 continuation.resumeWith(result) } } } } } |
我們注意到by關鍵字,這是kotlin中的委派實現,DispatchedContinuation透過類委派加強了Continuation的resumeWith方法,即在呼叫Continuation的resumeWith方法之前增加了一些自己的邏輯,我們可以看到DispatchedContinuation的resumeWith方法中會根據CoroutineDispatcher的isDispatchNeeded方法回傳值做出不同處理,當isDispatchNeeded方法回傳true時,會呼叫協程的CoroutineDispatcher的dispatch方法,而當isDispatchNeeded方法回傳false時,不會呼叫CoroutineDispatcher的dispatch方法而是呼叫executeUnconfined方法,上面講到Unconfined的isDispatchNeeded方法回傳了false,我們看executeUnconfined方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ): Boolean { assert { mode != MODE_UNINITIALIZED } //從ThreadLocal中取出EventLoop val eventLoop = ThreadLocalEventLoop.eventLoop if (doYield && eventLoop.isUnconfinedQueueEmpty) return false //判斷是否在執行Unconfined任務 return if (eventLoop.isUnconfinedLoopActive) { _state = contState resumeMode = mode //呼叫EventLoop的dispatchUnconfined方法把Unconfined任務放進EventLoop中 eventLoop.dispatchUnconfined(this) true } else { //執行Unconfined任務 runUnconfinedEventLoop(eventLoop, block = block) false } } internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { eventLoop.incrementUseCount(unconfined = true) try { //先執行block程式碼塊,block()就是executeUnconfined方法傳進的程式碼塊, block()裡面會呼叫Continuation的resumeWith方法 block() while (true) { //再呼叫EventLoop的processUnconfinedEvent方法執行EventLoop中的Unconfined任務,直到EventLoop中的所有Unconfined任務執行完才跳出迴圈 if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { //... } finally { eventLoop.decrementUseCount(unconfined = true) } } |
可以看到對於Unconfined任務,是在當前執行緒馬上執行或者透過當前執行緒的EventLoop來執行的,EventLoop是存放在ThreadLocal中的,所以EventLoop它是跟當前執行緒相關聯的,而EventLoop也是CoroutineDispatcher的一個子類別:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | internal abstract class EventLoop : CoroutineDispatcher() { //... private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null public fun dispatchUnconfined(task: DispatchedTask<*>) { val queue = unconfinedQueue ?: ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } public fun processUnconfinedEvent(): Boolean { val queue = unconfinedQueue ?: return false val task = queue.removeFirstOrNull() ?: return false task.run() return true } } |
EventLoop中有一個雙端佇列用於存放Unconfined任務,Unconfined任務是指指定了Dispatchers.Unconfined的協程任務,EventLoop的dispatchUnconfined方法用於把Unconfined任務放進佇列的尾部,processUnconfinedEvent方法用於從佇列的頭部移出Unconfined任務執行。
所以executeUnconfined方法裡面的策略就是:在當前執行緒立即執行Unconfined任務,如果當前執行緒已經在執行Unconfined任務,就暫時把它放進跟當前執行緒關聯的EventLoop中,等待執行,同時Unconfined任務裡面會呼叫Continuation的resumeWith方法恢復協程執行,這也是為什麼指定了Dispatchers.Unconfined後協程恢復能夠被恢復協程的執行緒執行的原因。
Main
Dispatchers.Main的含義是把協程執行在平台相關的只能操作UI物件的Main執行緒,所以它根據不同的平台有不同的實現,kotlin它支援下面三種平台:
-
kotlin/js:kotlin/js是kotlin對JavaScript的支援,提供了轉換kotlin程式碼,kotlin標準庫的能力,npm包管理能力,在kotlin/js上Dispatchers.Main等效於Dispatchers.Default;
-
kotlin/native:kotlin/native是一種將kotlin程式碼編譯為無需虛擬機就可執行的原生二進位制檔案的技術, 它的主要目的是允許對不需要或不可能使用虛擬機的平台進行編譯,例如嵌入式裝置或iOS,在kotlin/native上Dispatchers.Main等效於Dispatchers.Default;
-
kotlin/JVM:kotlin/JVM就是需要虛擬機才能編譯的平台,例如Android就是屬於kotlin/JVM,對於kotlin/JVM我們需要引入對應的dispatcher,例如Android就需要引入kotlinx-coroutines-android庫,它裡面有Android對應的Dispatchers.Main實現,其實就是把任務透過Handler執行在Android的主執行緒.
我們再看Dispatchers.Main的實現 - MainDispatcherLoader.dispatcher:
1 2 3 4 5 6 7 8 9 | internal object MainDispatcherLoader { @JvmField val dispatcher: MainCoroutineDispatcher = loadMainDispatcher() private fun loadMainDispatcher(): MainCoroutineDispatcher { //...主要是透過反射載入實現了MainCoroutineDispatcher的類 } } |
所以Dispatchers.Main的CoroutineDispatcher實現是MainCoroutineDispatcher,MainCoroutineDispatcher的具體實現就因平台的不同而不同了,如果你直接使用Dispatchers.Main而沒有引入對應的庫就會引發IllegalStateException異常。
CoroutineName
1 2 3 4 5 6 7 8 | public data class CoroutineName( val name: String ) : AbstractCoroutineContextElement(CoroutineName) { public companion object Key : CoroutineContext.Key<CoroutineName> override fun toString(): String = "CoroutineName($name)" } |
CoroutineName就是協程的名字,它的結構很簡單, 我們平時開發一般是不會去指定一個CoroutineName的,因為CoroutineName只在kotlin的除錯模式下才會被用的, 它在debug模式下被用於設定協程執行執行緒的名字:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | internal data class CoroutineId( val id: Long ) : ThreadContextElement<String>, AbstractCoroutineContextElement(CoroutineId) { override fun updateThreadContext(context: CoroutineContext): String { val coroutineName = context[CoroutineName]?.name ?: "coroutine" val currentThread = Thread.currentThread() val oldName = currentThread.name var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR) if (lastIndex < 0) lastIndex = oldName.length currentThread.name = buildString(lastIndex + coroutineName.length + 10) { append(oldName.substring(0, lastIndex)) append(DEBUG_THREAD_NAME_SEPARATOR) append(coroutineName) append('#') append(id) } return oldName } //... } |
CoroutineExceptionHandler
1 2 3 4 5 6 | public interface CoroutineExceptionHandler : CoroutineContext.Element { public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler> public fun handleException(context: CoroutineContext, exception: Throwable) } |
CoroutineExceptionHandler就是協程的異常處理器,用來處理協程執行中未捕獲的異常,每一個建立的協程預設都會有一個異常處理器,我們可以在啟動協程時透過CoroutineContext指定我們自定義的異常處理器,我們可以透過CoroutineExceptionHandler方法建立一個CoroutineExceptionHandler,它會回傳一個CoroutineExceptionHandler的預設實現,預設實現的handleException方法中呼叫了我們傳進的handler方法:
1 2 3 4 5 | public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler = object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler { override fun handleException(context: CoroutineContext, exception: Throwable) = handler.invoke(context, exception) } |
CoroutineExceptionHandler只對launch方法啟動的根協程有效,而對async啟動的根協程無效,因為async啟動的根協程預設會捕獲所有未捕獲異常並把它放在Deferred中,等到使用者呼叫Deferred的await方法才丟擲,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | fun main(){ //自定義CoroutineExceptionHandler val handler = CoroutineExceptionHandler{ coroutineContext, throwable -> println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}") } //handler有效 val job = GlobalScope.launch(handler){ throw IndexOutOfBoundsException("exception thrown from launch") } job.start() //handler無效 val deferred = GlobalScope.async(handler){ throw NullPointerException("exception thrown from async") } deferred.start() Thread.sleep(1000) } 輸出: my coroutineExceptionHandler catch exception, msg = exception thrown from launch |
其中衹有launch啟動的根協程丟擲的異常才被CoroutineExceptionHandler處理,而對於async啟動的根協程丟擲的異常CoroutineExceptionHandler無效,需要我們呼叫Deferred的await方法時try catch。
還有子協程丟擲的未捕獲異常會委派父協程的CoroutineExceptionHandler處理,子協程設定的CoroutineExceptionHandler永遠不會生效(SupervisorJob 除外),如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | fun main(){ //根協程的Handler val parentHandler = CoroutineExceptionHandler{coroutineContext, throwable -> println("parent coroutineExceptionHandler catch exception, msg = ${throwable.message}") } //啟動根協程 val parentJob = GlobalScope.launch(parentHandler){ //子協程的Handler val childHandler = CoroutineExceptionHandler{coroutineContext, throwable -> println("child coroutineExceptionHandler catch exception, msg = ${throwable.message}") } //啟動子協程 val childJob = launch(childHandler){ throw IndexOutOfBoundsException("exception thrown from child launch") } childJob.start() } parentJob.start() Thread.sleep(1000) } 輸出: parent coroutineExceptionHandler catch exception, msg = exception thrown from child launch |
可以看到子協程設定CoroutineExceptionHandler沒有輸出,衹有根協程的CoroutineExceptionHandler輸出了,但是也有例外,如果子協程是SupervisorJob,那麼它設定的CoroutineExceptionHandler是生效的,前面也說過SupervisorJob不會產生異常傳播。
當父協程的子協程同時丟擲多個異常時,CoroutineExceptionHandler只會捕獲第一個協程丟擲的異常,後續協程丟擲的異常被儲存在第一個異常的suppressed陣列中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | fun main(){ val handler = CoroutineExceptionHandler{coroutineContext, throwable -> println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}, suppressed = ${throwable.suppressed.contentToString()}") } val parentJob = GlobalScope.launch(handler){ launch { try { delay(200) }finally { //第二個丟擲的異常 throw IndexOutOfBoundsException("exception thrown from first child launch") } }.start() launch { delay(100) //第一個丟擲的異常 throw NullPointerException("exception thrown from second child launch") }.start() } parentJob.start() Thread.sleep(1000) } 輸出: my coroutineExceptionHandler catch exception, msg = exception thrown from second child launch, suppressed = [java.lang.IndexOutOfBoundsException: exception thrown from first child launch] |
可以看到CoroutineExceptionHandler只處理了第一個子協程丟擲的異常,後續異常都放在了第一個丟擲異常的suppressed陣列中。
還有取消協程時會丟擲一個CancellationException,它會被所有CoroutineExceptionHandler省略,但可以try catch它,同時當子協程丟擲CancellationException時,並不會終止當前父協程的執行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | fun main(){ val handler = CoroutineExceptionHandler{coroutineContext, throwable -> println("my coroutineExceptionHandler catch exception, msg = ${throwable.message}") } val parentJob = GlobalScope.launch(handler){ val childJob = launch { try { delay(Long.MAX_VALUE) }catch (e: CancellationException){ println("catch cancellationException thrown from child launch") println("rethrow cancellationException") throw CancellationException() }finally { println("child was canceled") } } //取消子協程 childJob.cancelAndJoin() println("parent is still running") } parentJob.start() Thread.sleep(1000) } 輸出: catch cancellationException thrown from child launch rethrow cancellationException child was canceled parent is still running |
可以看到當丟擲CancellationException時,我們可以try catch住它,同時當我們再次丟擲它時,協程的CoroutineExceptionHandler並沒有處理它,同時父協程不受影響,繼續執行。
上面就是CoroutineExceptionHandler處理協程異常時的特點。
/ CoroutineContext的結構 /
我們再次看一下CoroutineContext的全家福:

上面講解了組成CoroutineContext的Element,每一個Element都繼承自CoroutineContext,而每一個Element都可以透過 + 號來組合,也可以透過類似map的 [key] 來取值,這和CoroutineContext的運算子過載邏輯和它的結構實現CombinedContext有關,我們先來看一下CoroutineContext類:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public interface CoroutineContext { //運算子[]過載,可以透過CoroutineContext[Key]這種形式來取得與Key關聯的Element public operator fun <E : Element> get(key: Key<E>): E? //它是一個聚集函式,提供了從left到right遍歷CoroutineContext中每一個Element的能力,並對每一個Element做operation操作 public fun <R> fold(initial: R, operation: (R, Element) -> R): R //運算子+過載,可以CoroutineContext + CoroutineContext這種形式把兩個CoroutineContext合併成一個 public operator fun plus(context: CoroutineContext): CoroutineContext //回傳一個新的CoroutineContext,這個CoroutineContext刪除了Key對應的Element public fun minusKey(key: Key<*>): CoroutineContext //Key定義,空實現,僅僅做一個標識 public interface Key<E : Element> //Element定義,每個Element都是一個CoroutineContext public interface Element : CoroutineContext { //每個Element都有一個Key例項 public val key: Key<*> //... } } |
除了plus方法,CoroutineContext中的其他三個方法都被CombinedContext、Element、EmptyCoroutineContext重寫,CombinedContext就是CoroutineContext集合結構的實現,它裡面是一個遞迴定義,Element就是CombinedContext中的元素,而EmptyCoroutineContext就表示一個空的CoroutineContext,它裡面是空實現。
CombinedContext
我們先看CombinedContext類:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | //CombinedContext只包含left和element兩個成員:left可能為CombinedContext或Element例項,而element就是Element例項 internal class CombinedContext( private val left: CoroutineContext, private val element: Element ) : CoroutineContext, Serializable { //CombinedContext的get操作的邏輯是: //1、先看element是否是匹配,如果匹配,那麼element就是需要找的元素,回傳element,否則說明要找的元素在left中,繼續從left開始找,根據left是CombinedContext還是Element轉到2或3 //2、如果left又是一個CombinedContext,那麼重複1 //3、如果left是Element,那麼呼叫它的get方法回傳 override fun <E : Element> get(key: Key<E>): E? { var cur = this while (true) { //1 cur.element[key]?.let { return it } val next = cur.left if (next is CombinedContext) {//2 cur = next } else {//3 return next[key] } } } //CombinedContext的fold操作的邏輯是:先對left做fold操作,把left做完fold操作的的回傳結果和element做operation操作 public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(left.fold(initial, operation), element) //CombinedContext的minusKey操作的邏輯是: //1、先看element是否是匹配,如果匹配,那麼element就是需要刪除的元素,回傳left,否則說明要刪除的元素在left中,繼續從left中刪除對應的元素,根據left是否刪除了要刪除的元素轉到2或3或4 //2、如果left中不存在要刪除的元素,那麼當前CombinedContext就不存在要刪除的元素,直接回傳當前CombinedContext例項就行 //3、如果left中存在要刪除的元素,刪除了這個元素後,left變為了空,那麼直接回傳當前CombinedContext的element就行 //4、如果left中存在要刪除的元素,刪除了這個元素後,left不為空,那麼組合一個新的CombinedContext回傳 public override fun minusKey(key: Key<*>): CoroutineContext { //1 element[key]?.let { return left } val newLeft = left.minusKey(key) return when { newLeft === left -> this//2 newLeft === EmptyCoroutineContext -> element//3 else -> CombinedContext(newLeft, element)//4 } } //... } |
可以發現CombinedContext中的get、fold、minusKey操作都是遞迴形式的操作,遞迴的終點就是當這個left是一個Element,我們再看Element類:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public interface Element : CoroutineContext { public val key: Key<*> //Element的get方法邏輯:如果key和自己的key匹配,那麼自己就是要找的Element,回傳自己,否則回傳null public override operator fun <E : Element> get(key: Key<E>): E? = if (this.key == key) this as E else null //Element的fold方法邏輯:對傳入的initial和自己做operation操作 public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(initial, this) //Element的minusKey方法邏輯:如果key和自己的key匹配,那麼自己就是要刪除的Element,回傳EmptyCoroutineContext(表示刪除了自己),否則說明自己不需要被刪除,回傳自己 public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key == key) EmptyCoroutineContext else this } |
現在我們把CombinedContext和Element結合來看,那麼CombinedContext的整體結構如下:

有點像是一個連結串列,left就是指向下一個結點的指標,有了這個圖我們再從整體看當呼叫CombinedContext的get、fold、minusKey操作時的訪問順序:get、minusKey操作大體邏輯都是先訪問當前element,不滿足,再訪問left的element,順序都是從right到left,而fold的操作大體邏輯是先訪問left,直到遞迴到最後的element,然後再從left到right的回傳,從而訪問了所有的element。
CoroutineContext的plus操作
現在我們來看CoroutineContext唯一沒有被重寫的方法 - plus方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | public interface CoroutineContext { //... public operator fun plus(context: CoroutineContext): CoroutineContext = if (context === EmptyCoroutineContext) this else context.fold(this) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } } } |
這個方法看起來有點複雜,為了方便我們理解,我把它簡化一下,我把對ContinuationInterceptor的處理去掉,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | public interface CoroutineContext { //... public operator fun plus(context: CoroutineContext): CoroutineContext = //如果要相加的CoroutineContext為空,那麼不做任何處理,直接回傳 if (context === EmptyCoroutineContext) this else //如果要相加的CoroutineContext不為空,那麼對它進行fold操作 context.fold(this) { acc, element -> //我們可以把acc理解成+號左邊的CoroutineContext,element理解成+號右邊的CoroutineContext的某一個element //首先從左邊CoroutineContext中刪除右邊的這個element val removed = acc.minusKey(element.key) //如果removed為空,說明左邊CoroutineContext刪除了和element相同的元素後為空,那麼回傳右邊的element即可 if (removed === EmptyCoroutineContext) element else { //如果removed不為空,說明左邊CoroutineContext刪除了和element相同的元素後還有其他元素,那麼建構一個新的CombinedContext回傳 return CombinedContext(removed, element) } } } |
plus方法大部分情況最終下回傳一個CombinedContext,即我們把兩個CoroutineContext相加後,回傳一個CombinedContext,在組合成CombinedContext時,+號右邊的CoroutineContext中的元素會覆蓋+號左邊的CoroutineContext中的含有相同key的元素,如下:
1 | (Dispatchers.Main, "name") + (Dispatchers.IO) = (Dispatchers.IO, "name") |
這個覆蓋操作就在fold方法的引數operation程式碼塊中完成,透過minusKey方法刪除掉重複元素,前面講過當呼叫CombinedContext的fold方法時,會從left到right到訪問所有的element,即會從left到right的把每一個element傳入operation方法中,作為operation方法的第二個引數,而operation方法第一個引數acc的初始值為fold方法傳入的initial值,然後它會不斷的更新,每次更新的值為上一次呼叫operation方法的回傳值,所以當兩個CoroutineContext相加時,puls方法可以理解為下面的虛擬程式碼:
1 2 3 4 5 | val acc = 左邊的CoroutineContext for(var element in 右邊的CoroutineContext){ acc = operation(acc, element)//operation操作中會讓element覆蓋掉acc中與element相同的元素 } return acc//所以plus方法最終回傳的CoroutineContext是不存在key相同的element的 |
所以puls方法最終回傳的CoroutineContext是不存在key相同的element的,+號右邊的CoroutineContext中的元素會覆蓋+號左邊的CoroutineContext中的含有相同key的元素,這像是Set的屬性。
現在我們再看回簡化前的plus方法,它裡面有個對ContinuationInterceptor的處理,目的是讓ContinuationInterceptor在每次相加後都能變成CoroutineContext中的最後一個元素, ContinuationInterceptor它也是繼承自Element,通常叫做協程上下文攔截器,它的主要作用是在協程執行前攔截它,從而在協程執行前做出一些其他的操作,前面我們講到CoroutineDispatcher它本身也繼承自ContinuationInterceptor,ContinuationInterceptor有一個interceptContinuation方法用於回傳攔截協程的行為,而這個行為就是前面我們所講到Dispatchers.Unconfined時的DispatchedContinuation,DispatchedContinuation在恢復協程前根據協程的CoroutineDispatcher型別做出不同的協程分派行為,透過把ContinuationInterceptor放在最後面,協程在搜尋上下文的element時,總能最快找到攔截器,避免了遞迴搜尋,從而讓攔截行為前置執行。
/ 結語 /
本文主要介紹了CoroutineContext的元素組成和結構,理解CoroutineContext對於理解協程使用有很大的幫助,因為協程的啟動時就離不開CoroutineContext,同時如果你以後想要更深入的學習協程,例如協程的建立過程,Continuation概念、suspend關鍵字等,本篇文章也能給你一個拋磚引玉的效果。
推薦閱讀:
我的新書,《第一行程式碼 第3版》已出版!
用Jetpack Compose寫一個玩AndroidApp
Android中指紋識別的使用
歡迎關注我的公眾號
學習技術或投稿


長按上圖,識別圖中二維碼即可關注