Skip to content

stream: construct

Provide a standardized way of asynchronously creating and initializing resources before performing any work.

Some streams need to first asynchronously create resources before they can perform any work. Currently this is implemented in the different stream implementations which both makes things partly duplicated, more difficult, more error prone and often incorrect to some degree (e.g. 'open' and 'ready' are emitted after 'close').

This PR provides a standardized way of asynchronously constructing streams and handles the "pending" state that occurs until construction has either completed or failed.

This will allow further simplification and improved consistency for various stream implementations such as fs and net stream.

Passes the graceful-fs test suite.

This will make it possible to easily implement more complex stream such as e.g. fs streams:

const { Writable } = require('stream');
const fs = require('fs')

class WriteStream extends Writable {
  constructor (options) {
    options.autoDestroy = true;
    super(options);
  }
  _construct({ filename }, callback) {
    this.filename = filename;
    this.fd = null;
    fs.open(this.filename, (fd, err) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback);
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
}
const { Readable } = require('stream');
const fs = require('fs')

class ReadStream extends Readable {
  constructor (options) {
    options.autoDestroy = true;
    super(options);
  }
  _construct({ filename }, callback) {
    this.filename = filename;
    this.fd = null;
    fs.open(this.filename, (fd, err) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _read(n) {
    const buf = Buffer.alloc(n);
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err);
      } else {
        this.push(bytesRead > 0 ? buf : null);
      }
    });
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
}

Furthermore it makes it easier to e.g. add transforms into pipeline by inlining initialization:

const { Duplex } = require('stream');
const fs = require('fs')

stream.pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf-8'),
  new Duplex({
    construct (options, callback) {
      this.data = '';
      callback();
    },
    transform (chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
      } catch (err) {
        callback(err);
      }
    }
  }),
  fs.createWriteStream('valid-object.json')
);
Semver

I think this should be semver-major.

  • fs: Emit some previously synchronous errors asynchronously. See updated test. Bug fix.
  • fs: open() is removed but it still works if monkey-patched.
  • fs: Don't emit 'close' twice if emitClose: true.
  • fs: stream.fd is nulled during destroy instead of final. See updated test.

Otherwise, this should be pretty non breaking as far as I can tell. graceful-fs tests pass and there are tests for compat.

The changes are mostly opt-in through the construct method, except for the previously listed updates to fs streams.

Checklist
  • make -j4 test (UNIX), or vcbuild test (Windows) passes
  • tests and/or benchmarks are included
  • documentation is changed or added
  • commit message follows commit guidelines

Refs: https://github.com/nodejs/node/issues/29314, https://github.com/nodejs/node/issues/23133

NOTE TO SELF: After merge look into:

Merge request reports

Loading