Node.js Readable Stream:從資料源讀取數據

Readable Stream (可讀取串流) 代表一個可以從中「提取數據」的來源。在 Node.js 中,最常見的實例包括 fs.createReadStream()、HTTP 伺服器中的 req (IncomingMessage) 物件,以及標準輸入 process.stdin

兩種讀取模式:流動 vs 暫停

理解 Readable Stream 的關鍵在於它如何交付數據:

1. 流動模式 (Flowing Mode)

數據會盡可能快地從底層系統讀取,並透過事件自動傳遞給應用程式。一旦你監聽了 data 事件,串流就會切換到此模式。

const fs = require('fs');
const stream = fs.createReadStream('logs.txt', 'utf8');

stream.on('data', (chunk) => {
  console.log('收到一塊數據,長度:', chunk.length);
});

stream.on('end', () => {
  console.log('讀取完成');
});

2. 暫停模式 (Paused Mode)

數據會停留在串流的內部緩衝區中,必須由應用程式主動呼叫 stream.read() 來提取。這是串流建立時的預設狀態。

如何手動控制流向?

  • stream.pause():停止觸發 data 事件,數據會堆積在內部緩衝區。這常用於接收端處理太慢時的自我調節。
  • stream.resume():恢復數據傳輸,重新將串流切換回流動模式。

設定快大小:highWaterMark

當我們建立串流時,可以透過 highWaterMark 選項來控制每次讀取塊 (Chunk) 的緩衝上限。對於檔案串流,預設為 64 KB

const stream = fs.createReadStream('movie.mp4', {
  highWaterMark: 128 * 1024, // 調整為 128 KB
});

現代化寫法:Async Iteration (推薦)

在現代 Node.js (12+) 中,我們不再需要手動監聽 dataend 事件。你可以直接使用 for await...of 語法,這讓邏輯更簡單、更接近同步流程。

async function readLargeFile(readable) {
  try {
    for await (const chunk of readable) {
      // 處理資料塊
      processData(chunk);
    }
    console.log('檔案已全部處理完畢');
  } catch (err) {
    console.error('讀取過程中出錯:', err);
  }
}
優點:使用 for await...of 會自動幫你處理錯誤補獲(透過 try...catch)以及串流的正當關閉。

總結

  1. Readable 是資料的出口,你可以監聽事件或使用非同步迭代器來消費它。
  2. 記住 dataenderror 這三個核心事件。
  3. 如果處理邏輯複雜,推薦優先使用 Async Iteration