tech share
  • tech-share
  • Engineering
    • 登录鉴权
    • SSR 页面路由
    • npm 版本号
    • 缓存
    • 数据库容灾
    • 动态效果导出 gif
    • Chrome-devtools
    • C 端 H5 性能优化
    • Docker
    • Monorepo 最佳实践
    • 技术架构演化
    • 项目规范最佳实践
    • snowpack
    • 静态资源重试
    • 前端页面渲染分析
    • Git
    • 前端重构
    • 微前端
    • 项目依赖分析
    • 前端监控原理
    • webpack
    • BS 架构与 CS 架构
    • HTTPS
    • package-lock.json 生成逻辑
    • SVN(Subversion)
    • 数据库分类
    • gulp
    • 前端架构
    • Bundle & Bundless
    • 控制反转 IoC
  • JavaScript
    • Javascript 性能
    • JavaScript 原型(2) - 原型与原型链
    • JavaScript 原型(1) - 构造函数
    • JavaScript - Promise
    • ES6 解构赋值
    • 前端离线化
    • Proxy
    • Object.defineProperty()简介
    • TypeScript
  • MachineLearning
    • GAN生成对抗网络
    • 虚拟对抗训练
    • 深度度量学习
    • 原型网络
    • PyTorch优化器
    • 隐马尔可夫模型2
    • Shapley Value 算法
    • Embarassingly Autoencoder算法
    • AutoRec算法及其后续发展
    • 深度学习常用激活函数
    • 序列预测ConvTran算法
    • 联邦学习
    • 深度学习推荐系统算法整理
    • 隐马尔可夫模型
    • 黎曼优化方法
    • FM算法
    • 机器学习常见评价指标
    • VAE算法
    • Adam优化器详解
    • Transformer算法
    • Self-attention 推荐算法
    • CNN 卷积神经网络
    • 图嵌入
    • 集成学习算法
    • RecBole开源框架
    • NCE-PLRec
    • 深度学习初始化方法
    • RNN循环神经网络
    • PyTorch数据处理
    • PyTorch安装和基本操作
    • XGBoost算法
    • NCF算法与简单MF的对比
    • 计算最佳传输
  • CSS
    • 什么是BFC
    • 纯CSS实现可拖动布局
    • 滚动穿透解决方案
  • React
    • React 生命周期
    • React Ref
    • React Hooks
    • SWR
    • React 数据流
    • React 函数式组件和类组件的区别
  • 可视化
    • OffscreenCanvas
    • Echarts 平滑曲线端点为什么不平滑
    • 颜色空间
    • 词云布局解析
    • 3D 数学基础
    • Canvas 图片处理
    • GLGL ES
    • WebGL 中绘制直线
    • Graphics API
    • 现代计算机图形学基础
    • Canvas 灰度
  • Vue
    • Vue2.x全局挂载整理
    • Vue2.6.x源码阅读
      • Vue2.6.x源码阅读 - 2.目录结构分析
      • Vue2.6.x源码阅读 - 4.源码阅读-platform
      • Vue2.6.x源码阅读 - 1.准备工作
      • Vue2.6.x源码阅读 - 5.源码阅读-core-Vue构造函数
      • Vue2.6.x源码阅读 - 7.源码阅读-core-响应式原理
      • Vue2.6.x源码阅读 - 3.源码阅读-shared
      • Vue2.6.x源码阅读 - 6.源码阅读-core-组件挂载
    • Vue + TypeScript Web应用实践
    • Vue2.x指令
    • nextTick()的使用
    • vue-cli2.x 的使用与项目结构分析
    • Vue响应式原理及总结
    • VueX的使用
    • Electron-Vue + Python 桌面应用实践
    • Vite
    • Vue组件通信整理
    • 记录一个问题的探索过程
  • Linux
    • memcg
  • GameDev
    • 游戏中的几种投影视图
    • 从零开始写软渲染器06
    • 从零开始写软渲染器05
    • 从零开始写软渲染器04
    • 从零开始写软渲染器03
    • 从零开始写软渲染器02
    • 从零开始写软渲染器01
    • 从零开始写软渲染器00
    • 现代游戏常用的几种寻路方案(一)
  • Node
    • NPM Dependency
    • Node 优势
    • Node Stream
    • Node 模块系统
  • HTML
    • html5语义与结构元素
  • 跨端
    • Flutter 介绍
  • Golang
    • Golang 基础
  • AR
    • SceneKit
由 GitBook 提供支持
在本页
  • 概念
  • Stream 在 Node 中的具体应用
  • 类型
  • Readable
  • Flowing Mode
  • Non-Flowing Mode
  • Writable
  • pipe
  • Duplex
  • Transform

这有帮助吗?

  1. Node

Node Stream

概念

Stream 是 Node.js 中最重要的组件和模式之一,流是一组有序的,有起点和终点的字节数据传输手段,它是一个抽象的接口。 流是数据的集合,区别在于流中的数据可能不会立刻就全部可用,并且你无需一次性地把这些数据全部放入内存。这使得流在操作大量数据或是数据从外部来源逐段发送过来的时候变得非常有用。 每一个流对象都是 EventEmitter 类的一个实例,都有相应的 on 和 emit 方法

stream 是 node 的核心模块,引入方式如下:

let Stream = require("stream");

Stream 在 Node 中的具体应用

  1. http 协议中的 请求 req 和响应 res

  2. tcp 协议中的套接字对象 sockets

  3. fs 文件模块中的可读流和可写流

  4. process 进程模块中的 stdout stderr

  5. zlib 中的 streams

类型

  1. Readable

  2. Writeable

  3. Duplex

  4. Transform

Readable

Readable Stream 存在两种模式,一种是叫做 Flowing Mode,流动模式,在 Stream 上绑定 ondata 方法就会自动触发这个模式

Flowing Mode

const readable = getReadableStreamSomehow();
readable.on("data", chunk => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

资源的数据流并不是直接流向消费者,而是先 push 到缓存池,缓存池有一个水位标记 highWatermark,超过这个标记阈值,push 的时候会返回 false,什么场景下会出现这种情况呢?

  1. 消费者主动执行了 .pause()

  2. 消费速度比数据 push 到缓存池的生产速度慢

有个专有名词来形成这种情况,叫做「背压」,Writable Stream 也存在类似的情况。

流动模式,这个名词还是很形象的,缓存池就像一个水桶,消费者通过管口接水,同时,资源池就像一个水泵,不断地往水桶中泵水,而 highWaterMark 是水桶的浮标,达到阈值就停止蓄水。

Non-Flowing Mode

暂停模式,这是 Stream 的预设模式,Stream 实例的 _readableState.flow 有三个状态,分别是:

  • _readableState.flow = null,暂时没有消费者过来

  • _readableState.flow = false,主动触发了 .pause()

  • _readableState.flow = true,流动模式

当我们监听了 onreadable 事件后,会进入这种模式,比如:

const myReadable = new MyReadable(dataSource);
myReadable.setEncoding("utf8");
myReadable.on("readable", () => {});

监听 readable 的回调函数第一个参数不会传递内容,需要我们通过 myReadable.read() 主动读取

资源池会不断地往缓存池输送数据,直到 highWaterMark 阈值,消费者监听了 readable 事件并不会消费数据,需要主动调用 .read([size]) 函数才会从缓存池取出,并且可以带上 size 参数,用多少就取多少

只要数据达到缓存池都会触发一次 readable 事件,有可能出现「消费者正在消费数据的时候,又触发了一次 readable 事件,那么下次回调中 read 到的数据可能为空」的情况。我们可以通过 _readableState.buffer 来查看缓存池到底缓存了多少资源

let once = false;
myReadable.on("readable", chunk => {
  console.log(myReadable._readableState.buffer.length);
  if (once) return;
  once = true;
  console.log(myReadable.read());
});

上面的代码我们只消费一次缓存池的数据,那么在消费后,缓存池又收到了一次资源池的 push 操作,此时还会触发一次 readable 事件,我们可以看看这次存了多大的 buffer。

需要注意的是,buffer 大小也是有上限的,默认设置为 16kb,也就是 16384 个字节长度,它最大可设置为 8Mb,没记错的话,这个值好像是 Node 的 new space memory 的大小。

上面介绍了 Readable Stream 大概的机制,还有很多细节部分没有提到,比如 Flowing Mode 在不同 Node 版本中的 Stream 实现不太一样,实际上,它有三个版本,上面提到的是第 2 和 第 3 个版本的实现;再比如 Mixins Mode 模式,一般我们只推荐(允许)使用 ondata 和 onreadable 的一种来处理 Readable Stream,但是如果要求在 Non-Flowing Mode 的情况下使用 ondata 如何实现呢?那么就可以考虑 Mixins Mode 了。

Writable

数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入队列池缓存起来

当生产者写入速度过快,把队列池装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,Writable Stream 会给生产者发送一个 drain 消息,让它恢复生产。

pipe

用法

readable.pipe(writable);

readable 通过 pipe(管道)传输给 writable

Duplex

双工的意思,它的输入和输出可以没有任何关系

Duplex Stream 实现特别简单,它继承了 Readable Stream,并拥有 Writable Stream 的方法,它的 read 和 write 是两个管道,彼此互不干扰

Transform

Transform Stream 集成了 Duplex Stream,它同样具备 Readable 和 Writable 的能力,只不过它的输入和输出是存在相互关联的,中间做了一次转换处理。常见的处理有 Gzip 压缩、解压等

Transform 的处理就是通过 _transform 函数将 Duplex 的 Readable 连接到 Writable,由于 Readable 的生产效率与 Writable 的消费效率是一样的,所以这里 Transform 内部不存在「背压」问题,背压问题的源头是外部的生产者和消费者速度差造成的

上一页Node 优势下一页Node 模块系统

最后更新于4年前

这有帮助吗?