Develop

Node.js Stream 深入:大文件处理与管道模式的优雅实践

✎ -- 字 🕐 -- 分钟
字号

为什么需要 Stream

fs.readFile() 读一个 2GB 的日志文件会发生什么?内存爆炸。Stream 将数据分块(chunk)处理,内存占用恒定为 chunk 大小(通常是16KB-64KB):

const fs = require("fs");
const readStream = fs.createReadStream("bigfile.log", { highWaterMark: 64 * 1024 });
readStream.on("data", (chunk) => { processChunk(chunk); });
readStream.on("end", () => { console.log("Done"); });

四类 Stream

类型方向用途
Readable数据源 →文件读取、HTTP 请求体
Writable→ 目标文件写入、HTTP 响应
Transform→ 转换 →压缩、加密、解析
Duplex↔ 双向Socket、WebSocket

管道模式

const { pipeline } = require("stream/promises");
const { createGzip } = require("zlib");

await pipeline(
  fs.createReadStream("input.csv"),
  csvParser(),           // Transform: 解析 CSV
  transformRow(),        // Transform: 数据清洗
  stringify(),           // Transform: 转回字符串
  createGzip(),          // Transform: Gzip 压缩
  fs.createWriteStream("output.csv.gz")
);

背压(Backpressure)处理

当写入速度跟不上读取速度时,Stream 自动暂停读取,防止内存溢出。这是 Node.js Stream 最强大的内置机制:

const writeStream = fs.createWriteStream("out.txt");
for (let i = 0; i < 1e6; i++) {
  const canContinue = writeStream.write(`Line ${i}\n`);
  if (!canContinue) {
    await new Promise(resolve => writeStream.once("drain", resolve));
  }
}
writeStream.end();

常见实战场景

  • 日志处理:逐行读取大日志文件,过滤后写入数据库
  • 文件上传:分块接收上传流,实时计算哈希和进度
  • ETL 管道:读取 → 转换 → 写入,全程流式处理
  • 视频转码:ffmpeg Transform 流实时转码推流