From 478e264817148157022dc1f2fcfb0b423303bf4d Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 29 Apr 2025 17:21:13 +0200 Subject: [PATCH] [fetch-utils] fix leak of abort event handlers in AbortSignal (#25172) GitOrigin-RevId: 992496010eb1cbe571b2e87fab8e7227b0d64538 --- libraries/fetch-utils/index.js | 32 +++++++++------- .../fetch-utils/test/unit/FetchUtilsTests.js | 37 ++++++++++++++++++- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/libraries/fetch-utils/index.js b/libraries/fetch-utils/index.js index 643dcc752b..60e8180c7d 100644 --- a/libraries/fetch-utils/index.js +++ b/libraries/fetch-utils/index.js @@ -23,11 +23,11 @@ async function fetchJson(url, opts = {}) { } async function fetchJsonWithResponse(url, opts = {}) { - const { fetchOpts } = parseOpts(opts) + const { fetchOpts, detachSignal } = parseOpts(opts) fetchOpts.headers = fetchOpts.headers ?? {} fetchOpts.headers.Accept = fetchOpts.headers.Accept ?? 'application/json' - const response = await performRequest(url, fetchOpts) + const response = await performRequest(url, fetchOpts, detachSignal) if (!response.ok) { const body = await maybeGetResponseBody(response) throw new RequestFailedError(url, opts, response, body) @@ -53,8 +53,8 @@ async function fetchStream(url, opts = {}) { } async function fetchStreamWithResponse(url, opts = {}) { - const { fetchOpts, abortController } = parseOpts(opts) - const response = await performRequest(url, fetchOpts) + const { fetchOpts, abortController, detachSignal } = parseOpts(opts) + const response = await performRequest(url, fetchOpts, detachSignal) if (!response.ok) { const body = await maybeGetResponseBody(response) @@ -76,8 +76,8 @@ async function fetchStreamWithResponse(url, opts = {}) { * @throws {RequestFailedError} if the response has a failure status code */ async function fetchNothing(url, opts = {}) { - const { fetchOpts } = parseOpts(opts) - const response = await performRequest(url, fetchOpts) + const { fetchOpts, detachSignal } = parseOpts(opts) + const response = await performRequest(url, fetchOpts, detachSignal) if (!response.ok) { const body = await maybeGetResponseBody(response) throw new RequestFailedError(url, opts, response, body) @@ -108,9 +108,9 @@ async function fetchRedirect(url, opts = {}) { * @throws {RequestFailedError} if the response has a non redirect status code or missing Location header */ async function fetchRedirectWithResponse(url, opts = {}) { - const { fetchOpts } = parseOpts(opts) + const { fetchOpts, detachSignal } = parseOpts(opts) fetchOpts.redirect = 'manual' - const response = await performRequest(url, fetchOpts) + const response = await performRequest(url, fetchOpts, detachSignal) if (response.status < 300 || response.status >= 400) { const body = await maybeGetResponseBody(response) throw new RequestFailedError(url, opts, response, body) @@ -142,8 +142,8 @@ async function fetchString(url, opts = {}) { } async function fetchStringWithResponse(url, opts = {}) { - const { fetchOpts } = parseOpts(opts) - const response = await performRequest(url, fetchOpts) + const { fetchOpts, detachSignal } = parseOpts(opts) + const response = await performRequest(url, fetchOpts, detachSignal) if (!response.ok) { const body = await maybeGetResponseBody(response) throw new RequestFailedError(url, opts, response, body) @@ -178,13 +178,14 @@ function parseOpts(opts) { const abortController = new AbortController() fetchOpts.signal = abortController.signal + let detachSignal = () => {} if (opts.signal) { - abortOnSignal(abortController, opts.signal) + detachSignal = abortOnSignal(abortController, opts.signal) } if (opts.body instanceof Readable) { abortOnDestroyedRequest(abortController, fetchOpts.body) } - return { fetchOpts, abortController } + return { fetchOpts, abortController, detachSignal } } function setupJsonBody(fetchOpts, json) { @@ -208,6 +209,9 @@ function abortOnSignal(abortController, signal) { abortController.abort(signal.reason) } signal.addEventListener('abort', listener) + return () => { + signal.removeEventListener('abort', listener) + } } function abortOnDestroyedRequest(abortController, stream) { @@ -226,11 +230,12 @@ function abortOnDestroyedResponse(abortController, response) { }) } -async function performRequest(url, fetchOpts) { +async function performRequest(url, fetchOpts, detachSignal) { let response try { response = await fetch(url, fetchOpts) } catch (err) { + detachSignal() if (fetchOpts.body instanceof Readable) { fetchOpts.body.destroy() } @@ -239,6 +244,7 @@ async function performRequest(url, fetchOpts) { method: fetchOpts.method ?? 'GET', }) } + response.body.on('close', detachSignal) if (fetchOpts.body instanceof Readable) { response.body.on('close', () => { if (!fetchOpts.body.readableEnded) { diff --git a/libraries/fetch-utils/test/unit/FetchUtilsTests.js b/libraries/fetch-utils/test/unit/FetchUtilsTests.js index e9fd0ff231..691e90778d 100644 --- a/libraries/fetch-utils/test/unit/FetchUtilsTests.js +++ b/libraries/fetch-utils/test/unit/FetchUtilsTests.js @@ -1,6 +1,9 @@ const { expect } = require('chai') +const fs = require('node:fs') +const events = require('node:events') const { FetchError, AbortError } = require('node-fetch') const { Readable } = require('node:stream') +const { pipeline } = require('node:stream/promises') const { once } = require('node:events') const { TestServer } = require('./helpers/TestServer') const selfsigned = require('selfsigned') @@ -203,6 +206,31 @@ describe('fetch-utils', function () { ).to.be.rejectedWith(AbortError) expect(stream.destroyed).to.be.true }) + + it('detaches from signal on success', async function () { + const signal = AbortSignal.timeout(10_000) + for (let i = 0; i < 20; i++) { + const s = await fetchStream(this.url('/hello'), { signal }) + expect(events.getEventListeners(signal, 'abort')).to.have.length(1) + await pipeline(s, fs.createWriteStream('/dev/null')) + expect(events.getEventListeners(signal, 'abort')).to.have.length(0) + } + }) + + it('detaches from signal on error', async function () { + const signal = AbortSignal.timeout(10_000) + for (let i = 0; i < 20; i++) { + try { + await fetchStream(this.url('/500'), { signal }) + } catch (err) { + if (err instanceof RequestFailedError && err.response.status === 500) + continue + throw err + } finally { + expect(events.getEventListeners(signal, 'abort')).to.have.length(0) + } + } + }) }) describe('fetchNothing', function () { @@ -391,9 +419,16 @@ async function* infiniteIterator() { async function abortOnceReceived(func, server) { const controller = new AbortController() const promise = func(controller.signal) + expect(events.getEventListeners(controller.signal, 'abort')).to.have.length(1) await once(server.events, 'request-received') controller.abort() - return await promise + try { + return await promise + } finally { + expect(events.getEventListeners(controller.signal, 'abort')).to.have.length( + 0 + ) + } } async function expectRequestAborted(req) {