Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/web/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const {
getResponseState
} = require('./response')
const { HeadersList } = require('./headers')
const { Request, cloneRequest, getRequestDispatcher, getRequestState } = require('./request')
const { Request, cloneRequest, getRequestDispatcher, getRequestState, removeRequestAbortListener } = require('./request')
const zlib = require('node:zlib')
const {
makePolicyContainer,
Expand Down Expand Up @@ -208,7 +208,7 @@ function fetch (input, init = undefined) {
let controller = null

// 11. Add the following abort steps to requestObject’s signal:
addAbortListener(
const removeAbortListener = addAbortListener(
requestObject.signal,
() => {
// 1. Set locallyAborted to true.
Expand All @@ -228,6 +228,15 @@ function fetch (input, init = undefined) {
}
)
Comment on lines 211 to 229

// Remove the `abort` listeners registered above and in the Request
// constructor once the fetch has settled. Without this, reusing a single
// signal across many requests leaks listeners and Node.js emits a
// MaxListenersExceededWarning. See https://github.com/nodejs/undici/issues/5285
const cleanupAbortListeners = () => {
removeAbortListener()
removeRequestAbortListener(requestObject)
}

// 12. Let handleFetchDone given response response be to finalize and
// report timing with response, globalObject, and "fetch".
// see function handleFetchDone
Expand All @@ -252,13 +261,15 @@ function fetch (input, init = undefined) {
// deserializedError.

abortFetch(p, request, responseObject, controller.serializedAbortReason, controller.controller)
cleanupAbortListeners()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this function call to abortFetch.

return
}

// 3. If response is a network error, then reject p with a TypeError
// and terminate these substeps.
if (response.type === 'error') {
p.reject(new TypeError('fetch failed', { cause: response.error }))
cleanupAbortListeners()
return
}

Expand All @@ -273,7 +284,10 @@ function fetch (input, init = undefined) {

controller = fetching({
request,
processResponseEndOfBody: handleFetchDone,
processResponseEndOfBody: (response) => {
handleFetchDone(response)
cleanupAbortListeners()
},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be some unexpected behavior when cloning the response, so we need to investigate.

processResponse,
dispatcher: getRequestDispatcher(requestObject), // undici
// Keep requestObject alive to prevent its AbortController from being GC'd
Expand Down
35 changes: 32 additions & 3 deletions lib/web/fetch/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ class Request {

#state

/**
* Removes the `abort` listener that makes this request's signal follow the
* passed signal. `null` when no such listener was registered.
* @type {(() => void) | null}
*/
#abortCleanup = null

// https://fetch.spec.whatwg.org/#dom-request
constructor (input, init = undefined) {
webidl.util.markAsUncloneable(this)
Expand Down Expand Up @@ -436,12 +443,23 @@ class Request {
setMaxListeners(1500, signal)
}

util.addAbortListener(signal, abort)
const removeAbortListener = util.addAbortListener(signal, abort)
// The third argument must be a registry key to be unregistered.
// Without it, you cannot unregister.
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
// abort is used as the unregister key. (because it is unique)
requestFinalizer.register(ac, { signal, abort }, abort)

// Allow the listener to be removed deterministically once the fetch
// that owns this request has settled, instead of relying solely on the
// FinalizationRegistry (i.e. garbage collection). Reusing a single
// signal across many requests would otherwise leak listeners.
// See https://github.com/nodejs/undici/issues/5285
this.#abortCleanup = () => {
requestFinalizer.unregister(abort)
removeAbortListener()
this.#abortCleanup = null
}
Comment on lines +446 to +462
}
}

Expand Down Expand Up @@ -868,15 +886,25 @@ class Request {
static setRequestState (request, newState) {
request.#state = newState
}

/**
* Removes the `abort` listener that makes this request's signal follow the
* signal passed to its constructor, if any. Idempotent.
* @param {Request} request
*/
static removeRequestAbortListener (request) {
request.#abortCleanup?.()
}
}

const { setRequestSignal, getRequestDispatcher, setRequestDispatcher, setRequestHeaders, getRequestState, setRequestState } = Request
const { setRequestSignal, getRequestDispatcher, setRequestDispatcher, setRequestHeaders, getRequestState, setRequestState, removeRequestAbortListener } = Request
Reflect.deleteProperty(Request, 'setRequestSignal')
Reflect.deleteProperty(Request, 'getRequestDispatcher')
Reflect.deleteProperty(Request, 'setRequestDispatcher')
Reflect.deleteProperty(Request, 'setRequestHeaders')
Reflect.deleteProperty(Request, 'getRequestState')
Reflect.deleteProperty(Request, 'setRequestState')
Reflect.deleteProperty(Request, 'removeRequestAbortListener')

mixinBody(Request, getRequestState)

Expand Down Expand Up @@ -1111,5 +1139,6 @@ module.exports = {
fromInnerRequest,
cloneRequest,
getRequestDispatcher,
getRequestState
getRequestState,
removeRequestAbortListener
}
51 changes: 51 additions & 0 deletions test/fetch/issue-5285.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict'

const { test } = require('node:test')
const assert = require('node:assert')
const http = require('node:http')
const { once, getEventListeners } = require('node:events')
const { fetch } = require('../..')
const { closeServerAsPromise } = require('../utils/node-http')

// https://github.com/nodejs/undici/issues/5285
// Reusing a single AbortSignal across many fetch() calls must not leak
// `abort` listeners on the signal, which previously caused Node.js to emit
// a MaxListenersExceededWarning.
test('fetch removes the abort listener once the request settles', async (t) => {
const server = http.createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.end('hello')
})

t.after(closeServerAsPromise(server))
await once(server.listen(0), 'listening')

let warning = null
function onWarning (value) {
warning = value
}
process.on('warning', onWarning)
t.after(() => process.off('warning', onWarning))

const controller = new AbortController()
const { signal } = controller

const url = `http://localhost:${server.address().port}`

// Issue many more requests than the default max listeners (10) while
// sharing the same signal. Each settled request must remove its listener,
// otherwise a MaxListenersExceededWarning is emitted and the listeners leak.
for (let i = 0; i < 100; i++) {
const res = await fetch(url, { signal })
await res.text()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await res.text()
await res.arrayBuffer()

}

// Allow the trailing end-of-body cleanup of the final request, which is
// scheduled in a microtask, to run before asserting.
await new Promise((resolve) => setTimeout(resolve, 100))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  const { setImmediate } = require('node:timers/promises')
Suggested change
await new Promise((resolve) => setTimeout(resolve, 100))
await setImmediate()


// No `abort` listeners should remain registered on the signal once every
// request has settled.
assert.strictEqual(getEventListeners(signal, 'abort').length, 0)
assert.strictEqual(warning, null)
})
Loading