Python AsyncIO 非同步程式設計
在 Python 3.4 之後,Python 引入了標準庫 asyncio,正式支援了 協程 (Coroutines) 與 非同步 I/O (Asynchronous I/O)。這讓 Python 在處理高併發的網路請求(如爬蟲、Web Server)時,效能有了質的飛躍。
本文將從基礎觀念一路深入到底層的 Event Loop 機制。
基礎觀念:同步 vs 非同步
- 同步 (Synchronous):程式一行一行執行。如果遇到 I/O (如請求網頁),CPU 會「卡住」等待,直到資料回來。
- 非同步 (Asynchronous):遇到 I/O 時,程式會掛起該任務,讓 CPU 去執行其他任務。等到 I/O 完成,再回來繼續執行。
實際效能對比
讓我們用一個簡單的例子來模擬 I/O 操作:
import time
import asyncio
def sync_sleep():
time.sleep(1) # 卡住 1 秒
async def async_sleep():
await asyncio.sleep(1) # 讓出控制權 1 秒
def run_sync():
start = time.time()
for _ in range(3):
sync_sleep() # 總共卡 3 次
print(f"Sync Total: {time.time() - start:.2f}s") # 約 3.0s
async def run_async():
start = time.time()
# 同時並發執行 3 個任務
await asyncio.gather(async_sleep(), async_sleep(), async_sleep())
print(f"Async Total: {time.time() - start:.2f}s") # 約 1.0s
if __name__ == "__main__":
run_sync()
asyncio.run(run_async())
結論:非同步程式在 I/O 密集型任務中,能避免 CPU 閒置,大幅提升吞吐量。
Event Loop 機制詳解
Event Loop (事件迴圈) 是 AsyncIO 的心臟。它是一個無限迴圈,負責監聽 I/O 事件(如 Socket 可讀寫)並排程執行 Coroutine。
Python 提供了高階與低階兩種方式來管理 Event Loop。
現代寫法 (Python 3.7+)
推薦使用 asyncio.run()。它會自動建立新的 Loop,執行傳入的 Coroutine,最後關閉 Loop。
async def main():
await asyncio.sleep(1)
print("Done")
if __name__ == "__main__":
# 自動管理 Loop 的生命週期
asyncio.run(main())
底層寫法 (Legacy / Advanced)
在某些舊程式碼或與 GUI 框架整合時,你可能需要手動控制 Loop。
loop.run_until_complete(future):執行直到該 Future/Coroutine 完成。loop.run_forever():一直執行,直到呼叫loop.stop()。常用於伺服器。
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close() # 務必手動關閉
伺服器模式 (Run Forever)
如果你在寫一個 TCP Server,通常會用到 run_forever。
# 伺服器啟動範例 (概念)
try:
server = loop.run_until_complete(start_server())
print("Server running...")
loop.run_forever() # 阻塞在這裡,直到 loop.stop() 被呼叫
except KeyboardInterrupt:
pass
finally:
# 優雅關閉 (Graceful Shutdown)
loop.run_until_complete(server.wait_closed())
loop.close()
解決 Blocking:使用 Executor
這是 AsyncIO 開發中最重要的一點。
千萬不要在 async 函式中呼叫會「卡住」的同步函式 (如 time.sleep, requests.get)。 因為這會卡死單一執行緒的 Event Loop,導致所有其他任務都暫停。
當你不得不使用同步函式時,應使用 loop.run_in_executor() 將任務交給執行緒池(Threads)或進程池(Processes)執行。
ThreadPoolExecutor (I/O 密集型)
適合處理網路請求、檔案讀寫等 I/O 密集型任務。
import asyncio
import concurrent.futures
import requests
def blocking_io():
# 這是同步的 Blocking Call
return requests.get("https://www.google.com").status_code
async def main():
loop = asyncio.get_running_loop()
# 建立 ThreadPool (預設也會由 asyncio 管理)
with concurrent.futures.ThreadPoolExecutor() as pool:
# 第一個參數通常是 pool
result = await loop.run_in_executor(pool, blocking_io)
print(f"Status Code: {result}")
asyncio.run(main())
ProcessPoolExecutor (CPU 密集型)
如果你需要進行大量的數學計算、圖像處理或加密解密(CPU 密集型任務),Python 的 GIL 會限制執行緒的效能。這時應改用進程池,讓任務在獨立的 CPU 核心上執行。
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_task(n):
# 模擬耗時的 CPU 計算
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
# 使用 ProcessPoolExecutor 避開 GIL
with ProcessPoolExecutor() as pool:
# 將計算任務丟到另一個進程跑
result = await loop.run_in_executor(pool, cpu_bound_task, 10**7)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
ProcessPoolExecutor 時,必須確保啟動程式碼放在 if __name__ == "__main__": 區塊內,以避免在 Windows 或 macOS 上產生多進程啟動的循環報錯。Task 與並發管理
Coroutine vs Task
- Coroutine:只是一個產生器函式,呼叫它不會馬上執行。
- Task:將 Coroutine 包裝後丟入 Event Loop 排程。一旦建立 Task,它就會開始跑。
建立 Task
# 建立後會立即被排程
task = asyncio.create_task(my_coro())
await task # 等待結果
現代化的任務管理:TaskGroup (Python 3.11+)
在 Python 3.11 之後,官方推薦使用 asyncio.TaskGroup 來管理多個併發任務。它提供了「結構化併發」(Structured Concurrency),如果其中一個任務失敗掛掉,它會確保其他任務也能被正確取消且異常會統一拋出。
async def main():
async with asyncio.TaskGroup() as tg:
# 使用 tg.create_task 建立任務
t1 = tg.create_task(fetch_data(1))
t2 = tg.create_task(fetch_data(2))
# 離開 async with 塊時,會自動等待所有任務完成
# 這裡可以安全地取得結果
print(f"Finished: {t1.result()}, {t2.result()}")
asyncio.run(main())
保護任務 (Shield)
有時候我們取消了父任務(例如按了 Ctrl+C),但不希望某個關鍵的子任務被中斷(例如寫入 Log)。
try:
# 如果 main() 被取消,important_task 依然會繼續執行
await asyncio.shield(important_task())
except asyncio.CancelledError:
print("Main task cancelled, but shield protected the inner task")
收集多個任務 (Gather vs As Completed)
gather:全部做完才回傳。如果傳入return_exceptions=True,某個任務報錯時不會立即中斷,而是將異常當作結果回傳。as_completed:做完一個回傳一個 (適合串流處理)。
tasks = [download(url1), download(url2)]
# Gather (進階用法:包容異常)
results = await asyncio.gather(*tasks, return_exceptions=True)
# As Completed
for coro in asyncio.as_completed(tasks):
result = await coro
print("One download finished:", result)
等待特定條件:asyncio.wait
如果你需要更細緻的控制,例如「只要第一個任務完成就往下走」,可以使用 asyncio.wait。
tasks = {asyncio.create_task(work()) for _ in range(3)}
# return_when=asyncio.FIRST_COMPLETED (第一個完成即返回)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# 取消剩餘還在跑的任務 (Good Practice)
for t in pending:
t.cancel()
設定超時 (Timeout)
Python 3.11 引入了更優雅的 asyncio.timeout 上下文管理器。
try:
async with asyncio.timeout(2.0): # 超過 2 秒會拋出 TimeoutError
await long_running_task()
except TimeoutError:
print("任務超時了!")
實戰範例:生產者-消費者 (Queue)
這是處理大量並費用最穩健的模式。
import asyncio
import random
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(random.random())
item = f"Data-{i}"
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # 結束信號
async def consumer(queue, id):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break # 收到結束信號
print(f"Consumer-{id} got: {item}")
await asyncio.sleep(0.5)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 啟動 1 個生產者,2 個消費者
await asyncio.gather(
producer(queue, 5),
consumer(queue, 1),
consumer(queue, 2)
)
asyncio.run(main())
ContextVars (上下文變數)
在 AsyncIO 中,傳統的 threading.local() 無法正常工作,因為多個協程 (Coroutine) 可能在同一個執行緒中切換執行,導致變數會混在一起。
Python 3.7 引入了 contextvars 來解決這個問題,它提供了對協程友善的 Thread Local 替代方案。
import asyncio
import contextvars
# 宣告一個 ContextVar
request_id = contextvars.ContextVar("request_id")
async def process_request(id):
# 設定變數 (只對當前 Context 有效)
request_id.set(id)
await perform_db_query()
async def perform_db_query():
# 讀取變數
# 即使在同一個執行緒,也能讀到正確的 id
print(f"Processing Request ID: {request_id.get()}")
async def main():
# 併發執行,彼此的 contextvars 是獨立的
await asyncio.gather(
process_request("REQ-001"),
process_request("REQ-002")
)
asyncio.run(main())
# Output (順序可能不同):
# Processing Request ID: REQ-001
# Processing Request ID: REQ-002
非同步上下文管理器 (Async Context Manager) 與迭代器 (Async Iterator)
類比於 with 和 for,AsyncIO 也有對應的非同步版本:async with 和 async for。
Async Context Manager
定義了 __aenter__ 和 __aexit__ 方法。
class AsyncResource:
async def __aenter__(self):
print("Async entering...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc, tb):
print("Async exiting...")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as r:
print("Inside block")
Async Iterator
定義了 __aiter__ 和 __anext__ 方法。
class AsyncCounter:
def __init__(self, stop):
self.stop = stop
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.stop:
raise StopAsyncIteration
self.count += 1
await asyncio.sleep(0.1) # 模擬 IO
return self.count
async def main():
async for num in AsyncCounter(3):
print(num)
限制併發數 (Semaphore)
asyncio.gather 會同時啟動所有任務。如果你同時發起 1000 個 HTTP 請求,可能會因為開啟太多 Socket 而報錯。這時可以使用 asyncio.Semaphore 來限制同時執行的任務數量。
import asyncio
import random
async def worker(sem, i):
async with sem: # 只有拿到鎖的才能進入
print(f"Worker {i} is working...")
await asyncio.sleep(random.uniform(0.5, 1.5))
print(f"Worker {i} finished")
async def main():
# 限制同時只能有 3 個 Worker 執行
sem = asyncio.Semaphore(3)
tasks = [worker(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
除錯技巧 (Debugging)
AsyncIO 程式有時候很難除錯(例如 Deadlock 或忘記 await)。可以開啟 Debug 模式:
# 這樣會啟用額外的檢查 (如 thread safety, slow callback 警告)
asyncio.run(main(), debug=True)
或是設定環境變數 PYTHONASYNCIODEBUG=1。