JavaScript Streams API 串流處理
在早期的 Web 開發中,當我們要處理資源(例如文字檔、影片、JSON 資料)時,通常必須等待 「整個檔案」 下載完畢後,才能開始進行處理或顯示給使用者看。如果檔案很大,這意味著使用者需要等待很長的時間,而且瀏覽器需要消耗大量的記憶體來暫存這些資料。
Streams API 的出現改變了這一切。它允許我們以 「片段 (Chunks)」 的方式,一點一點地處理資料,而不需要等待全部資料都到達。
什麼是 Stream (串流)?
想像一下你在線上看影片。你不需要等到整部電影(例如 2GB)都下載到硬碟後才能開始播放,而是下載了一小段緩衝區後就開始播放,邊看邊下載。這就是「串流」的概念。
在程式開發中,Stream 讓我們可以:
- 節省記憶體:不需要一次把巨大的檔案載入記憶體。
- 提高反應速度:當第一批資料到達時,就可以立刻開始處理(例如顯示在螢幕上),不用空轉等待。
- 管線化 (Piping):可以像接水管一樣,把資料從 A 點流向 B 點,中間經過 C 濾水器(轉換處理),流程非常直觀。
Streams API 的核心角色
Streams API 主要由三種物件組成:
- ReadableStream (可讀串流):資料的來源。例如:網路請求的回應 (fetch response body)、讀取檔案。
- WritableStream (可寫串流):資料的目的地。例如:儲存檔案、寫入特定緩衝區。
- TransformStream (轉換串流):介於中間,負責修改資料。例如:解壓縮、加密、編碼轉換。
ReadableStream (可讀串流)
這是開發中最常接觸到的部分,特別是在使用 Fetch API 時。fetch 回應的 body 屬性其實就是一個 ReadableStream。
如何讀取 Stream?
要讀取一個 Stream,我們需要先取得一個 Reader (讀取器),然後透過迴圈不斷地讀取,直到資料流結束。
async function consumeStream() {
const response = await fetch('https://example.com/large-text.txt');
// 1. 取得 ReadableStream
const stream = response.body;
// 2. 取得 Reader (鎖定這個串流,準備讀取)
const reader = stream.getReader();
while (true) {
// 3. 讀取下一個 Chunk
// value: 這次讀到的資料片段 (通常是 Uint8Array)
// done: 布林值,如果為 true 代表傳輸結束
const { done, value } = await reader.read();
if (done) {
console.log('傳輸完成!');
break;
}
console.log(`收到資料片段,大小:${value.length} bytes`);
// 在這裡處理 value (例如解碼文字、顯示圖片等)
}
}
處理文字資料 (TextDecoder)
由於 Stream 預設傳輸的是 二進位資料 (Uint8Array),如果我們讀取的是文字檔,直接印出 value 會是一堆數字。我們需要使用 TextDecoder 來將其轉回 readable 的文字。
async function readTextStream() {
const response = await fetch('/api/log.txt');
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let result = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 將二進位片段解碼為文字,並設定 stream: true 處理邊界字元
// (有些中文字可能會被切斷在兩個 chunk 之間,stream: true 會自動處理暫存)
const textChunk = decoder.decode(value, { stream: true });
console.log('新的文字片段:', textChunk);
result += textChunk;
}
console.log('完整內容:', result);
}
TextDecoderStream,可以直接透過 pipeThrough 轉換,讓程式碼更簡潔。建立自定義的 ReadableStream
除了讀取 fetch 的結果,我們也可以自己創造一個 Stream 來發送資料。
// 建立一個每秒發送一個數字的 Stream
const countStream = new ReadableStream({
start(controller) {
let count = 0;
const max = 5;
const interval = setInterval(() => {
count++;
// 將資料放入佇列 (Enqueue)
controller.enqueue(`Count: ${count}\n`);
console.log(`已發送: ${count}`);
if (count >= max) {
clearInterval(interval);
// 關閉 Stream
controller.close();
}
}, 1000);
},
cancel(reason) {
// 當 Stream 被取消讀取時觸發
console.log('Stream 被取消了', reason);
clearInterval(interval);
},
});
// 讀取這個自定義 Stream
async function readCount() {
const reader = countStream.getReader();
// ... 使用 reader.read() 讀取
}
WritableStream (可寫串流)
WritableStream 代表資料的「終點」。當我們想要將 Stream 的資料寫入某個地方(例如:檔案、自定義的緩衝區、或是傳送到另一個 API)時,就需要使用它。
建立自定義的 WritableStream
透過 new WritableStream({ write, close, abort }) 可以定義寫入的行為:
const myWritable = new WritableStream({
// 當有資料(chunk)寫入時觸發
write(chunk) {
return new Promise((resolve, reject) => {
console.log('正在寫入資料:', chunk);
// 模擬寫入耗時 (例如寫入資料庫或磁碟)
setTimeout(() => {
console.log('資料寫入完成!');
resolve(); // 通知 Stream 可以繼續寫入下一筆
}, 1000);
});
},
// 當 Stream 被正常關閉時觸發 (例如 pipeTo 完成)
close() {
console.log('寫入串流已關閉');
},
// 當發生錯誤而被中斷時觸發
abort(reason) {
console.error('寫入串流發生錯誤:', reason);
},
});
如何寫入資料?
與 ReadableStream 使用 getReader() 類似,WritableStream 使用 getWriter() 來取得一個 Writer (寫入器)。
async function writeData() {
const writer = myWritable.getWriter();
await writer.write('第一筆資料');
await writer.write('第二筆資料');
// 寫入完成,關閉串流
await writer.close();
}
WritableStream 應用是 File System Access API,它允許你將網路下載的 Stream 直接 pipeTo 到使用者的本機檔案中,實現大檔案下載與儲存。TransformStream (轉換串流)
TransformStream 允許我們自定義資料的處理邏輯,通常用於資料清洗、格式轉換(例如 ZIP 壓縮、加密)。它由一組 writable (輸入端) 和 readable (輸出端) 組成。
// 建立一個將文字轉為大寫的轉換器
const upperCaseStream = new TransformStream({
transform(chunk, controller) {
// chunk: 輸入的資料
// controller: 用來將處理後的資料送到下一站
// 假設輸入是字串,轉大寫後送出
controller.enqueue(chunk.toUpperCase());
},
});
Pipe (管線處理) 與 Teeing (分流)
Streams API 最強大的功能之一就是 Pipe Chain (管線鏈) 和 Teeing (分流)。
pipeThrough (轉換)
readableStream.pipeThrough(transformStream) 會將資料通過轉換器,並回傳一個新的 ReadableStream (轉換後的結果)。
fetch('/story.txt')
.then(response => response.body)
.then(rb => rb.pipeThrough(new TextDecoderStream())) // 1. 轉成文字
.then(textStream => textStream.pipeThrough(upperCaseStream)) // 2. 轉大寫
.then(upperStream => /* ... */);
pipeTo (輸出)
readableStream.pipeTo(writableStream) 會負責將資料「倒」進去目的地。它會自動處理:
- 讀取與寫入的迴圈。
- Backpressure (背壓):如果寫入速度慢,它會自動暫停讀取,防止記憶體塞爆。
- 錯誤傳遞與關閉訊號。
// 範例:讀取 -> 轉大寫 -> 寫入自定義目的地
readable.pipeThrough(upperCaseStream).pipeTo(myWritable);
Teeing (分流)
有時候我們想要對「同一個」Stream 做兩件事(例如:一邊顯示在螢幕上,一邊計算總大小)。但 Stream 預設只能被讀取一次 (Locked)。
這時可以使用 tee() 方法,將一個 Stream 複製成兩個獨立的 Stream:
const response = await fetch('/api/data');
// 將原始 stream 分叉成兩個
const [stream1, stream2] = response.body.tee();
// 分頭進行處理
const promise1 = stream1.pipeTo(writable1); // 例如:顯示
const promise2 = stream2.pipeTo(writable2); // 例如:計算 Hash
await Promise.all([promise1, promise2]);
tee() 後,原始的 Stream 就會被鎖定,不能再直接讀取。你必須使用分出來的 stream1 和 stream2。背壓 (Backpressure) 與 Queuing Strategies
當「輸入的速度」快過「處理或輸出的速度」時,就會發生積壓。例如,網路下載速度極快 (100MB/s),但使用者硬碟寫入速度很慢 (10MB/s)。
Streams API 內建處理了 Backpressure。當 WritableStream 的內部佇列 (Queue) 超過一定大小 (High Water Mark) 時,它會發出信號通知 ReadableStream 暫停讀取。
開發者可以在建立 Stream 時設定 strategy 來控制緩衝區行為:
// 設定緩衝區最多只能有 1024 個位元組,避免記憶體佔用過多
const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 });
const stream = new ReadableStream(
{
/* ... */
},
strategy,
);
在使用 pipeTo 時,瀏覽器會自動幫你處理好所有的 Backpressure 邏輯,這是手寫 while 迴圈很難做到的優勢。
瀏覽器支援度
現代瀏覽器 (Chrome, Edge, Firefox, Safari) 對 Streams API 的主要標準(ReadableStream, WritableStream, TransformStream)支援度都已經非常好。
但是一些特定的輔助 Stream,例如 TextDecoderStream, CompressionStream (壓縮用), DecompressionStream (解壓縮用),在舊一點的瀏覽器版本可能需要 Polyfill。
總結
- ReadableStream: 資料來源,使用
getReader()讀取。 - WritableStream: 資料終點,使用
getWriter()寫入,支援 Backpressure。 - TransformStream: 資料加工站,使用
pipeThrough串聯。 - Teeing: 使用
tee()將資料流一分為二。
掌握 Streams API,這對於處理大型檔案、即時影音、或是需要高效能 I/O 的 Web App 來說是必備的技能。