Merge pull request #22061 from overleaf/jpa-reduce-idle-time

[history-v1] back_fill_file_hash: reduce idle time between batches

GitOrigin-RevId: 9b24bb882b158f33915d2e0ab2f82273eac09739
This commit is contained in:
Jakob Ackermann
2024-11-21 15:56:19 +01:00
committed by Copybot
parent 2c6dd4e56b
commit e4574bff7e
3 changed files with 127 additions and 42 deletions

View File

@@ -253,8 +253,14 @@ function expressifyErrorHandler(fn) {
* Map values in `array` with the async function `fn`
*
* Limit the number of unresolved promises to `concurrency`.
* @template T
* @template V
* @param {number} concurrency
* @param {Array<T>} array
* @param {(arg: T) => Promise<V>} fn
* @return {Promise<Array<Awaited<V>>>}
*/
function promiseMapWithLimit(concurrency, array, fn) {
async function promiseMapWithLimit(concurrency, array, fn) {
const limit = pLimit(concurrency)
return Promise.all(array.map(x => limit(() => fn(x))))
return await Promise.all(array.map(x => limit(() => fn(x))))
}

View File

@@ -8,10 +8,12 @@ import Stream from 'node:stream'
import zLib from 'node:zlib'
import { setTimeout } from 'node:timers/promises'
import { Binary, ObjectId } from 'mongodb'
import pLimit from 'p-limit'
import logger from '@overleaf/logger'
import {
batchedUpdate,
objectIdFromInput,
renderObjectId,
READ_PREFERENCE_SECONDARY,
} from '@overleaf/mongo-utils/batchedUpdate.js'
import OError from '@overleaf/o-error'
@@ -20,7 +22,6 @@ import {
NoKEKMatchedError,
NotFoundError,
} from '@overleaf/object-persistor/src/Errors.js'
import { promiseMapWithLimit } from '@overleaf/promise-utils'
import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs'
import {
BlobStore,
@@ -100,6 +101,7 @@ const LOGGING_IDENTIFIER = process.env.LOGGING_IDENTIFIER || BATCH_RANGE_START
// Concurrency for downloading from GCS and updating hashes in mongo
const CONCURRENCY = parseInt(process.env.CONCURRENCY || '100', 10)
const CONCURRENT_BATCHES = parseInt(process.env.CONCURRENT_BATCHES || '2', 10)
// Retries for processing a given file
const RETRIES = parseInt(process.env.RETRIES || '10', 10)
const RETRY_DELAY_MS = parseInt(process.env.RETRY_DELAY_MS || '100', 10)
@@ -123,6 +125,19 @@ const projectsCollection = db.collection('projects')
const deletedProjectsCollection = db.collection('deletedProjects')
const deletedFilesCollection = db.collection('deletedFiles')
const concurrencyLimit = pLimit(CONCURRENCY)
/**
* @template T
* @template V
* @param {Array<T>} array
* @param {(arg: T) => Promise<V>} fn
* @return {Promise<Array<Awaited<V>>>}
*/
async function processConcurrently(array, fn) {
return await Promise.all(array.map(x => concurrencyLimit(() => fn(x))))
}
const STATS = {
projects: 0,
blobs: 0,
@@ -215,6 +230,7 @@ function printStats(isLast = false) {
...bandwidthStats(STATS, now - processStart),
eventLoop: nextEventLoopStats,
diff: computeDiff(nextEventLoopStats, now),
deferredBatches: Array.from(deferredBatches.keys()),
})
if (isLast) {
console.warn(logLine)
@@ -468,9 +484,7 @@ async function uploadBlobToAWS(entry, blob, filePath) {
* @return {Promise<void>}
*/
async function processFiles(files) {
if (files.length === 0) return // all processed
await promiseMapWithLimit(
CONCURRENCY,
await processConcurrently(
files,
/**
* @param {QueueEntry} entry
@@ -497,15 +511,65 @@ async function processFiles(files) {
)
}
/** @type {Map<string, Promise>} */
const deferredBatches = new Map()
async function waitForDeferredQueues() {
// Wait for ALL pending batches to finish, especially wait for their mongo
// writes to finish to avoid extra work when resuming the batch.
const all = await Promise.allSettled(deferredBatches.values())
// Now that all batches finished, we can throw if needed.
for (const res of all) {
if (res.status === 'rejected') {
throw res.reason
}
}
}
/**
* @param {Array<Project>} batch
* @param {string} prefix
*/
async function queueNextBatch(batch, prefix = 'rootFolder.0') {
if (gracefulShutdownInitiated) {
throw new Error('graceful shutdown: aborting batch processing')
}
// Read ids now, the batch will get trimmed by processBatch shortly.
const start = renderObjectId(batch[0]._id)
const end = renderObjectId(batch[batch.length - 1]._id)
const deferred = processBatch(batch, prefix)
.then(() => {
console.error(`Actually completed batch ending ${end}`)
})
.catch(err => {
logger.error({ err, start, end }, 'fatal error processing batch')
throw err
})
.finally(() => {
deferredBatches.delete(end)
})
deferredBatches.set(end, deferred)
if (deferredBatches.size >= CONCURRENT_BATCHES) {
// Wait for any of the deferred batches to finish before fetching the next.
// We should never have more than CONCURRENT_BATCHES batches in memory.
await Promise.race(deferredBatches.values())
}
}
/**
* @param {Array<Project>} batch
* @param {string} prefix
* @return {Promise<void>}
*/
async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
const deletedFiles = await collectDeletedFiles(batch)
const { nBlobs, blobs } = await collectProjectBlobs(batch)
const { nBackedUpBlobs, backedUpBlobs } = await collectBackedUpBlobs(batch)
async function processBatch(batch, prefix = 'rootFolder.0') {
const [deletedFiles, { nBlobs, blobs }, { nBackedUpBlobs, backedUpBlobs }] =
await Promise.all([
collectDeletedFiles(batch),
collectProjectBlobs(batch),
collectBackedUpBlobs(batch),
])
const files = Array.from(
findFileInBatch(batch, prefix, deletedFiles, blobs, backedUpBlobs)
)
@@ -533,8 +597,7 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
}
)
await processFiles(files)
await promiseMapWithLimit(
CONCURRENCY,
await processConcurrently(
files,
/**
* @param {QueueEntry} entry
@@ -544,9 +607,6 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
await entry.ctx.flushMongoQueues()
}
)
if (gracefulShutdownInitiated) {
throw new Error('graceful shutdown: aborting batch processing')
}
}
/**
@@ -554,7 +614,7 @@ async function handleLiveTreeBatch(batch, prefix = 'rootFolder.0') {
* @return {Promise<void>}
*/
async function handleDeletedFileTreeBatch(batch) {
await handleLiveTreeBatch(
await queueNextBatch(
batch.map(d => d.project),
'project.rootFolder.0'
)
@@ -1063,37 +1123,51 @@ function estimateBlobSize(blob) {
}
async function updateLiveFileTrees() {
await batchedUpdate(
projectsCollection,
{ 'overleaf.history.id': { $exists: true } },
handleLiveTreeBatch,
{ rootFolder: 1, _id: 1, 'overleaf.history.id': 1 },
{},
{
BATCH_RANGE_START,
BATCH_RANGE_END,
}
)
try {
await batchedUpdate(
projectsCollection,
{ 'overleaf.history.id': { $exists: true } },
queueNextBatch,
{ rootFolder: 1, _id: 1, 'overleaf.history.id': 1 },
{},
{
BATCH_RANGE_START,
BATCH_RANGE_END,
}
)
} catch (err) {
gracefulShutdownInitiated = true
throw err
} finally {
await waitForDeferredQueues()
}
console.warn('Done updating live projects')
}
async function updateDeletedFileTrees() {
await batchedUpdate(
deletedProjectsCollection,
{
'deleterData.deletedProjectId': {
$gt: new ObjectId(BATCH_RANGE_START),
$lte: new ObjectId(BATCH_RANGE_END),
try {
await batchedUpdate(
deletedProjectsCollection,
{
'deleterData.deletedProjectId': {
$gt: new ObjectId(BATCH_RANGE_START),
$lte: new ObjectId(BATCH_RANGE_END),
},
'project.overleaf.history.id': { $exists: true },
},
'project.overleaf.history.id': { $exists: true },
},
handleDeletedFileTreeBatch,
{
'project.rootFolder': 1,
'project._id': 1,
'project.overleaf.history.id': 1,
}
)
handleDeletedFileTreeBatch,
{
'project.rootFolder': 1,
'project._id': 1,
'project.overleaf.history.id': 1,
}
)
} catch (err) {
gracefulShutdownInitiated = true
throw err
} finally {
await waitForDeferredQueues()
}
console.warn('Done updating deleted projects')
}

View File

@@ -540,6 +540,11 @@ describe('back_fill_file_hash script', function () {
delete stats[key]
}
delete stats.LOGGING_IDENTIFIER
expect(stats.deferredBatches).to.have.length(
0,
'should not have any remaining deferred batches'
)
delete stats.deferredBatches
return { stats, result }
}