diff --git a/services/history-v1/backup-worker-app.mjs b/services/history-v1/backup-worker-app.mjs index b21e55aafe..0e75edb3ba 100644 --- a/services/history-v1/backup-worker-app.mjs +++ b/services/history-v1/backup-worker-app.mjs @@ -8,7 +8,7 @@ import express from 'express' import logger from '@overleaf/logger' import Metrics from '@overleaf/metrics' import { expressify } from '@overleaf/promise-utils' -import { drainQueue, healthCheck } from './storage/scripts/backup_worker.mjs' +import { healthCheck } from './storage/scripts/backup_worker.mjs' const app = express() logger.initialize('history-v1-backup-worker') @@ -39,7 +39,6 @@ app.use((err, req, res, next) => { async function triggerGracefulShutdown(server, signal) { logger.info({ signal }, 'graceful shutdown: started shutdown sequence') - await drainQueue() server.close(function () { logger.info({ signal }, 'graceful shutdown: closed server') setTimeout(() => { diff --git a/services/history-v1/storage/scripts/backup.mjs b/services/history-v1/storage/scripts/backup.mjs index e044fc69b5..656691f708 100644 --- a/services/history-v1/storage/scripts/backup.mjs +++ b/services/history-v1/storage/scripts/backup.mjs @@ -93,8 +93,10 @@ process.on('SIGINT', handleSignal) process.on('SIGTERM', handleSignal) function handleSignal() { - gracefulShutdownInitiated = true - logger.info({}, 'graceful shutdown initiated, draining queue') + if (!gracefulShutdownInitiated) { + gracefulShutdownInitiated = true + logger.info({}, 'graceful shutdown: waiting for backups to complete') + } } async function retry(fn, times, delayMs) { @@ -1071,6 +1073,45 @@ async function main() { } } +/** + * Close all database connections gracefully + * @returns {Promise} + */ +export async function closeConnections() { + /** @type {Error[]} */ + const errors = [] + + try { + await knex.destroy() + console.log('Postgres connection closed') + } catch (err) { + console.error('Error closing Postgres connection:', err) + errors.push(/** @type {Error} */ (err)) + } + + try { + await client.close() + console.log('MongoDB connection closed') + } catch (err) { + console.error('Error closing MongoDB connection:', err) + errors.push(/** @type {Error} */ (err)) + } + + try { + await redis.disconnect() + console.log('Redis connection closed') + } catch (err) { + console.error('Error closing Redis connection:', err) + errors.push(/** @type {Error} */ (err)) + } + + if (errors.length > 0) { + throw new Error( + `Failed to close ${errors.length} connection(s): ${errors.map(e => e.message).join(', ')}` + ) + } +} + // Only run command-line interface when script is run directly if (import.meta.url === `file://${process.argv[1]}`) { main() @@ -1083,30 +1124,7 @@ if (import.meta.url === `file://${process.argv[1]}`) { console.error('Error backing up project:', err) process.exit(1) }) - .finally(() => { - knex - .destroy() - .then(() => { - console.log('Postgres connection closed') - }) - .catch(err => { - console.error('Error closing Postgres connection:', err) - }) - client - .close() - .then(() => { - console.log('MongoDB connection closed') - }) - .catch(err => { - console.error('Error closing MongoDB connection:', err) - }) - redis - .disconnect() - .then(() => { - console.log('Redis connection closed') - }) - .catch(err => { - console.error('Error closing Redis connection:', err) - }) + .finally(async () => { + await closeConnections() }) } diff --git a/services/history-v1/storage/scripts/backup_worker.mjs b/services/history-v1/storage/scripts/backup_worker.mjs index 0196d02422..ba630293be 100644 --- a/services/history-v1/storage/scripts/backup_worker.mjs +++ b/services/history-v1/storage/scripts/backup_worker.mjs @@ -6,6 +6,7 @@ import { backupProject, initializeProjects, configureBackup, + closeConnections, } from './backup.mjs' const JOB_CONCURRENCY = parseInt(process.env.JOB_CONCURRENCY, 10) || 15 @@ -20,6 +21,19 @@ const LAG_TIME_BUCKETS_HRS = [ // Configure backup settings to match worker concurrency configureBackup({ concurrency: UPLOAD_CONCURRENCY, useSecondary: true }) +let gracefulShutdownInitiated = false + +process.on('SIGINT', handleSignal) +process.on('SIGTERM', handleSignal) + +async function handleSignal() { + if (!gracefulShutdownInitiated) { + gracefulShutdownInitiated = true + logger.info({}, 'graceful shutdown: stopping backup worker') + await drainQueue() + } +} + // Create a Bull queue named 'backup' const backupQueue = new Queue('backup', { redis: redisOptions, @@ -62,11 +76,11 @@ backupQueue.on('lock-extension-failed', (job, err) => { }) backupQueue.on('paused', () => { - logger.info('queue paused') + logger.info({}, 'queue paused') }) backupQueue.on('resumed', () => { - logger.info('queue resumed') + logger.info({}, 'queue resumed') }) // Process jobs @@ -138,10 +152,10 @@ async function runInit(startDate, endDate) { } export async function drainQueue() { - logger.info({ queue: backupQueue.name }, 'pausing queue') - await backupQueue.pause(true) // pause this worker and wait for jobs to finish logger.info({ queue: backupQueue.name }, 'closing queue') await backupQueue.close() + logger.info({ queue: backupQueue.name }, 'closing database connections') + await closeConnections() } export async function healthCheck() {