Python 多行程 (Multiprocessing) 與平行運算

Threading 中我們提到,由於 GIL 的存在,Python 的多執行緒無法利用多核 CPU 進行平行運算。如果你的任務是 CPU 密集型 (CPU-bound) 的(例如:影像處理、數據分析、複雜計算),那麼 多行程 (Multiprocessing) 才是正確的解決方案。

什麼是行程 (Process)?

行程是作業系統資源分配的最小單位。

  • 獨立記憶體:每個 Process 都有自己獨立的記憶體空間。這意味著變數不共享(這點與 Thread 不同)。
  • 避開 GIL:因為每個 Process 都有自己獨立的 Python 解譯器實例,所以每個 Process 都有自己的 GIL。這讓它們可以真正的同時在多個 CPU 核心上執行。

使用 multiprocessing.Process

基本的用法與 threading.Thread 非常相似:

import multiprocessing
import time
import os

def worker(num):
    # os.getpid() 可以取得當前行程的 ID
    print(f"Worker {num} (PID: {os.getpid()}) 開始工作")
    time.sleep(1)
    print(f"Worker {num} 結束")

if __name__ == '__main__':
    # 注意:在 Windows 上,使用 multiprocessing 必須將程式碼放在 if __name__ == '__main__': 區塊內
    # 否則會造成無限遞迴建立 Process 的錯誤
    
    p1 = multiprocessing.Process(target=worker, args=(1,))
    p2 = multiprocessing.Process(target=worker, args=(2,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    print("主程序結束")

使用 Process Pool (進程池)

如果你需要同時執行大量的任務,手動建立和管理 Process 會很麻煩。這時可以使用 Pool 來管理一組固定的 Worker。

from multiprocessing import Pool
import time

def heavy_calculation(x):
    return x * x

if __name__ == '__main__':
    # 建立一個包含 4 個 Worker 的 Pool
    # 如果不指定 processes 參數,預設會使用 CPU 核心數 (os.cpu_count())
    with Pool(processes=4) as pool:
        
        # 1. 使用 map (同步方式,會等待所有結果回傳)
        # 就像內建的 map(),但會自動分配給不同的 Process 執行
        results = pool.map(heavy_calculation, [1, 2, 3, 4, 5])
        print(f"Map results: {results}") 
        # [1, 4, 9, 16, 25]

        # 2. 使用 apply_async (非同步方式)
        res = pool.apply_async(heavy_calculation, (10,))
        print(f"Async result: {res.get()}") # get() 會阻塞直到結果出來
        # 100

行程間通訊 (IPC)

由於 Process 之間記憶體不共享,如果要在它們之間傳遞資料(例如 Worker 算完的結果要傳回主程式,或是 Worker 之間要溝通),需要透過特殊的 IPC (Inter-Process Communication) 機制。

最常用的是 Queue (佇列) 和 Pipe (管道)。

使用 Queue

multiprocessing.Queue 是一個執行緒與行程安全的佇列,可以用來在多個 Process 間傳遞訊息。

from multiprocessing import Process, Queue

def producer(q):
    q.put("Hello")
    q.put("World")
    q.put(None) # 結束信號

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"收到: {item}")

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

使用 Pipe (管道)

multiprocessing.Pipe() 會回傳一對連接物件 (conn1, conn2),這兩端預設是雙向的 (Duplex)。Pipe 通常比 Queue 速度快,但如果是多個行程同時讀寫同一個端點,需要自己處理 Lock,而 Queue 則是已經處理好的。

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from Pipe")
    conn.close()

if __name__ == '__main__':
    # 建立管道,回傳兩個連接物件
    parent_conn, child_conn = Pipe()
    
    p = Process(target=sender, args=(child_conn,))
    p.start()
    
    # 主程式從 parent_conn 接收資料
    print(f"收到: {parent_conn.recv()}") 
    # 收到: Hello from Pipe
    
    p.join()

行程同步與鎖 (Lock)

雖然 Process 擁有獨立的記憶體空間,但它們有時候仍需要存取共享的資源(例如:螢幕輸出、檔案、資料庫)。這時候如果沒有適當的同步機制,輸出的內容可能會混雜在一起。

使用 multiprocessing.Lock 可以確保一段程式碼在同一時間只有一個 Process 能執行。

from multiprocessing import Process, Lock
import time

def printer(item, lock):
    lock.acquire()
    try:
        print(f"列印: {item}")
        time.sleep(0.5) # 模擬耗時操作
    finally:
        lock.release()

# 比較簡潔的寫法是使用 with
def printer_with_context(item, lock):
    with lock:
        print(f"列印: {item}")
        time.sleep(0.5)

if __name__ == '__main__':
    lock = Lock()
    items = ['A', 'B', 'C', 'D']
    processes = []
    
    for item in items:
        p = Process(target=printer_with_context, args=(item, lock))
        processes.append(p)
        p.start()
        
    for p in processes:
        p.join()

共享記憶體 (Shared Memory)

雖然我們說 Process 的記憶體是獨立的,但 multiprocessing 模組提供了一些特殊的物件,允許在 Process 之間共享數據。這通常比使用 Queue 或 Pipe 更快,但也更危險(因為需要處理 Race Condition)。

主要有兩種:Value (單一值) 和 Array (陣列)。

注意:操作共享記憶體通常不是原子操作 (Atomic Operation),所以務必配合 Lock 使用。

from multiprocessing import Process, Value, Array, Lock

def worker(n, a, lock):
    with lock:
        n.value += 1
        for i in range(len(a)):
            a[i] = -a[i]

if __name__ == '__main__':
    lock = Lock()
    
    # Value('i', 0): 建立一個整數 (integer),初始值為 0
    # 'i' 是 type code,代表 signed int
    # 'd' 代表 double float
    num = Value('i', 0)
    
    # Array('i', ...): 建立一個整數陣列
    arr = Array('i', range(10))

    p = Process(target=worker, args=(num, arr, lock))
    p.start()
    p.join()

    print(f"Num: {num.value}")
    # Num: 1
    
    print(f"Arr: {arr[:]}")
    # Arr: [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Threading vs Multiprocessing 比較

特性Threading (多執行緒)Multiprocessing (多行程)
記憶體共享 (需注意 Race Condition)獨立 (需透過 IPC 溝通)
GIL 影響受 GIL 限制,無法多核平行避開 GIL,可多核平行
建立成本高 (需複製記憶體空間)
適用場景I/O 密集型 (爬蟲、Web Request)CPU 密集型 (運算、轉檔)
通訊難度簡單 (直接讀寫變數)較難 (Queue, Pipe)

總結

  • 如果你的程式很慢是因為一直在算數學 (CPU 高附載),請用 multiprocessing
  • 如果你的程式很慢是因為一直在等網路/硬碟 (CPU 閒置),請用 Threadingasyncio