dbt Sources 管理與引用原始資料表

在 dbt 中,Sources 指的是那些「不是由 dbt 建立」的原始資料表。這些資料通常是由 ETL 工具 (如 Fivetran, Airbyte) 或腳本載入到你的資料庫中。

雖然 dbt 無法控制這些資料如何產生,但我們可以使用 Sources 功能來:

  1. 定義與文檔化:在 YAML 檔中記錄有哪些原始表、它們的欄位描述。
  2. 依賴管理:在 model 中使用 source() 函數引用,建立完整的資料血緣 (Lineage)。
  3. 新鮮度檢查 (Freshness):監控原始資料是否太久沒更新。

定義 Sources

Source 通常定義在 models/ 目錄下的 .yml 檔案中 (檔名不限,通常取名為 _sources.ymlsrc_xxx.yml)。

範例 (models/_sources.yml):

version: 2

sources:
  - name: stripe # Source 的名稱 (邏輯名稱)
    database: raw # 實際資料庫名稱 (選填,預設為 profile target)
    schema: stripe_api # 實際 schema 名稱

    tables:
      - name: payments # 實際 table 名稱
        description: '從 Stripe API 同步過來的付款紀錄'

      - name: customers
        description: 'Stripe 客戶資料'

使用 source() 函數

定義好 Source 後,我們在 model 中就不應該直接寫 FROM raw.stripe_api.payments,而是使用 source() 函數:

-- models/stg_stripe_payments.sql

select
    id as payment_id,
    order_id,
    amount
from {{ source('stripe', 'payments') }}

語法:{{ source('source_name', 'table_name') }}

這樣做的好處是,如果未來原始 table 的名稱或 schema 變了,你只需要修改 YAML 檔,不需要修改所有用到該 table 的 SQL。

Source Freshness (新鮮度檢查)

資料工程中最常見的問題之一就是:「ETL 是不是掛了?資料怎麼沒更新?」

dbt 內建了 Freshness 檢查功能,可以幫你自動監控資料的新鮮度。我們可以在 YAML 中設定 freshness 區塊:

version: 2

sources:
  - name: stripe
    schema: stripe_api

    # 全域設定:檢查 loaded_at 欄位
    loaded_at_field: _etl_loaded_at

    freshness:
      warn_after: { count: 12, period: hour } # 超過 12 小時沒更新 -> 警告
      error_after: { count: 24, period: hour } # 超過 24 小時沒更新 -> 錯誤

    tables:
      - name: payments

      - name: customers
        # 也可以針對個別 table 覆寫設定
        freshness:
          warn_after: { count: 6, period: hour }

設定好後,執行以下指令進行檢查:

dbt source freshness

dbt 會查詢 loaded_at_field 的最大值 (max),並與現在時間比較。如果資料過期,CLI 會顯示警告或錯誤,這在 CI/CD 流程中非常有用。

使用 Source 的最佳實踐

  1. 盡量只在 Staging 層引用 Source:通常我們會建立一層 stg_ models (如 stg_stripe_payments.sql) 專門對接 source,進行基本的改名與型別轉換。
  2. 下游 Model 只引用 Staging Model:其他的 model 應該 ref('stg_stripe_payments'),而不是直接 source('stripe', 'payments')。這樣當 source 結構變動時,你只需要修改 Staging model,不影響下游。