Pydantic AI Graph Beta API:平行執行與合併 (Parallel Execution & Joins)
在設計進階的 AI 代理人工作流時,如果所有任務都是循序漸進的,那執行效率會非常低落。例如,當你需要分析一整份文件中的 10 個段落時,與其一個一個段落依序分析,不如同時啟動 10 個 AI Agent 來分別處理。
Pydantic Graph 的 Beta API 最強大的特色之一,就是它將平行處理變得極度簡單且具備宣告式的風格。
Map:平行處理列表資料
當你有一個陣列 (List) 或是可迭代 (Iterable) 的資料,且你希望陣列中的每一個元素都能觸發一次獨立的步驟執行時,你可以使用 .map() 語法。
在下面的範例中,我們會收到一個數字陣列,並平行地將每一個數字計算平方,最後把結果收集起來。
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append
@dataclass
class ProcessingState:
items_processed: int = 0
# 宣告 Builder:初始輸入為整數陣列,輸出也是整數陣列
g = GraphBuilder(
state_type=ProcessingState,
input_type=list[int],
output_type=list[int],
)
# 定義一個計算平方的步驟。注意:這裡預期收到的 inputs 是一個單一的整數 (int)
@g.step
async def square(ctx: StepContext[ProcessingState, None, int]) -> int:
# 修改共用狀態(框架內部會安全地處理非同步狀態的變更)
ctx.state.items_processed += 1
return ctx.inputs * ctx.inputs
Joins & Reducers:收集平行任務的結果
當你發散 (Fan-out) 了多個平行任務後,你需要一個地方來等待所有人完成,並把資料聚合起來 (Fan-in)。在 Beta API 中,這被稱為 Join 節點 與 Reducer。
Pydantic Graph 內建了一些實用的 Reducer,例如 reduce_list_append 可以幫你把所有平行執行的回傳值塞進一個新的 List 裡面。
# 建立一個 Join 節點,使用 reduce_list_append 來收集結果
# initial_factory=list[int] 表示收集容器的初始狀態是一個空的 int list
collect_results = g.join(reduce_list_append, initial_factory=list[int])
# 設定流程連線
g.add(
# 關鍵點 1:從起點出發時,遇到陣列,使用 .map() 將陣列拆解並平行派發給 square 步驟
g.edge_from(g.start_node).map().to(square),
# 關鍵點 2:所有的 square 步驟完成後,將結果送往 collect_results 這個 Join 節點
g.edge_from(square).to(collect_results),
# 從 Join 節點走向終點
g.edge_from(collect_results).to(g.end_node),
)
執行這個 Graph:
import asyncio
async def main():
graph = g.build()
state = ProcessingState()
# 傳入包含 5 個數字的陣列作為初始 inputs
result = await graph.run(state=state, inputs=[1, 2, 3, 4, 5])
# 因為是非同步平行執行,回傳結果的順序可能不一定,所以我們使用 sorted 排序
print(f'處理結果: {sorted(result)}') # 輸出: [1, 4, 9, 16, 25]
print(f'總共處理了 {state.items_processed} 個項目') # 輸出: 5
if __name__ == "__main__":
asyncio.run(main())
簡化寫法:add_mapping_edge
對於單純將一個節點的輸出陣列 mapping 給下一個節點的情況,Pydantic Graph 提供了一個語法糖 add_mapping_edge(),可以讓連線宣告更為簡潔:
# 原始的串接寫法
# g.edge_from(generate_list).map().to(square)
# 使用語法糖的等價寫法
g.add_mapping_edge(generate_list, square)
處理非同步資料流 (AsyncIterables)
更棒的是,.map() 不只支援一般的 List,還支援 Python 的非同步產生器 (AsyncIterable)。當你需要處理 Streaming 資料(例如一邊從資料庫撈取資料,一邊進行 AI 摘要分析)時,.map() 會在資料被 yield 出來的瞬間就動態建立一個平行任務,不必等所有資料都收集完才開始處理,這對於需要低延遲回應的串流應用非常實用。
你可以使用 @g.stream 裝飾器來建立一個非同步產生器節點:
import asyncio
# 使用 @g.stream 裝飾器定義一個會連續 yield 資料的節點
@g.stream
async def stream_data(ctx: StepContext[ProcessingState, None, None]):
# 模擬每 0.5 秒從外部系統串流取得一筆資料
for i in range(1, 4):
await asyncio.sleep(0.5)
yield i
@g.step
async def process_data(ctx: StepContext[ProcessingState, None, int]) -> str:
return f"處理完成: {ctx.inputs}"
g.add(
g.edge_from(g.start_node).to(stream_data),
# 這裡的 .map() 會在 stream_data 每次 yield 出一筆資料的瞬間,
# 立即平行啟動一個 process_data 任務,無需等待全部資料載入完畢!
g.edge_from(stream_data).map().to(process_data),
g.edge_from(process_data).to(collect_results),
)
Broadcast:將單一資料廣播給多個步驟
與 .map() 拆解陣列的行為不同,當你需要將「同一筆」資料同時送給多條平行的獨立路徑去執行不同的任務時,這就是「廣播 (Broadcast)」的概念。
在 Beta API 中,廣播的語法非常直觀,你只需要在 .to() 裡面傳入多個目標步驟,它就會自動將相同的輸入資料,同時複製發送給這些步驟平行執行:
# 定義兩個會接收相同輸入 (字串) 的平行步驟
@g.step
async def analyze_sentiment(ctx: StepContext[ProcessingState, None, str]) -> str:
return f"情緒分析: {ctx.inputs}"
@g.step
async def extract_keywords(ctx: StepContext[ProcessingState, None, str]) -> str:
return f"關鍵字萃取: {ctx.inputs}"
# 將單一資料廣播給多個步驟
g.add(
# 起點送出的一份文件字串,會被同時廣播給 analyze_sentiment 和 extract_keywords 兩個步驟
g.edge_from(g.start_node).to(analyze_sentiment, extract_keywords),
# 平行任務完成後,同樣可以透過 Join 節點將來自兩條分支的結果聚合起來
g.edge_from(analyze_sentiment, extract_keywords).to(collect_results),
)
廣播功能非常適合應用在「同時呼叫不同 AI 模型來分析同一份文件」,或是「將使用者的同一個問題同時交給多個專家 Agent 思考」的實務場景。
連線轉換 (Edge Transformations)
有時候,兩個節點之間的資料型別並不完全吻合,或者你只想萃取前一個步驟輸出的某個特定欄位,但你又不想為此專門寫一個只做簡單計算的 @g.step。
這時你可以使用連線上的 .transform() 語法來進行即時的資料轉換:
# 在 edge 上直接將數字乘以 2 並轉成字串,再傳遞給 format_output 步驟
g.add(
g.edge_from(generate_number)
.transform(lambda ctx: str(ctx.inputs * 2))
.to(format_output)
)
transform() 會收到一個包含目前 inputs、state 與依賴 deps 的 StepContext 物件。你甚至可以將它與 .map() 結合:先使用 transform 萃取出複雜物件中的陣列欄位,接著再利用 .map() 拆解並平行派發給下一個步驟。這為資料流的操作提供了極大的彈性。
平行執行的狀態共用注意事項 (State Sharing)
在進行 Map 或 Broadcast 等平行處理時,所有產生的平行任務都會「共用」同一個 Graph State 實例。如果你的平行步驟會去修改 (Mutate) ctx.state(例如:對一個陣列執行 .append()),請務必小心處理非同步環境下的資料覆寫問題。在實務上,通常會建議讓平行任務只專注於回傳 (Return) 處理結果,最後再交由 Join 節點與 Reducer 來統一負責最終資料的聚合與狀態變更,以確保安全性。
透過組合使用 Steps (基本任務)、Map 與 Broadcast (平行派發),以及 Join (結果聚合) 等功能,你就能夠使用精簡且具備強大型別安全的宣告式語法,打造出效能極佳的平行化企業級 AI 代理人工作流!