Python AsyncIO 非同步程式設計

在 Python 3.4 之後,Python 引入了標準庫 asyncio,正式支援了 協程 (Coroutines)非同步 I/O (Asynchronous I/O)。這讓 Python 在處理高併發的網路請求(如爬蟲、Web Server)時,效能有了質的飛躍。

本文將從基礎觀念一路深入到底層的 Event Loop 機制。

基礎觀念:同步 vs 非同步

  • 同步 (Synchronous):程式一行一行執行。如果遇到 I/O (如請求網頁),CPU 會「卡住」等待,直到資料回來。
  • 非同步 (Asynchronous):遇到 I/O 時,程式會掛起該任務,讓 CPU 去執行其他任務。等到 I/O 完成,再回來繼續執行。

實際效能對比

讓我們用一個簡單的例子來模擬 I/O 操作:

import time
import asyncio

def sync_sleep():
    time.sleep(1) # 卡住 1 秒

async def async_sleep():
    await asyncio.sleep(1) # 讓出控制權 1 秒

def run_sync():
    start = time.time()
    for _ in range(3):
        sync_sleep() # 總共卡 3 次
    print(f"Sync Total: {time.time() - start:.2f}s") # 約 3.0s

async def run_async():
    start = time.time()
    # 同時並發執行 3 個任務
    await asyncio.gather(async_sleep(), async_sleep(), async_sleep())
    print(f"Async Total: {time.time() - start:.2f}s") # 約 1.0s

if __name__ == "__main__":
    run_sync()
    asyncio.run(run_async())

結論:非同步程式在 I/O 密集型任務中,能避免 CPU 閒置,大幅提升吞吐量。

Event Loop 機制詳解

Event Loop (事件迴圈) 是 AsyncIO 的心臟。它是一個無限迴圈,負責監聽 I/O 事件(如 Socket 可讀寫)並排程執行 Coroutine。

Python 提供了高階與低階兩種方式來管理 Event Loop。

現代寫法 (Python 3.7+)

推薦使用 asyncio.run()。它會自動建立新的 Loop,執行傳入的 Coroutine,最後關閉 Loop。

async def main():
    await asyncio.sleep(1)
    print("Done")

if __name__ == "__main__":
    # 自動管理 Loop 的生命週期
    asyncio.run(main())

底層寫法 (Legacy / Advanced)

在某些舊程式碼或與 GUI 框架整合時,你可能需要手動控制 Loop。

  1. loop.run_until_complete(future):執行直到該 Future/Coroutine 完成。
  2. loop.run_forever():一直執行,直到呼叫 loop.stop()。常用於伺服器。
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    loop.run_until_complete(main())
finally:
    loop.close() # 務必手動關閉

伺服器模式 (Run Forever)

如果你在寫一個 TCP Server,通常會用到 run_forever

# 伺服器啟動範例 (概念)
try:
    server = loop.run_until_complete(start_server())
    print("Server running...")
    loop.run_forever() # 阻塞在這裡,直到 loop.stop() 被呼叫
except KeyboardInterrupt:
    pass
finally:
    # 優雅關閉 (Graceful Shutdown)
    loop.run_until_complete(server.wait_closed())
    loop.close()

解決 Blocking:使用 Executor

這是 AsyncIO 開發中最重要的一點。 千萬不要在 async 函式中呼叫會「卡住」的同步函式 (如 time.sleep, requests.get)。 因為這會卡死單一執行緒的 Event Loop,導致所有其他任務都暫停。

當你不得不使用同步函式時,應使用 loop.run_in_executor() 將任務交給執行緒池(Threads)或進程池(Processes)執行。

ThreadPoolExecutor (I/O 密集型)

適合處理網路請求、檔案讀寫等 I/O 密集型任務。

import asyncio
import concurrent.futures
import requests

def blocking_io():
    # 這是同步的 Blocking Call
    return requests.get("https://www.google.com").status_code

async def main():
    loop = asyncio.get_running_loop()

    # 建立 ThreadPool (預設也會由 asyncio 管理)
    with concurrent.futures.ThreadPoolExecutor() as pool:
        # 第一個參數通常是 pool
        result = await loop.run_in_executor(pool, blocking_io)
        print(f"Status Code: {result}")

asyncio.run(main())

ProcessPoolExecutor (CPU 密集型)

如果你需要進行大量的數學計算、圖像處理或加密解密(CPU 密集型任務),Python 的 GIL 會限制執行緒的效能。這時應改用進程池,讓任務在獨立的 CPU 核心上執行。

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_task(n):
    # 模擬耗時的 CPU 計算
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()

    # 使用 ProcessPoolExecutor 避開 GIL
    with ProcessPoolExecutor() as pool:
        # 將計算任務丟到另一個進程跑
        result = await loop.run_in_executor(pool, cpu_bound_task, 10**7)
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())
使用 ProcessPoolExecutor 時,必須確保啟動程式碼放在 if __name__ == "__main__": 區塊內,以避免在 Windows 或 macOS 上產生多進程啟動的循環報錯。

Task 與並發管理

Coroutine vs Task

  • Coroutine:只是一個產生器函式,呼叫它不會馬上執行。
  • Task:將 Coroutine 包裝後丟入 Event Loop 排程。一旦建立 Task,它就會開始跑。

建立 Task

# 建立後會立即被排程
task = asyncio.create_task(my_coro())
await task # 等待結果

現代化的任務管理:TaskGroup (Python 3.11+)

在 Python 3.11 之後,官方推薦使用 asyncio.TaskGroup 來管理多個併發任務。它提供了「結構化併發」(Structured Concurrency),如果其中一個任務失敗掛掉,它會確保其他任務也能被正確取消且異常會統一拋出。

async def main():
    async with asyncio.TaskGroup() as tg:
        # 使用 tg.create_task 建立任務
        t1 = tg.create_task(fetch_data(1))
        t2 = tg.create_task(fetch_data(2))
        # 離開 async with 塊時,會自動等待所有任務完成

    # 這裡可以安全地取得結果
    print(f"Finished: {t1.result()}, {t2.result()}")

asyncio.run(main())

保護任務 (Shield)

有時候我們取消了父任務(例如按了 Ctrl+C),但不希望某個關鍵的子任務被中斷(例如寫入 Log)。

try:
    # 如果 main() 被取消,important_task 依然會繼續執行
    await asyncio.shield(important_task())
except asyncio.CancelledError:
    print("Main task cancelled, but shield protected the inner task")

收集多個任務 (Gather vs As Completed)

  • gather:全部做完才回傳。如果傳入 return_exceptions=True,某個任務報錯時不會立即中斷,而是將異常當作結果回傳。
  • as_completed:做完一個回傳一個 (適合串流處理)。
tasks = [download(url1), download(url2)]

# Gather (進階用法:包容異常)
results = await asyncio.gather(*tasks, return_exceptions=True)

# As Completed
for coro in asyncio.as_completed(tasks):
    result = await coro
    print("One download finished:", result)

等待特定條件:asyncio.wait

如果你需要更細緻的控制,例如「只要第一個任務完成就往下走」,可以使用 asyncio.wait

tasks = {asyncio.create_task(work()) for _ in range(3)}

# return_when=asyncio.FIRST_COMPLETED (第一個完成即返回)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

# 取消剩餘還在跑的任務 (Good Practice)
for t in pending:
    t.cancel()

設定超時 (Timeout)

Python 3.11 引入了更優雅的 asyncio.timeout 上下文管理器。

try:
    async with asyncio.timeout(2.0): # 超過 2 秒會拋出 TimeoutError
        await long_running_task()
except TimeoutError:
    print("任務超時了!")

實戰範例:生產者-消費者 (Queue)

這是處理大量並費用最穩健的模式。

import asyncio
import random

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(random.random())
        item = f"Data-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None) # 結束信號

async def consumer(queue, id):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break # 收到結束信號

        print(f"Consumer-{id} got: {item}")
        await asyncio.sleep(0.5)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    # 啟動 1 個生產者,2 個消費者
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue, 1),
        consumer(queue, 2)
    )

asyncio.run(main())

ContextVars (上下文變數)

在 AsyncIO 中,傳統的 threading.local() 無法正常工作,因為多個協程 (Coroutine) 可能在同一個執行緒中切換執行,導致變數會混在一起。

Python 3.7 引入了 contextvars 來解決這個問題,它提供了對協程友善的 Thread Local 替代方案。

import asyncio
import contextvars

# 宣告一個 ContextVar
request_id = contextvars.ContextVar("request_id")

async def process_request(id):
    # 設定變數 (只對當前 Context 有效)
    request_id.set(id)
    await perform_db_query()

async def perform_db_query():
    # 讀取變數
    # 即使在同一個執行緒,也能讀到正確的 id
    print(f"Processing Request ID: {request_id.get()}")

async def main():
    # 併發執行,彼此的 contextvars 是獨立的
    await asyncio.gather(
        process_request("REQ-001"),
        process_request("REQ-002")
    )

asyncio.run(main())
# Output (順序可能不同):
# Processing Request ID: REQ-001
# Processing Request ID: REQ-002

非同步上下文管理器 (Async Context Manager) 與迭代器 (Async Iterator)

類比於 withfor,AsyncIO 也有對應的非同步版本:async withasync for

Async Context Manager

定義了 __aenter____aexit__ 方法。

class AsyncResource:
    async def __aenter__(self):
        print("Async entering...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Async exiting...")
        await asyncio.sleep(0.1)

async def main():
    async with AsyncResource() as r:
        print("Inside block")

Async Iterator

定義了 __aiter____anext__ 方法。

class AsyncCounter:
    def __init__(self, stop):
        self.stop = stop
        self.count = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.count >= self.stop:
            raise StopAsyncIteration
        self.count += 1
        await asyncio.sleep(0.1) # 模擬 IO
        return self.count

async def main():
    async for num in AsyncCounter(3):
        print(num)

限制併發數 (Semaphore)

asyncio.gather 會同時啟動所有任務。如果你同時發起 1000 個 HTTP 請求,可能會因為開啟太多 Socket 而報錯。這時可以使用 asyncio.Semaphore 來限制同時執行的任務數量。

import asyncio
import random

async def worker(sem, i):
    async with sem:  # 只有拿到鎖的才能進入
        print(f"Worker {i} is working...")
        await asyncio.sleep(random.uniform(0.5, 1.5))
    print(f"Worker {i} finished")

async def main():
    # 限制同時只能有 3 個 Worker 執行
    sem = asyncio.Semaphore(3)

    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

除錯技巧 (Debugging)

AsyncIO 程式有時候很難除錯(例如 Deadlock 或忘記 await)。可以開啟 Debug 模式:

# 這樣會啟用額外的檢查 (如 thread safety, slow callback 警告)
asyncio.run(main(), debug=True)

或是設定環境變數 PYTHONASYNCIODEBUG=1