From e4574bff7ecfd6acecfa76f114cffd4aecf2f234 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Thu, 21 Nov 2024 15:56:19 +0100 Subject: [PATCH] Merge pull request #22061 from overleaf/jpa-reduce-idle-time [history-v1] back_fill_file_hash: reduce idle time between batches GitOrigin-RevId: 9b24bb882b158f33915d2e0ab2f82273eac09739 --- libraries/promise-utils/index.js | 10 +- .../storage/scripts/back_fill_file_hash.mjs | 154 +++++++++++++----- .../js/storage/back_fill_file_hash.test.mjs | 5 + 3 files changed, 127 insertions(+), 42 deletions(-) diff --git a/libraries/promise-utils/index.js b/libraries/promise-utils/index.js index 370a1c2441..c71b17127b 100644 --- a/libraries/promise-utils/index.js +++ b/libraries/promise-utils/index.js @@ -253,8 +253,14 @@ function expressifyErrorHandler(fn) { * Map values in `array` with the async function `fn` * * Limit the number of unresolved promises to `concurrency`. + * @template T + * @template V + * @param {number} concurrency + * @param {Array} array + * @param {(arg: T) => Promise} fn + * @return {Promise>>} */ -function promiseMapWithLimit(concurrency, array, fn) { +async function promiseMapWithLimit(concurrency, array, fn) { const limit = pLimit(concurrency) - return Promise.all(array.map(x => limit(() => fn(x)))) + return await Promise.all(array.map(x => limit(() => fn(x)))) } diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index 101e19134f..1aaf533b16 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -8,10 +8,12 @@ import Stream from 'node:stream' import zLib from 'node:zlib' import { setTimeout } from 'node:timers/promises' import { Binary, ObjectId } from 'mongodb' +import pLimit from 'p-limit' import logger from '@overleaf/logger' import { batchedUpdate, objectIdFromInput, + renderObjectId, READ_PREFERENCE_SECONDARY, } from '@overleaf/mongo-utils/batchedUpdate.js' import OError from '@overleaf/o-error' @@ -20,7 +22,6 @@ import { NoKEKMatchedError, NotFoundError, } from '@overleaf/object-persistor/src/Errors.js' -import { promiseMapWithLimit } from '@overleaf/promise-utils' import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs' import { BlobStore, @@ -100,6 +101,7 @@ const LOGGING_IDENTIFIER = process.env.LOGGING_IDENTIFIER || BATCH_RANGE_START // Concurrency for downloading from GCS and updating hashes in mongo const CONCURRENCY = parseInt(process.env.CONCURRENCY || '100', 10) +const CONCURRENT_BATCHES = parseInt(process.env.CONCURRENT_BATCHES || '2', 10) // Retries for processing a given file const RETRIES = parseInt(process.env.RETRIES || '10', 10) const RETRY_DELAY_MS = parseInt(process.env.RETRY_DELAY_MS || '100', 10) @@ -123,6 +125,19 @@ const projectsCollection = db.collection('projects') const deletedProjectsCollection = db.collection('deletedProjects') const deletedFilesCollection = db.collection('deletedFiles') +const concurrencyLimit = pLimit(CONCURRENCY) + +/** + * @template T + * @template V + * @param {Array} array + * @param {(arg: T) => Promise} fn + * @return {Promise>>} + */ +async function processConcurrently(array, fn) { + return await Promise.all(array.map(x => concurrencyLimit(() => fn(x)))) +} + const STATS = { projects: 0, blobs: 0, @@ -215,6 +230,7 @@ function printStats(isLast = false) { ...bandwidthStats(STATS, now - processStart), eventLoop: nextEventLoopStats, diff: computeDiff(nextEventLoopStats, now), + deferredBatches: Array.from(deferredBatches.keys()), }) if (isLast) { console.warn(logLine) @@ -468,9 +484,7 @@ async function uploadBlobToAWS(entry, blob, filePath) { * @return {Promise} */ async function processFiles(files) { - if (files.length === 0) return // all processed - await promiseMapWithLimit( - CONCURRENCY, + await processConcurrently( files, /** * @param {QueueEntry} entry @@ -497,15 +511,65 @@ async function processFiles(files) { ) } +/** @type {Map} */ +const deferredBatches = new Map() + +async function waitForDeferredQueues() { + // Wait for ALL pending batches to finish, especially wait for their mongo + // writes to finish to avoid extra work when resuming the batch. + const all = await Promise.allSettled(deferredBatches.values()) + // Now that all batches finished, we can throw if needed. + for (const res of all) { + if (res.status === 'rejected') { + throw res.reason + } + } +} + +/** + * @param {Array} batch + * @param {string} prefix + */ +async function queueNextBatch(batch, prefix = 'rootFolder.0') { + if (gracefulShutdownInitiated) { + throw new Error('graceful shutdown: aborting batch processing') + } + + // Read ids now, the batch will get trimmed by processBatch shortly. + const start = renderObjectId(batch[0]._id) + const end = renderObjectId(batch[batch.length - 1]._id) + const deferred = processBatch(batch, prefix) + .then(() => { + console.error(`Actually completed batch ending ${end}`) + }) + .catch(err => { + logger.error({ err, start, end }, 'fatal error processing batch') + throw err + }) + .finally(() => { + deferredBatches.delete(end) + }) + deferredBatches.set(end, deferred) + + if (deferredBatches.size >= CONCURRENT_BATCHES) { + // Wait for any of the deferred batches to finish before fetching the next. + // We should never have more than CONCURRENT_BATCHES batches in memory. + await Promise.race(deferredBatches.values()) + } +} + /** * @param {Array} batch * @param {string} prefix * @return {Promise} */ -async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { - const deletedFiles = await collectDeletedFiles(batch) - const { nBlobs, blobs } = await collectProjectBlobs(batch) - const { nBackedUpBlobs, backedUpBlobs } = await collectBackedUpBlobs(batch) +async function processBatch(batch, prefix = 'rootFolder.0') { + const [deletedFiles, { nBlobs, blobs }, { nBackedUpBlobs, backedUpBlobs }] = + await Promise.all([ + collectDeletedFiles(batch), + collectProjectBlobs(batch), + collectBackedUpBlobs(batch), + ]) const files = Array.from( findFileInBatch(batch, prefix, deletedFiles, blobs, backedUpBlobs) ) @@ -533,8 +597,7 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { } ) await processFiles(files) - await promiseMapWithLimit( - CONCURRENCY, + await processConcurrently( files, /** * @param {QueueEntry} entry @@ -544,9 +607,6 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { await entry.ctx.flushMongoQueues() } ) - if (gracefulShutdownInitiated) { - throw new Error('graceful shutdown: aborting batch processing') - } } /** @@ -554,7 +614,7 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') { * @return {Promise} */ async function handleDeletedFileTreeBatch(batch) { - await handleLiveTreeBatch( + await queueNextBatch( batch.map(d => d.project), 'project.rootFolder.0' ) @@ -1063,37 +1123,51 @@ function estimateBlobSize(blob) { } async function updateLiveFileTrees() { - await batchedUpdate( - projectsCollection, - { 'overleaf.history.id': { $exists: true } }, - handleLiveTreeBatch, - { rootFolder: 1, _id: 1, 'overleaf.history.id': 1 }, - {}, - { - BATCH_RANGE_START, - BATCH_RANGE_END, - } - ) + try { + await batchedUpdate( + projectsCollection, + { 'overleaf.history.id': { $exists: true } }, + queueNextBatch, + { rootFolder: 1, _id: 1, 'overleaf.history.id': 1 }, + {}, + { + BATCH_RANGE_START, + BATCH_RANGE_END, + } + ) + } catch (err) { + gracefulShutdownInitiated = true + throw err + } finally { + await waitForDeferredQueues() + } console.warn('Done updating live projects') } async function updateDeletedFileTrees() { - await batchedUpdate( - deletedProjectsCollection, - { - 'deleterData.deletedProjectId': { - $gt: new ObjectId(BATCH_RANGE_START), - $lte: new ObjectId(BATCH_RANGE_END), + try { + await batchedUpdate( + deletedProjectsCollection, + { + 'deleterData.deletedProjectId': { + $gt: new ObjectId(BATCH_RANGE_START), + $lte: new ObjectId(BATCH_RANGE_END), + }, + 'project.overleaf.history.id': { $exists: true }, }, - 'project.overleaf.history.id': { $exists: true }, - }, - handleDeletedFileTreeBatch, - { - 'project.rootFolder': 1, - 'project._id': 1, - 'project.overleaf.history.id': 1, - } - ) + handleDeletedFileTreeBatch, + { + 'project.rootFolder': 1, + 'project._id': 1, + 'project.overleaf.history.id': 1, + } + ) + } catch (err) { + gracefulShutdownInitiated = true + throw err + } finally { + await waitForDeferredQueues() + } console.warn('Done updating deleted projects') } diff --git a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs index c42cfd1108..bc77df7422 100644 --- a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs @@ -540,6 +540,11 @@ describe('back_fill_file_hash script', function () { delete stats[key] } delete stats.LOGGING_IDENTIFIER + expect(stats.deferredBatches).to.have.length( + 0, + 'should not have any remaining deferred batches' + ) + delete stats.deferredBatches return { stats, result } }