Merge pull request #24104 from overleaf/bg-batched-update-concurrency-warning

prevent concurrent execution of batchedUpdate

GitOrigin-RevId: 90853ccd83943d8cd7b01fd11f152512d806e9a7
This commit is contained in:
Brian Gough
2025-04-03 11:24:59 +01:00
committed by Copybot
parent fc6df69e41
commit 9f527f10e1

View File

@@ -16,6 +16,7 @@ let VERBOSE_LOGGING
let BATCH_RANGE_START
let BATCH_RANGE_END
let BATCH_MAX_TIME_SPAN_IN_MS
let BATCHED_UPDATE_RUNNING = false
/**
* @typedef {import("mongodb").Collection} Collection
@@ -211,57 +212,66 @@ async function batchedUpdate(
findOptions,
batchedUpdateOptions
) {
ID_EDGE_PAST = await getIdEdgePast(collection)
if (!ID_EDGE_PAST) {
console.warn(
`The collection ${collection.collectionName} appears to be empty.`
)
return 0
// only a single batchedUpdate can run at a time due to global variables
if (BATCHED_UPDATE_RUNNING) {
throw new Error('batchedUpdate is already running')
}
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
findOptions = findOptions || {}
findOptions.readPreference = READ_PREFERENCE_SECONDARY
projection = projection || { _id: 1 }
let nextBatch
let updated = 0
let start = BATCH_RANGE_START
while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start)
nextBatch = await getNextBatch(
collection,
query,
start,
end,
projection,
findOptions
)
if (nextBatch.length > 0) {
end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${renderObjectId(end)}`)
}
if (typeof update === 'function') {
await update(nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
try {
BATCHED_UPDATE_RUNNING = true
ID_EDGE_PAST = await getIdEdgePast(collection)
if (!ID_EDGE_PAST) {
console.warn(
`The collection ${collection.collectionName} appears to be empty.`
)
return 0
}
console.error(`Completed batch ending ${renderObjectId(end)}`)
start = end
refreshGlobalOptionsForBatchedUpdate(batchedUpdateOptions)
findOptions = findOptions || {}
findOptions.readPreference = READ_PREFERENCE_SECONDARY
projection = projection || { _id: 1 }
let nextBatch
let updated = 0
let start = BATCH_RANGE_START
while (start !== BATCH_RANGE_END) {
let end = getNextEnd(start)
nextBatch = await getNextBatch(
collection,
query,
start,
end,
projection,
findOptions
)
if (nextBatch.length > 0) {
end = nextBatch[nextBatch.length - 1]._id
updated += nextBatch.length
if (VERBOSE_LOGGING) {
console.log(
`Running update on batch with ids ${JSON.stringify(
nextBatch.map(entry => entry._id)
)}`
)
} else {
console.error(`Running update on batch ending ${renderObjectId(end)}`)
}
if (typeof update === 'function') {
await update(nextBatch)
} else {
await performUpdate(collection, nextBatch, update)
}
}
console.error(`Completed batch ending ${renderObjectId(end)}`)
start = end
}
return updated
} finally {
BATCHED_UPDATE_RUNNING = false
}
return updated
}
/**