Kotlin Coroutines (協程)

並行程式設計 (Concurrency) 一直是痛點。從早期的 Thread,到 Callback Hell,再到 RxJava。 Kotlin 推出了 Coroutines,核心理念是:用寫同步程式碼的方式,來寫非同步程式

這篇文章將帶你深入了解 Coroutines 的運作機制,而不只是皮毛。

核心觀念:什麼是協程?

你可以把它想像成輕量級的 Thread (Lightweight Thread)。 一個 Thread 可以跑成千上萬個 Coroutine。

  • Thread (執行緒):作業系統層級,建立與切換成本高。
  • Coroutine (協程):語言層級 (Kotlin Runtime),成本極低,可以在執行緒之間「暫停 (Suspend)」和「恢復 (Resume)」。

非阻塞 (Non-blocking) 是關鍵: 當協程執行到網路請求時,它可以暫停自己,讓底下的 Thread 去做別的事(例如處理 UI)。等到網路資料回來了,協程再恢復執行。

在 Kotlin 中,定義並啟動一個 Coroutine 主要涉及三個核心概念:CoroutineScope(作用域)、Builder(構建器) 以及 Dispatcher(調度器)。簡單來說,你不能直接「定義」一個協程像定義變數一樣,而是要透過「啟動」的方式來建立它。

協程作用域 (CoroutineScope):在哪裡執行

協程必須運行在一個作用域(Scope)內,Scope 定義了協程的生命週期,這樣當作用域銷毀時,裡面的協程才會自動停止,避免記憶體洩漏。

常見的 Scope

  • GlobalScope: 全域 Scope,生命週期跟隨整個 App。它不受控,強烈建議不要在專案中使用,因為容易造成 Memory Leak。
  • CoroutineScope (Dispatchers.XX): 自定義 Scope。通常我們會繼承它或使用委派。
  • MainScope(): 預設在 UI Thread 的 Scope。

Android 專用 Scope

  • ViewModelScope: 跟隨 ViewModel 生命週期。ViewModel 清除時自動取消所有協程。
  • LifecycleScope: 在 Activity/Fragment 中使用,跟隨 Activity/Fragment 生命週期。
// Android 範例
viewModelScope.launch {
    // 當使用者離開畫面,ViewModel 被 cleared,這裡的任務會自動被取消
    // 不會因為網路請求回來但畫面不在了而 Crash
    val data = repository.fetch()
}

調度器 (Dispatchers):決定在哪個執行緒執行

你可以指定協程要在哪個執行緒池執行 (Context Switching)。

Dispatcher用途底層機制
Dispatchers.MainUI 操作Android 的 Main Thread (如 Swing/JavaFx 的 Event Loop),處理 UI 互動
Dispatchers.IOI/O 密集型共用的 Thread Pool (最多 64 個),適合讀寫檔案、網路請求、DB
Dispatchers.DefaultCPU 密集型共用的 Thread Pool (核心數),適合大量數學運算、影像處理
Dispatchers.Unconfined不限制呼叫在哪就在哪,直到第一次掛起。通常不建議使用。

最佳實踐:在合適的地方切換 Context 使用 withContext 來切換執行緒。

fun loadData() {
    viewModelScope.launch(Dispatchers.Main) { // 1. 在 UI Thread 啟動
        showLoading()
        
        val result = withContext(Dispatchers.IO) { // 2. 切換到 IO Thread 跑耗時任務
            api.getData() // 這行會卡住 IO Thread,但 Main Thread 是順暢的
        }
        
        // 3. 執行完自動切回 Main Thread
        showResult(result) 
    }
}

構建器 (Builders):啟動方式

要進入協程的世界,我們需要 Coroutine Builders,最常見的啟動方式是使用 launchasync

launch (射後不理,不回傳結果)

不會回傳結果,回傳一個 Job 物件用來控制(取消)協程。適合不需要回傳值的操作 (如更新 Cache)。

scope.launch {
    // 執行任務
    println("Task started")
}

async (需要取得結果)

會回傳結果一個 Deferred<T> 物件,之後可以透過 .await() 取得結果。適合並行處理多個任務。

scope.launch {
    val deferred1 = async { fetchPart1() }
    val deferred2 = async { fetchPart2() }
    
    // 這裡會等待兩個都做完才繼續 (並行執行)
    val result = deferred1.await() + deferred2.await()
}

runBlocking (會阻塞當前執行緒)

通常除非是測試或 main function 才會用到,它會真的卡住 (Block) 當前的 Thread 直到裡面的協程跑完。千萬不要在 UI Thread (Android) 或 Backend 的 Request Thread 中使用它。

fun main() = runBlocking {
    launch {
        delay(1000)
        println("World!")
    }
    println("Hello,")
}

關鍵字:suspend

在定義協程中使用的函式時,你會頻繁用到 suspend 關鍵字。被 suspend 的函式稱為掛起函式 (Suspending Function),它具有神奇的能力:它可以暫停執行,但不會阻塞執行緒。它告訴編譯器:「這個函數有能力掛起協程,它執行時可能會需要等待」。

  • 掛起(Suspend): 當協程執行到一個掛起函數時,它會暫時停止在當前執行點,並釋放它所佔用的執行緒(Thread)。這意味著執行緒可以去處理其他任務,而不會被阻塞(Block)。
  • 恢復(Resume): 當掛起函數內部的耗時任務(如網路請求、資料庫查詢)完成後,協程會從剛才停止的地方繼續執行。

宣告了 suspend fun 並不會自動開啟新任務,它只是宣告這個函數「可以在協程裡面跑」。地位就像 val、class 一樣,是語言的基礎構造。

使用 suspend 的規則:suspend 函式只能被「另一個 suspend 函式」或「協程作用域 (Coroutine Scope)」呼叫。

suspend fun fetchData(): String {
    delay(1000) // delay 是一個掛起函數
    return "Data from network"
}

fun main() {
    // fetchData() // 錯誤!不能在普通函數直接調用
    
    runBlocking { // 建立一個協程作用域
        val result = fetchData() // 正確
        println(result)
    }
}

底層原理:CPS 轉換

你可能會好奇,為什麼程式碼可以「停下來」又「活過來」?

Kotlin 編譯器會對 suspend 函數進行 CPS(Continuation Passing Style) 轉換。 當你寫一個掛起函數時,編譯器會在後台幫你多傳遞一個參數:Continuation。

  • Continuation 就像是一個「書籤」,記錄了程式執行到哪裡,以及當時的狀態。
  • 當函數掛起時,這個書籤被保存起來;當任務完成,系統調用 continuation.resumeWith(),程式就根據書籤內容恢復執行。

為什麼要使用 Suspend?

避免「回呼地獄」(Callback Hell),傳統異步開發常長這樣:

fetchUser { user ->
    fetchOrders(user) { orders ->
        showOrders(orders)
    }
}

使用 suspend 後,程式碼變得非常直觀:

val user = fetchUser() // 掛起直到取得 user
val orders = fetchOrders(user) // 掛起直到取得 orders
showOrders(orders) // 恢復後直接顯示

協程上下文 (CoroutineContext)

CoroutineScope 裡都有一個 CoroutineContext,它就像是協程的「環境設定」,它決定了協程該在哪裡執行、如何處理錯誤,以及它的名字和生命週期。

這套「環境設定」主要包含以下四種關鍵元素,你可以透過 + 將它們組合成一個 Context:

  1. Job: 控制協程的生命週期(啟動、取消、狀態)。
  2. CoroutineDispatcher: 決定在哪個執行緒執行。
  3. CoroutineName: 給協程取個名字,方便除錯用。
  4. CoroutineExceptionHandler: 處理未被捕獲的例外。
val myContext = Dispatchers.IO + CoroutineName("DownloadTask")

scope.launch(myContext) {
    // 這裡的協程會在 IO 執行緒跑,且名字叫做 DownloadTask
    println("執行緒: ${Thread.currentThread().name}")
}

繼承關係與合併規則

這是 CoroutineContext 最核心的邏輯。當你啟動一個協程時,Context 是透過「繼承」與「覆蓋」產生的:

  1. 繼承父級: 預設情況下,子協程會繼承父協程或 Scope 的所有 Context 元素。
  2. 局部覆蓋: 如果你在啟動(如 launch)時傳入了新的元素,它會覆蓋繼承來的同類元素。
  3. Job 永遠是新的: 每個新啟動的協程都會生成一個新的 Job 實例,用來管理自己的生命週期,但它會與父 Job 建立父子關聯。

結構化並發 (Structured Concurrency)

為什麼 context 它很重要?關鍵在於結構化並發。這是 Kotlin Coroutines 最重要的設計哲學:協程必須有父子關係

  1. 父協程取消,子協程會全部被取消
  2. 父協程會等待所有子協程完成,才會視為完成。
  3. 子協程拋出例外,會向上傳遞導致父協程取消 (除非使用 SupervisorJob)。

這避免了「孤兒協程 (Dangling Coroutines)」在背景偷跑或是吞掉錯誤的問題。

如何在協程中查看 Context?

在協程內部,你可以直接存取一個內建的變數 coroutineContext:

scope.launch {
    println(coroutineContext) // 可以看到 Job、Dispatcher 等信息
}

例外處理 (Exception Handling)

在協程中,例外處理機制有分兩類:自動傳播 (launch)暴露给使用者 (async)

1. 傳播機制

預設情況下,如果子協程發生例外且沒被 Catch,它會取消父協程,進而取消所有兄弟協程

launch {
    launch { throw Exception("Oops!") } // 子協程拋出例外
    launch { println("Create sibling") } // 這裡會被連帶取消,不會印出
}

2. 使用 try-catch

這是最直觀的處理方式。

  • launch:在 Block 裡面 try-catch。
  • async:在 await() 的時候 try-catch。
launch {
    try {
        doSomething()
    } catch (e: Exception) {
        println("Caught: $e")
    }
}

val deferred = async { throw Exception("Async Error") }
try {
    deferred.await() // 這裡才會拋出例外
} catch (e: Exception) {
    println("Caught from async: $e")
}

3. CoroutineExceptionHandler (CEH)

這是一個 Context 元素,用來當作「全域例外捕捉器」。但注意:它只對根協程 (Root Coroutine) 有效

val handler = CoroutineExceptionHandler { _, exception ->
    println("Caught $exception")
}

// 必須放在 Root launch 的 Context 中
scope.launch(handler) {
    throw Exception("Error!")
}

監督機制 (Supervision)

標準的結構化並發是「一人出事,全家連坐」。但在 UI 開發中,我們不希望一個 Request 失敗導致整個畫面崩潰。這時就需要 SupervisorJob

SupervisorJob & supervisorScope

使用 SupervisorJob 時,子協程的失敗不會傳播給父協程,也不會取消其他兄弟協程。

val scope = CoroutineScope(SupervisorJob())

scope.launch {
    launch { 
        throw Exception("Task 1 failed") // 只有自己掛掉
    }
    
    launch { 
        delay(100)
        println("Task 2 is still alive!") // 這會正常執行
    }
}
viewModelScope 中,預設就使用了 SupervisorJob,所以一個操作失敗不會導致 ViewModel 裡的其他操作也被取消。

取消與超時 (Cancellation & Timeouts)

所有 launch 都會回傳一個 Job。你可以呼叫 job.cancel() 來取消任務。

val job = scope.launch {
    while (isActive) { // 檢查是否還活著
        // 做事...
    }
}

job.cancel()
注意:協程的取消是「協作式 (Cooperative)」的! 如果你的協程正在跑一個死迴圈且沒有檢查 isActive 或沒有呼叫 suspend 函式 (如 yield()delay()),它是無法被取消的。

處理超時 (withTimeout)

如果你希望某個任務執行太久就取消,可以使用 withTimeoutwithTimeoutOrNull

try {
    withTimeout(1000) { // 限時 1 秒
        delay(2000) // 模擬耗時操作
    }
} catch (e: TimeoutCancellationException) {
    println("Time's up!")
}

// withTimeoutOrNull 會回傳 null 而不是拋出例外,更優雅
val result = withTimeoutOrNull(1000) { "Success" } ?: "Fallback"

執行緒安全 (Concurrency Safety)

當多個協程同時修改同一個變數時,也會發生 Race Condition。Kotlin 提供了 Mutex (Mutual Exclusion) 來解決這個問題,它是 synchronizedLock 的協程版本 (非阻塞)。

val mutex = Mutex()
var counter = 0

repeat(100) {
    launch {
        // 使用 withLock 來確保 critical section 的安全
        mutex.withLock {
            counter++
        }
    }
}

異步串流:Flow

如果你需要回傳「多個值」怎麼辦?(例如下載進度條、即時股票報價)。 List<T> 是一次給全部,Sequence<T> 是同步的。 非同步的多個值,就是 Flow (冷串流)

建立 Flow

fun countdown(): Flow<Int> = flow {
    for (i in 5 downTo 1) {
        emit(i) // 發送(推送)資料給收集者
        delay(1000)
    }
}

emit 關鍵字必須與 flow { ... } 構建器(或是類似的發送環境)搭配使用。在 Flow 的模型中,flow { ... } 區塊定義了一個生產者。當你想從這個生產者「推」出一個資料給訂閱者(消費者)時,就會呼叫 emit

收集 Flow (Collect)

Flow 是 冷 (Cold) 的,你不 collect 它,裡面的 code 根本不會跑。

launch {
    countdown()
        .map { "剩餘 $it 秒" } // 支援各種操作元
        .collect { value ->
            println(value)
        }
}

Flow 的強大之處在於它也支援 Backpressure (背壓) 處理與豐富的運算子,完全可以取代 RxJava。

常用運算子 (Common Operators)

Flow 的操作分為「中間操作」與「終端操作」。

1. 中間操作 (Intermediate Operators)

這些函式不會馬上執行,而是回傳一個新的 Flow (Cold Stream)。

  • map:轉換資料型態。
  • filter:過濾資料。
  • onEach:對每個元素執行操作(不改變資料流),常用於 logging。
  • onCompletion:Flow 結束時執行 (無論成功或失敗)。
  • flowOn非常重要! 用來切換「上游」執行的 Dispatcher。
flow {
    emit(1) // 這裡會在 IO Thread 執行 (因為下面的 flowOn)
}
.map { it * 2 }
.onEach { println("Processing $it") } 
.onCompletion { cause -> if (cause == null) println("Done") }
.flowOn(Dispatchers.IO) // 影響它上面的操作
.collect { 
    // 這裡依舊在原本的 Thread (e.g. Main)
    println("Result: $it") 
}

2. 終端操作 (Terminal Operators)

這些函式會 觸發 Flow 開始運作,並等待結果 (Suspend Function)。

  • collect:最基本的收集,處理每一個值。
  • first:只取第一個值,然後直接取消 Flow。
  • single:預期 Flow 只有一個值,如果有更多值會拋出 Exception。
  • toList:將所有資料收集成一個 List。
val value = flow.first() // 取得第一個就結束
val list = flow.toList() // 全部跑完存成 List

熱流 (Hot Stream):StateFlow 與 SharedFlow

前面介紹的 Flow 是 冷流 (Cold Stream),只有被訂閱 (Collect) 時才會開始運作,且每個訂閱者都會拿到一份獨立的資料流。

但在 App 開發 (特別是 UI 層) 中,我們常需要 熱流 (Hot Stream)

  1. 狀態保持 (State Holding):無論有無訂閱者,資料都存在 (如 UI 顯示的當前 User)。
  2. 事件廣播 (Multicasting):一個事件發生,多個頁面同時收到。
  3. 持續活躍:不管有沒有人聽,資料流都在運作。

1. StateFlow (狀態容器)

它是用來取代 LiveData 的最佳方案。它永遠會有一個初始值,且只會發送 有變化 (Distinct) 的值。

  • 用途:UI 狀態 (Loading, Success, Error)。
  • 特性
    • Hot:立即活躍。
    • Sticky:新的訂閱者會馬上收到「當前最新」的值。
    • Distinct:如果寫入相同的值,不會觸發更新。

通常我們會遵循 Backing Property 模式:內部使用 MutableStateFlow 來改變狀態 (讀寫),對外暴露 StateFlow (唯讀) 以確保狀態封裝。

class MainViewModel : ViewModel() {
    // Backing Property: 內部可變 (Mutable),負責更新資料
    private val _uiState = MutableStateFlow("Loading")
    
    // Public Property: 對外唯讀 (Immutable),保護資料不被外部亂改
    val uiState = _uiState.asStateFlow()
    
    fun updateData(newData: String) {
        // 更新狀態:賦予新值,StateFlow 會自動通知所有訂閱者
        _uiState.value = newData
    }
}

// 在 Activity/Fragment 中收集
lifecycleScope.launch {
    // repeatOnLifecycle 確保在背景時暫停收集,節省資源
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.uiState.collect { state ->
            println("Current State: $state")
        }
    }
}

2. SharedFlow (事件流)

它是更底層、配置更靈活的熱流。

  • 用途:單次事件 (One-shot Events),如顯示 Toast、導航事件、Server Push 廣播。
  • 特性
    • No Initial Value:預設沒有初始值。
    • Configurable Replay:可以設定 replay 參數來決定新訂閱者能收到幾筆「過去」的資料 (預設 0)。
class EventViewModel : ViewModel() {
    private val _events = MutableSharedFlow<String>()
    val events = _events.asSharedFlow()
    
    fun triggerEvent() {
        viewModelScope.launch {
            _events.emit("Show Toast!")
        }
    }
}

3. 比較總結

特性FlowStateFlowSharedFlow
類型Cold (冷流)Hot (熱流)Hot (熱流)
初始值必須有
資料重播無 (每次重跑)1 (Sticky)可設定 (預設 0)
用途耗時任務、資料流UI 狀態 (State)單次事件 (Event)
LiveData 替代是 (完全替代)是 (替代 SingleLiveEvent)

補充:安全更新 MutableStateFlow

雖然可以直接設定 _uiState.value = newData,但如果你的更新依賴於「舊值」 (例如計數器 +1),建議使用 update 函式來確保 原子性 (Atomicity)執行緒安全

// 不安全,在多執行緒下可能會有 Race Condition
_count.value = _count.value + 1

// 安全,保證原子性
_count.update { it + 1 }

通道 (Channels):協程間的溝通

Channel 就像是協程之間的 管線 (Pipe)隊列 (Queue)。 一個協程負責 send 資料,另一個協程負責 receive 資料。

它與 Flow 最大的不同在於:

  1. Channel 是 Hot 的:寄件者送出資料後,如果沒人收,它可能會暫停 (Suspend) 等待,直到有人收走 (視 Buffer 設定而定)。
  2. 點對點 (Point-to-Point):一個資料只會被一個接收者拿走 (Flow 則是每個 Collector 都會收到一份)。
val channel = Channel<Int>()

launch {
    // Producer
    for (x in 1..5) {
        channel.send(x * x)
        println("Sent: ${x * x}")
    }
    channel.close() // 結束發送
}

launch {
    // Consumer
    // 使用 for loop 可以持續接收直到 close
    for (y in channel) {
        println("Received: $y")
    }
    println("Done!")
}

Channel 的類型 (Buffer Capacity)

建立 Channel 時可以指定 capacity

  • Rendezvous (預設):容量為 0。send 會暫停直到有人 receive (不見不散)。
  • Buffered:有固定容量。send 只有在滿了的時候才會暫停。
  • Unlimited:無限容量 (LinkedList)。send 永遠不會暫停 (小心 OutOfMemory)。
  • Conflated:容量為 1,但策略是 舊的資料會被覆蓋,接收者永遠只拿到最新的。

什麼時候用 Channel vs Flow?

  • Flow:適用於 資料流 (Stream) 的分發,例如感測器數據、資料庫更新。通常是一對多 (Multicast)。
  • Channel:適用於 生產者-消費者 (Producer-Consumer) 模式,或者兩個協程之間單純的訊息傳遞。通常是一對一 (Unicast)。