通过异步迭代简化 Node.js 流

使用异步迭代操作 Node.js 流相当有意思。这篇博客文章探讨如何做到这些。

目录:

概括:异步迭代与异步生成器

异步迭代是异步检索数据内容的协议(意味着在取得项目前可能暂停当前任务)。

异步迭代器用于强化异步迭代。举例来说,下面是一个异步迭代器函数:

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}
  • for-await-of 循环迭代输入 asyncIterable。该循环也作用于通常的异步函数。
  • yield 提供由该生成器返回的异步迭代值。

在文章接下来的部分,不再说明函数是异步函数还是异步生成器函数:

/** @returns a Promise */
async function asyncFunction() {
  /*···*/
}

/** @returns an async iterable */
async function* asyncGeneratorFunction() {
  /*···*/
}

流的核心思想是一种“分隔并攻克”大量数据的模式:当我们将大量数据分割为一些小的部分并一次处理一部分,我们可以处理它。

Node.js 提供多种类的流,举例来说:

  • readable streams(可读流) 是可以读取数据的流。换句话说,它们是数据的来源。一个例子是可读文件流,可以让我们读取文件的内容。
  • writable streams(可写流) 是可以写数据的流。换句话说,它们是数据的水池。一个例子是可写文件流,可以让我们向文件写数据。
  • transform stream(转换流) 是同时可读和可写的流。作为可写流,它提取数据,并转换(改变或丢弃)它们,然后作为可读流输出它们。

流程

为了处理数据流可分为多个步骤,我们可以管道化(连接)流:

  1. 通过可读流收集输入。
  2. 通过转换流处理每一步。
  3. 在最后一步,我们有两个选项:
    • 在最新的可读流中写入信息到一个可写流。于是可写流是管道步骤的最后一步。
    • 可以使用其它方法在最新的可读流中提取数据。

第(2)步可选。

文本编码

当创建文本流,这是最好总是使用同一种编码:

  • Node.js 文档有一份支持的编码和它们的默认拼写的列表,举例来说:
    • 'utf8'
    • 'utf16le'
    • 'base64'
  • 一些不同的拼写是允许的。可以使用 Buffer.isEncoding() 来检查哪一种是支持的:
    buffer.Buffer.isEncoding('utf8');
    // true
    buffer.Buffer.isEncoding('utf-8');
    // true
    buffer.Buffer.isEncoding('UTF-8');
    // true
    buffer.Buffer.isEncoding('UTF:8');
    // false
    

编码的默认值是 null,这等价于 'utf8'

帮助函数:readableToString()

有时候我们使用下面的帮助函数。不需要理解它如何工作,仅仅知道它做了什么。

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream
 * and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function(chunk) {
      data += chunk;
    });
    readable.on('end', function() {
      resolve(data);
    });
    readable.on('error', function(err) {
      reject(err);
    });
  });
}

该函数是通过基础事件 API 实现的。我们将在后面使用一个简单方式实现这些,通过异步迭代。

一些前置设定

  • 在这篇博客中,我们只使用文本流。
  • 在例子中,我们有时候会使用顶层 await,在这种情况下,我们假设我们是在一个模块的内部或者 async 函数的内部。
  • 无论哪一种换行符,我们两种都支持:
    • Unix: '\n'(LF)
    • Windows: '\r\n'(CRLF) 当前平台的换行符可以通过 os 模块的 the constant EOL 访问到。

可读流

创建可读流

从文件创建可读流

可以使用 fs.createReadStream() 创建可读流:

import * as fs from 'fs';

const readableStream = fs.createReadStream('tmp/test.txt', {
  encoding: 'utf8',
});

console(await readableToString(readableStream));
// 'This is a test!\n'

Readable.from():从迭代器创建可读流

静态方法 Readable.from(iterable, options?) 创建可读流,它保存数据在 iterable. iterable 的内容可以是同步迭代或异步迭代。参数 options 是可选的,可以使用标准的文本编码。

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), { encoding: 'utf8' });
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n',
);

从字符串创建可读流

Readable.from() 接受任何迭代,并且除此外也可以转换字符串为流:

import { Readable } from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, { encoding: 'utf8' });
assert.equal(await readableToString(readable), 'Some text!');

在 momentReadable.from() 将字符串与其它任何可迭代对象一样对待,因此在其代码点上进行迭代。从性能角度来看,这不是理想的选择,但是对于大多数用例来说应该可以。我希望 Readable.from() 经常与字符串一起使用,因此将来可能会进行优化。

通过 for-await-of 读取可读流

每一个可读流都是异步可迭代的,这意味着我们可以使用 for-await-of 循环读取它的内容:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream('tmp/test.txt', { encoding: 'utf8' });
logChunks(readable);

// Output:
// 'This is a test!\n'

收集可读流的内容为字符串

下面的函数是一个简单的实现,我们可以在这篇博客文章的开头看到它:

import { Readable } from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', { encoding: 'utf8' });
assert.equal(await readableToString2(readable), 'Good morning!');

注意,在这种情况下,我们不得不使用 async 函数,因为我们想要返回 Promise

通过 async generators(异步生成器)转换可读流

异步迭代提供一种优雅的替代方案,可以转换流以分多个步骤处理流数据:

  • 输入一个可读流。
  • 第一个转换由异步生成器执行,该异步生成器遍历可读流并按其认为合适的方式产生。
  • 可选的是,我们可以通过使用更多的异步生成器来进一步转换。
  • 最后,我们有几个选项来处理最后一个生成器返回的异步可迭代:
    • 我们可以通过 Readable.from() 将其转换为可读流(以后可以通过管道将其写入可写流)。
    • 我们可以使用异步函数来处理它。
    • 其它。

总之,这些处理管道包括几部分:

可读的
→ 第一个异步生成器[→ … → 最后一个异步生成器]
→ 可读或者异步方法\

在下面的例子中,最后一步是执行异步函数 logLines(),该函数在迭代器中输出项目到控制台。

import { Readable } from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    previous += chunk;
    while (true) {
      const eolIndex = previous.indexOf('\n');
      if (eolIndex < 0) break;

      // line includes the EOL
      const line = previous.slice(0, eolIndex + 1);
      yield line;
      previous = previous.slice(eolIndex + 1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from('Text with\nmultiple\nlines.\n', {
  encoding: 'utf8',
});
logLines(numberLines(chunksToLines(chunks)));

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

可写流

为文件创建可写流

我们使用 fs.createWriteStream() 创建可写流:

const writableStream = fs.createWriteStream('tmp/log.txt', {
  encoding: 'utf8',
});

向可写流写数据

在这一节,我们了解三种向可写流写数据的方法:

  1. 通过可写流的方法 .write() 直接地向可写流写数据。
  2. 使用可读流的方法 .pipe 连接到可写流。
  3. 使用 stream 模块函数 pipeline() 连接可读流到可写流。

为了验证这些方法,我们以三种不同的方式实现相同功能函数 writeIterableToFile()

在异步函数中向可写流写数据

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import { once } from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, { encoding: 'utf8' });
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) {
      // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', { encoding: 'utf8' }),
  'One line of text.\n',
);

stream.finished() 的默认版本是基于回调函数的,但是该函数可以通过 util.promisify() 转换为 Promise 版本(行 A)。

我们可以使用下面两种模式:

  • 当处理反面时(行 B)写入可写流:
    if (!writable.write(chunk)) {
      await once(writable, 'drain');
    }
    
  • 关闭可写流并等待写入结束(行 C):
    writable.end();
    await finished(writable);
    

pipeline(readable, writable)

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(iterable, { encoding: 'utf8' });
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(['One', ' line of text.\n'], 'tmp/log.txt');
// ···

我们使用下面的模式(行 A):

await pipeline(readable, writable);

也可以使用 Readable.prototype.pipe(),但该方法有一个风险(如果可读流抛出错误,可写流不会默认关闭)。方法 stream.pipeline() 没有这个风险。

流相关功能

os 模块:

  • const EOL: string (Node.js 0.7.8+)

    用于获取当前操作系统平台行结束分隔符

  • Buffer.isEncoding(encoding: string): boolean (Node.js 0.9.1+)

    如果 encoding 编码名称是 Node.js 支持的文本编码格式,该方法返回 true支持的编码格式有:

    • 'utf8'
    • 'utf16le'
    • 'ascii'
    • 'latin1
    • 'base64'
    • 'hex' (每个字节为两个十六进制字符)

stream 模块:

  • Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (Nodejs 10.0.0+)

    可读流是异步迭代。举例来说,可以在 async 函数和异步迭代器中使用 for-await-of 循环迭代它们。

  • finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>Node.js 10.0.0+

    该方法在可读流/可写流结束或发生错误时返回 Promise。 这个 Promise 版本是这样创建的:

    const finished = util.promisify(stream.finished);
    
  • pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void> (Node.js 10.0.0+)

    流之间的管道。当管道完成或发生错误时,将返回 Promise。 此 Promise 版本是这样创建:

    const pipeline = util.promisify(stream.pipeline);
    
  • Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (Node.js 12.3.0+)

    转换迭代为可读流。

    interface ReadableOptions {
      highWaterMark?: number;
      encoding?: string;
      objectMode?: boolean;
      read?(this: Readable, size: number): void;
      destroy?(
        this: Readable,
        error: Error | null,
        callback: (error: Error | null) => void,
      ): void;
      autoDestroy?: boolean;
    }
    

    这些选项与 Readable 构造函数效果相同,更多介绍文档

fs 模块:

  • createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (Node.js 2.3.0+)

    创建可读流。可以接受更多参数。

  • createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream (Node.js 2.3.0+)

    使用参数 .flags 可以指定是要写入还是要追加,以及如果文件存在或不存在会发生什么。可以接受更多参数。

本章节的静态类型信息基于 Definitely Typed

扩展阅读