Pydantic AI Graph Beta API:平行執行與合併 (Parallel Execution & Joins)

在設計進階的 AI 代理人工作流時,如果所有任務都是循序漸進的,那執行效率會非常低落。例如,當你需要分析一整份文件中的 10 個段落時,與其一個一個段落依序分析,不如同時啟動 10 個 AI Agent 來分別處理。

Pydantic Graph 的 Beta API 最強大的特色之一,就是它將平行處理變得極度簡單且具備宣告式的風格。

Map:平行處理列表資料

當你有一個陣列 (List) 或是可迭代 (Iterable) 的資料,且你希望陣列中的每一個元素都能觸發一次獨立的步驟執行時,你可以使用 .map() 語法。

在下面的範例中,我們會收到一個數字陣列,並平行地將每一個數字計算平方,最後把結果收集起來。

from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append

@dataclass
class ProcessingState:
    items_processed: int = 0

# 宣告 Builder:初始輸入為整數陣列,輸出也是整數陣列
g = GraphBuilder(
    state_type=ProcessingState,
    input_type=list[int],
    output_type=list[int],
)

# 定義一個計算平方的步驟。注意:這裡預期收到的 inputs 是一個單一的整數 (int)
@g.step
async def square(ctx: StepContext[ProcessingState, None, int]) -> int:
    # 修改共用狀態(框架內部會安全地處理非同步狀態的變更)
    ctx.state.items_processed += 1
    return ctx.inputs * ctx.inputs

Joins & Reducers:收集平行任務的結果

當你發散 (Fan-out) 了多個平行任務後,你需要一個地方來等待所有人完成,並把資料聚合起來 (Fan-in)。在 Beta API 中,這被稱為 Join 節點Reducer

Pydantic Graph 內建了一些實用的 Reducer,例如 reduce_list_append 可以幫你把所有平行執行的回傳值塞進一個新的 List 裡面。

# 建立一個 Join 節點,使用 reduce_list_append 來收集結果
# initial_factory=list[int] 表示收集容器的初始狀態是一個空的 int list
collect_results = g.join(reduce_list_append, initial_factory=list[int])

# 設定流程連線
g.add(
    # 關鍵點 1:從起點出發時,遇到陣列,使用 .map() 將陣列拆解並平行派發給 square 步驟
    g.edge_from(g.start_node).map().to(square),

    # 關鍵點 2:所有的 square 步驟完成後,將結果送往 collect_results 這個 Join 節點
    g.edge_from(square).to(collect_results),

    # 從 Join 節點走向終點
    g.edge_from(collect_results).to(g.end_node),
)

執行這個 Graph:

import asyncio

async def main():
    graph = g.build()
    state = ProcessingState()

    # 傳入包含 5 個數字的陣列作為初始 inputs
    result = await graph.run(state=state, inputs=[1, 2, 3, 4, 5])

    # 因為是非同步平行執行,回傳結果的順序可能不一定,所以我們使用 sorted 排序
    print(f'處理結果: {sorted(result)}') # 輸出: [1, 4, 9, 16, 25]
    print(f'總共處理了 {state.items_processed} 個項目') # 輸出: 5

if __name__ == "__main__":
    asyncio.run(main())

簡化寫法:add_mapping_edge

對於單純將一個節點的輸出陣列 mapping 給下一個節點的情況,Pydantic Graph 提供了一個語法糖 add_mapping_edge(),可以讓連線宣告更為簡潔:

# 原始的串接寫法
# g.edge_from(generate_list).map().to(square)

# 使用語法糖的等價寫法
g.add_mapping_edge(generate_list, square)

處理非同步資料流 (AsyncIterables)

更棒的是,.map() 不只支援一般的 List,還支援 Python 的非同步產生器 (AsyncIterable)。當你需要處理 Streaming 資料(例如一邊從資料庫撈取資料,一邊進行 AI 摘要分析)時,.map() 會在資料被 yield 出來的瞬間就動態建立一個平行任務,不必等所有資料都收集完才開始處理,這對於需要低延遲回應的串流應用非常實用。

你可以使用 @g.stream 裝飾器來建立一個非同步產生器節點:

import asyncio

# 使用 @g.stream 裝飾器定義一個會連續 yield 資料的節點
@g.stream
async def stream_data(ctx: StepContext[ProcessingState, None, None]):
    # 模擬每 0.5 秒從外部系統串流取得一筆資料
    for i in range(1, 4):
        await asyncio.sleep(0.5)
        yield i

@g.step
async def process_data(ctx: StepContext[ProcessingState, None, int]) -> str:
    return f"處理完成: {ctx.inputs}"

g.add(
    g.edge_from(g.start_node).to(stream_data),

    # 這裡的 .map() 會在 stream_data 每次 yield 出一筆資料的瞬間,
    # 立即平行啟動一個 process_data 任務,無需等待全部資料載入完畢!
    g.edge_from(stream_data).map().to(process_data),

    g.edge_from(process_data).to(collect_results),
)

Broadcast:將單一資料廣播給多個步驟

.map() 拆解陣列的行為不同,當你需要將「同一筆」資料同時送給多條平行的獨立路徑去執行不同的任務時,這就是「廣播 (Broadcast)」的概念。

在 Beta API 中,廣播的語法非常直觀,你只需要在 .to() 裡面傳入多個目標步驟,它就會自動將相同的輸入資料,同時複製發送給這些步驟平行執行:

# 定義兩個會接收相同輸入 (字串) 的平行步驟
@g.step
async def analyze_sentiment(ctx: StepContext[ProcessingState, None, str]) -> str:
    return f"情緒分析: {ctx.inputs}"

@g.step
async def extract_keywords(ctx: StepContext[ProcessingState, None, str]) -> str:
    return f"關鍵字萃取: {ctx.inputs}"

# 將單一資料廣播給多個步驟
g.add(
    # 起點送出的一份文件字串,會被同時廣播給 analyze_sentiment 和 extract_keywords 兩個步驟
    g.edge_from(g.start_node).to(analyze_sentiment, extract_keywords),

    # 平行任務完成後,同樣可以透過 Join 節點將來自兩條分支的結果聚合起來
    g.edge_from(analyze_sentiment, extract_keywords).to(collect_results),
)

廣播功能非常適合應用在「同時呼叫不同 AI 模型來分析同一份文件」,或是「將使用者的同一個問題同時交給多個專家 Agent 思考」的實務場景。

連線轉換 (Edge Transformations)

有時候,兩個節點之間的資料型別並不完全吻合,或者你只想萃取前一個步驟輸出的某個特定欄位,但你又不想為此專門寫一個只做簡單計算的 @g.step

這時你可以使用連線上的 .transform() 語法來進行即時的資料轉換:

# 在 edge 上直接將數字乘以 2 並轉成字串,再傳遞給 format_output 步驟
g.add(
    g.edge_from(generate_number)
    .transform(lambda ctx: str(ctx.inputs * 2))
    .to(format_output)
)

transform() 會收到一個包含目前 inputsstate 與依賴 depsStepContext 物件。你甚至可以將它與 .map() 結合:先使用 transform 萃取出複雜物件中的陣列欄位,接著再利用 .map() 拆解並平行派發給下一個步驟。這為資料流的操作提供了極大的彈性。

平行執行的狀態共用注意事項 (State Sharing)

在進行 Map 或 Broadcast 等平行處理時,所有產生的平行任務都會「共用」同一個 Graph State 實例。如果你的平行步驟會去修改 (Mutate) ctx.state(例如:對一個陣列執行 .append()),請務必小心處理非同步環境下的資料覆寫問題。在實務上,通常會建議讓平行任務只專注於回傳 (Return) 處理結果,最後再交由 Join 節點與 Reducer 來統一負責最終資料的聚合與狀態變更,以確保安全性。

透過組合使用 Steps (基本任務)、Map 與 Broadcast (平行派發),以及 Join (結果聚合) 等功能,你就能夠使用精簡且具備強大型別安全的宣告式語法,打造出效能極佳的平行化企業級 AI 代理人工作流!