Readable.map concurrency not running map on next item before previous finish
Version
v18.12.1
Platform
Darwin Razs-MBP 22.2.0 Darwin Kernel Version 22.2.0: Fri Nov 11 02:03:51 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T6000 arm64
Subsystem
stream
What steps will reproduce the bug?
{
const finishOrder = [];
const stream = Readable.from([10, 1, 5, 20]).map(async (item, { signal }) => {
await setTimeout(item, { signal });
finishOrder.push(item);
return item;
}, { concurrency: 2 });
(async () => {
await stream.toArray()
assert.deepStrictEqual(finishOrder, [1, 5, 10, 20]);
})().then(common.mustCall());
}
Same idea but with pretty output
const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises')
async function run() {
const all = [];
const start = Date.now();
const data = Readable.from([
[1, 200],
[2, 500],
[3, 100],
[4, 300],
[5, 600],
[6, 60],
])
.map(async ([index, timeout]) => {
const waitedTime = Date.now() - start;
await setTimeout(timeout)
return {
index,
waitedTime,
timeTook: timeout,
};
}, { concurrency: 3 });
for await (const item of data) {
all.push(item);
}
printPretty(all)
}
run();
function printPretty(all) {
for (const item of all) {
const title = ` ${item.index} `;
// Reduce the number of dots needed
const waitedTime = Math.floor(item.waitedTime / 20);
const timeTook = Math.floor(item.timeTook / 20);
const line = [
// If not started late than don't add spaces for padding
waitedTime === 0 ? '' : ' '.repeat(title.length),
'░'.repeat(waitedTime),
title,
// Running time
'.'.repeat(timeTook),
].join('');
console.log(line);
}
}
How often does it reproduce? Is there a required condition?
always
What is the expected behavior?
that as soon as any item finishes it will start the next one until reaching the highWaterMark
(maybe we should add it as an option?)
the docs say:
concurrency
<number>
the maximum concurrent invocation of fn to call on the stream at once. Default: 1.
for the nodejs internal test the test should pass
For the Code with pretty output, the output should look like this:
Symbol meanings:
.
(dot) is running░
is waiting
1 ..........
2 .........................
3 .....
░░░░░ 4 ...............
░░░░░░░░░░ 5 ..............................
░░░░░░░░░░░░░░░░░░░░░░░ 6 ...
(as soon as one finished another one start)
What do you see instead?
For the nodejs internal test
node:internal/process/promises:289
triggerUncaughtException(err, true /* fromPromise */);
^
AssertionError [ERR_ASSERTION]: Expected values to be strictly deep-equal:
+ actual - expected
[
1,
+ 10,
+ 5,
- 5,
- 10,
20
]
at ~/dev/open-source/node/node-fork/test/parallel/test-stream-map.js:189:12
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
generatedMessage: true,
code: 'ERR_ASSERTION',
actual: [ 1, 10, 5, 20 ],
expected: [ 1, 5, 10, 20 ],
operator: 'deepStrictEqual'
}
For the Code with pretty output
function not running even though they can
Symbol meanings:
.
(dot) is running░
is waiting
1 ..........
2 .........................
3 .....
░░░░░░░░░░ 4 ...............
░░░░░░░░░░░░░░░░░░░░░░░░░ 5 ..............................
░░░░░░░░░░░░░░░░░░░░░░░░░ 6 ...
Additional information
No response