mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-31 21:01:33 +02:00
Merge pull request #24080 from overleaf/bg-serialise-backup-initialisation
fix backup initialization to avoid incorrect use of batchedUpdate GitOrigin-RevId: 6984f3510c6b03b3dfda35efea8173f848e58eff
This commit is contained in:
@@ -37,7 +37,11 @@ import projectKey from '../lib/project_key.js'
|
||||
import Crypto from 'node:crypto'
|
||||
import Stream from 'node:stream'
|
||||
import { EventEmitter } from 'node:events'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import {
|
||||
objectIdFromInput,
|
||||
batchedUpdate,
|
||||
READ_PREFERENCE_SECONDARY,
|
||||
} from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { createGunzip } from 'node:zlib'
|
||||
import { text } from 'node:stream/consumers'
|
||||
import { fromStream as blobHashFromStream } from '../lib/blob_hash.js'
|
||||
@@ -668,7 +672,6 @@ export async function backupProject(projectId, options) {
|
||||
}
|
||||
|
||||
function convertToISODate(dateStr) {
|
||||
if (!dateStr) return undefined
|
||||
// Expecting YYYY-MM-DD format
|
||||
if (!/^\d{4}-\d{2}-\d{2}$/.test(dateStr)) {
|
||||
throw new Error('Date must be in YYYY-MM-DD format')
|
||||
@@ -678,61 +681,41 @@ function convertToISODate(dateStr) {
|
||||
|
||||
export async function initializeProjects(options) {
|
||||
await ensureGlobalBlobsLoaded()
|
||||
const limiter = pLimit(BATCH_CONCURRENCY)
|
||||
let totalErrors = 0
|
||||
let totalProjects = 0
|
||||
async function backupProjectWithErrorLogging(projectId) {
|
||||
try {
|
||||
await backupProject(projectId, options)
|
||||
} catch (err) {
|
||||
logger.error({ projectId, err }, 'error backing up project')
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
async function processBatch(batch) {
|
||||
if (gracefulShutdownInitiated) {
|
||||
throw new Error('graceful shutdown')
|
||||
}
|
||||
const batchOperations = batch.map(project =>
|
||||
limiter(backupProjectWithErrorLogging, project._id.toHexString())
|
||||
)
|
||||
const results = await Promise.allSettled(batchOperations)
|
||||
const errors = results.filter(result => result.status === 'rejected').length
|
||||
if (errors > 0) {
|
||||
logger.error(
|
||||
{
|
||||
errors,
|
||||
batchSize: batch.length,
|
||||
batchStart: batch[0]._id.toHexString(),
|
||||
batchEnd: batch[batch.length - 1]._id.toHexString(),
|
||||
},
|
||||
'errors in batch'
|
||||
)
|
||||
}
|
||||
totalErrors += errors
|
||||
totalProjects += batch.length
|
||||
}
|
||||
|
||||
const query = {
|
||||
'overleaf.history.id': { $exists: true },
|
||||
'overleaf.backup.lastBackedUpVersion': { $exists: false },
|
||||
'overleaf.backup.pendingChangeAt': { $exists: false },
|
||||
_id: {
|
||||
$gte: objectIdFromInput(convertToISODate(options['start-date'])),
|
||||
$lt: objectIdFromInput(convertToISODate(options['end-date'])),
|
||||
},
|
||||
}
|
||||
|
||||
await batchedUpdate(
|
||||
client.db().collection('projects'),
|
||||
query,
|
||||
processBatch,
|
||||
{
|
||||
_id: 1,
|
||||
},
|
||||
{ readPreference: 'secondary' },
|
||||
{
|
||||
BATCH_RANGE_START: convertToISODate(options['start-date']),
|
||||
BATCH_RANGE_END: convertToISODate(options['end-date']),
|
||||
const cursor = client
|
||||
.db()
|
||||
.collection('projects')
|
||||
.find(query, {
|
||||
projection: { _id: 1 },
|
||||
readPreference: READ_PREFERENCE_SECONDARY,
|
||||
})
|
||||
|
||||
for await (const project of cursor) {
|
||||
if (gracefulShutdownInitiated) {
|
||||
console.warn('graceful shutdown: stopping project initialization')
|
||||
break
|
||||
}
|
||||
)
|
||||
totalProjects++
|
||||
const projectId = project._id.toHexString()
|
||||
try {
|
||||
await backupProject(projectId, options)
|
||||
} catch (err) {
|
||||
totalErrors++
|
||||
logger.error({ projectId, err }, 'error backing up project')
|
||||
}
|
||||
}
|
||||
|
||||
return { errors: totalErrors, projects: totalProjects }
|
||||
}
|
||||
|
||||
@@ -8,12 +8,12 @@ import {
|
||||
configureBackup,
|
||||
} from './backup.mjs'
|
||||
|
||||
const CONCURRENCY = 10
|
||||
const CONCURRENCY = 15
|
||||
const redisOptions = config.get('redis.queue')
|
||||
const TIME_BUCKETS = [10, 100, 500, 1000, 5000, 10000, 30000, 60000]
|
||||
|
||||
// Configure backup settings to match worker concurrency
|
||||
configureBackup({ concurrency: 50, batchConcurrency: 3 })
|
||||
configureBackup({ concurrency: 50, useSecondary: true })
|
||||
|
||||
// Create a Bull queue named 'backup'
|
||||
const backupQueue = new Queue('backup', {
|
||||
|
||||
Reference in New Issue
Block a user