Pydantic AI Graph 節點 (Nodes) 與步驟 (Steps) 實作

在了解了 Pydantic Graph 的基本概念後,我們現在來實際動手用程式碼打造一個最基礎的圖形工作流。在這個流程中,我們將學習如何定義「狀態 (State)」、如何建立「節點 (Nodes)」,以及如何讓流程在節點之間轉移。

1. 定義共用狀態 (State)

狀態是貫穿整個 Graph 的靈魂。所有的節點都會接收到這個狀態物件,並能夠讀取或修改它。我們通常使用標準的 Python @dataclass 或是 Pydantic 的 BaseModel 來定義它。

from dataclasses import dataclass

# 定義一個簡單的狀態物件,用來儲存整個流程中搜集到的資訊
@dataclass
class ConversationState:
    user_name: str = ""
    user_age: int = 0
    analysis_result: str = ""

2. 建立節點 (Nodes)

在 Pydantic Graph 的 Beta API 中,一個節點通常是透過繼承 BaseNode 類別並實作其方法來定義的。每一個節點都需要宣告它會接受什麼樣的狀態型別,以及它執行完畢後會產生什麼結果(通常是決定下一個要前往的節點)。

我們來建立兩個節點:AskNameNodeAskAgeNode

from pydantic_graph import BaseNode, End

# 第一個節點:負責獲取名字
class AskNameNode(BaseNode[ConversationState]):
    async def run(self, ctx) -> 'AskAgeNode':
        # 在實際應用中,這裡可能是透過 Pydantic AI Agent 與使用者對話
        # 為了示範,我們直接硬編碼修改狀態
        print("執行節點:詢問名字...")
        ctx.state.user_name = "小明"

        # 回傳下一個節點的實例
        return AskAgeNode()

# 第二個節點:負責獲取年齡,並結束流程
class AskAgeNode(BaseNode[ConversationState]):
    # 注意回傳型別宣告為 End,這表示這個節點結束後,整個 Graph 就結束了
    async def run(self, ctx) -> End[str]:
        print(f"執行節點:知道名字是 {ctx.state.user_name} 了,現在詢問年齡...")
        ctx.state.user_age = 25

        # 進行一些分析
        ctx.state.analysis_result = f"{ctx.state.user_name} 今年 {ctx.state.user_age} 歲,是個年輕人!"

        # 流程結束,並可以選擇回傳一個最終的資料給呼叫端
        return End(ctx.state.analysis_result)

3. 建立並執行 Graph

當所有的狀態和節點都定義好之後,我們就可以將它們組合起來建立一個 Graph 實例,並開始執行流程。

import asyncio
from pydantic_graph import Graph

async def main():
    # 建立 Graph 實例,我們需要告訴它:
    # 1. 包含哪些節點 (nodes)
    # 2. 狀態的初始型別為何
    my_workflow = Graph(
        nodes=(AskNameNode, AskAgeNode),
        state_type=ConversationState
    )

    # 建立初始狀態
    initial_state = ConversationState()

    # 開始執行流程!我們需要指定:
    # 1. 第一個要執行的節點實例
    # 2. 初始的狀態物件
    print("--- 流程開始 ---")
    final_output, history = await my_workflow.run(
        AskNameNode(),
        state=initial_state
    )
    print("--- 流程結束 ---")

    print(f"最終輸出的結果: {final_output}")
    print(f"最終狀態物件: {initial_state.user_name}, {initial_state.user_age}")

if __name__ == "__main__":
    asyncio.run(main())

執行過程解析

當你呼叫 my_workflow.run(...) 時,底層發生的事情如下:

  1. 框架將 initial_state 包裝到 Context 中,交給 AskNameNoderun 方法。
  2. AskNameNode 執行完畢,回傳了 AskAgeNode()
  3. 框架接到回傳值,知道流程該往下走,於是把同一個被修改過的 state 交給 AskAgeNoderun 方法。
  4. AskAgeNode 執行完畢,回傳了 End("...")
  5. 框架看到 End,終止整個循環迴圈,並將字串作為 final_output 回傳。

這種明確的 return NextNode() 設計,讓狀態的轉移變得非常容易理解,你只需要看程式碼就能清楚知道流程是怎麼走的。在下一個章節中,我們將探討如何加入更複雜的條件判斷分支。