dbt Sources 管理與引用原始資料表
在 dbt 中,Sources 指的是那些「不是由 dbt 建立」的原始資料表。這些資料通常是由 ETL 工具 (如 Fivetran, Airbyte) 或腳本載入到你的資料庫中。
雖然 dbt 無法控制這些資料如何產生,但我們可以使用 Sources 功能來:
- 定義與文檔化:在 YAML 檔中記錄有哪些原始表、它們的欄位描述。
- 依賴管理:在 model 中使用
source()函數引用,建立完整的資料血緣 (Lineage)。 - 新鮮度檢查 (Freshness):監控原始資料是否太久沒更新。
定義 Sources
Source 通常定義在 models/ 目錄下的 .yml 檔案中 (檔名不限,通常取名為 _sources.yml 或 src_xxx.yml)。
範例 (models/_sources.yml):
version: 2
sources:
- name: stripe # Source 的名稱 (邏輯名稱)
database: raw # 實際資料庫名稱 (選填,預設為 profile target)
schema: stripe_api # 實際 schema 名稱
tables:
- name: payments # 實際 table 名稱
description: '從 Stripe API 同步過來的付款紀錄'
- name: customers
description: 'Stripe 客戶資料'
使用 source() 函數
定義好 Source 後,我們在 model 中就不應該直接寫 FROM raw.stripe_api.payments,而是使用 source() 函數:
-- models/stg_stripe_payments.sql
select
id as payment_id,
order_id,
amount
from {{ source('stripe', 'payments') }}
語法:{{ source('source_name', 'table_name') }}
這樣做的好處是,如果未來原始 table 的名稱或 schema 變了,你只需要修改 YAML 檔,不需要修改所有用到該 table 的 SQL。
Source Freshness (新鮮度檢查)
資料工程中最常見的問題之一就是:「ETL 是不是掛了?資料怎麼沒更新?」
dbt 內建了 Freshness 檢查功能,可以幫你自動監控資料的新鮮度。我們可以在 YAML 中設定 freshness 區塊:
version: 2
sources:
- name: stripe
schema: stripe_api
# 全域設定:檢查 loaded_at 欄位
loaded_at_field: _etl_loaded_at
freshness:
warn_after: { count: 12, period: hour } # 超過 12 小時沒更新 -> 警告
error_after: { count: 24, period: hour } # 超過 24 小時沒更新 -> 錯誤
tables:
- name: payments
- name: customers
# 也可以針對個別 table 覆寫設定
freshness:
warn_after: { count: 6, period: hour }
設定好後,執行以下指令進行檢查:
dbt source freshness
dbt 會查詢 loaded_at_field 的最大值 (max),並與現在時間比較。如果資料過期,CLI 會顯示警告或錯誤,這在 CI/CD 流程中非常有用。
使用 Source 的最佳實踐
- 盡量只在 Staging 層引用 Source:通常我們會建立一層
stg_models (如stg_stripe_payments.sql) 專門對接 source,進行基本的改名與型別轉換。 - 下游 Model 只引用 Staging Model:其他的 model 應該
ref('stg_stripe_payments'),而不是直接source('stripe', 'payments')。這樣當 source 結構變動時,你只需要修改 Staging model,不影響下游。