Merge pull request #23920 from overleaf/bg-backup-queue

initial history backup queue worker app

GitOrigin-RevId: e9647a2ec3caeb1fff843cead12164ba89fdb1f8
This commit is contained in:
Brian Gough
2025-02-27 09:18:17 +00:00
committed by Copybot
parent 788f6569d8
commit 763e5ba82c
6 changed files with 524 additions and 1 deletions

251
package-lock.json generated
View File

@@ -6306,6 +6306,12 @@
"react": "*"
}
},
"node_modules/@ioredis/commands": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz",
"integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==",
"license": "MIT"
},
"node_modules/@isaacs/cliui": {
"version": "8.0.2",
"resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz",
@@ -6860,6 +6866,84 @@
"sparse-bitfield": "^3.0.3"
}
},
"node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz",
"integrity": "sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz",
"integrity": "sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz",
"integrity": "sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==",
"cpu": [
"arm"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz",
"integrity": "sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz",
"integrity": "sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz",
"integrity": "sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
]
},
"node_modules/@napi-rs/canvas": {
"version": "0.1.65",
"resolved": "https://registry.npmjs.org/@napi-rs/canvas/-/canvas-0.1.65.tgz",
@@ -28519,6 +28603,15 @@
"es5-ext": "~0.10.2"
}
},
"node_modules/luxon": {
"version": "3.5.0",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz",
"integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==",
"license": "MIT",
"engines": {
"node": ">=12"
}
},
"node_modules/lz-string": {
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/lz-string/-/lz-string-1.5.0.tgz",
@@ -29905,6 +29998,37 @@
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="
},
"node_modules/msgpackr": {
"version": "1.11.2",
"resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.11.2.tgz",
"integrity": "sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==",
"license": "MIT",
"optionalDependencies": {
"msgpackr-extract": "^3.0.2"
}
},
"node_modules/msgpackr-extract": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz",
"integrity": "sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==",
"hasInstallScript": true,
"license": "MIT",
"optional": true,
"dependencies": {
"node-gyp-build-optional-packages": "5.2.2"
},
"bin": {
"download-msgpackr-prebuilds": "bin/download-prebuilds.js"
},
"optionalDependencies": {
"@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.3",
"@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.3",
"@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.3",
"@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.3",
"@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.3",
"@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3"
}
},
"node_modules/multer": {
"version": "1.4.5-lts.1",
"resolved": "https://registry.npmjs.org/multer/-/multer-1.4.5-lts.1.tgz",
@@ -30234,6 +30358,31 @@
"node": ">= 6.13.0"
}
},
"node_modules/node-gyp-build-optional-packages": {
"version": "5.2.2",
"resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz",
"integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==",
"license": "MIT",
"optional": true,
"dependencies": {
"detect-libc": "^2.0.1"
},
"bin": {
"node-gyp-build-optional-packages": "bin.js",
"node-gyp-build-optional-packages-optional": "optional.js",
"node-gyp-build-optional-packages-test": "build-test.js"
}
},
"node_modules/node-gyp-build-optional-packages/node_modules/detect-libc": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.0.3.tgz",
"integrity": "sha512-bwy0MGW55bG41VqxxypOsdSdGqLwXPI/focwgTYCFMbdUiBAxLg9CFzG08sz2aqzknwiX7Hkl0bQENjg8iLByw==",
"license": "Apache-2.0",
"optional": true,
"engines": {
"node": ">=8"
}
},
"node_modules/node-int64": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz",
@@ -42143,6 +42292,7 @@
"basic-auth": "^2.0.1",
"bluebird": "^3.7.2",
"body-parser": "^1.20.3",
"bull": "^4.16.5",
"bunyan": "^1.8.12",
"check-types": "^11.1.2",
"command-line-args": "^3.0.3",
@@ -42190,6 +42340,24 @@
"node": ">=0.12.0"
}
},
"services/history-v1/node_modules/bull": {
"version": "4.16.5",
"resolved": "https://registry.npmjs.org/bull/-/bull-4.16.5.tgz",
"integrity": "sha512-lDsx2BzkKe7gkCYiT5Acj02DpTwDznl/VNN7Psn7M3USPG7Vs/BaClZJJTAG+ufAR9++N1/NiUTdaFBWDIl5TQ==",
"license": "MIT",
"dependencies": {
"cron-parser": "^4.9.0",
"get-port": "^5.1.1",
"ioredis": "^5.3.2",
"lodash": "^4.17.21",
"msgpackr": "^1.11.2",
"semver": "^7.5.2",
"uuid": "^8.3.0"
},
"engines": {
"node": ">=12"
}
},
"services/history-v1/node_modules/chai-exclude": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/chai-exclude/-/chai-exclude-2.1.1.tgz",
@@ -42222,6 +42390,44 @@
"command-line-args": "bin.js"
}
},
"services/history-v1/node_modules/cron-parser": {
"version": "4.9.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz",
"integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==",
"license": "MIT",
"dependencies": {
"luxon": "^3.2.1"
},
"engines": {
"node": ">=12.0.0"
}
},
"services/history-v1/node_modules/debug": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz",
"integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==",
"license": "MIT",
"dependencies": {
"ms": "^2.1.3"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"services/history-v1/node_modules/denque": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz",
"integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==",
"license": "Apache-2.0",
"engines": {
"node": ">=0.10"
}
},
"services/history-v1/node_modules/find-replace": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/find-replace/-/find-replace-1.0.3.tgz",
@@ -42248,6 +42454,30 @@
"node": ">=10"
}
},
"services/history-v1/node_modules/ioredis": {
"version": "5.5.0",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.5.0.tgz",
"integrity": "sha512-7CutT89g23FfSa8MDoIFs2GYYa0PaNiW/OrT+nRyjRXHDZd17HmIgy+reOQ/yhh72NznNjGuS8kbCAcA4Ro4mw==",
"license": "MIT",
"dependencies": {
"@ioredis/commands": "^1.1.1",
"cluster-key-slot": "^1.1.0",
"debug": "^4.3.4",
"denque": "^2.1.0",
"lodash.defaults": "^4.2.0",
"lodash.isarguments": "^3.1.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.1.0"
},
"engines": {
"node": ">=12.22.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/ioredis"
}
},
"services/history-v1/node_modules/p-limit": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-6.2.0.tgz",
@@ -42263,6 +42493,18 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"services/history-v1/node_modules/semver": {
"version": "7.7.1",
"resolved": "https://registry.npmjs.org/semver/-/semver-7.7.1.tgz",
"integrity": "sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA==",
"license": "ISC",
"bin": {
"semver": "bin/semver.js"
},
"engines": {
"node": ">=10"
}
},
"services/history-v1/node_modules/test-value": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/test-value/-/test-value-2.1.0.tgz",
@@ -42275,6 +42517,15 @@
"node": ">=0.10.0"
}
},
"services/history-v1/node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"license": "MIT",
"bin": {
"uuid": "dist/bin/uuid"
}
},
"services/history-v1/node_modules/yocto-queue": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.1.1.tgz",

View File

@@ -0,0 +1,70 @@
// @ts-check
// Metrics must be initialized before importing anything else
import '@overleaf/metrics/initialize.js'
import http from 'node:http'
import { fileURLToPath } from 'node:url'
import { promisify } from 'node:util'
import express from 'express'
import logger from '@overleaf/logger'
import Metrics from '@overleaf/metrics'
import { expressify } from '@overleaf/promise-utils'
import { drainQueue, healthCheck } from './storage/scripts/backup_worker.mjs'
const app = express()
logger.initialize('history-v1-backup-worker')
Metrics.open_sockets.monitor()
Metrics.injectMetricsRoute(app)
app.use(Metrics.http.monitor(logger))
Metrics.leaked_sockets.monitor(logger)
Metrics.event_loop.monitor(logger)
Metrics.memory.monitor(logger)
app.get('/status', (req, res) => {
res.send('history-v1-backup-worker is up')
})
app.get(
'/health_check',
expressify(async (req, res) => {
await healthCheck()
res.sendStatus(200)
})
)
app.use((err, req, res, next) => {
req.logger.addFields({ err })
req.logger.setLevel('error')
next(err)
})
async function triggerGracefulShutdown(server, signal) {
logger.warn({ signal }, 'graceful shutdown: started shutdown sequence')
await drainQueue()
server.close(function () {
logger.warn({ signal }, 'graceful shutdown: closed server')
setTimeout(() => {
process.exit(0)
}, 1000)
})
}
/**
* @param {number} port
* @return {Promise<http.Server>}
*/
export async function startApp(port) {
await healthCheck()
const server = http.createServer(app)
await promisify(server.listen.bind(server, port))()
const signals = ['SIGINT', 'SIGTERM']
signals.forEach(signal => {
process.on(signal, () => triggerGracefulShutdown(server, signal))
})
return server
}
// Run this if we're called directly
if (process.argv[1] === fileURLToPath(import.meta.url)) {
const PORT = parseInt(process.env.PORT || '3103', 10)
await startApp(PORT)
}

View File

@@ -20,6 +20,7 @@
"basic-auth": "^2.0.1",
"bluebird": "^3.7.2",
"body-parser": "^1.20.3",
"bull": "^4.16.5",
"bunyan": "^1.8.12",
"check-types": "^11.1.2",
"command-line-args": "^3.0.3",

View File

@@ -0,0 +1,137 @@
import Queue from 'bull'
import config from 'config'
import commandLineArgs from 'command-line-args'
import logger from '@overleaf/logger'
logger.initialize('backup-queue')
// Use the same redis config as backup_worker
const redisOptions = config.get('redis.queue')
// Create a Bull queue named 'backup'
const backupQueue = new Queue('backup', {
redis: redisOptions,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
})
// Define command-line options
const optionDefinitions = [
{ name: 'clean', type: Boolean },
{ name: 'status', type: Boolean },
{ name: 'add', type: String, multiple: true },
{ name: 'monitor', type: Boolean },
]
// Parse command line arguments
const options = commandLineArgs(optionDefinitions)
// Setup queue event listeners
function setupMonitoring() {
console.log('Starting queue monitoring. Press Ctrl+C to exit.')
backupQueue.on('global:error', error => {
logger.info({ error }, 'Queue error')
})
backupQueue.on('global:waiting', jobId => {
logger.info({ jobId }, 'job is waiting')
})
backupQueue.on('global:active', jobId => {
logger.info({ jobId }, 'job is now active')
})
backupQueue.on('global:stalled', jobId => {
logger.info({ jobId }, 'job has stalled')
})
backupQueue.on('global:progress', (jobId, progress) => {
logger.info({ jobId, progress }, 'job progress')
})
backupQueue.on('global:completed', (jobId, result) => {
logger.info({ jobId, result }, 'job completed')
})
backupQueue.on('global:failed', (jobId, err) => {
logger.info({ jobId, err }, 'job failed')
})
backupQueue.on('global:paused', () => {
logger.info({}, 'Queue paused')
})
backupQueue.on('global:resumed', () => {
logger.info({}, 'Queue resumed')
})
backupQueue.on('global:cleaned', (jobs, type) => {
logger.info({ jobsCount: jobs.length, type }, 'Jobs cleaned')
})
backupQueue.on('global:drained', () => {
logger.info({}, 'Queue drained')
})
backupQueue.on('global:removed', jobId => {
logger.info({ jobId }, 'Job removed')
})
}
// Main execution block
async function run() {
const optionCount = [
options.clean,
options.status,
options.add,
options.monitor,
].filter(Boolean).length
if (optionCount > 1) {
console.error('Only one option can be specified')
process.exit(1)
}
if (options.clean) {
const beforeCounts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(beforeCounts))
console.log('Cleaning completed and failed jobs...')
await backupQueue.clean(1, 'completed')
await backupQueue.clean(1, 'failed')
const afterCounts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(afterCounts))
console.log('Queue cleaned successfully')
} else if (options.status) {
const counts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(counts))
} else if (options.add) {
const projectIds = Array.isArray(options.add) ? options.add : [options.add]
for (const projectId of projectIds) {
const job = await backupQueue.add({ projectId }, { jobId: projectId })
console.log(`Added job for project: ${projectId}, job ID: ${job.id}`)
}
} else if (options.monitor) {
setupMonitoring()
} else {
console.log('Usage:')
console.log(' --clean Clean up completed and failed jobs')
console.log(' --status Show current job counts')
console.log(' --add [projectId] Add a job for the specified projectId')
console.log(' --monitor Monitor queue events')
}
}
// Run and handle errors
run()
.catch(err => {
console.error('Error:', err)
process.exit(1)
})
.then(result => {
// Only exit if not in monitor mode
if (!options.monitor) {
process.exit(0)
}
})

View File

@@ -0,0 +1,63 @@
import Queue from 'bull'
import logger from '@overleaf/logger'
import config from 'config'
import metrics from '@overleaf/metrics'
const CONCURRENCY = 10
const redisOptions = config.get('redis.queue')
const TIME_BUCKETS = [10, 100, 500, 1000, 5000, 10000, 30000, 60000]
// Create a Bull queue named 'backup'
const backupQueue = new Queue('backup', {
redis: redisOptions,
})
// Log queue events
backupQueue.on('active', job => {
logger.info({ job }, 'job is now active')
})
backupQueue.on('completed', (job, result) => {
metrics.inc('backup_worker_job', 1, { status: 'completed' })
logger.info({ job, result }, 'job completed')
})
backupQueue.on('failed', (job, err) => {
metrics.inc('backup_worker_job', 1, { status: 'failed' })
logger.error({ job, err }, 'job failed')
})
backupQueue.on('waiting', jobId => {
logger.info({ jobId }, 'job is waiting')
})
backupQueue.on('error', error => {
logger.error({ error }, 'queue error')
})
// Process jobs
backupQueue.process(CONCURRENCY, async job => {
const { projectId } = job.data
const timer = new metrics.Timer(
'backup_worker_job_duration',
1,
{},
TIME_BUCKETS
)
logger.info({ projectId }, 'processing backup for project')
await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 5000))
timer.done()
return `backup completed ${projectId}`
})
export async function drainQueue() {
logger.info({ queue: backupQueue.name }, 'pausing queue')
await backupQueue.pause(true) // pause this worker and wait for jobs to finish
logger.info({ queue: backupQueue.name }, 'closing queue')
await backupQueue.close()
}
export async function healthCheck() {
const count = await backupQueue.count()
metrics.gauge('backup_worker_queue_length', count)
}

View File

@@ -6,6 +6,7 @@
"app/js/**/*",
"backup-deletion-app.mjs",
"backup-verifier-app.mjs",
"backup-worker-app.mjs",
"benchmarks/**/*",
"config/**/*",
"migrations/**/*",
@@ -14,4 +15,4 @@
"test/**/*",
"types"
]
}
}