WebSocketStream: ストリームと WebSocket API の統合

バックプレッシャーを適用して、アプリが WebSocket メッセージに埋もれたり、WebSocket サーバーにメッセージを送らせたりすることを防ぎます。

背景

WebSocket API

WebSocket API は、WebSocket プロトコルへの JavaScript インターフェースを提供し、ユーザーのブラウザとサーバーの間で双方向のインタラクティブな通信セッションを開始できるようにします。この API を使用すると、サーバーに返信をポーリングすることなく、サーバーにメッセージを送信し、イベント ドリブン レスポンスを受信できます。

Streams API

Streams API を使用すると、JavaScript はネットワーク経由で受信したデータチャンクのストリームにプログラムでアクセスし、必要に応じて処理できます。ストリームのコンテキストで重要なコンセプトは、バックプレッシャーです。これは、単一のストリームまたはパイプチェーンが読み取りまたは書き込みの速度を調整するプロセスです。ストリーム自体またはパイプライン チェーンの後半のストリームがまだビジー状態で、チャンクを受け入れる準備ができていない場合は、必要に応じてチェーンを逆方向に信号を送信し、配信を遅くします。

現在の WebSocket API の問題

受信したメッセージにバックプレッシャーを適用できない

現在の WebSocket API では、メッセージへの対応は WebSocket.onmessage で発生します。これは、サーバーからメッセージを受信したときに EventHandler が呼び出されます。

たとえば、新しいメッセージを受信するたびに負荷の高いデータ処理オペレーションを実行する必要があるアプリケーションがあるとします。以下のコードのようなフローをセットアップしますが、process() 呼び出しの結果を await しているので、問題ありません。

// A heavy data crunching operation.
const process = async (data) => {
  return new Promise((resolve) => {
    window.setTimeout(() => {
      console.log('WebSocket message processed:', data);
      return resolve('done');
    }, 1000);
  });
};

webSocket.onmessage = async (event) => {
  const data = event.data;
  // Await the result of the processing step in the message handler.
  await process(data);
};

不正解です。現在の WebSocket API の問題は、バックプレッシャーを適用する方法がないことです。メッセージが process() メソッドで処理できるよりも速く到着した場合、レンダリング プロセスはそれらのメッセージをバッファリングしてメモリをいっぱいにするか、100% の CPU 使用率で応答しなくなるか、またはその両方を行います。

送信メッセージへのバックプレッシャーの適用が人間工学的に非効率的

送信されたメッセージにバックプレッシャーを適用することは可能ですが、WebSocket.bufferedAmount プロパティのポーリングが必要になりますが、これは非効率的で、人間工学的ではありません。この読み取り専用プロパティは、WebSocket.send() の呼び出しによってキューに追加されたものの、まだネットワークに送信されていないデータのバイト数を返します。この値は、キュー内のすべてのデータが送信されるとゼロにリセットされますが、WebSocket.send() を呼び出し続けた場合、この値は増え続けます。

WebSocketStream API とは

WebSocketStream API は、ストリームを WebSocket API と統合することで、存在しない、または人間工学的にないバックプレッシャーの問題に対処します。つまり、追加費用なしでバックプレッシャーを「無料で」適用できます。

WebSocketStream API の推奨ユースケース

この API を使用できるサイトの例:

  • インタラクティビティ(特に動画や画面共有)を維持する必要がある高帯域幅の WebSocket アプリケーション。
  • 同様に、ブラウザで大量のデータを生成する動画キャプチャやその他のアプリケーションも、サーバーにアップロードする必要があります。バックプレッシャーを使うと、クライアントはメモリにデータを蓄積するのではなく、データの生成を停止できます。

現在のステータス

| ステップ | ステータス | | ------------------------------------------ | ---------------------------- | | 1. 説明メッセージを作成する | [完了][explainer] | | 2. 仕様の最初のドラフトを作成する | [進行中][spec] | | 3. フィードバックを収集して設計を繰り返す | [進行中](#feedback) | | 4. オリジン トライアル | [完了][ot] | | 5. リリース | 開始前 |

WebSocketStream API の使用方法

基本的な例

WebSocketStream API は Promise ベースであるため、最新の JavaScript 環境での処理が自然に感じられます。まず、新しい WebSocketStream を作成し、WebSocket サーバーの URL を渡します。次に、接続が opened になるまで待ちます。これにより、ReadableStreamWritableStream が発生します。

ReadableStream.getReader() メソッドを呼び出して最終的に ReadableStreamDefaultReader を取得し、ストリームが完了するまで、つまり {value: undefined, done: true} という形式のオブジェクトを返すまで、このデータから read() のデータを取得できます。

したがって、WritableStream.getWriter() メソッドを呼び出すことで最終的に WritableStreamDefaultWriter を取得し、これに write() データを取得できます。

  const wss = new WebSocketStream(WSS_URL);
  const {readable, writable} = await wss.opened;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      break;
    }
    const result = await process(value);
    await writer.write(result);
  }

バックプレッシャー

では、先ほど説明したバックプレッシャー機能はどうでしょうか。前述のとおり、「無料」で利用できるため、追加の手順は必要ありません。process() に時間がかかる場合、次のメッセージはパイプラインの準備ができた場合にのみ使用されます。同様に、WritableStreamDefaultWriter.write() ステップは、安全に行える場合にのみ続行されます。

高度な例

WebSocketStream の 2 番目の引数は、将来の拡張を可能にするオプション バッグです。現在、唯一のオプションは protocols です。これは、WebSocket コンストラクタの 2 番目の引数と同じように動作します。

const chatWSS = new WebSocketStream(CHAT_URL, {protocols: ['chat', 'chatv2']});
const {protocol} = await chatWSS.opened;

選択された protocol と潜在的な extensions は、WebSocketStream.opened Promise を通じて利用できる辞書の一部です。ライブ接続に関する情報はすべて、この Promise によって提供されます。接続が失敗した場合は関連性がないためです。

const {readable, writable, protocol, extensions} = await chatWSS.opened;

閉じた WebSocketStream 接続に関する情報

WebSocket API の WebSocket.onclose イベントと WebSocket.onerror イベントから入手できた情報は、WebSocketStream.closed Promise を介して利用できるようになりました。クリーンでないクローズが行われた場合、Promise は拒否され、それ以外の場合は、サーバーから送信されたコードと理由に解決されます。

すべてのステータス コードとその意味については、CloseEvent ステータス コードのリストをご覧ください。

const {code, reason} = await chatWSS.closed;

WebSocketStream 接続を閉じる

WebSocketStream は AbortController で閉じることができます。そのため、AbortSignalWebSocketStream コンストラクタに渡します。

const controller = new AbortController();
const wss = new WebSocketStream(URL, {signal: controller.signal});
setTimeout(() => controller.abort(), 1000);

代わりに WebSocketStream.close() メソッドを使用することもできますが、その主な目的は、サーバーに送信されるコードと理由を指定することです。

wss.close({code: 4000, reason: 'Game over'});

段階的な拡張と相互運用性

現在、Chrome は WebSocketStream API を実装している唯一のブラウザです。従来の WebSocket API との相互運用性のため、受信したメッセージにバックプレッシャーを適用することはできません。送信されたメッセージにバックプレッシャーを適用することは可能ですが、WebSocket.bufferedAmount プロパティのポーリングが必要になりますが、これは非効率的で、人間工学的ではありません。

特徴検出

WebSocketStream API がサポートされているかどうかを確認するには、次のコマンドを使用します。

if ('WebSocketStream' in window) {
  // `WebSocketStream` is supported!
}

デモ

対応ブラウザでは、埋め込み iframe で WebSocketStream API の動作を確認できます。または、Glitch で直接確認することもできます。

フィードバック

Chrome チームでは、WebSocketStream API の使用経験をぜひお聞かせください。

API 設計についてお聞かせください

API について、想定どおりに機能しない部分はありますか。 あるいは、アイデアを実装するために必要なメソッドやプロパティが足りていないか。セキュリティ モデルについてご質問やご意見がありましたら、対応する GitHub リポジトリで仕様に関する問題を報告するか、既存の問題にご意見を記入してください。

実装に関する問題を報告する

Chrome の実装でバグが見つかりましたか? または、実装が仕様と異なっていますか? new.crbug.com でバグを報告します。可能な限り詳細に、再現の簡単な手順を記載し、[Components] ボックスに「Blink>Network>WebSockets」と入力します。Glitch は、すばやく簡単に再現できるケースの共有にうってつけです。

API をサポートすることを伝える

WebSocketStream API の使用を計画していますか。公開サポートによって、Chrome チームは機能の優先順位付けを行うことができ、他のブラウザ ベンダーに対し、そのサポートの重要性を示すことができます。

ハッシュタグ #WebSocketStream を使用して @ChromiumDev にツイートを送信し、どこでどのように使用しているかをお知らせください。

関連情報

謝辞

WebSocketStream API は、Adam RiceYutaka Hirano によって実装されました。ヒーロー画像(作成者: Daan MooijUnsplash