Pydantic AI Graph 平行執行與條件分支
真實世界的 AI 工作流很少是單一的一直線。我們經常需要根據模型輸出的結果來決定下一步該往左走還是往右走(條件分支),有時為了節省時間,我們甚至需要同時啟動多個任務(平行執行)。
Pydantic Graph 提供了靈活的 API 來處理這些複雜的流程控制。
條件分支 (Decisions)
要在流程中加入判斷邏輯(if-else),你只需要在節點的 run 方法中,根據不同的條件回傳不同的節點實例即可。
Pydantic Graph 的型別系統支援你使用 Union (或 | 符號) 來宣告多種可能的回傳節點,這讓 IDE 可以清楚知道未來的可能走向。
from pydantic_graph import BaseNode, End
from dataclasses import dataclass
@dataclass
class SupportState:
issue_type: str = ""
# 準備兩個負責處理不同問題的分支節點
class HardwareSupportNode(BaseNode[SupportState]):
async def run(self, ctx) -> End[str]:
return End("正在轉接給硬體維修團隊...")
class SoftwareSupportNode(BaseNode[SupportState]):
async def run(self, ctx) -> End[str]:
return End("正在轉接給軟體工程團隊...")
# 路由節點:負責做決定 (Decision)
class RouterNode(BaseNode[SupportState]):
# 宣告這個節點可能會回傳硬體節點,或是軟體節點
async def run(self, ctx) -> HardwareSupportNode | SoftwareSupportNode:
print(f"收到客戶問題類型:{ctx.state.issue_type}")
# 條件判斷分支
if ctx.state.issue_type == "螢幕破裂":
return HardwareSupportNode()
else:
return SoftwareSupportNode()
在這個範例中,RouterNode 扮演了分發任務的角色。實務上,這個 if-else 的條件通常是由 Pydantic AI Agent 去分析使用者的自然語言提問後所得出的結果。
平行執行 (Parallel Execution)
有些任務是可以同時進行的,例如:當使用者上傳一份財報時,我們可以同時呼叫一個 Agent 去分析營收,另一個 Agent 去分析風險,藉此大幅縮短等待時間。
Pydantic Graph 支援節點回傳一個包含多個節點實例的群組 (Iterable),框架就會自動以平行、併發 (Concurrent) 的方式去執行這些節點。
Joins & Reducers:將分支重新合併
當你平行分派出去了多個任務後,你通常會需要一個地方來「等待所有人都完成」,然後將大家的結果統整起來進行下一步。這在 Pydantic Graph 中被稱為 Joins。
同時,由於平行執行的節點可能會同時嘗試修改共用的 State(狀態),為避免競爭條件 (Race Conditions),框架提供了 Reducers 的機制來安全地合併狀態。
以下是平行執行與合併的概略邏輯範例:
from typing import Iterable
from pydantic_graph import BaseNode, End
# 假設狀態中有一個列表用來收集分析報告
# @dataclass
# class AnalysisState:
# reports: list[str]
# 兩個可以平行執行的分析節點
class RevenueAnalysisNode(BaseNode):
async def run(self, ctx):
# 假設這裡使用 Agent 產生了營收報告
# 為了安全,我們不直接修改 ctx.state,而是把結果「傳遞」下去
return JoinNode(report="營收表現良好")
class RiskAnalysisNode(BaseNode):
async def run(self, ctx):
return JoinNode(report="發現一項潛在風險")
# 分發節點:同時啟動兩個分析
class DispatchNode(BaseNode):
async def run(self, ctx) -> Iterable[RevenueAnalysisNode | RiskAnalysisNode]:
print("開始平行分析...")
# 回傳一個 tuple 或 list,框架就會平行執行它們
return (RevenueAnalysisNode(), RiskAnalysisNode())
# 合併節點:等待所有平行節點完成
# 這個節點會被呼叫多次,每次收到一個平行節點的結果 (Reducer 的概念)
class JoinNode(BaseNode):
report: str # 透過實例屬性接收上一個節點傳來的資料
async def run(self, ctx) -> End[str]:
# 將收到的報告安全地寫入共用狀態中
ctx.state.reports.append(self.report)
# 檢查是否所有的報告都收齊了?
if len(ctx.state.reports) >= 2:
final_summary = "綜合分析結果:\n" + "\n".join(ctx.state.reports)
return End(final_summary)
else:
# 如果還沒收齊,回傳特殊的訊號讓框架繼續等待其他的 Join
# (具體語法請參考官方 Pydantic Graph 最新文件關於 Joins 的實作細節)
pass
透過 Decisions 控制流程走向,並結合 Parallel Execution 與 Joins 來提升效率與匯整資料,你可以使用 Pydantic Graph 打造出具有企業級水準的高級 AI 工作流程。