WebSocketStream:将流与 WebSocket API 集成

通过应用回压,可防止应用被 WebSocket 消息淹没或向 WebSocket 服务器发送过多消息。

背景

WebSocket APIWebSocket 协议提供了 JavaScript 接口,从而能够在用户的浏览器和服务器之间打开双向互动通信会话。借助此 API,您可以向服务器发送消息并接收事件驱动型响应,而无需轮询服务器以获取回复。

Streams API

借助 Streams API,JavaScript 可以以编程方式访问通过网络接收的数据块流,并根据需要对其进行处理。在数据流上下文中,有一个重要的概念是背压。这是单个数据流或管道链调节读取或写入速度的过程。如果数据流本身或管道链中较后的数据流仍处于忙碌状态,且尚未准备好接受更多分块,则会通过链向后发送信号,以便根据需要减慢传送速度。

当前 WebSocket API 存在的问题

无法对收到的消息应用回压

使用当前的 WebSocket API,对消息做出响应是在 WebSocket.onmessage 中进行的,该函数是在从服务器收到消息时调用的 EventHandler

假设您有一个应用,每当收到新消息时都需要执行大量数据处理操作。您可能需要设置类似于以下代码的流程,并且由于您对 process() 调用的结果进行了 await,因此应该没问题,对吗?

// A heavy data crunching operation.
const process = async (data) => {
  return new Promise((resolve) => {
    window.setTimeout(() => {
      console.log('WebSocket message processed:', data);
      return resolve('done');
    }, 1000);
  });
};

webSocket.onmessage = async (event) => {
  const data = event.data;
  // Await the result of the processing step in the message handler.
  await process(data);
};

答错了!当前 WebSocket API 的问题在于,无法应用回压。如果消息到达速度比 process() 方法能够处理的速度快,则渲染进程将通过缓冲这些消息来填满内存,或者由于 CPU 使用率达到 100% 而无响应,或者同时出现这两种情况。

对已发送的消息应用回压不符合人体工学

可以对已发送的消息应用回压,但需要轮询 WebSocket.bufferedAmount 属性,这既低效又不符合人体工学。此只读属性会返回已使用 WebSocket.send() 调用加入队列但尚未传输到网络的数据字节数。发送所有队列数据后,此值会重置为零,但如果您继续调用 WebSocket.send(),该值将继续增加。

什么是 WebSocketStream API?

WebSocketStream API 通过将流与 WebSocket API 集成,解决了回压不存在或不符合人体工学的问题。这意味着,可以“免费”应用回压,而无需支付任何额外费用。

WebSocketStream API 的建议用例

可以使用此 API 的网站示例包括:

  • 需要保留互动性的高带宽 WebSocket 应用,尤其是视频和屏幕共享应用。
  • 同样,视频截取和其他在浏览器中生成大量需要上传到服务器的数据的应用。借助回压,客户端可以停止生成数据,而不是在内存中累积数据。

当前状态

步骤 状态
1. 创建铺垫消息 完成
2. 创建规范的初始草稿 进行中
3. 收集反馈并迭代设计 进行中
4. 来源试用 完成
5. 发布 尚未开始

如何使用 WebSocketStream API

WebSocketStream API 基于 promise,这使得在现代 JavaScript 世界中处理它会很自然。首先,构建一个新的 WebSocketStream 并将 WebSocket 服务器的网址传递给它。接下来,您需要等待连接变为 opened,这会导致 ReadableStream 和/或 WritableStream

通过调用 ReadableStream.getReader() 方法,您最终会获得 ReadableStreamDefaultReader,然后您可以从中read()数据,直到流结束,也就是直到它返回 {value: undefined, done: true} 格式的对象。

因此,通过调用 WritableStream.getWriter() 方法,您最终会获得 WritableStreamDefaultWriter,然后可以将数据write()到该对象。

  const wss = new WebSocketStream(WSS_URL);
  const {readable, writable} = await wss.opened;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      break;
    }
    const result = await process(value);
    await writer.write(result);
  }

背压

承诺的背压功能怎么样了? 您无需执行任何额外步骤,即可“免费”获得。如果 process() 需要额外的时间,则只有在流水线准备就绪后,才会使用下一条消息。同样,只有在安全的情况下,WritableStreamDefaultWriter.write() 步骤才会继续。

高级示例

WebSocketStream 的第二个参数是一个选项包,以便将来进行扩展。唯一的选项是 protocols,其行为与 WebSocket 构造函数的第二个参数相同:

const chatWSS = new WebSocketStream(CHAT_URL, {protocols: ['chat', 'chatv2']});
const {protocol} = await chatWSS.opened;

所选的 protocol 以及可能的 extensions 是通过 WebSocketStream.opened promise 提供的字典的一部分。此 promise 会提供有关实时连接的所有信息,因为连接是否失败与此无关。

const {readable, writable, protocol, extensions} = await chatWSS.opened;

有关已关闭的 WebSocketStream 连接的信息

WebSocket API 中的 WebSocket.oncloseWebSocket.onerror 事件提供的信息现在可通过 WebSocketStream.closed promise 获取。如果发生异常关闭,Promise 会被拒绝,否则它会解析为服务器发送的代码和原因。

CloseEvent 状态代码列表中介绍了所有可能的状态代码及其含义。

const {code, reason} = await chatWSS.closed;

关闭 WebSocketStream 连接

WebSocketStream 可以使用 AbortController 关闭。因此,请将 AbortSignal 传递给 WebSocketStream 构造函数。

const controller = new AbortController();
const wss = new WebSocketStream(URL, {signal: controller.signal});
setTimeout(() => controller.abort(), 1000);

作为替代方案,您还可以使用 WebSocketStream.close() 方法,但其主要目的是允许指定要发送到服务器的代码和原因。

wss.close({code: 4000, reason: 'Game over'});

渐进增强和互操作性

Chrome 目前是唯一实现 WebSocketStream API 的浏览器。 为了与传统版 WebSocket API 实现互操作性,无法对收到的消息应用回压。可以对已发送的消息应用回压,但需要轮询 WebSocket.bufferedAmount 属性,这既低效又不符合人体工学。

功能检测

如需检查是否支持 WebSocketStream API,请使用以下命令:

if ('WebSocketStream' in window) {
  // `WebSocketStream` is supported!
}

演示

在支持的浏览器中,您可以在嵌入的 iframe 中或直接在 Glitch 上查看 WebSocketStream API 的运作方式。

反馈

Chrome 团队希望了解您使用 WebSocketStream API 的体验。

请向我们说明 API 设计

API 是否存在某些方面未按预期运行? 或者,您是否缺少实现想法所需的方法或属性? 对安全模型有疑问或意见? 在相应的 GitHub 代码库中提交规范问题,或在现有问题中添加您的想法。

报告实现方面的问题

您是否发现了 Chrome 实现中的 bug? 或者实现方式是否与规范不同? 请访问 new.crbug.com 提交 bug。务必提供尽可能详细的信息,以及简单的重现说明,并在 Components 框中输入 Blink>Network>WebSockets故障非常适合分享快速轻松的再现案例。

显示对该 API 的支持

您是否打算使用 WebSocketStream API? 您的公开支持有助于 Chrome 团队确定功能的优先级,并向其他浏览器供应商表明支持这些功能的重要性。

使用 #WebSocketStream 标签向 @ChromiumDev 发推文,告诉我们您在哪里以及如何使用该功能。

实用链接

致谢

WebSocketStream API 由 Adam RiceYutaka Hirano 实现。