Node.js Readable Stream:從資料源讀取數據
Readable Stream (可讀取串流) 代表一個可以從中「提取數據」的來源。在 Node.js 中,最常見的實例包括 fs.createReadStream()、HTTP 伺服器中的 req (IncomingMessage) 物件,以及標準輸入 process.stdin。
兩種讀取模式:流動 vs 暫停
理解 Readable Stream 的關鍵在於它如何交付數據:
1. 流動模式 (Flowing Mode)
數據會盡可能快地從底層系統讀取,並透過事件自動傳遞給應用程式。一旦你監聽了 data 事件,串流就會切換到此模式。
const fs = require('fs');
const stream = fs.createReadStream('logs.txt', 'utf8');
stream.on('data', (chunk) => {
console.log('收到一塊數據,長度:', chunk.length);
});
stream.on('end', () => {
console.log('讀取完成');
});
2. 暫停模式 (Paused Mode)
數據會停留在串流的內部緩衝區中,必須由應用程式主動呼叫 stream.read() 來提取。這是串流建立時的預設狀態。
如何手動控制流向?
stream.pause():停止觸發data事件,數據會堆積在內部緩衝區。這常用於接收端處理太慢時的自我調節。stream.resume():恢復數據傳輸,重新將串流切換回流動模式。
設定快大小:highWaterMark
當我們建立串流時,可以透過 highWaterMark 選項來控制每次讀取塊 (Chunk) 的緩衝上限。對於檔案串流,預設為 64 KB。
const stream = fs.createReadStream('movie.mp4', {
highWaterMark: 128 * 1024, // 調整為 128 KB
});
現代化寫法:Async Iteration (推薦)
在現代 Node.js (12+) 中,我們不再需要手動監聽 data 或 end 事件。你可以直接使用 for await...of 語法,這讓邏輯更簡單、更接近同步流程。
async function readLargeFile(readable) {
try {
for await (const chunk of readable) {
// 處理資料塊
processData(chunk);
}
console.log('檔案已全部處理完畢');
} catch (err) {
console.error('讀取過程中出錯:', err);
}
}
優點:使用
for await...of 會自動幫你處理錯誤補獲(透過 try...catch)以及串流的正當關閉。總結
- Readable 是資料的出口,你可以監聽事件或使用非同步迭代器來消費它。
- 記住
data、end、error這三個核心事件。 - 如果處理邏輯複雜,推薦優先使用 Async Iteration。