Merge pull request #22013 from overleaf/jpa-flush-mongo-queues-early

[history-v1] back_fill_file_hash: flush mongo write queues early

GitOrigin-RevId: 9b6c6ff9861945e69e42d15dc19f4c03c39193e9
This commit is contained in:
Jakob Ackermann
2024-11-20 15:19:49 +01:00
committed by Copybot
parent 73aea01f37
commit 6de1817ef5

View File

@@ -676,17 +676,19 @@ async function updateFileRefInMongo(entry) {
* @param {ProjectContext} ctx
* @param {Folder} folder
* @param {string} path
* @param {boolean} isInputLoop
* @return Generator<QueueEntry>
*/
function* findFiles(ctx, folder, path) {
function* findFiles(ctx, folder, path, isInputLoop = false) {
let i = 0
for (const child of folder.folders) {
yield* findFiles(ctx, child, `${path}.folders.${i}`)
yield* findFiles(ctx, child, `${path}.folders.${i}`, isInputLoop)
i++
}
i = 0
for (const fileRef of folder.fileRefs) {
if (!fileRef.hash) {
if (isInputLoop) ctx.remainingQueueEntries++
yield {
ctx,
cacheKey: fileRef._id.toString(),
@@ -725,12 +727,14 @@ function* findFileInBatch(
projectBlobs,
projectBackedUpBlobs
)
yield* findFiles(ctx, project.rootFolder[0], prefix)
yield* findFiles(ctx, project.rootFolder[0], prefix, true)
for (const fileId of projectDeletedFiles) {
ctx.remainingQueueEntries++
yield { ctx, cacheKey: fileId, fileId, path: '' }
}
for (const blob of projectBlobs) {
if (projectBackedUpBlobs.has(blob.getHash())) continue
ctx.remainingQueueEntries++
yield {
ctx,
cacheKey: blob.getHash(),
@@ -821,6 +825,9 @@ class ProjectContext {
/** @type {Set<string>} */
#historyBlobs
/** @type {number} */
remainingQueueEntries = 0
/**
* @param {ObjectId} projectId
* @param {string} historyId
@@ -877,6 +884,10 @@ class ProjectContext {
}
async flushMongoQueuesIfNeeded() {
if (this.remainingQueueEntries === 0) {
await this.flushMongoQueues()
}
if (this.#completedBlobs.size > BATCH_HASH_WRITES) {
await this.#storeBackedUpBlobs()
}
@@ -1022,7 +1033,11 @@ class ProjectContext {
} else {
this.#pendingFiles.set(entry.cacheKey, processFileWithCleanup(entry))
}
entry.hash = await this.#pendingFiles.get(entry.cacheKey)
try {
entry.hash = await this.#pendingFiles.get(entry.cacheKey)
} finally {
this.remainingQueueEntries--
}
this.queueFileForWritingHash(entry)
await this.flushMongoQueuesIfNeeded()
}