From 9f527f10e1736b526c0bd3a3e743e3e7dc125e6e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 3 Apr 2025 11:24:59 +0100 Subject: [PATCH] Merge pull request #24104 from overleaf/bg-batched-update-concurrency-warning prevent concurrent execution of batchedUpdate GitOrigin-RevId: 90853ccd83943d8cd7b01fd11f152512d806e9a7 --- libraries/mongo-utils/batchedUpdate.js | 106 ++++++++++++++----------- 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/libraries/mongo-utils/batchedUpdate.js b/libraries/mongo-utils/batchedUpdate.js index 5e97f45aca..7e3ad677db 100644 --- a/libraries/mongo-utils/batchedUpdate.js +++ b/libraries/mongo-utils/batchedUpdate.js @@ -16,6 +16,7 @@ let VERBOSE_LOGGING let BATCH_RANGE_START let BATCH_RANGE_END let BATCH_MAX_TIME_SPAN_IN_MS +let BATCHED_UPDATE_RUNNING = false /** * @typedef {import("mongodb").Collection} Collection @@ -211,57 +212,66 @@ async function batchedUpdate( findOptions, batchedUpdateOptions ) { - ID_EDGE_PAST = await getIdEdgePast(collection) - if (!ID_EDGE_PAST) { - console.warn( - `The collection ${collection.collectionName} appears to be empty.` - ) - return 0 + // only a single batchedUpdate can run at a time due to global variables + if (BATCHED_UPDATE_RUNNING) { + throw new Error('batchedUpdate is already running') } - refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions) - - findOptions = findOptions || {} - findOptions.readPreference = READ_PREFERENCE_SECONDARY - - projection = projection || { _id: 1 } - let nextBatch - let updated = 0 - let start = BATCH_RANGE_START - - while (start !== BATCH_RANGE_END) { - let end = getNextEnd(start) - nextBatch = await getNextBatch( - collection, - query, - start, - end, - projection, - findOptions - ) - if (nextBatch.length > 0) { - end = nextBatch[nextBatch.length - 1]._id - updated += nextBatch.length - - if (VERBOSE_LOGGING) { - console.log( - `Running update on batch with ids ${JSON.stringify( - nextBatch.map(entry => entry._id) - )}` - ) - } else { - console.error(`Running update on batch ending ${renderObjectId(end)}`) - } - - if (typeof update === 'function') { - await update(nextBatch) - } else { - await performUpdate(collection, nextBatch, update) - } + try { + BATCHED_UPDATE_RUNNING = true + ID_EDGE_PAST = await getIdEdgePast(collection) + if (!ID_EDGE_PAST) { + console.warn( + `The collection ${collection.collectionName} appears to be empty.` + ) + return 0 } - console.error(`Completed batch ending ${renderObjectId(end)}`) - start = end + refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions) + + findOptions = findOptions || {} + findOptions.readPreference = READ_PREFERENCE_SECONDARY + + projection = projection || { _id: 1 } + let nextBatch + let updated = 0 + let start = BATCH_RANGE_START + + while (start !== BATCH_RANGE_END) { + let end = getNextEnd(start) + nextBatch = await getNextBatch( + collection, + query, + start, + end, + projection, + findOptions + ) + if (nextBatch.length > 0) { + end = nextBatch[nextBatch.length - 1]._id + updated += nextBatch.length + + if (VERBOSE_LOGGING) { + console.log( + `Running update on batch with ids ${JSON.stringify( + nextBatch.map(entry => entry._id) + )}` + ) + } else { + console.error(`Running update on batch ending ${renderObjectId(end)}`) + } + + if (typeof update === 'function') { + await update(nextBatch) + } else { + await performUpdate(collection, nextBatch, update) + } + } + console.error(`Completed batch ending ${renderObjectId(end)}`) + start = end + } + return updated + } finally { + BATCHED_UPDATE_RUNNING = false } - return updated } /**