Merge pull request #29801 from overleaf/bg-history-refactor-backup-worker-shutdown

refactor history-v1-backup worker shutdown

GitOrigin-RevId: 9666a99b00b30e98844e7dd25932f1590d0879b3
This commit is contained in:
Brian Gough
2025-11-21 10:32:23 +00:00
committed by Copybot
parent e822ac0ee0
commit 1f356754de
3 changed files with 64 additions and 33 deletions

View File

@@ -8,7 +8,7 @@ import express from 'express'
import logger from '@overleaf/logger'
import Metrics from '@overleaf/metrics'
import { expressify } from '@overleaf/promise-utils'
import { drainQueue, healthCheck } from './storage/scripts/backup_worker.mjs'
import { healthCheck } from './storage/scripts/backup_worker.mjs'
const app = express()
logger.initialize('history-v1-backup-worker')
@@ -39,7 +39,6 @@ app.use((err, req, res, next) => {
async function triggerGracefulShutdown(server, signal) {
logger.info({ signal }, 'graceful shutdown: started shutdown sequence')
await drainQueue()
server.close(function () {
logger.info({ signal }, 'graceful shutdown: closed server')
setTimeout(() => {

View File

@@ -93,8 +93,10 @@ process.on('SIGINT', handleSignal)
process.on('SIGTERM', handleSignal)
function handleSignal() {
gracefulShutdownInitiated = true
logger.info({}, 'graceful shutdown initiated, draining queue')
if (!gracefulShutdownInitiated) {
gracefulShutdownInitiated = true
logger.info({}, 'graceful shutdown: waiting for backups to complete')
}
}
async function retry(fn, times, delayMs) {
@@ -1071,6 +1073,45 @@ async function main() {
}
}
/**
* Close all database connections gracefully
* @returns {Promise<void>}
*/
export async function closeConnections() {
/** @type {Error[]} */
const errors = []
try {
await knex.destroy()
console.log('Postgres connection closed')
} catch (err) {
console.error('Error closing Postgres connection:', err)
errors.push(/** @type {Error} */ (err))
}
try {
await client.close()
console.log('MongoDB connection closed')
} catch (err) {
console.error('Error closing MongoDB connection:', err)
errors.push(/** @type {Error} */ (err))
}
try {
await redis.disconnect()
console.log('Redis connection closed')
} catch (err) {
console.error('Error closing Redis connection:', err)
errors.push(/** @type {Error} */ (err))
}
if (errors.length > 0) {
throw new Error(
`Failed to close ${errors.length} connection(s): ${errors.map(e => e.message).join(', ')}`
)
}
}
// Only run command-line interface when script is run directly
if (import.meta.url === `file://${process.argv[1]}`) {
main()
@@ -1083,30 +1124,7 @@ if (import.meta.url === `file://${process.argv[1]}`) {
console.error('Error backing up project:', err)
process.exit(1)
})
.finally(() => {
knex
.destroy()
.then(() => {
console.log('Postgres connection closed')
})
.catch(err => {
console.error('Error closing Postgres connection:', err)
})
client
.close()
.then(() => {
console.log('MongoDB connection closed')
})
.catch(err => {
console.error('Error closing MongoDB connection:', err)
})
redis
.disconnect()
.then(() => {
console.log('Redis connection closed')
})
.catch(err => {
console.error('Error closing Redis connection:', err)
})
.finally(async () => {
await closeConnections()
})
}

View File

@@ -6,6 +6,7 @@ import {
backupProject,
initializeProjects,
configureBackup,
closeConnections,
} from './backup.mjs'
const JOB_CONCURRENCY = parseInt(process.env.JOB_CONCURRENCY, 10) || 15
@@ -20,6 +21,19 @@ const LAG_TIME_BUCKETS_HRS = [
// Configure backup settings to match worker concurrency
configureBackup({ concurrency: UPLOAD_CONCURRENCY, useSecondary: true })
let gracefulShutdownInitiated = false
process.on('SIGINT', handleSignal)
process.on('SIGTERM', handleSignal)
async function handleSignal() {
if (!gracefulShutdownInitiated) {
gracefulShutdownInitiated = true
logger.info({}, 'graceful shutdown: stopping backup worker')
await drainQueue()
}
}
// Create a Bull queue named 'backup'
const backupQueue = new Queue('backup', {
redis: redisOptions,
@@ -62,11 +76,11 @@ backupQueue.on('lock-extension-failed', (job, err) => {
})
backupQueue.on('paused', () => {
logger.info('queue paused')
logger.info({}, 'queue paused')
})
backupQueue.on('resumed', () => {
logger.info('queue resumed')
logger.info({}, 'queue resumed')
})
// Process jobs
@@ -138,10 +152,10 @@ async function runInit(startDate, endDate) {
}
export async function drainQueue() {
logger.info({ queue: backupQueue.name }, 'pausing queue')
await backupQueue.pause(true) // pause this worker and wait for jobs to finish
logger.info({ queue: backupQueue.name }, 'closing queue')
await backupQueue.close()
logger.info({ queue: backupQueue.name }, 'closing database connections')
await closeConnections()
}
export async function healthCheck() {