Merge pull request #23946 from overleaf/bg-backup-queue-implementation

additional backup queue implementation

GitOrigin-RevId: 03754e57a6c6798a07dcca6a5248dec61b3cdc7a
This commit is contained in:
Brian Gough
2025-02-27 16:16:35 +00:00
committed by Copybot
parent 767591973c
commit b37b409994
3 changed files with 135 additions and 45 deletions

View File

@@ -47,6 +47,7 @@ EventEmitter.defaultMaxListeners = 20
logger.initialize('history-v1-backup')
// Settings shared between command-line and module usage
let DRY_RUN = false
let RETRY_LIMIT = 3
const RETRY_DELAY = 1000
@@ -54,6 +55,18 @@ let CONCURRENCY = 4
let BATCH_CONCURRENCY = 1
let BLOB_LIMITER = pLimit(CONCURRENCY)
/**
* Configure backup settings
* @param {Object} options Backup configuration options
*/
export function configureBackup(options = {}) {
DRY_RUN = options.dryRun || false
RETRY_LIMIT = options.retries || 3
CONCURRENCY = options.concurrency || 1
BATCH_CONCURRENCY = options.batchConcurrency || 1
BLOB_LIMITER = pLimit(CONCURRENCY)
}
let gracefulShutdownInitiated = false
process.on('SIGINT', handleSignal)
@@ -483,7 +496,7 @@ function makeChunkKey(projectId, startVersion) {
return path.join(projectKey.format(projectId), projectKey.pad(startVersion))
}
async function backupProject(projectId, options) {
export async function backupProject(projectId, options) {
// FIXME: flush the project first!
// Let's assume the the flush happens externally and triggers this backup
const backupStartTime = new Date()
@@ -627,7 +640,7 @@ function convertToISODate(dateStr) {
return new Date(dateStr + 'T00:00:00.000Z').toISOString()
}
async function initializeProjects(options) {
export async function initializeProjects(options) {
const limiter = pLimit(BATCH_CONCURRENCY)
async function processBatch(batch) {
@@ -929,31 +942,34 @@ async function main() {
}
}
main()
.then(() => {
console.log(
gracefulShutdownInitiated ? 'Exited - graceful shutdown' : 'Completed'
)
})
.catch(err => {
console.error('Error backing up project:', err)
process.exit(1)
})
.finally(() => {
knex
.destroy()
.then(() => {
console.log('Postgres connection closed')
})
.catch(err => {
console.error('Error closing Postgres connection:', err)
})
client
.close()
.then(() => {
console.log('MongoDB connection closed')
})
.catch(err => {
console.error('Error closing MongoDB connection:', err)
})
})
// Only run command-line interface when script is run directly
if (import.meta.url === `file://${process.argv[1]}`) {
main()
.then(() => {
console.log(
gracefulShutdownInitiated ? 'Exited - graceful shutdown' : 'Completed'
)
})
.catch(err => {
console.error('Error backing up project:', err)
process.exit(1)
})
.finally(() => {
knex
.destroy()
.then(() => {
console.log('Postgres connection closed')
})
.catch(err => {
console.error('Error closing Postgres connection:', err)
})
client
.close()
.then(() => {
console.log('MongoDB connection closed')
})
.catch(err => {
console.error('Error closing MongoDB connection:', err)
})
})
}

View File

@@ -21,13 +21,23 @@ const backupQueue = new Queue('backup', {
const optionDefinitions = [
{ name: 'clean', type: Boolean },
{ name: 'status', type: Boolean },
{ name: 'add', type: String, multiple: true },
{
name: 'add',
type: String,
multiple: true,
description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format',
},
{ name: 'monitor', type: Boolean },
]
// Parse command line arguments
const options = commandLineArgs(optionDefinitions)
// Helper to validate date format
function isValidDateFormat(dateStr) {
return /^\d{4}-\d{2}-\d{2}$/.test(dateStr)
}
// Setup queue event listeners
function setupMonitoring() {
console.log('Starting queue monitoring. Press Ctrl+C to exit.')
@@ -81,6 +91,23 @@ function setupMonitoring() {
})
}
async function addDateRangeJob(input) {
const [startDate, endDate] = input.split(':')
if (!isValidDateFormat(startDate) || !isValidDateFormat(endDate)) {
console.error(
`Invalid date format for "${input}". Use YYYY-MM-DD:YYYY-MM-DD`
)
return
}
const job = await backupQueue.add(
{ startDate, endDate },
{ jobId: `backup-${startDate}-to-${endDate}` }
)
console.log(
`Added date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}`
)
}
// Main execution block
async function run() {
const optionCount = [
@@ -107,19 +134,31 @@ async function run() {
const counts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(counts))
} else if (options.add) {
const projectIds = Array.isArray(options.add) ? options.add : [options.add]
for (const projectId of projectIds) {
const job = await backupQueue.add({ projectId }, { jobId: projectId })
console.log(`Added job for project: ${projectId}, job ID: ${job.id}`)
const inputs = Array.isArray(options.add) ? options.add : [options.add]
for (const input of inputs) {
if (input.includes(':')) {
// Handle date range format
await addDateRangeJob(input)
} else {
// Handle project ID format
const job = await backupQueue.add(
{ projectId: input },
{ jobId: input }
)
console.log(`Added job for project: ${input}, job ID: ${job.id}`)
}
}
} else if (options.monitor) {
setupMonitoring()
} else {
console.log('Usage:')
console.log(' --clean Clean up completed and failed jobs')
console.log(' --status Show current job counts')
console.log(' --add [projectId] Add a job for the specified projectId')
console.log(' --monitor Monitor queue events')
console.log(' --clean Clean up completed and failed jobs')
console.log(' --status Show current job counts')
console.log(' --add [projectId] Add a job for the specified projectId')
console.log(
' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range'
)
console.log(' --monitor Monitor queue events')
}
}

View File

@@ -2,11 +2,19 @@ import Queue from 'bull'
import logger from '@overleaf/logger'
import config from 'config'
import metrics from '@overleaf/metrics'
import {
backupProject,
initializeProjects,
configureBackup,
} from './backup.mjs'
const CONCURRENCY = 10
const redisOptions = config.get('redis.queue')
const TIME_BUCKETS = [10, 100, 500, 1000, 5000, 10000, 30000, 60000]
// Configure backup settings to match worker concurrency
configureBackup({ concurrency: 5, batchConcurrency: 5 })
// Create a Bull queue named 'backup'
const backupQueue = new Queue('backup', {
redis: redisOptions,
@@ -37,18 +45,45 @@ backupQueue.on('error', error => {
// Process jobs
backupQueue.process(CONCURRENCY, async job => {
const { projectId } = job.data
const { projectId, startDate, endDate } = job.data
if (projectId) {
return await runBackup(projectId)
} else if (startDate && endDate) {
return await runInit(startDate, endDate)
} else {
throw new Error('invalid job data')
}
})
async function runBackup(projectId) {
const timer = new metrics.Timer(
'backup_worker_job_duration',
1,
{},
TIME_BUCKETS
)
logger.info({ projectId }, 'processing backup for project')
await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 5000))
timer.done()
return `backup completed ${projectId}`
})
try {
logger.info({ projectId }, 'processing backup for project')
await backupProject(projectId, {})
timer.done()
return `backup completed ${projectId}`
} catch (err) {
logger.error({ projectId, err }, 'backup failed')
throw err // Re-throw to mark job as failed
}
}
async function runInit(startDate, endDate) {
try {
logger.info({ startDate, endDate }, 'initializing projects')
await initializeProjects({ 'start-date': startDate, 'end-date': endDate })
return `initialization completed ${startDate} - ${endDate}`
} catch (err) {
logger.error({ startDate, endDate, err }, 'initialization failed')
throw err
}
}
export async function drainQueue() {
logger.info({ queue: backupQueue.name }, 'pausing queue')