为什么需要 Stream
用 fs.readFile() 读一个 2GB 的日志文件会发生什么?内存爆炸。Stream 将数据分块(chunk)处理,内存占用恒定为 chunk 大小(通常是16KB-64KB):
const fs = require("fs");
const readStream = fs.createReadStream("bigfile.log", { highWaterMark: 64 * 1024 });
readStream.on("data", (chunk) => { processChunk(chunk); });
readStream.on("end", () => { console.log("Done"); });
四类 Stream
| 类型 | 方向 | 用途 |
|---|---|---|
| Readable | 数据源 → | 文件读取、HTTP 请求体 |
| Writable | → 目标 | 文件写入、HTTP 响应 |
| Transform | → 转换 → | 压缩、加密、解析 |
| Duplex | ↔ 双向 | Socket、WebSocket |
管道模式
const { pipeline } = require("stream/promises");
const { createGzip } = require("zlib");
await pipeline(
fs.createReadStream("input.csv"),
csvParser(), // Transform: 解析 CSV
transformRow(), // Transform: 数据清洗
stringify(), // Transform: 转回字符串
createGzip(), // Transform: Gzip 压缩
fs.createWriteStream("output.csv.gz")
);
背压(Backpressure)处理
当写入速度跟不上读取速度时,Stream 自动暂停读取,防止内存溢出。这是 Node.js Stream 最强大的内置机制:
const writeStream = fs.createWriteStream("out.txt");
for (let i = 0; i < 1e6; i++) {
const canContinue = writeStream.write(`Line ${i}\n`);
if (!canContinue) {
await new Promise(resolve => writeStream.once("drain", resolve));
}
}
writeStream.end();
常见实战场景
- 日志处理:逐行读取大日志文件,过滤后写入数据库
- 文件上传:分块接收上传流,实时计算哈希和进度
- ETL 管道:读取 → 转换 → 写入,全程流式处理
- 视频转码:ffmpeg Transform 流实时转码推流