JavaScript Streams API 串流處理

在早期的 Web 開發中,當我們要處理資源(例如文字檔、影片、JSON 資料)時,通常必須等待 「整個檔案」 下載完畢後,才能開始進行處理或顯示給使用者看。如果檔案很大,這意味著使用者需要等待很長的時間,而且瀏覽器需要消耗大量的記憶體來暫存這些資料。

Streams API 的出現改變了這一切。它允許我們以 「片段 (Chunks)」 的方式,一點一點地處理資料,而不需要等待全部資料都到達。

什麼是 Stream (串流)?

想像一下你在線上看影片。你不需要等到整部電影(例如 2GB)都下載到硬碟後才能開始播放,而是下載了一小段緩衝區後就開始播放,邊看邊下載。這就是「串流」的概念。

在程式開發中,Stream 讓我們可以:

  1. 節省記憶體:不需要一次把巨大的檔案載入記憶體。
  2. 提高反應速度:當第一批資料到達時,就可以立刻開始處理(例如顯示在螢幕上),不用空轉等待。
  3. 管線化 (Piping):可以像接水管一樣,把資料從 A 點流向 B 點,中間經過 C 濾水器(轉換處理),流程非常直觀。

Streams API 的核心角色

Streams API 主要由三種物件組成:

  1. ReadableStream (可讀串流):資料的來源。例如:網路請求的回應 (fetch response body)、讀取檔案。
  2. WritableStream (可寫串流):資料的目的地。例如:儲存檔案、寫入特定緩衝區。
  3. TransformStream (轉換串流):介於中間,負責修改資料。例如:解壓縮、加密、編碼轉換。

ReadableStream (可讀串流)

這是開發中最常接觸到的部分,特別是在使用 Fetch API 時。fetch 回應的 body 屬性其實就是一個 ReadableStream

如何讀取 Stream?

要讀取一個 Stream,我們需要先取得一個 Reader (讀取器),然後透過迴圈不斷地讀取,直到資料流結束。

async function consumeStream() {
  const response = await fetch('https://example.com/large-text.txt');

  // 1. 取得 ReadableStream
  const stream = response.body;

  // 2. 取得 Reader (鎖定這個串流,準備讀取)
  const reader = stream.getReader();

  while (true) {
    // 3. 讀取下一個 Chunk
    // value: 這次讀到的資料片段 (通常是 Uint8Array)
    // done: 布林值,如果為 true 代表傳輸結束
    const { done, value } = await reader.read();

    if (done) {
      console.log('傳輸完成!');
      break;
    }

    console.log(`收到資料片段,大小:${value.length} bytes`);
    // 在這裡處理 value (例如解碼文字、顯示圖片等)
  }
}

處理文字資料 (TextDecoder)

由於 Stream 預設傳輸的是 二進位資料 (Uint8Array),如果我們讀取的是文字檔,直接印出 value 會是一堆數字。我們需要使用 TextDecoder 來將其轉回 readable 的文字。

async function readTextStream() {
  const response = await fetch('/api/log.txt');
  const reader = response.body.getReader();
  const decoder = new TextDecoder('utf-8');

  let result = '';

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      break;
    }

    // 將二進位片段解碼為文字,並設定 stream: true 處理邊界字元
    // (有些中文字可能會被切斷在兩個 chunk 之間,stream: true 會自動處理暫存)
    const textChunk = decoder.decode(value, { stream: true });

    console.log('新的文字片段:', textChunk);
    result += textChunk;
  }

  console.log('完整內容:', result);
}
TextDecoderStream: 現代瀏覽器還支援 TextDecoderStream,可以直接透過 pipeThrough 轉換,讓程式碼更簡潔。

建立自定義的 ReadableStream

除了讀取 fetch 的結果,我們也可以自己創造一個 Stream 來發送資料。

// 建立一個每秒發送一個數字的 Stream
const countStream = new ReadableStream({
  start(controller) {
    let count = 0;
    const max = 5;

    const interval = setInterval(() => {
      count++;

      // 將資料放入佇列 (Enqueue)
      controller.enqueue(`Count: ${count}\n`);
      console.log(`已發送: ${count}`);

      if (count >= max) {
        clearInterval(interval);
        // 關閉 Stream
        controller.close();
      }
    }, 1000);
  },

  cancel(reason) {
    // 當 Stream 被取消讀取時觸發
    console.log('Stream 被取消了', reason);
    clearInterval(interval);
  },
});

// 讀取這個自定義 Stream
async function readCount() {
  const reader = countStream.getReader();
  // ... 使用 reader.read() 讀取
}

WritableStream (可寫串流)

WritableStream 代表資料的「終點」。當我們想要將 Stream 的資料寫入某個地方(例如:檔案、自定義的緩衝區、或是傳送到另一個 API)時,就需要使用它。

建立自定義的 WritableStream

透過 new WritableStream({ write, close, abort }) 可以定義寫入的行為:

const myWritable = new WritableStream({
  // 當有資料(chunk)寫入時觸發
  write(chunk) {
    return new Promise((resolve, reject) => {
      console.log('正在寫入資料:', chunk);

      // 模擬寫入耗時 (例如寫入資料庫或磁碟)
      setTimeout(() => {
        console.log('資料寫入完成!');
        resolve(); // 通知 Stream 可以繼續寫入下一筆
      }, 1000);
    });
  },

  // 當 Stream 被正常關閉時觸發 (例如 pipeTo 完成)
  close() {
    console.log('寫入串流已關閉');
  },

  // 當發生錯誤而被中斷時觸發
  abort(reason) {
    console.error('寫入串流發生錯誤:', reason);
  },
});

如何寫入資料?

ReadableStream 使用 getReader() 類似,WritableStream 使用 getWriter() 來取得一個 Writer (寫入器)

async function writeData() {
  const writer = myWritable.getWriter();

  await writer.write('第一筆資料');
  await writer.write('第二筆資料');

  // 寫入完成,關閉串流
  await writer.close();
}
實際應用: 在瀏覽器環境中,最常見的 WritableStream 應用是 File System Access API,它允許你將網路下載的 Stream 直接 pipeTo 到使用者的本機檔案中,實現大檔案下載與儲存。

TransformStream (轉換串流)

TransformStream 允許我們自定義資料的處理邏輯,通常用於資料清洗、格式轉換(例如 ZIP 壓縮、加密)。它由一組 writable (輸入端) 和 readable (輸出端) 組成。

// 建立一個將文字轉為大寫的轉換器
const upperCaseStream = new TransformStream({
  transform(chunk, controller) {
    // chunk: 輸入的資料
    // controller: 用來將處理後的資料送到下一站

    // 假設輸入是字串,轉大寫後送出
    controller.enqueue(chunk.toUpperCase());
  },
});

Pipe (管線處理) 與 Teeing (分流)

Streams API 最強大的功能之一就是 Pipe Chain (管線鏈)Teeing (分流)

pipeThrough (轉換)

readableStream.pipeThrough(transformStream) 會將資料通過轉換器,並回傳一個新的 ReadableStream (轉換後的結果)。

fetch('/story.txt')
  .then(response => response.body)
  .then(rb => rb.pipeThrough(new TextDecoderStream())) // 1. 轉成文字
  .then(textStream => textStream.pipeThrough(upperCaseStream)) // 2. 轉大寫
  .then(upperStream => /* ... */);

pipeTo (輸出)

readableStream.pipeTo(writableStream) 會負責將資料「倒」進去目的地。它會自動處理:

  1. 讀取與寫入的迴圈。
  2. Backpressure (背壓):如果寫入速度慢,它會自動暫停讀取,防止記憶體塞爆。
  3. 錯誤傳遞與關閉訊號。
// 範例:讀取 -> 轉大寫 -> 寫入自定義目的地
readable.pipeThrough(upperCaseStream).pipeTo(myWritable);

Teeing (分流)

有時候我們想要對「同一個」Stream 做兩件事(例如:一邊顯示在螢幕上,一邊計算總大小)。但 Stream 預設只能被讀取一次 (Locked)。

這時可以使用 tee() 方法,將一個 Stream 複製成兩個獨立的 Stream:

const response = await fetch('/api/data');
// 將原始 stream 分叉成兩個
const [stream1, stream2] = response.body.tee();

// 分頭進行處理
const promise1 = stream1.pipeTo(writable1); // 例如:顯示
const promise2 = stream2.pipeTo(writable2); // 例如:計算 Hash

await Promise.all([promise1, promise2]);
使用 tee() 後,原始的 Stream 就會被鎖定,不能再直接讀取。你必須使用分出來的 stream1stream2

背壓 (Backpressure) 與 Queuing Strategies

當「輸入的速度」快過「處理或輸出的速度」時,就會發生積壓。例如,網路下載速度極快 (100MB/s),但使用者硬碟寫入速度很慢 (10MB/s)。

Streams API 內建處理了 Backpressure。當 WritableStream 的內部佇列 (Queue) 超過一定大小 (High Water Mark) 時,它會發出信號通知 ReadableStream 暫停讀取。

開發者可以在建立 Stream 時設定 strategy 來控制緩衝區行為:

// 設定緩衝區最多只能有 1024 個位元組,避免記憶體佔用過多
const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 });

const stream = new ReadableStream(
  {
    /* ... */
  },
  strategy,
);

在使用 pipeTo 時,瀏覽器會自動幫你處理好所有的 Backpressure 邏輯,這是手寫 while 迴圈很難做到的優勢。

瀏覽器支援度

現代瀏覽器 (Chrome, Edge, Firefox, Safari) 對 Streams API 的主要標準(ReadableStream, WritableStream, TransformStream)支援度都已經非常好。

但是一些特定的輔助 Stream,例如 TextDecoderStream, CompressionStream (壓縮用), DecompressionStream (解壓縮用),在舊一點的瀏覽器版本可能需要 Polyfill。

總結

  1. ReadableStream: 資料來源,使用 getReader() 讀取。
  2. WritableStream: 資料終點,使用 getWriter() 寫入,支援 Backpressure。
  3. TransformStream: 資料加工站,使用 pipeThrough 串聯。
  4. Teeing: 使用 tee() 將資料流一分為二。

掌握 Streams API,這對於處理大型檔案、即時影音、或是需要高效能 I/O 的 Web App 來說是必備的技能。