Kotlin Coroutines (協程)
並行程式設計 (Concurrency) 一直是痛點。從早期的 Thread,到 Callback Hell,再到 RxJava。 Kotlin 推出了 Coroutines,核心理念是:用寫同步程式碼的方式,來寫非同步程式。
這篇文章將帶你深入了解 Coroutines 的運作機制,而不只是皮毛。
核心觀念:什麼是協程?
你可以把它想像成輕量級的 Thread (Lightweight Thread)。 一個 Thread 可以跑成千上萬個 Coroutine。
- Thread (執行緒):作業系統層級,建立與切換成本高。
- Coroutine (協程):語言層級 (Kotlin Runtime),成本極低,可以在執行緒之間「暫停 (Suspend)」和「恢復 (Resume)」。
非阻塞 (Non-blocking) 是關鍵: 當協程執行到網路請求時,它可以暫停自己,讓底下的 Thread 去做別的事(例如處理 UI)。等到網路資料回來了,協程再恢復執行。
在 Kotlin 中,定義並啟動一個 Coroutine 主要涉及三個核心概念:CoroutineScope(作用域)、Builder(構建器) 以及 Dispatcher(調度器)。簡單來說,你不能直接「定義」一個協程像定義變數一樣,而是要透過「啟動」的方式來建立它。
協程作用域 (CoroutineScope):在哪裡執行
協程必須運行在一個作用域(Scope)內,Scope 定義了協程的生命週期,這樣當作用域銷毀時,裡面的協程才會自動停止,避免記憶體洩漏。
常見的 Scope
- GlobalScope: 全域 Scope,生命週期跟隨整個 App。它不受控,強烈建議不要在專案中使用,因為容易造成 Memory Leak。
- CoroutineScope (Dispatchers.XX): 自定義 Scope。通常我們會繼承它或使用委派。
- MainScope(): 預設在 UI Thread 的 Scope。
Android 專用 Scope
- ViewModelScope: 跟隨
ViewModel生命週期。ViewModel 清除時自動取消所有協程。 - LifecycleScope: 在 Activity/Fragment 中使用,跟隨 Activity/Fragment 生命週期。
// Android 範例
viewModelScope.launch {
// 當使用者離開畫面,ViewModel 被 cleared,這裡的任務會自動被取消
// 不會因為網路請求回來但畫面不在了而 Crash
val data = repository.fetch()
}
調度器 (Dispatchers):決定在哪個執行緒執行
你可以指定協程要在哪個執行緒池執行 (Context Switching)。
| Dispatcher | 用途 | 底層機制 |
|---|---|---|
| Dispatchers.Main | UI 操作 | Android 的 Main Thread (如 Swing/JavaFx 的 Event Loop),處理 UI 互動 |
| Dispatchers.IO | I/O 密集型 | 共用的 Thread Pool (最多 64 個),適合讀寫檔案、網路請求、DB |
| Dispatchers.Default | CPU 密集型 | 共用的 Thread Pool (核心數),適合大量數學運算、影像處理 |
| Dispatchers.Unconfined | 不限制 | 呼叫在哪就在哪,直到第一次掛起。通常不建議使用。 |
最佳實踐:在合適的地方切換 Context
使用 withContext 來切換執行緒。
fun loadData() {
viewModelScope.launch(Dispatchers.Main) { // 1. 在 UI Thread 啟動
showLoading()
val result = withContext(Dispatchers.IO) { // 2. 切換到 IO Thread 跑耗時任務
api.getData() // 這行會卡住 IO Thread,但 Main Thread 是順暢的
}
// 3. 執行完自動切回 Main Thread
showResult(result)
}
}
構建器 (Builders):啟動方式
要進入協程的世界,我們需要 Coroutine Builders,最常見的啟動方式是使用 launch 或 async:
launch (射後不理,不回傳結果)
不會回傳結果,回傳一個 Job 物件用來控制(取消)協程。適合不需要回傳值的操作 (如更新 Cache)。
scope.launch {
// 執行任務
println("Task started")
}
async (需要取得結果)
會回傳結果一個 Deferred<T> 物件,之後可以透過 .await() 取得結果。適合並行處理多個任務。
scope.launch {
val deferred1 = async { fetchPart1() }
val deferred2 = async { fetchPart2() }
// 這裡會等待兩個都做完才繼續 (並行執行)
val result = deferred1.await() + deferred2.await()
}
runBlocking (會阻塞當前執行緒)
通常除非是測試或 main function 才會用到,它會真的卡住 (Block) 當前的 Thread 直到裡面的協程跑完。千萬不要在 UI Thread (Android) 或 Backend 的 Request Thread 中使用它。
fun main() = runBlocking {
launch {
delay(1000)
println("World!")
}
println("Hello,")
}
關鍵字:suspend
在定義協程中使用的函式時,你會頻繁用到 suspend 關鍵字。被 suspend 的函式稱為掛起函式 (Suspending Function),它具有神奇的能力:它可以暫停執行,但不會阻塞執行緒。它告訴編譯器:「這個函數有能力掛起協程,它執行時可能會需要等待」。
- 掛起(Suspend): 當協程執行到一個掛起函數時,它會暫時停止在當前執行點,並釋放它所佔用的執行緒(Thread)。這意味著執行緒可以去處理其他任務,而不會被阻塞(Block)。
- 恢復(Resume): 當掛起函數內部的耗時任務(如網路請求、資料庫查詢)完成後,協程會從剛才停止的地方繼續執行。
宣告了 suspend fun 並不會自動開啟新任務,它只是宣告這個函數「可以在協程裡面跑」。地位就像 val、class 一樣,是語言的基礎構造。
使用 suspend 的規則:suspend 函式只能被「另一個 suspend 函式」或「協程作用域 (Coroutine Scope)」呼叫。
suspend fun fetchData(): String {
delay(1000) // delay 是一個掛起函數
return "Data from network"
}
fun main() {
// fetchData() // 錯誤!不能在普通函數直接調用
runBlocking { // 建立一個協程作用域
val result = fetchData() // 正確
println(result)
}
}
底層原理:CPS 轉換
你可能會好奇,為什麼程式碼可以「停下來」又「活過來」?
Kotlin 編譯器會對 suspend 函數進行 CPS(Continuation Passing Style) 轉換。 當你寫一個掛起函數時,編譯器會在後台幫你多傳遞一個參數:Continuation。
- Continuation 就像是一個「書籤」,記錄了程式執行到哪裡,以及當時的狀態。
- 當函數掛起時,這個書籤被保存起來;當任務完成,系統調用 continuation.resumeWith(),程式就根據書籤內容恢復執行。
為什麼要使用 Suspend?
避免「回呼地獄」(Callback Hell),傳統異步開發常長這樣:
fetchUser { user ->
fetchOrders(user) { orders ->
showOrders(orders)
}
}
使用 suspend 後,程式碼變得非常直觀:
val user = fetchUser() // 掛起直到取得 user
val orders = fetchOrders(user) // 掛起直到取得 orders
showOrders(orders) // 恢復後直接顯示
協程上下文 (CoroutineContext)
CoroutineScope 裡都有一個 CoroutineContext,它就像是協程的「環境設定」,它決定了協程該在哪裡執行、如何處理錯誤,以及它的名字和生命週期。
這套「環境設定」主要包含以下四種關鍵元素,你可以透過 + 將它們組合成一個 Context:
- Job: 控制協程的生命週期(啟動、取消、狀態)。
- CoroutineDispatcher: 決定在哪個執行緒執行。
- CoroutineName: 給協程取個名字,方便除錯用。
- CoroutineExceptionHandler: 處理未被捕獲的例外。
val myContext = Dispatchers.IO + CoroutineName("DownloadTask")
scope.launch(myContext) {
// 這裡的協程會在 IO 執行緒跑,且名字叫做 DownloadTask
println("執行緒: ${Thread.currentThread().name}")
}
繼承關係與合併規則
這是 CoroutineContext 最核心的邏輯。當你啟動一個協程時,Context 是透過「繼承」與「覆蓋」產生的:
- 繼承父級: 預設情況下,子協程會繼承父協程或 Scope 的所有 Context 元素。
- 局部覆蓋: 如果你在啟動(如 launch)時傳入了新的元素,它會覆蓋繼承來的同類元素。
- Job 永遠是新的: 每個新啟動的協程都會生成一個新的 Job 實例,用來管理自己的生命週期,但它會與父 Job 建立父子關聯。
結構化並發 (Structured Concurrency)
為什麼 context 它很重要?關鍵在於結構化並發。這是 Kotlin Coroutines 最重要的設計哲學:協程必須有父子關係。
- 父協程取消,子協程會全部被取消。
- 父協程會等待所有子協程完成,才會視為完成。
- 子協程拋出例外,會向上傳遞導致父協程取消 (除非使用
SupervisorJob)。
這避免了「孤兒協程 (Dangling Coroutines)」在背景偷跑或是吞掉錯誤的問題。
如何在協程中查看 Context?
在協程內部,你可以直接存取一個內建的變數 coroutineContext:
scope.launch {
println(coroutineContext) // 可以看到 Job、Dispatcher 等信息
}
例外處理 (Exception Handling)
在協程中,例外處理機制有分兩類:自動傳播 (launch) 與 暴露给使用者 (async)。
1. 傳播機制
預設情況下,如果子協程發生例外且沒被 Catch,它會取消父協程,進而取消所有兄弟協程。
launch {
launch { throw Exception("Oops!") } // 子協程拋出例外
launch { println("Create sibling") } // 這裡會被連帶取消,不會印出
}
2. 使用 try-catch
這是最直觀的處理方式。
launch:在 Block 裡面 try-catch。async:在await()的時候 try-catch。
launch {
try {
doSomething()
} catch (e: Exception) {
println("Caught: $e")
}
}
val deferred = async { throw Exception("Async Error") }
try {
deferred.await() // 這裡才會拋出例外
} catch (e: Exception) {
println("Caught from async: $e")
}
3. CoroutineExceptionHandler (CEH)
這是一個 Context 元素,用來當作「全域例外捕捉器」。但注意:它只對根協程 (Root Coroutine) 有效。
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
// 必須放在 Root launch 的 Context 中
scope.launch(handler) {
throw Exception("Error!")
}
監督機制 (Supervision)
標準的結構化並發是「一人出事,全家連坐」。但在 UI 開發中,我們不希望一個 Request 失敗導致整個畫面崩潰。這時就需要 SupervisorJob。
SupervisorJob & supervisorScope
使用 SupervisorJob 時,子協程的失敗不會傳播給父協程,也不會取消其他兄弟協程。
val scope = CoroutineScope(SupervisorJob())
scope.launch {
launch {
throw Exception("Task 1 failed") // 只有自己掛掉
}
launch {
delay(100)
println("Task 2 is still alive!") // 這會正常執行
}
}
viewModelScope 中,預設就使用了 SupervisorJob,所以一個操作失敗不會導致 ViewModel 裡的其他操作也被取消。取消與超時 (Cancellation & Timeouts)
所有 launch 都會回傳一個 Job。你可以呼叫 job.cancel() 來取消任務。
val job = scope.launch {
while (isActive) { // 檢查是否還活著
// 做事...
}
}
job.cancel()
isActive 或沒有呼叫 suspend 函式 (如 yield() 或 delay()),它是無法被取消的。處理超時 (withTimeout)
如果你希望某個任務執行太久就取消,可以使用 withTimeout 或 withTimeoutOrNull。
try {
withTimeout(1000) { // 限時 1 秒
delay(2000) // 模擬耗時操作
}
} catch (e: TimeoutCancellationException) {
println("Time's up!")
}
// withTimeoutOrNull 會回傳 null 而不是拋出例外,更優雅
val result = withTimeoutOrNull(1000) { "Success" } ?: "Fallback"
執行緒安全 (Concurrency Safety)
當多個協程同時修改同一個變數時,也會發生 Race Condition。Kotlin 提供了 Mutex (Mutual Exclusion) 來解決這個問題,它是 synchronized 或 Lock 的協程版本 (非阻塞)。
val mutex = Mutex()
var counter = 0
repeat(100) {
launch {
// 使用 withLock 來確保 critical section 的安全
mutex.withLock {
counter++
}
}
}
異步串流:Flow
如果你需要回傳「多個值」怎麼辦?(例如下載進度條、即時股票報價)。
List<T> 是一次給全部,Sequence<T> 是同步的。
非同步的多個值,就是 Flow (冷串流)。
建立 Flow
fun countdown(): Flow<Int> = flow {
for (i in 5 downTo 1) {
emit(i) // 發送(推送)資料給收集者
delay(1000)
}
}
emit 關鍵字必須與 flow { ... } 構建器(或是類似的發送環境)搭配使用。在 Flow 的模型中,flow { ... } 區塊定義了一個生產者。當你想從這個生產者「推」出一個資料給訂閱者(消費者)時,就會呼叫 emit。
收集 Flow (Collect)
Flow 是 冷 (Cold) 的,你不 collect 它,裡面的 code 根本不會跑。
launch {
countdown()
.map { "剩餘 $it 秒" } // 支援各種操作元
.collect { value ->
println(value)
}
}
Flow 的強大之處在於它也支援 Backpressure (背壓) 處理與豐富的運算子,完全可以取代 RxJava。
常用運算子 (Common Operators)
Flow 的操作分為「中間操作」與「終端操作」。
1. 中間操作 (Intermediate Operators)
這些函式不會馬上執行,而是回傳一個新的 Flow (Cold Stream)。
map:轉換資料型態。filter:過濾資料。onEach:對每個元素執行操作(不改變資料流),常用於 logging。onCompletion:Flow 結束時執行 (無論成功或失敗)。flowOn:非常重要! 用來切換「上游」執行的 Dispatcher。
flow {
emit(1) // 這裡會在 IO Thread 執行 (因為下面的 flowOn)
}
.map { it * 2 }
.onEach { println("Processing $it") }
.onCompletion { cause -> if (cause == null) println("Done") }
.flowOn(Dispatchers.IO) // 影響它上面的操作
.collect {
// 這裡依舊在原本的 Thread (e.g. Main)
println("Result: $it")
}
2. 終端操作 (Terminal Operators)
這些函式會 觸發 Flow 開始運作,並等待結果 (Suspend Function)。
collect:最基本的收集,處理每一個值。first:只取第一個值,然後直接取消 Flow。single:預期 Flow 只有一個值,如果有更多值會拋出 Exception。toList:將所有資料收集成一個 List。
val value = flow.first() // 取得第一個就結束
val list = flow.toList() // 全部跑完存成 List
熱流 (Hot Stream):StateFlow 與 SharedFlow
前面介紹的 Flow 是 冷流 (Cold Stream),只有被訂閱 (Collect) 時才會開始運作,且每個訂閱者都會拿到一份獨立的資料流。
但在 App 開發 (特別是 UI 層) 中,我們常需要 熱流 (Hot Stream):
- 狀態保持 (State Holding):無論有無訂閱者,資料都存在 (如 UI 顯示的當前 User)。
- 事件廣播 (Multicasting):一個事件發生,多個頁面同時收到。
- 持續活躍:不管有沒有人聽,資料流都在運作。
1. StateFlow (狀態容器)
它是用來取代 LiveData 的最佳方案。它永遠會有一個初始值,且只會發送 有變化 (Distinct) 的值。
- 用途:UI 狀態 (Loading, Success, Error)。
- 特性:
- Hot:立即活躍。
- Sticky:新的訂閱者會馬上收到「當前最新」的值。
- Distinct:如果寫入相同的值,不會觸發更新。
通常我們會遵循 Backing Property 模式:內部使用 MutableStateFlow 來改變狀態 (讀寫),對外暴露 StateFlow (唯讀) 以確保狀態封裝。
class MainViewModel : ViewModel() {
// Backing Property: 內部可變 (Mutable),負責更新資料
private val _uiState = MutableStateFlow("Loading")
// Public Property: 對外唯讀 (Immutable),保護資料不被外部亂改
val uiState = _uiState.asStateFlow()
fun updateData(newData: String) {
// 更新狀態:賦予新值,StateFlow 會自動通知所有訂閱者
_uiState.value = newData
}
}
// 在 Activity/Fragment 中收集
lifecycleScope.launch {
// repeatOnLifecycle 確保在背景時暫停收集,節省資源
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
println("Current State: $state")
}
}
}
2. SharedFlow (事件流)
它是更底層、配置更靈活的熱流。
- 用途:單次事件 (One-shot Events),如顯示 Toast、導航事件、Server Push 廣播。
- 特性:
- No Initial Value:預設沒有初始值。
- Configurable Replay:可以設定
replay參數來決定新訂閱者能收到幾筆「過去」的資料 (預設 0)。
class EventViewModel : ViewModel() {
private val _events = MutableSharedFlow<String>()
val events = _events.asSharedFlow()
fun triggerEvent() {
viewModelScope.launch {
_events.emit("Show Toast!")
}
}
}
3. 比較總結
| 特性 | Flow | StateFlow | SharedFlow |
|---|---|---|---|
| 類型 | Cold (冷流) | Hot (熱流) | Hot (熱流) |
| 初始值 | 無 | 必須有 | 無 |
| 資料重播 | 無 (每次重跑) | 1 (Sticky) | 可設定 (預設 0) |
| 用途 | 耗時任務、資料流 | UI 狀態 (State) | 單次事件 (Event) |
| LiveData 替代 | 否 | 是 (完全替代) | 是 (替代 SingleLiveEvent) |
補充:安全更新 MutableStateFlow
雖然可以直接設定 _uiState.value = newData,但如果你的更新依賴於「舊值」 (例如計數器 +1),建議使用 update 函式來確保 原子性 (Atomicity) 與 執行緒安全。
// 不安全,在多執行緒下可能會有 Race Condition
_count.value = _count.value + 1
// 安全,保證原子性
_count.update { it + 1 }
通道 (Channels):協程間的溝通
Channel 就像是協程之間的 管線 (Pipe) 或 隊列 (Queue)。
一個協程負責 send 資料,另一個協程負責 receive 資料。
它與 Flow 最大的不同在於:
- Channel 是 Hot 的:寄件者送出資料後,如果沒人收,它可能會暫停 (Suspend) 等待,直到有人收走 (視 Buffer 設定而定)。
- 點對點 (Point-to-Point):一個資料只會被一個接收者拿走 (Flow 則是每個 Collector 都會收到一份)。
val channel = Channel<Int>()
launch {
// Producer
for (x in 1..5) {
channel.send(x * x)
println("Sent: ${x * x}")
}
channel.close() // 結束發送
}
launch {
// Consumer
// 使用 for loop 可以持續接收直到 close
for (y in channel) {
println("Received: $y")
}
println("Done!")
}
Channel 的類型 (Buffer Capacity)
建立 Channel 時可以指定 capacity:
- Rendezvous (預設):容量為 0。
send會暫停直到有人receive(不見不散)。 - Buffered:有固定容量。
send只有在滿了的時候才會暫停。 - Unlimited:無限容量 (
LinkedList)。send永遠不會暫停 (小心 OutOfMemory)。 - Conflated:容量為 1,但策略是 舊的資料會被覆蓋,接收者永遠只拿到最新的。
什麼時候用 Channel vs Flow?
- Flow:適用於 資料流 (Stream) 的分發,例如感測器數據、資料庫更新。通常是一對多 (Multicast)。
- Channel:適用於 生產者-消費者 (Producer-Consumer) 模式,或者兩個協程之間單純的訊息傳遞。通常是一對一 (Unicast)。