领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

写了五年前端,你可能真的不懂 WritableStream?

nixiaole 2025-08-02 20:57:11 知识剖析 2 ℃

家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发,您的支持是我不断创作的动力。

前面章节讲述过可读流,本篇文章将带着大家一起聊聊可写流。

1. 可写流的构造

可写流是一个可以写入数据的目标,在 JavaScript 中用 WritableStream 对象表示,其充当底层接收器(用于写入原始数据的底层 I/O 接收器)顶层的抽象。

数据通过写入器 (Writer) 一次写入一个数据块,数据块可以采用多种形式,与读取器中的数据块类似。开发者可以使用任何代码来生成可供写入的数据块,而写入器及其相关代码称为生产者 (Producer)。

当一个写入器被创建并开始写入流(活动写入器)时,则相当于被锁定到当前流,一次只能有一个写入器写入可写流。如果希望另一个写入器写入流则需要先释放,然后再连接另一个写入器。

  • 内部队列 (Internal Queue) :用于跟踪已写入流但尚未被底层接收器处理的数据块
  • 排队策略 (Queuing Strategy): 是一个对象,其根据流内部队列的状态决定流应如何发出背压信号。排队策略会为每个块分配一个大小,并将队列中所有块的总大小与高水位线进行比较
  • 控制器 (Controller):每个可写流都有一个关联的控制器,允许开发者控制该流,例如:中止流

2. 如何使用可写流

WritableStream 接口提供了一个标准的抽象,用于将流数据写入目标,该对象内置了背压和队列功能。其有一个可选的 underlineSink 参数,包含定义构造流实例行为的方法和属性。

underlyingSink 可以包含以下可选的、由开发者自定义的方法,同时传递给这些方法的 controller 参数是
WritableStreamDefaultController。

  • start(controller):此方法在对象构造完成后立即调用,此方法的内容旨在访问底层接收器。如果此过程是异步,则可以返回一个 Promise 来表示成功或失败

注意:可以在该阶段打开文件 (fs.openSync)、建立网络连接、分配内存等,也可以设置状态或变量,与外部系统通信等等

  • write(chunk, controller):当新数据块准备好写入底层接收器时调用此方法,其可以返回一个 Promise 来指示写入操作的成功或失败。此方法仅在先前的写入操作成功后才调用,而不会在流关闭或中止后调用
  • close(controller):如果应用发出信号表示已完成向流写入数据块,则会调用此方法。其会执行一切必要操作以完成对底层接收器的写入,并释放对其访问权限。如果此过程是异步的,则可以返回一个 Promise。只有在所有排队的写入操作都成功后,才会调用此方法。
  • abort(reason):如果应用发出信号表示希望突然关闭流并将其置于错误状态,则会调用此方法。此时可以清理任何占用的资源,类似于 close(),但即使写入操作已排队也会调用 abort(),此时数据块将被丢弃。如果此过程是异步,则可以返回一个 Promise 。reason 参数包含一个 DOMString,用于描述流中止的原因。

WritableStream 的使用非常简单:

const writableStream = new WritableStream({
  start(controller) {},
  write(chunk, controller) {},
  close(controller) {},
  abort(reason) {},
});

其中
WritableStreamDefaultController
表示一个控制器,用于在设置期间、提交更多数据块进行写入时以及写入结束时控制 WritableStream 的状态。构造 WritableStream 时,底层接收器会获得一个相应的
WritableStreamDefaultController ,其支持以下属性和方法:

  • error():该方法会导致与关联流的任何后续交互都产生错误
  • signal:返回 AbortSignal 实例,允许在需要时停止 WritableStream 操作

WritableStream() 函数的第二个参数用于定义流的排队策略,包含两个参数:

  • highWaterMark:一个非负数,表示使用此排队策略的流的高水位线
  • size(chunk):用于计算并返回给定块值的有限非负大小,结果用于确定背压,并通过相应的 WritableStreamDefaultWriter.desiredSize 属性来体现

下面示例在 WritableStream 中使用 size(chunk)、highWaterMark、writer.desiredSize、writer.ready 等来确定背压:

const writableStream = new WritableStream(
  {
    write(chunk, controller) {
      console.log(`Writing chunk: ${chunk}`);
      return new Promise((resolve) => {
        // 增加异步处理时间以模拟更慢的处理
        setTimeout(() => {
          console.log(`Chunk processed: ${chunk}`);
          resolve();
        }, 500);
      });
    },
    close() {
      console.log("Stream closed");
    },
    abort(err) {
      console.error("Stream error:", err);
    },
  },
  {
    // 使用 chunk 的长度作为大小
    size(chunk) {
      return chunk.length;
    },
    // 设置高水位线为 8,写入一个字符串就满了
    highWaterMark: 8,
  }
);
const writer = writableStream.getWriter();
// 模拟数据写入,且让数据块更快地达到高水位线
const dataChunks = ["12345678", "90ABCDEF", "GHIJKLMN", "OPQRSTUV", "WXYZ1234"];
// 真实写入数据
async function writeData() {
  for (let i = 0; i < dataChunks.length; i++) {
    const chunk = dataChunks[i];
    console.log(`Attempting to write chunk: ${chunk}`);
    // 写入数据块到流
    const writePromise = writer.write(chunk);
    // 检查流的 desiredSize 以检测背压
    if (writer.desiredSize <= 0) {
      console.log("Backpressure detected: Stream is applying backpressure");
      await writer.ready;
      // 等待流准备好继续写入
    }
    await writePromise;
    console.log(`Chunk ${chunk} has been written`);
  }
  // 关闭流
  await writer.close();
  console.log("All data written and stream closed.");
}
writeData().catch(console.error);

3. WritableStream 的 getWriter() 和 write() 方法

要写入可写流,需要一个写入器
WritableStreamDefaultWriter
,WritableStream.getWriter() 方法返回一个新的
WritableStreamDefaultWriter 实例,并将流锁定到该实例。在流锁定期间,无法获取其他写入器。


WritableStreamDefaultWriter 接口的
write() 方法将传递的数据块写入 WritableStream 及其底层接收器,然后返回一个 Promise 表示写入成功或失败。请注意,“成功” 的含义取决于底层接收器;可能表示数据块已被接受,而不一定表示已安全地保存到最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write("The first chunk!");

开发者还可以通过访问可写流的 WritableStream.locked 属性来检查可写流是否被锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? "indeed" : "not"} locked.`);

以下示例创建了一个带有自定义接收器的 WritableStream,然后调用流的 getWriter() 方法,该方法返回一个
WritableStreamDefaultWriter 实例。接下来,将几个字符串写入流。最后, close() 返回一个 Promise,当所有写入操作成功完成后,该 Promise 将被 resolve。

const writableStream = new WritableStream({
  // 当新的数据块准备好写入底层接收器时调用
  write(chunk) {
    const textElement = document.getElementById("text-output");
    textElement.textContent += chunk;
  },
});

const writer = writableStream.getWriter();
try {
  writer.write("Hello,");
  writer.write("world!\n");
  writer.write("This has been a demo!\n");
  await writer.close();
  // 等待所有 chunk 写入成功
  console.log("All chunks written");
} catch (error) {
  console.error("Stream error:", error);
}

4. 使用可写流的示例

4.1 将可读流通过管道传输到可写流

开发者可以通过可读流的 pipeTo() 方法将可读流传输到可写流。

ReadableStream.pipeTo() 将当前的 ReadableStream 传输到指定的 WritableStream,并返回一个 Promise,该 Promise 在传输成功时返回 fulfilled,在遇到任何错误时返回 rejection。

const readableStream = new ReadableStream({
  start(controller) {
    // 构造时调用
    console.log("[start readable]");
    controller.enqueue("a");
    controller.enqueue("b");
    controller.enqueue("c");
  },
  pull(controller) {
    // 当控制器 controller 队列为空时调用
    console.log("[pull]");
    controller.enqueue("d");
    controller.close();
  },
  cancel(reason) {
    // 流取消时调用
    console.log("[cancel]", reason);
  },
});
const writableStream = new WritableStream({
  start(controller) {
    // 构造时调用
    console.log("[start writable]");
  },
  async write(chunk, controller) {
    // 调用 writer.write() 时触发
    console.log("[write]", chunk);
    // 等待下一次 write
    await new Promise((resolve) =>
      setTimeout(() => {
        document.body.textContent += chunk;
        resolve();
      }, 1_000)
    );
  },
  close(controller) {
    console.log("[close]");
  },
  abort(reason) {
    console.log("[abort]", reason);
  },
});
await readableStream.pipeTo(writableStream);
console.log("[finished]");

4.2 将接口返回值流式传递给 DOM

下面示例将请求的
https://slow.com/test.json 的内容流式写入到指定的 DOM 元素中:

window.addEventListener("DOMContentLoaded", () => {
  const renderButton2 = document.getElementById("renderButton2");
  const renderTarget2 = document.getElementById("renderTarget2");
  renderButton2.addEventListener("click", async () => {
    renderButton2.disabled = true;
    try {
      await streamHTMLInto(renderTarget2);
    } finally {
      renderButton2.disabled = false;
    }
  });
});
async function streamHTMLInto(targetElement) {
  const response = await fetch("https://slow.com/test.json");
  await response.body
    .pipeThrough(new TextDecoderStream())
    .pipeTo(appendToDOM(targetElement));
}

// 插入到 DOM
function appendToDOM(targetElement) {
  if (targetElement.firstChild) {
    targetElement.removeChild(targetElement.firstChild);
  }
  const newDocument = document.implementation.createHTMLDocument();
  newDocument.write("<div>");
  // 此时 document.write 只是写了一个 <div> 元素
  targetElement.appendChild(newDocument.body.firstChild);
  // appendChild 的参数 dom 如果一开始就存在,调用后只是简单移动元素
  // 目标元素 targetElement 此时只有一个 <div> 标签,而且还没有闭合
  return new WritableStream({
    write(chunk) {
      newDocument.write(chunk);
    },
    close() {
      newDocument.close();
    },
    abort(reason) {
      newDocument.close();
    },
  });
}

上面 appendToDOM 函数的 “魔法” 之处在于:其创建了一个临时 HTML 文档,用 document.write() 动态写入内容,并通过 DOM 操作把其挂载到页面上;再配合 WritableStream,实现了边下载边渲染的效果,非常适合流式 HTML 加载场景。

参考资料

https://web.dev/articles/streams

https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Concepts

https://jfhr.me/streaming-html-inside-the-dom/

https://developer.mozilla.org/en-US/docs/Web/API/DOMImplementation/createHTMLDocument

Tags:

最近发表
标签列表