From b37b409994a12dff585fb710f52af076a934d1d3 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 27 Feb 2025 16:16:35 +0000 Subject: [PATCH] Merge pull request #23946 from overleaf/bg-backup-queue-implementation additional backup queue implementation GitOrigin-RevId: 03754e57a6c6798a07dcca6a5248dec61b3cdc7a --- .../history-v1/storage/scripts/backup.mjs | 76 +++++++++++-------- .../storage/scripts/backup_scheduler.mjs | 57 +++++++++++--- .../storage/scripts/backup_worker.mjs | 47 ++++++++++-- 3 files changed, 135 insertions(+), 45 deletions(-) diff --git a/services/history-v1/storage/scripts/backup.mjs b/services/history-v1/storage/scripts/backup.mjs index 1aebebbe68..a89950e416 100644 --- a/services/history-v1/storage/scripts/backup.mjs +++ b/services/history-v1/storage/scripts/backup.mjs @@ -47,6 +47,7 @@ EventEmitter.defaultMaxListeners = 20 logger.initialize('history-v1-backup') +// Settings shared between command-line and module usage let DRY_RUN = false let RETRY_LIMIT = 3 const RETRY_DELAY = 1000 @@ -54,6 +55,18 @@ let CONCURRENCY = 4 let BATCH_CONCURRENCY = 1 let BLOB_LIMITER = pLimit(CONCURRENCY) +/** + * Configure backup settings + * @param {Object} options Backup configuration options + */ +export function configureBackup(options = {}) { + DRY_RUN = options.dryRun || false + RETRY_LIMIT = options.retries || 3 + CONCURRENCY = options.concurrency || 1 + BATCH_CONCURRENCY = options.batchConcurrency || 1 + BLOB_LIMITER = pLimit(CONCURRENCY) +} + let gracefulShutdownInitiated = false process.on('SIGINT', handleSignal) @@ -483,7 +496,7 @@ function makeChunkKey(projectId, startVersion) { return path.join(projectKey.format(projectId), projectKey.pad(startVersion)) } -async function backupProject(projectId, options) { +export async function backupProject(projectId, options) { // FIXME: flush the project first! // Let's assume the the flush happens externally and triggers this backup const backupStartTime = new Date() @@ -627,7 +640,7 @@ function convertToISODate(dateStr) { return new Date(dateStr + 'T00:00:00.000Z').toISOString() } -async function initializeProjects(options) { +export async function initializeProjects(options) { const limiter = pLimit(BATCH_CONCURRENCY) async function processBatch(batch) { @@ -929,31 +942,34 @@ async function main() { } } -main() - .then(() => { - console.log( - gracefulShutdownInitiated ? 'Exited - graceful shutdown' : 'Completed' - ) - }) - .catch(err => { - console.error('Error backing up project:', err) - process.exit(1) - }) - .finally(() => { - knex - .destroy() - .then(() => { - console.log('Postgres connection closed') - }) - .catch(err => { - console.error('Error closing Postgres connection:', err) - }) - client - .close() - .then(() => { - console.log('MongoDB connection closed') - }) - .catch(err => { - console.error('Error closing MongoDB connection:', err) - }) - }) +// Only run command-line interface when script is run directly +if (import.meta.url === `file://${process.argv[1]}`) { + main() + .then(() => { + console.log( + gracefulShutdownInitiated ? 'Exited - graceful shutdown' : 'Completed' + ) + }) + .catch(err => { + console.error('Error backing up project:', err) + process.exit(1) + }) + .finally(() => { + knex + .destroy() + .then(() => { + console.log('Postgres connection closed') + }) + .catch(err => { + console.error('Error closing Postgres connection:', err) + }) + client + .close() + .then(() => { + console.log('MongoDB connection closed') + }) + .catch(err => { + console.error('Error closing MongoDB connection:', err) + }) + }) +} diff --git a/services/history-v1/storage/scripts/backup_scheduler.mjs b/services/history-v1/storage/scripts/backup_scheduler.mjs index 4e191832f3..44c6b7c4ec 100644 --- a/services/history-v1/storage/scripts/backup_scheduler.mjs +++ b/services/history-v1/storage/scripts/backup_scheduler.mjs @@ -21,13 +21,23 @@ const backupQueue = new Queue('backup', { const optionDefinitions = [ { name: 'clean', type: Boolean }, { name: 'status', type: Boolean }, - { name: 'add', type: String, multiple: true }, + { + name: 'add', + type: String, + multiple: true, + description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format', + }, { name: 'monitor', type: Boolean }, ] // Parse command line arguments const options = commandLineArgs(optionDefinitions) +// Helper to validate date format +function isValidDateFormat(dateStr) { + return /^\d{4}-\d{2}-\d{2}$/.test(dateStr) +} + // Setup queue event listeners function setupMonitoring() { console.log('Starting queue monitoring. Press Ctrl+C to exit.') @@ -81,6 +91,23 @@ function setupMonitoring() { }) } +async function addDateRangeJob(input) { + const [startDate, endDate] = input.split(':') + if (!isValidDateFormat(startDate) || !isValidDateFormat(endDate)) { + console.error( + `Invalid date format for "${input}". Use YYYY-MM-DD:YYYY-MM-DD` + ) + return + } + const job = await backupQueue.add( + { startDate, endDate }, + { jobId: `backup-${startDate}-to-${endDate}` } + ) + console.log( + `Added date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}` + ) +} + // Main execution block async function run() { const optionCount = [ @@ -107,19 +134,31 @@ async function run() { const counts = await backupQueue.getJobCounts() console.log('Current queue state:', JSON.stringify(counts)) } else if (options.add) { - const projectIds = Array.isArray(options.add) ? options.add : [options.add] - for (const projectId of projectIds) { - const job = await backupQueue.add({ projectId }, { jobId: projectId }) - console.log(`Added job for project: ${projectId}, job ID: ${job.id}`) + const inputs = Array.isArray(options.add) ? options.add : [options.add] + for (const input of inputs) { + if (input.includes(':')) { + // Handle date range format + await addDateRangeJob(input) + } else { + // Handle project ID format + const job = await backupQueue.add( + { projectId: input }, + { jobId: input } + ) + console.log(`Added job for project: ${input}, job ID: ${job.id}`) + } } } else if (options.monitor) { setupMonitoring() } else { console.log('Usage:') - console.log(' --clean Clean up completed and failed jobs') - console.log(' --status Show current job counts') - console.log(' --add [projectId] Add a job for the specified projectId') - console.log(' --monitor Monitor queue events') + console.log(' --clean Clean up completed and failed jobs') + console.log(' --status Show current job counts') + console.log(' --add [projectId] Add a job for the specified projectId') + console.log( + ' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range' + ) + console.log(' --monitor Monitor queue events') } } diff --git a/services/history-v1/storage/scripts/backup_worker.mjs b/services/history-v1/storage/scripts/backup_worker.mjs index 9bc832fd7b..449a260c6a 100644 --- a/services/history-v1/storage/scripts/backup_worker.mjs +++ b/services/history-v1/storage/scripts/backup_worker.mjs @@ -2,11 +2,19 @@ import Queue from 'bull' import logger from '@overleaf/logger' import config from 'config' import metrics from '@overleaf/metrics' +import { + backupProject, + initializeProjects, + configureBackup, +} from './backup.mjs' const CONCURRENCY = 10 const redisOptions = config.get('redis.queue') const TIME_BUCKETS = [10, 100, 500, 1000, 5000, 10000, 30000, 60000] +// Configure backup settings to match worker concurrency +configureBackup({ concurrency: 5, batchConcurrency: 5 }) + // Create a Bull queue named 'backup' const backupQueue = new Queue('backup', { redis: redisOptions, @@ -37,18 +45,45 @@ backupQueue.on('error', error => { // Process jobs backupQueue.process(CONCURRENCY, async job => { - const { projectId } = job.data + const { projectId, startDate, endDate } = job.data + + if (projectId) { + return await runBackup(projectId) + } else if (startDate && endDate) { + return await runInit(startDate, endDate) + } else { + throw new Error('invalid job data') + } +}) + +async function runBackup(projectId) { const timer = new metrics.Timer( 'backup_worker_job_duration', 1, {}, TIME_BUCKETS ) - logger.info({ projectId }, 'processing backup for project') - await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 5000)) - timer.done() - return `backup completed ${projectId}` -}) + try { + logger.info({ projectId }, 'processing backup for project') + await backupProject(projectId, {}) + timer.done() + return `backup completed ${projectId}` + } catch (err) { + logger.error({ projectId, err }, 'backup failed') + throw err // Re-throw to mark job as failed + } +} + +async function runInit(startDate, endDate) { + try { + logger.info({ startDate, endDate }, 'initializing projects') + await initializeProjects({ 'start-date': startDate, 'end-date': endDate }) + return `initialization completed ${startDate} - ${endDate}` + } catch (err) { + logger.error({ startDate, endDate, err }, 'initialization failed') + throw err + } +} export async function drainQueue() { logger.info({ queue: backupQueue.name }, 'pausing queue')