From 6de1817ef50e01fea1cd5890ae0a93a6403e03b4 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Wed, 20 Nov 2024 15:19:49 +0100 Subject: [PATCH] Merge pull request #22013 from overleaf/jpa-flush-mongo-queues-early [history-v1] back_fill_file_hash: flush mongo write queues early GitOrigin-RevId: 9b6c6ff9861945e69e42d15dc19f4c03c39193e9 --- .../storage/scripts/back_fill_file_hash.mjs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index c1d76a8bcd..859c286eb5 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -676,17 +676,19 @@ async function updateFileRefInMongo(entry) { * @param {ProjectContext} ctx * @param {Folder} folder * @param {string} path + * @param {boolean} isInputLoop * @return Generator */ -function* findFiles(ctx, folder, path) { +function* findFiles(ctx, folder, path, isInputLoop = false) { let i = 0 for (const child of folder.folders) { - yield* findFiles(ctx, child, `${path}.folders.${i}`) + yield* findFiles(ctx, child, `${path}.folders.${i}`, isInputLoop) i++ } i = 0 for (const fileRef of folder.fileRefs) { if (!fileRef.hash) { + if (isInputLoop) ctx.remainingQueueEntries++ yield { ctx, cacheKey: fileRef._id.toString(), @@ -725,12 +727,14 @@ function* findFileInBatch( projectBlobs, projectBackedUpBlobs ) - yield* findFiles(ctx, project.rootFolder[0], prefix) + yield* findFiles(ctx, project.rootFolder[0], prefix, true) for (const fileId of projectDeletedFiles) { + ctx.remainingQueueEntries++ yield { ctx, cacheKey: fileId, fileId, path: '' } } for (const blob of projectBlobs) { if (projectBackedUpBlobs.has(blob.getHash())) continue + ctx.remainingQueueEntries++ yield { ctx, cacheKey: blob.getHash(), @@ -821,6 +825,9 @@ class ProjectContext { /** @type {Set} */ #historyBlobs + /** @type {number} */ + remainingQueueEntries = 0 + /** * @param {ObjectId} projectId * @param {string} historyId @@ -877,6 +884,10 @@ class ProjectContext { } async flushMongoQueuesIfNeeded() { + if (this.remainingQueueEntries === 0) { + await this.flushMongoQueues() + } + if (this.#completedBlobs.size > BATCH_HASH_WRITES) { await this.#storeBackedUpBlobs() } @@ -1022,7 +1033,11 @@ class ProjectContext { } else { this.#pendingFiles.set(entry.cacheKey, processFileWithCleanup(entry)) } - entry.hash = await this.#pendingFiles.get(entry.cacheKey) + try { + entry.hash = await this.#pendingFiles.get(entry.cacheKey) + } finally { + this.remainingQueueEntries-- + } this.queueFileForWritingHash(entry) await this.flushMongoQueuesIfNeeded() }