訊息串:這份指南

瞭解如何透過 Streams API 使用可讀取、可寫入和轉換的串流。

透過 Streams API,您可以透過程式輔助方式存取透過網路接收或以任何方式在本機建立的資料串流,並使用 JavaScript 處理這些資料。串流是指將要接收、傳送或轉換的資源分解成小區塊,然後逐一處理這些區塊。瀏覽器本來就會在收到 HTML 或影片等要在網頁上顯示的資產時進行串流,但這項功能在 2015 年推出 fetch 串流之前,從未開放 JavaScript 使用。

先前,如要處理某種資源 (無論是影片或文字檔等),您必須下載整個檔案,等待檔案反序列化為合適的格式,然後再進行處理。JavaScript 可使用串流後,上述情況就會有所改變。現在,您可以在用戶端取得原始資料後,立即以 JavaScript 逐步處理,不必產生緩衝區、字串或 Blob。這項功能可解鎖多種用途,以下列舉幾項:

  • 影片效果:透過轉換串流傳輸可讀取的影片串流,即時套用效果。
  • 資料 (解)壓縮:透過轉換串流將檔案串流導向,並選擇性地(解)壓縮檔案。
  • 圖片解碼:透過轉換串流將 HTTP 回應串流傳送至管道,將位元組解碼為點陣圖資料,然後透過另一個轉換串流將點陣圖轉換為 PNG。如果安裝在服務工作人員的 fetch 處理常式中,您就能以透明方式填補 AVIF 等新圖片格式。

瀏覽器支援

ReadableStream 和 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

核心概念

在詳細說明各種串流類型之前,請先瞭解一些核心概念。

區塊

區塊是寫入或讀取串流的「單一資料片段」。可以是任何型別,串流甚至可以包含不同型別的區塊。在大多數情況下,資料塊不會是特定串流最細微的資料單位。舉例來說,位元組串流可能包含由 16 KiB Uint8Array 單位組成的區塊,而非單一位元組。

可讀取的串流

可讀取的串流代表可供讀取的資料來源。換句話說,資料會從可讀取的串流中輸出。具體來說,可讀取的串流是 ReadableStream 類別的執行個體。

可寫入的串流

可寫入的串流代表資料可寫入的目的地。換句話說,資料會進入可寫入的串流。具體來說,可寫入的串流是 WritableStream 類別的執行個體。

轉換串流

轉換串流由一對串流組成:可寫入串流 (稱為可寫入端) 和可讀取串流 (稱為可讀取端)。這就像即時口譯員,能將一種語言即時翻譯成另一種語言。以轉換串流專屬的方式,寫入可寫入端會導致新資料可從可讀取端讀取。具體來說,任何含有 writable 屬性和 readable 屬性的物件,都可以做為轉換串流。不過,標準 TransformStream 類別可簡化這類正確糾纏的配對建立作業。

管鏈

串流主要用於彼此管道傳輸。可讀取的串流可以直接使用可讀取串流的 pipeTo() 方法,透過管道傳輸至可寫入的串流,也可以先使用可讀取串流的 pipeThrough() 方法,透過一或多個轉換串流傳輸。以這種方式串連在一起的一組串流稱為管道鏈。

背壓

建構管道鏈後,系統會傳播有關區塊應以多快速度流經管道的信號。如果鏈結中的任何步驟尚無法接受區塊,系統會透過管道鏈向後傳播訊號,直到原始來源收到通知,停止快速產生區塊為止。這個正規化流程的過程稱為「背壓」。

開球

可讀取的串流可以使用 tee() 方法分流 (以大寫「T」的形狀命名)。這會鎖定串流,也就是讓串流無法再直接使用,但會建立兩個新串流 (稱為分支),可獨立取用。此外,由於串流無法倒轉或重新啟動,因此 Teeing 也很重要,稍後會詳細說明。

這張圖表顯示管道鏈,其中包含來自 Fetch API 呼叫的可讀取串流,該串流會透過轉換串流管道傳輸,其輸出內容會分叉,然後傳送至瀏覽器 (第一個產生的可讀取串流) 和 Service Worker 快取 (第二個產生的可讀取串流)。
管鏈。

可讀取串流的機制

可讀取的串流是 JavaScript 中以 ReadableStream 物件表示的資料來源,可從基礎來源流動。ReadableStream() 建構函式會從指定處理常式建立並傳回可讀取的串流物件。基礎來源分為兩種類型:

  • 推送來源會在您存取時不斷推送資料,您可以自行開始、暫停或取消存取串流。例如即時串流影片、伺服器傳送事件或 WebSocket。
  • 提取來源:連線後,您必須明確要求提取來源的資料。例如透過 fetch()XMLHttpRequest 呼叫執行的 HTTP 作業。

系統會以稱為「區塊」的小片段,依序讀取串流資料。放置在串流中的區塊稱為已加入佇列。這表示這些訊息正在佇列中等待讀取。內部佇列會追蹤尚未讀取的區塊。

佇列策略是物件,可根據內部佇列的狀態,決定串流應如何發出背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定數字 (稱為高水位線) 進行比較。

串流中的區塊由「讀取器」讀取。這個讀取器一次擷取一個區塊的資料,讓您對資料執行任何想執行的作業。讀取器加上隨附的其他處理代碼,稱為「消費者」

這個背景資訊中的下一個建構函式稱為「控制器」。每個可讀取的串流都有相關聯的控制器,顧名思義,這個控制器可讓您控制串流。

一次只能有一個讀取器讀取串流;建立讀取器並開始讀取串流時 (即成為有效讀取器),該串流會鎖定讀取器。如要讓其他讀取器接管串流讀取作業,通常需要先「釋放」第一個讀取器,才能執行其他任何操作 (不過您可以「分接」串流)。

建立可讀取的串流

您可以呼叫建構函式 ReadableStream() 來建立可讀取的串流。建構函式具有選用引數 underlyingSource,代表具有方法和屬性的物件,可定義建構的串流例項行為。

underlyingSource

這項作業可使用下列開發人員定義的選用方法:

  • start(controller):物件建構完成後,系統會立即呼叫這個方法。這個方法可以存取串流來源,並執行設定串流功能所需的任何其他作業。如要以非同步方式執行這項程序,方法可以傳回 Promise,指出成功或失敗。傳遞至這個方法的 controller 參數是 ReadableStreamDefaultController
  • pull(controller):可用於在擷取更多區塊時控制串流。只要資料流的內部區塊佇列未滿,系統就會重複呼叫這個函式,直到佇列達到高水位標記為止。如果呼叫 pull() 的結果是 Promise,則在該 Promise 履行前,系統不會再次呼叫 pull()。如果 Promise 遭到拒絕,串流就會發生錯誤。
  • cancel(reason):在串流消費者取消串流時呼叫。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支援下列方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 建構函式的第二個引數同樣是選用引數,即 queuingStrategy。 這個物件可選擇性地定義串流的佇列策略,並採用兩個參數:

  • highWaterMark:非負數,表示使用這項佇列策略的串流高水位線。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負大小。 結果會用來判斷背壓,並透過適當的 ReadableStreamDefaultController.desiredSize 屬性呈現。此外,這個方法也會控管何時呼叫基礎來源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如要從可讀取的串流讀取資料,您需要讀取器,也就是 ReadableStreamDefaultReaderReadableStream 介面的 getReader() 方法會建立讀取器,並將串流鎖定至該讀取器。在串流鎖定期間,除非這個讀取器已釋出,否則無法取得其他讀取器。

ReadableStreamDefaultReader 介面的 read() 方法會傳回 Promise,提供串流內部佇列中下一個區塊的存取權。視串流狀態而定,系統會完成或拒絕要求,並傳回結果。可能原因如下:

  • 如有可用的區塊,系統會以
    { value: chunk, done: false } 形式的物件履行 Promise。
  • 如果串流關閉,系統會以
    { value: undefined, done: true } 形式的物件履行 Promise。
  • 如果串流發生錯誤,系統會拒絕該 Promise,並提供相關錯誤。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 屬性

您可以存取可讀取串流的 ReadableStream.locked 屬性,檢查串流是否已鎖定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可讀取串流程式碼範例

下列程式碼範例會顯示所有步驟的運作情形。首先,請建立 ReadableStream,並在 underlyingSource 引數 (即 TimestampSource 類別) 中定義 start() 方法。這個方法會告知串流的 controller 在十秒內每秒 enqueue() 一個時間戳記。最後,它會告知控制器 close() 串流。如要使用這個串流,請透過 getReader() 方法建立讀取器,並呼叫 read(),直到串流為 done 為止。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同步疊代

在每個 read() 迴圈疊代中檢查串流是否為 done,可能不是最方便的 API。幸好,我們即將推出更完善的解決方案:非同步疊代。

for await (const chunk of stream) {
  console.log(chunk);
}

如要使用非同步疊代,目前的解決方法是透過 Polyfill 實作行為。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Teeing a readable stream

ReadableStream 介面的 tee() 方法會分叉目前的可讀取串流,並傳回含有兩個結果分支的雙元素陣列,做為新的 ReadableStream 執行個體。這樣一來,兩位讀者就能同時讀取串流。舉例來說,如果您想從伺服器擷取回應並串流至瀏覽器,同時串流至 Service Worker 快取,您可能會在 Service Worker 中執行這項操作。由於回應本文只能取用一次,因此您需要兩個副本才能執行這項操作。如要取消串流,您必須取消這兩個產生的分支。一般來說,串流會鎖定一段時間,防止其他讀者鎖定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可讀取的位元組串流

如果是代表位元組的串流,系統會提供可讀取串流的擴充版本,以有效處理位元組,特別是盡量減少複製作業。位元組串流可讓您取得自備緩衝區 (BYOB) 讀取器。預設實作方式可能會產生各種不同的輸出內容,例如 WebSocket 的字串或陣列緩衝區,而位元組串流則保證輸出位元組。此外,BYOB 讀取器還具有穩定性優勢。這是因為緩衝區分離後,可確保不會重複寫入同一緩衝區,因此可避免競爭條件。BYOB 讀取器可重複使用緩衝區,因此能減少瀏覽器執行垃圾收集的次數。

建立可讀取的位元組串流

您可以將額外的 type 參數傳遞至 ReadableStream() 建構函式,建立可讀取的位元組串流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可讀取位元組串流的基礎來源會取得 ReadableByteStreamController 以供操控。其 ReadableByteStreamController.enqueue() 方法會採用 chunk 引數,該引數的值為 ArrayBufferViewReadableByteStreamController.byobRequest 屬性會傳回目前的 BYOB 提取要求,如果沒有,則傳回空值。最後,ReadableByteStreamController.desiredSize 屬性會傳回所需大小,以填滿受控串流的內部佇列。

queuingStrategy

ReadableStream() 建構函式的第二個引數同樣是選用引數,即 queuingStrategy。 這個物件可選擇性地定義串流的佇列策略,並採用一個參數:

  • highWaterMark:非負數的位元組數,表示使用這項佇列策略的串流高水位線。這項屬性用於判斷背壓,並透過適當的 ReadableByteStreamController.desiredSize 屬性呈現。此外,這個方法也會控管何時呼叫基礎來源的 pull() 方法。

getReader()read() 方法

然後,您可以視需要設定 mode 參數,存取 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" }). 這樣一來,您就能更精確地控制緩衝區分配,避免複製作業。如要從位元組串流讀取資料,您需要呼叫 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可讀取的位元組串流程式碼範例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

下列函式會傳回可讀取的位元組串流,以便有效率地以零複製方式讀取隨機產生的陣列。這個方法不會使用預先決定的 1,024 區塊大小,而是嘗試填滿開發人員提供的緩衝區,讓您完全掌控。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可寫入串流的機制

可寫入的串流是您可以寫入資料的目的地,在 JavaScript 中以 WritableStream 物件表示。這項功能可做為基礎接收器頂端的抽象層,也就是寫入原始資料的低階 I/O 接收器。

資料會透過寫入器寫入串流,一次寫入一個區塊。就像讀者中的區塊一樣,區塊可以採用多種形式。您可以使用任何喜歡的程式碼來產生可供撰寫的區塊;撰寫者和相關聯的程式碼稱為「製作人」

建立寫入器並開始寫入串流 (有效寫入器) 時,系統會將其鎖定至該串流。一次只能有一個寫入器寫入可寫入的串流。如要讓其他作家開始寫入串流,通常需要先發布串流,然後再將其他作家附加至串流。

內部佇列會追蹤已寫入串流但尚未由基礎接收器處理的區塊。

佇列策略是物件,可根據內部佇列的狀態,決定串流應如何發出背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定數字 (稱為高水位線) 進行比較。

最終建構項目稱為「控制器」。每個可寫入的串流都有相關聯的控制器,可供您控制串流 (例如中止串流)。

建立可寫入的串流

Streams API 的 WritableStream 介面提供標準抽象化功能,可將串流資料寫入目的地 (也稱為接收器)。這個物件內建背壓和佇列功能。您可以呼叫建構函式 WritableStream() 建立可寫入的串流。這個函式具有選用的 underlyingSink 參數,代表具有方法和屬性的物件,可定義建構的串流例項行為。

underlyingSink

underlyingSink 可包含下列開發人員定義的選用方法。傳遞至部分方法的 controller 參數是 WritableStreamDefaultController

  • start(controller):建構物件時,系統會立即呼叫這個方法。這個方法的內容應以取得基礎接收器的存取權為目標。如要以非同步方式執行這項程序,可以傳回 Promise,指出成功或失敗。
  • write(chunk, controller):當新資料區塊 (在 chunk 參數中指定) 準備好寫入基礎接收器時,就會呼叫這個方法。它可以傳回 Promise,指出寫入作業是否成功。只有在先前的寫入作業成功後,才會呼叫這個方法,且絕不會在串流關閉或中止後呼叫。
  • close(controller):如果應用程式發出信號,表示已完成將區塊寫入串流,系統就會呼叫這個方法。內容應盡一切必要措施,完成對基礎接收器的寫入作業,並釋出存取權。如果這個程序是非同步,可以傳回 Promise,指出成功或失敗。只有在所有已排入佇列的寫入作業都成功後,系統才會呼叫這個方法。
  • abort(reason):如果應用程式發出信號,表示要突然關閉串流並將其設為錯誤狀態,系統就會呼叫這個方法。這項函式可以清除所有保留的資源,與 close() 類似,但即使寫入作業已排入佇列,系統仍會呼叫 abort()。這些區塊將會遭到捨棄。如果是非同步程序,則可傳回 Promise,指出成功或失敗。reason 參數包含 DOMString,說明串流中斷的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 介面代表控制器,可控制 WritableStream 的狀態,例如在設定期間、提交更多區塊以供寫入時,或在寫入結束時。建構 WritableStream 時,系統會為基礎接收器提供對應的 WritableStreamDefaultController 例項,以供操控。WritableStreamDefaultController 只有一個方法:WritableStreamDefaultController.error(),這個方法會導致日後與相關聯串流的所有互動發生錯誤。WritableStreamDefaultController 也支援 signal 屬性,可傳回 AbortSignal 的執行個體,以便在需要時停止 WritableStream 作業。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 建構函式的第二個引數同樣是選用引數,即 queuingStrategy。 這個物件可選擇性地定義串流的佇列策略,並採用兩個參數:

  • highWaterMark:非負數,表示使用這項佇列策略的串流高水位線。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負大小。 結果會用來判斷背壓,並透過適當的 WritableStreamDefaultWriter.desiredSize 屬性呈現。

getWriter()write() 方法

如要寫入可寫入的串流,您需要一個寫入器,也就是 WritableStreamDefaultWriterWritableStream 介面的 getWriter() 方法會傳回 WritableStreamDefaultWriter 的新執行個體,並將串流鎖定至該執行個體。串流鎖定時,其他寫入器都無法取得,直到目前的寫入器釋出為止。

WritableStreamDefaultWriter 介面的 write() 方法會將傳遞的資料區塊寫入 WritableStream 和其基礎接收器,然後傳回會解析的 Promise,指出寫入作業成功或失敗。請注意,「成功」的意義取決於基礎接收器,可能表示區塊已接受,但不一定會安全地儲存至最終目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 屬性

您可以存取可寫入串流的 WritableStream.locked 屬性,檢查該串流是否已鎖定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可寫入的串流程式碼範例

下列程式碼範例會顯示所有步驟的實際運作情形。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

將可讀取的串流管道傳輸至可寫入的串流管道

可讀取的串流可以透過可讀取串流的 pipeTo() 方法,傳輸至可寫入的串流。ReadableStream.pipeTo() 會將目前的 ReadableStream 導向指定的 WritableStream,並傳回在導向程序順利完成時會完成的 Promise,或是在發生任何錯誤時拒絕。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

建立轉換串流

Streams API 的 TransformStream 介面代表一組可轉換的資料。您可以呼叫建構函式 TransformStream() 建立轉換串流,該函式會根據指定的處理常式建立並傳回轉換串流物件。TransformStream() 建構函式會接受選用的 JavaScript 物件做為第一個引數,代表 transformer。這類物件可包含下列任一方法:

transformer

  • start(controller):建構物件時,系統會立即呼叫這個方法。通常會使用 controller.enqueue() 將前置字串區塊加入佇列。這些區塊會從可讀取端讀取,但不會依附於任何寫入可寫入端的作業。如果這個初始程序是非同步,例如需要一些時間才能取得前置字串區塊,函式可以傳回 Promise 來表示成功或失敗;遭拒的 Promise 會導致串流發生錯誤。TransformStream() 建構函式會重新擲回任何擲回的例外狀況。
  • transform(chunk, controller):當原本寫入可寫入端的全新區塊準備好轉換時,系統會呼叫這個方法。串流實作可確保只有在先前的轉換作業成功後,才會呼叫這個函式,且絕不會在 start() 完成前或 flush() 呼叫後呼叫。這個函式會執行轉換串流的實際轉換工作。它可以透過 controller.enqueue() 將結果加入佇列。這允許寫入可寫入端的一個區塊,在可讀取端產生零或多個區塊,視 controller.enqueue() 的呼叫次數而定。如果轉換程序是非同步,這個函式可以傳回 Promise,表示轉換成功或失敗。遭拒絕的 Promise 會導致可讀取和可寫入的轉換串流兩端都發生錯誤。如果未提供任何 transform() 方法,系統會使用身分轉換,將可寫入端未變更的區塊排入可讀取端。
  • flush(controller):所有寫入可寫入端的區塊都已成功通過 transform() 轉換,且可寫入端即將關閉時,系統會呼叫這個方法。通常用於將後置字串區塊加入可讀取端,在此之前也會關閉。如果清除程序是非同步,函式可以傳回 Promise,表示成功或失敗;結果會傳達給 stream.writable.write() 的呼叫端。此外,遭拒的 Promise 會導致串流的可讀取和可寫入端發生錯誤。擲回例外狀況與傳回遭拒的 Promise 相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy排隊策略

TransformStream() 建構函式的第二個和第三個選用參數是選用的 writableStrategyreadableStrategy 排隊策略。如「可讀取」和「可寫入」串流章節所述。

轉換串流程式碼範例

下列程式碼範例顯示運作中的簡易轉換串流。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

透過轉換串流管道傳送可讀取的串流

ReadableStream 介面的 pipeThrough() 方法提供可鏈結的方式,透過轉換串流或任何其他可寫入/可讀取的配對,將目前的串流導向管道。一般來說,管道會鎖定串流,防止其他讀取器鎖定串流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一個程式碼範例 (有點牽強) 說明如何實作「大喊」版本的 fetch(),方法是將傳回的回應 Promise 視為串流,並逐一將區塊轉換為大寫。這種做法的優點是不必等待下載完整文件,處理大型檔案時,這點非常重要。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

示範

下方展示的範例實際運作了可讀取、可寫入和轉換的串流。此外,本範例也包含 pipeThrough()pipeTo() 管道鏈的範例,並示範 tee()。您可以選擇在專屬視窗中執行試用版,或查看原始碼

瀏覽器中可用的實用串流

瀏覽器內建多個實用串流。您可以輕鬆從 Blob 建立 ReadableStreamBlob 介面的 stream() 方法會傳回 ReadableStream,讀取該物件時會傳回 Blob 中包含的資料。此外,請回想一下,File 物件是 Blob 的特定類型,可用於任何可使用 Blob 的情境。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的串流變體分別稱為 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

使用 CompressionStreamDecompressionStream 轉換串流,即可輕鬆壓縮或解壓縮檔案。以下程式碼範例說明如何下載 Streams 規格、在瀏覽器中壓縮 (gzip) 規格,以及將壓縮檔直接寫入磁碟。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

檔案系統存取 APIFileSystemWritableFileStream 和實驗性 fetch() 要求串流,都是實際環境中可寫入的串流範例。

Serial API 大量使用可讀取和可寫入的串流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後,WebSocketStream API 會將串流與 WebSocket API 整合。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

實用資源

特別銘謝

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 審查。 Jake Archibald 的網誌文章有助於我瞭解串流。部分程式碼範例的靈感來自 GitHub 使用者 @bellbind 的探索,而部分散文則大量參考 MDN Web Docs on StreamsStreams Standard作者在撰寫這項規格時,做了非常出色的工作。主頁橫幅圖片由 Ryan Lara 提供,刊登於 Unsplash