WebSocketStream:将流与 WebSocket API 集成

通过应用回压,可防止应用被 WebSocket 消息淹没或向 WebSocket 服务器发送过多消息。

背景

WebSocket API 提供了 WebSocket 协议的 JavaScript 接口,借助该接口,您可以在用户的浏览器与服务器之间打开双向交互式通信会话。借助此 API,您可以向服务器发送消息并接收事件驱动型响应,而无需轮询服务器以获取回复。

Streams API

Streams API 可让 JavaScript 以编程方式访问通过网络接收的数据块流,并根据需要进行处理。在数据流上下文中,有一个重要的概念是背压。这是单个流或管道链调节读取或写入速度的过程。如果数据流本身或管道链中较后的数据流仍处于忙碌状态,且尚未准备好接受更多分块,则会通过链向后发送信号,以便根据需要减慢传送速度。

当前 WebSocket API 存在的问题

无法对收到的消息应用背压

使用当前的 WebSocket API 时,对消息的回应发生在 WebSocket.onmessage 中,当从服务器收到消息时,系统会调用 EventHandler

假设您有一个应用,每当收到新消息时都需要执行大量数据处理操作。您可能会设置类似于以下代码的流程,而且由于您通过 await 方法返回 process() 调用的结果,应该没问题吧?

// A heavy data crunching operation.
const process = async (data) => {
  return new Promise((resolve) => {
    window.setTimeout(() => {
      console.log('WebSocket message processed:', data);
      return resolve('done');
    }, 1000);
  });
};

webSocket.onmessage = async (event) => {
  const data = event.data;
  // Await the result of the processing step in the message handler.
  await process(data);
};

答错了!当前 WebSocket API 的问题在于,无法应用回压。当消息到达速度快于 process() 方法可以处理它们的速度时,渲染进程会通过缓冲这些消息来填满内存,或因 CPU 使用率 100% 而无响应,或者两者兼而有之。

对已发送的消息应用回压不符合人体工学

可以对已发送的消息应用回压,但需要轮询 WebSocket.bufferedAmount 属性,这既低效又不符合人体工学。此只读属性会返回使用 WebSocket.send() 调用加入队列但尚未传输到网络的数据的字节数。发送所有队列数据后,此值会重置为零,但如果您继续调用 WebSocket.send(),则该值会继续增加。

什么是 WebSocketStream API?

WebSocketStream API 通过将流与 WebSocket API 集成,解决了不存在或不符合人体工程学的背压问题。这意味着,可以“免费”应用回压,而无需支付任何额外费用。

WebSocketStream API 的建议用例

可以使用此 API 的网站示例包括:

  • 需要保留互动性的高带宽 WebSocket 应用,尤其是视频和屏幕共享应用。
  • 同样,视频捕获和其他在浏览器中生成大量需要上传到服务器的数据的应用。借助背压,客户端可以停止生成数据,而不是在内存中累积数据。

当前状态

步骤 状态
1. 创建铺垫消息 完成
2. 创建规范的初始草稿 进行中
3. 收集反馈并迭代设计 进行中
4. 源试用 完成
5. 发布 尚未开始

如何使用 WebSocketStream API

WebSocketStream API 基于 promise,这使得在现代 JavaScript 世界中处理它变得自然而然。首先,构建一个新的 WebSocketStream 并将 WebSocket 服务器的网址传递给它。接下来,您需要等待连接变为 opened,这会导致 ReadableStream 和/或 WritableStream

通过调用 ReadableStream.getReader() 方法,您最终会获得一个 ReadableStreamDefaultReader,然后您可以从该数据流的 read() 数据中开始read()数据,直到数据流完成(即,直到它返回 {value: undefined, done: true} 形式的对象为止)。

因此,通过调用 WritableStream.getWriter() 方法,您最终会获得 WritableStreamDefaultWriter,然后可以将数据write()到该对象。

  const wss = new WebSocketStream(WSS_URL);
  const {readable, writable} = await wss.opened;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      break;
    }
    const result = await process(value);
    await writer.write(result);
  }

背压

承诺的背压功能怎么样? 您无需执行任何额外步骤,即可“免费”获得此功能。如果 process() 需要额外的时间,则只有在流水线准备就绪后,才会使用下一条消息。同样,WritableStreamDefaultWriter.write() 步骤仅在安全的情况下才会继续。

高级示例

WebSocketStream 的第二个参数是一个允许将来扩展的选项包。唯一的选项是 protocols,其行为与 WebSocket 构造函数的第二个参数相同:

const chatWSS = new WebSocketStream(CHAT_URL, {protocols: ['chat', 'chatv2']});
const {protocol} = await chatWSS.opened;

所选的 protocol 以及可能的 extensions 是通过 WebSocketStream.opened promise 提供的字典的一部分。有关实时连接的所有信息都由此 promise 提供,因为如果连接失败,这些信息无关。

const {readable, writable, protocol, extensions} = await chatWSS.opened;

有关关闭的 WebSocketStream 连接的信息

WebSocket API 中的 WebSocket.oncloseWebSocket.onerror 事件提供的信息现在可通过 WebSocketStream.closed promise 获取。如果发生异常关闭,promise 将拒绝,否则将解析为服务器发送的代码和原因。

CloseEvent 状态代码列表中介绍了所有可能的状态代码及其含义。

const {code, reason} = await chatWSS.closed;

关闭 WebSocketStream 连接

WebSocketStream 可以使用 AbortController 关闭。因此,请将 AbortSignal 传递给 WebSocketStream 构造函数。

const controller = new AbortController();
const wss = new WebSocketStream(URL, {signal: controller.signal});
setTimeout(() => controller.abort(), 1000);

作为替代方案,您还可以使用 WebSocketStream.close() 方法,但其主要目的是允许指定要发送到服务器的代码和原因。

wss.close({code: 4000, reason: 'Game over'});

渐进增强和互操作性

Chrome 是目前唯一实现 WebSocketStream API 的浏览器。为了实现与经典 WebSocket API 的互操作性,无法对收到的消息应用背压。可以对已发送的消息应用回压,但需要轮询 WebSocket.bufferedAmount 属性,这既低效又不符合人体工学。

功能检测

如需检查是否支持 WebSocketStream API,请使用以下命令:

if ('WebSocketStream' in window) {
  // `WebSocketStream` is supported!
}

演示

在支持的浏览器中,您可以在嵌入的 iframe 中或直接在 Glitch 上查看 WebSocketStream API 的运作方式。

反馈

Chrome 团队希望了解您使用 WebSocketStream API 的体验。

请向我们说明 API 设计

API 是否存在某些方面未按预期运行? 或者,您是否缺少实现自己的想法所需的方法或属性? 对安全模型有疑问或意见? 在相应的 GitHub 代码库中提交规范问题,或在现有问题中添加您的想法。

报告实现存在的问题

您是否发现了 Chrome 实现中的 bug? 或者实现方式是否与规范不同? 在 new.crbug.com 上提交 bug。请务必提供尽可能多的细节信息,提供简单的重现说明,并在组件框中输入 Blink>Network>WebSocketsGlitch 非常适合用于分享快速简便的重现案例。

表示对 API 的支持

您打算使用 WebSocketStream API 吗? 您的公开支持可帮助 Chrome 团队确定各项功能的优先级,并向其他浏览器供应商展示支持这些功能的重要性。

使用 # 标签 #WebSocketStream@ChromiumDev 发送一条推文,告诉我们您使用它的位置和方式。

实用链接

致谢

WebSocketStream API 由 Adam RiceYutaka Hirano 实现。