Python 多行程 (Multiprocessing) 與平行運算
在 Threading 中我們提到,由於 GIL 的存在,Python 的多執行緒無法利用多核 CPU 進行平行運算。如果你的任務是 CPU 密集型 (CPU-bound) 的(例如:影像處理、數據分析、複雜計算),那麼 多行程 (Multiprocessing) 才是正確的解決方案。
什麼是行程 (Process)?
行程是作業系統資源分配的最小單位。
- 獨立記憶體:每個 Process 都有自己獨立的記憶體空間。這意味著變數不共享(這點與 Thread 不同)。
- 避開 GIL:因為每個 Process 都有自己獨立的 Python 解譯器實例,所以每個 Process 都有自己的 GIL。這讓它們可以真正的同時在多個 CPU 核心上執行。
使用 multiprocessing.Process
基本的用法與 threading.Thread 非常相似:
import multiprocessing
import time
import os
def worker(num):
# os.getpid() 可以取得當前行程的 ID
print(f"Worker {num} (PID: {os.getpid()}) 開始工作")
time.sleep(1)
print(f"Worker {num} 結束")
if __name__ == '__main__':
# 注意:在 Windows 上,使用 multiprocessing 必須將程式碼放在 if __name__ == '__main__': 區塊內
# 否則會造成無限遞迴建立 Process 的錯誤
p1 = multiprocessing.Process(target=worker, args=(1,))
p2 = multiprocessing.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
print("主程序結束")
使用 Process Pool (進程池)
如果你需要同時執行大量的任務,手動建立和管理 Process 會很麻煩。這時可以使用 Pool 來管理一組固定的 Worker。
from multiprocessing import Pool
import time
def heavy_calculation(x):
return x * x
if __name__ == '__main__':
# 建立一個包含 4 個 Worker 的 Pool
# 如果不指定 processes 參數,預設會使用 CPU 核心數 (os.cpu_count())
with Pool(processes=4) as pool:
# 1. 使用 map (同步方式,會等待所有結果回傳)
# 就像內建的 map(),但會自動分配給不同的 Process 執行
results = pool.map(heavy_calculation, [1, 2, 3, 4, 5])
print(f"Map results: {results}")
# [1, 4, 9, 16, 25]
# 2. 使用 apply_async (非同步方式)
res = pool.apply_async(heavy_calculation, (10,))
print(f"Async result: {res.get()}") # get() 會阻塞直到結果出來
# 100
行程間通訊 (IPC)
由於 Process 之間記憶體不共享,如果要在它們之間傳遞資料(例如 Worker 算完的結果要傳回主程式,或是 Worker 之間要溝通),需要透過特殊的 IPC (Inter-Process Communication) 機制。
最常用的是 Queue (佇列) 和 Pipe (管道)。
使用 Queue
multiprocessing.Queue 是一個執行緒與行程安全的佇列,可以用來在多個 Process 間傳遞訊息。
from multiprocessing import Process, Queue
def producer(q):
q.put("Hello")
q.put("World")
q.put(None) # 結束信號
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"收到: {item}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
使用 Pipe (管道)
multiprocessing.Pipe() 會回傳一對連接物件 (conn1, conn2),這兩端預設是雙向的 (Duplex)。Pipe 通常比 Queue 速度快,但如果是多個行程同時讀寫同一個端點,需要自己處理 Lock,而 Queue 則是已經處理好的。
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from Pipe")
conn.close()
if __name__ == '__main__':
# 建立管道,回傳兩個連接物件
parent_conn, child_conn = Pipe()
p = Process(target=sender, args=(child_conn,))
p.start()
# 主程式從 parent_conn 接收資料
print(f"收到: {parent_conn.recv()}")
# 收到: Hello from Pipe
p.join()
行程同步與鎖 (Lock)
雖然 Process 擁有獨立的記憶體空間,但它們有時候仍需要存取共享的資源(例如:螢幕輸出、檔案、資料庫)。這時候如果沒有適當的同步機制,輸出的內容可能會混雜在一起。
使用 multiprocessing.Lock 可以確保一段程式碼在同一時間只有一個 Process 能執行。
from multiprocessing import Process, Lock
import time
def printer(item, lock):
lock.acquire()
try:
print(f"列印: {item}")
time.sleep(0.5) # 模擬耗時操作
finally:
lock.release()
# 比較簡潔的寫法是使用 with
def printer_with_context(item, lock):
with lock:
print(f"列印: {item}")
time.sleep(0.5)
if __name__ == '__main__':
lock = Lock()
items = ['A', 'B', 'C', 'D']
processes = []
for item in items:
p = Process(target=printer_with_context, args=(item, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
共享記憶體 (Shared Memory)
雖然我們說 Process 的記憶體是獨立的,但 multiprocessing 模組提供了一些特殊的物件,允許在 Process 之間共享數據。這通常比使用 Queue 或 Pipe 更快,但也更危險(因為需要處理 Race Condition)。
主要有兩種:Value (單一值) 和 Array (陣列)。
注意:操作共享記憶體通常不是原子操作 (Atomic Operation),所以務必配合 Lock 使用。
from multiprocessing import Process, Value, Array, Lock
def worker(n, a, lock):
with lock:
n.value += 1
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
lock = Lock()
# Value('i', 0): 建立一個整數 (integer),初始值為 0
# 'i' 是 type code,代表 signed int
# 'd' 代表 double float
num = Value('i', 0)
# Array('i', ...): 建立一個整數陣列
arr = Array('i', range(10))
p = Process(target=worker, args=(num, arr, lock))
p.start()
p.join()
print(f"Num: {num.value}")
# Num: 1
print(f"Arr: {arr[:]}")
# Arr: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Threading vs Multiprocessing 比較
| 特性 | Threading (多執行緒) | Multiprocessing (多行程) |
|---|---|---|
| 記憶體 | 共享 (需注意 Race Condition) | 獨立 (需透過 IPC 溝通) |
| GIL 影響 | 受 GIL 限制,無法多核平行 | 避開 GIL,可多核平行 |
| 建立成本 | 低 | 高 (需複製記憶體空間) |
| 適用場景 | I/O 密集型 (爬蟲、Web Request) | CPU 密集型 (運算、轉檔) |
| 通訊難度 | 簡單 (直接讀寫變數) | 較難 (Queue, Pipe) |