stream: construct
Provide a standardized way of asynchronously creating and initializing resources before performing any work.
Some streams need to first asynchronously create resources before they can perform any work. Currently this is implemented in the different stream implementations which both makes things partly duplicated, more difficult, more error prone and often incorrect to some degree (e.g. 'open'
and 'ready'
are emitted after 'close'
).
This PR provides a standardized way of asynchronously constructing streams and handles the "pending" state that occurs until construction has either completed or failed.
This will allow further simplification and improved consistency for various stream implementations such as fs
and net
stream.
Passes the graceful-fs test suite.
This will make it possible to easily implement more complex stream such as e.g. fs streams:
const { Writable } = require('stream');
const fs = require('fs')
class WriteStream extends Writable {
constructor (options) {
options.autoDestroy = true;
super(options);
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
const { Readable } = require('stream');
const fs = require('fs')
class ReadStream extends Readable {
constructor (options) {
options.autoDestroy = true;
super(options);
}
_construct({ filename }, callback) {
this.filename = filename;
this.fd = null;
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
Furthermore it makes it easier to e.g. add transforms into pipeline by inlining initialization:
const { Duplex } = require('stream');
const fs = require('fs')
stream.pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Duplex({
construct (options, callback) {
this.data = '';
callback();
},
transform (chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json')
);
Semver
I think this should be semver-major.
-
fs
: Emit some previously synchronous errors asynchronously. See updated test. Bug fix. -
fs
:open()
is removed but it still works if monkey-patched. -
fs
: Don't emit'close'
twice ifemitClose: true
. -
fs
:stream.fd
is nulled duringdestroy
instead offinal
. See updated test.
Otherwise, this should be pretty non breaking as far as I can tell. graceful-fs
tests pass and there are tests for compat.
The changes are mostly opt-in through the construct
method, except for the previously listed updates to fs
streams.
Checklist
-
make -j4 test
(UNIX), orvcbuild test
(Windows) passes -
tests and/or benchmarks are included -
documentation is changed or added -
commit message follows commit guidelines
Refs: https://github.com/nodejs/node/issues/29314, https://github.com/nodejs/node/issues/23133
NOTE TO SELF: After merge look into:
- Remove
emitClose
and make'close'
behaviour align with streams. - Deprecate
close(cb)
in favour ofend(cb)
. Makeclose(cb)
callend(cb)
. - https://github.com/nxtedition/node/pull/new/stream-sync-final