Appearance
Stream与Pipe
Stream
Stream 是大多数 Node.js 应用程序所依赖的主要功能之一,比如 HTTP 请求、文件的读写操作等,下面何俊来解释流的使用与创建自定义流。
我们很少直接使用Stream,主要是其他模块使用Stream,比如网络、文件模块等。
基本概念
stream是从一个点到另一个点的数据流。可以将流理解为 Buffer 的搬运工,将 Buffer 一块块的搬运(流动)到目的地。比如向前端发送一个大文件内容时,使用 stream 可以边读取边发送,传统模式要一次读取文件再发送,所以使用Stream 可以为我们带来了更好的内存和时间效率。
- stream 用于处理数据的传输
- 在开发中我们多数使用的是对 stream 的封装,一般不需要自己写 stream 的控制
- stream 主要用在网络请求、文件处理等 IO 操作
- 在处理大文件时才可以体验到stream的效率
EventEmitter
Stream 流对象是 EventEmitter 的实例,所以拥有事件处理机制。
主要包含以下事件
- open 文件被打开时触发
- close 文件关闭时触发
- data 当有数据读取时触发
- end 数据读取完毕时触发,早于 close 事件
- error 在接收和写入过程中发生错误时触发
可读流
可读流指数据从源头(如磁盘)读取内存,也可以将流理解为 Buffer 的搬运工,将 Buffer 一块块的搬运(流动)到目的地。
- 数据会分块读取
- Buffer 数据是二进制的,所以结果是二进制表示
- 使用这种方式是一块一块读取处理数据,所以要比一次读取文件到内存性能更好
- stream使用默认的64KB的Buffer
基本操作
下面通过读取超大文件 bm.json ,来体验 Buffer 的操作大数据。
import { createReadStream, createWriteStream } from 'fs'
//创建可读流,将数据以块的形式读取,每次读取一点放在缓存区
const readStream = createReadStream('./bm.txt', {
encoding: 'utf8'
})
const writeStream = createWriteStream('a.txt')
//每次读取到数据时,会触发函数
readStream.on('data', (chunk) => {
console.log(chunk)
writeStream.write(chunk)
})
可以在读取时设置编码,指定Buffer大小
...
const readStream = createReadStream('./bm.txt', {encoding: 'utf8',highWaterMark: 2 })
...
http
我们在来看个 http 服务响应大文件的例子。
下面这种一次加载文件后响应会占用大量内容,同时用户会等待。
import { readFile } from 'fs'
import { createServer } from 'http'
const service = createServer((req, res) => {
readFile('bm.txt', (error, content) => {
res.end(content)
})
})
service.listen(3000)
现在改用流的方式,通过 Buffer 一块一块读取,然后响应数据
import { createReadStream, readFile } from 'fs'
import { createServer } from 'http'
const service = createServer((req, res) => {
const readStream = createReadStream('bm.txt')
readStream.on('data', (c) => {
res.write(c)
})
readStream.on('end', () => res.end())
})
service.listen(3000)
当然使用 pipe 管道可以将过程简化
import { createReadStream, readFile } from 'fs'
import { createServer } from 'http'
const service = createServer((req, res) => {
const readStream = createReadStream('bm.txt')
readStream.pipe(res)
})
service.listen(3000)
可写流
可写流 是消费上游流过来的数据,常见以下操作。
- 写入文件
- 压缩数据
- 接收客户端的请求
- 服务器响应数据到客户端
基本操作
下面使用可写流,将读取到的流写入到新文件,测试时需要一个特别大的 bm.json 才可以看到效果。
import { createReadStream, createWriteStream } from 'fs'
const readStream = createReadStream('./bm.json')
const writeStream = createWriteStream('./hj.json')
readStream.on('data', (chunk) => {
console.log('<读取了一个块...')
writeStream.write(chunk)
console.log('>写入了一个块')
})
流式写入
下面使用流生成1000行数据
import { createWriteStream } from 'fs'
const stream = createWriteStream('bm.txt')
for (let i = 0; i <= 1000; i++) {
stream.write('banmashou.com@何俊\n')
}
PIPE 管道
使用 PIPE 可以让我们对流的操作更简单。
基本使用
上面流的读取与写入我们是通过事件监听手动操作的,我们可以通过 PIPE 管道 简化该操作。从读取流中获取数据然后通过管道传递到写入流中完成数据的写入。
使用管道不需要手动监听数据事件,管道内部会自动完成。下面将上面 可写流 例子使用管道操作。
import { createReadStream, createWriteStream, read } from 'fs'
const readStream = createReadStream('./bm.json')
const writeStream = createWriteStream('./hj.json')
//使用管道将可读流的数据,传递给可写流创建文件
readStream.pipe(writeStream)
pipeline
使用 pipeline 工具函数可以实现管道操作,并可以方便的捕获错误
import { createReadStream, createWriteStream } from 'fs'
import { pipeline } from 'stream'
const readStream = createReadStream('./bm.txt')
const writeStream = createWriteStream('./hj.txt')
pipeline(readStream, writeStream, (error) => {
console.log('出错了')
throw error
})
自定义可读流
即然 stream 贯穿 node 的应用,所以在 node 中有固定的规范,这样不同 stream 才可以协同工作,pipe 管道才可以有效应用。下面来学习自定义流的控制,自定义流需要使用 node 内置的模块 stream。
选项参数
可读流需要继承 Readable 类实现,通过构造函数传递的选项如下
- encoding 数据编码,如果设置了会将读取的 Buffer 数据转为字符串
- objectMode 缓冲区数据内容为 JS 对象
- highWaterMark 对于字符串数据为缓存区大小,对于 JS 对象为对象的数量
非流动模式
非流动模式指我们对流自定义控制,通过监听 readable 事件完成。
import { Readable, ReadableOptions } from 'stream'
class BmReadStream extends Readable {
//data:操作的流数据 options:可读流选项
constructor(private data: any, options: ReadableOptions = {}) {
super(options)
}
//向缓存区中推入数据
_read(size: number): void {
this.push(this.data)
//数据已经推入完毕,如果不设置将一直推入
//你可以注释掉看看效果
this.push(null)
}
}
//创建流实例,并声明编码为 utf8,提出数据时将转换为 utf8 字符串
const bm = new BmReadStream('banmashou.com', { encoding: 'utf-8' })
//非流动模式,手动控制流
bm.on('readable', () => {
let chunk = ''
//每次读取一个字节数据
//每读一次后,从缓冲区中删除一个字节,直到缓存区读完
while ((chunk = bm.read(1))) {
//上面设置了 encoding:utf-8 ,数据会转为utf8字符串
console.log(chunk)
}
})
//数据读取完毕后的事件
bm.on('end', () => console.log('数据读取完成'))
流动模式
流动模式指对流的控制自动完成,需要监听 data 事件完成,流动模式自动完成,所以操作更方便。
下面使用流动模式操作对象数据
import { Readable, ReadableOptions } from 'stream'
class BmReadStream extends Readable {
//data:操作的流数据 options:可读流选项
constructor(private data: any, options: ReadableOptions = {}) {
super(options)
}
//向缓存区中推入数据
_read(size: number): void {
this.push(JSON.stringify(this.data))
//数据已经推入完毕,如果不设置将一直推入
//你可以注释掉看看效果
this.push(null)
}
}
//创建流实例,并声明编码为 utf8,提出数据时将转换为 utf8 字符串
const bm = new BmReadStream([{ name: 'banmashou.com' }, { name: '何俊' }], { encoding: 'utf-8', objectMode: true })
//非流动模式,手动控制流
bm.on('data', (chunk) => {
console.log(JSON.parse(chunk)[1].name)
})
//数据读取完毕后的事件
bm.on('end', () => console.log('数据读取完成'))
//读取时发生错误
bm.on('error', () => console.log('数据读取失败'))
自定义可写流
可写流需要继承 Writable 类实现,下面是自定义可写流的示例。
import { writeFile } from 'fs/promises'
import { Writable, WritableOptions } from 'stream'
//自定义可写流
class BmWritableStream extends Writable {
constructor(file: string, options: WritableOptions = {}) {
//objectMode: _write 方法的 chunk 参数为对象
super({ ...options, objectMode: true })
//创建文件或清空已存在文件内容
writeFile(file, '')
}
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null | undefined) => void): void {
//使用 Promise 向文件中追加内容,并调用 callback 触发可写流事件
writeFile(chunk.file, chunk.content, { flag: 'a' }).then(() => {
callback()
})
}
}
const bm = new BmWritableStream('bm.txt')
//写入完成事件
bm.on('finish', () => console.log('文件写入完成'))
//写入
bm.write({ file: 'bm.txt', content: 'banmashou.com@何俊' }, () => {
console.log('文件写入成功')
})
bm.write({ file: 'bm.txt', content: ' 斑马网' })
Transform 转换流
Transform流包含了Readable和Writeable特性,在读写过程中可以修改和变换数据。
比如使用 Transform 流,将数据转换后使用可读流推入缓存区,然后使用 Transform 流将缓存区数据使用可写流处理。
下面示例是提取年龄大于20岁的用户,首先我们先创建用户文件 user.json
[
{"name":"斑马兽","age":10},
{"name":"何俊","age":25}
]
下面使用 transform 转换流完成实例。执行过程是使用 createReadStream 可读流读取 user.json 文件,然后使用 createWriteStream 写入文件。
import { createReadStream, createWriteStream } from 'fs'
import { pipeline, Transform, TransformCallback, TransformOptions } from 'stream'
// 定义 Transform 流
class BmTransformStream extends Transform {
constructor(options: TransformOptions = {}) {
super(options)
}
//数据转换
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void {
//数据转换为JS对象,提取年龄大于20用户名
JSON.parse(String(chunk)).forEach((user: any) => {
if (user.age > 20) this.push(`${user.name} \n`)
})
callback()
}
// 写入流结束前调用
_flush(callback: TransformCallback): void {
this.push('\n以上结果由 何俊 统计')
callback()
}
}
//读取
const readStream = createReadStream('./user.json')
//转换流
const bmTransform = new BmTransformStream()
//写入流
const writeStream = createWriteStream('./hj.txt')
//使用管道转换流
pipeline(readStream, bmTransform, writeStream, (error) => {
if (error) console.log('执行失败', error)
})
文件上传
下面使用流并结合插件 multiparty 扩展包实现文件上传
import { createReadStream, createWriteStream, mkdirSync } from 'fs'
import { createServer } from 'http'
import multiparty from 'multiparty'
import { pipeline } from 'stream'
const service = createServer((req, res) => {
var form = new multiparty.Form()
form.parse(req, function (err: any, fields: any, files: any) {
//创建目录
mkdirSync('uploads')
//将临时文件使用流保存数据
pipeline(
createReadStream(files.file[0].path),
createWriteStream('./uploads/' + files.file[0].originalFilename),
(error) => {
error ? console.log(error) : res.end('文件上传成功')
},
)
})
})
service.listen(3000)