Skip to content

Commit 418c33b

Browse files
Postpone handling source closing until all pending reads are settled
1 parent 5e26f17 commit 418c33b

1 file changed

Lines changed: 19 additions & 3 deletions

File tree

reference-implementation/lib/abstract-ops/readable-streams.js

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
146146

147147
// This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown.
148148
let currentWrite = promiseResolvedWith(undefined);
149+
let pendingReads = 0;
149150

150151
return new Promise((resolve, reject) => {
151152
let abortAlgorithm;
@@ -210,17 +211,28 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
210211
return promiseResolvedWith(true);
211212
}
212213
return new Promise((resolveRead, rejectRead) => {
214+
pendingReads++;
213215
ReadableStreamDefaultReaderRead(
214216
reader,
215217
{
216218
chunkSteps: chunk => {
219+
pendingReads--;
217220
currentWrite = transformPromiseWith(
218221
WritableStreamDefaultWriterWrite(writer, chunk), undefined, rethrowAssertionErrorRejection
219222
);
223+
checkState();
220224
resolveRead(false);
221225
},
222-
closeSteps: () => resolveRead(true),
223-
errorSteps: rejectRead
226+
closeSteps: () => {
227+
pendingReads--;
228+
checkState();
229+
resolveRead(true);
230+
},
231+
errorSteps: reason => {
232+
pendingReads--;
233+
checkState();
234+
rejectRead(reason);
235+
}
224236
}
225237
);
226238
});
@@ -267,7 +279,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
267279
destIsOrBecomesErroringOrErrored();
268280
} else if (sourceState === 'closed') {
269281
// Closing must be propagated forward
270-
sourceIsOrBecomesClosed();
282+
// If there are any pending read requests, wait for them to settle first to avoid dropping a chunk.
283+
// This is needed because [[PullSteps]] calls ReadableStreamClose() before calling chunkSteps.
284+
if (pendingReads === 0) {
285+
sourceIsOrBecomesClosed();
286+
}
271287
}
272288
}
273289

0 commit comments

Comments
 (0)