From 5dfa4d6a46c1fb7b5738a6a4e22be596fcd23434 Mon Sep 17 00:00:00 2001 From: Eric Mc Sween <5454374+emcsween@users.noreply.github.com> Date: Mon, 16 Jun 2025 08:44:12 -0400 Subject: [PATCH] Merge pull request #26353 from overleaf/bg-history-redis-extend-persist-worker extend persist worker to make parallel requests GitOrigin-RevId: 8def7d5a8b5c9fcbe5fe45ac8f3ace503d31877a --- package-lock.json | 2 + services/history-v1/package.json | 2 + .../storage/scripts/persist_redis_chunks.mjs | 108 +++++++++++++++++- 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index d41c60b6a5..2a3bb7696d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43182,6 +43182,7 @@ "license": "Proprietary", "dependencies": { "@google-cloud/secret-manager": "^5.6.0", + "@overleaf/fetch-utils": "*", "@overleaf/logger": "*", "@overleaf/metrics": "*", "@overleaf/mongo-utils": "*", @@ -43211,6 +43212,7 @@ "mongodb": "6.12.0", "overleaf-editor-core": "*", "p-limit": "^6.2.0", + "p-queue": "^8.1.0", "pg": "^8.7.1", "pg-query-stream": "^4.2.4", "swagger-tools": "^0.10.4", diff --git a/services/history-v1/package.json b/services/history-v1/package.json index 1fdfd95c45..4796cafd03 100644 --- a/services/history-v1/package.json +++ b/services/history-v1/package.json @@ -7,6 +7,7 @@ "private": true, "dependencies": { "@google-cloud/secret-manager": "^5.6.0", + "@overleaf/fetch-utils": "*", "@overleaf/logger": "*", "@overleaf/metrics": "*", "@overleaf/mongo-utils": "*", @@ -36,6 +37,7 @@ "mongodb": "6.12.0", "overleaf-editor-core": "*", "p-limit": "^6.2.0", + "p-queue": "^8.1.0", "pg": "^8.7.1", "pg-query-stream": "^4.2.4", "swagger-tools": "^0.10.4", diff --git a/services/history-v1/storage/scripts/persist_redis_chunks.mjs b/services/history-v1/storage/scripts/persist_redis_chunks.mjs index 41eea6fbd2..03381ac63a 100644 --- a/services/history-v1/storage/scripts/persist_redis_chunks.mjs +++ b/services/history-v1/storage/scripts/persist_redis_chunks.mjs @@ -1,3 +1,6 @@ +import config from 'config' +import PQueue from 'p-queue' +import { fetchNothing } from '@overleaf/fetch-utils' import logger from '@overleaf/logger' import commandLineArgs from 'command-line-args' import * as redis from '../lib/redis.js' @@ -17,9 +20,19 @@ EventEmitter.defaultMaxListeners = 11 const rclient = redis.rclientHistory -const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }] +const optionDefinitions = [ + { name: 'dry-run', alias: 'd', type: Boolean }, + { name: 'queue', type: Boolean }, + { name: 'max-time', type: Number }, + { name: 'min-rate', type: Number, defaultValue: 1 }, +] const options = commandLineArgs(optionDefinitions) const DRY_RUN = options['dry-run'] || false +const USE_QUEUE = options.queue || false +const MAX_TIME = options['max-time'] || null +const MIN_RATE = options['min-rate'] +const HISTORY_V1_URL = `http://${process.env.HISTORY_V1_HOST || 'localhost'}:${process.env.PORT || 3100}` +let isShuttingDown = false logger.initialize('persist-redis-chunks') @@ -39,15 +52,96 @@ async function persistProjectAction(projectId) { } } +async function requestProjectFlush(projectId) { + const job = await claimPersistJob(projectId) + logger.debug({ projectId }, 'sending project flush request') + const url = `${HISTORY_V1_URL}/api/projects/${projectId}/flush` + const credentials = Buffer.from( + `staging:${config.get('basicHttpAuth.password')}` + ).toString('base64') + await fetchNothing(url, { + method: 'POST', + headers: { + Authorization: `Basic ${credentials}`, + }, + }) + if (job && job.close) { + await job.close() + } +} + +async function persistQueuedProjects(queuedProjects) { + const totalCount = queuedProjects.size + // Compute the rate at which we need to dispatch requests + const targetRate = MAX_TIME > 0 ? Math.ceil(totalCount / MAX_TIME) : 0 + // Rate limit to spread the requests over the interval. + const queue = new PQueue({ + intervalCap: Math.max(MIN_RATE, targetRate), + interval: 1000, // use a 1 second interval + }) + logger.info( + { totalCount, targetRate, minRate: MIN_RATE, maxTime: MAX_TIME }, + 'dispatching project flush requests' + ) + const startTime = Date.now() + let dispatchedCount = 0 + for (const projectId of queuedProjects) { + if (isShuttingDown) { + logger.info('Shutting down, stopping project flush requests') + queue.clear() + break + } + queue.add(async () => { + try { + await requestProjectFlush(projectId) + } catch (err) { + logger.error({ err, projectId }, 'error while flushing project') + } + }) + dispatchedCount++ + if (dispatchedCount % 1000 === 0) { + logger.info( + { count: dispatchedCount }, + 'dispatched project flush requests' + ) + } + await queue.onEmpty() + } + const elapsedTime = Math.floor((Date.now() - startTime) / 1000) + logger.info( + { count: totalCount, elapsedTime }, + 'dispatched project flush requests' + ) + await queue.onIdle() +} + async function runPersistChunks() { + const queuedProjects = new Set() + + async function queueProjectAction(projectId) { + queuedProjects.add(projectId) + } + await loadGlobalBlobs() await scanAndProcessDueItems( rclient, 'persistChunks', 'persist-time', - persistProjectAction, + USE_QUEUE ? queueProjectAction : persistProjectAction, DRY_RUN ) + + if (USE_QUEUE) { + if (isShuttingDown) { + logger.info('Shutting down, skipping queued project persistence') + return + } + logger.info( + { count: queuedProjects.size }, + 'queued projects for persistence' + ) + await persistQueuedProjects(queuedProjects) + } } async function main() { @@ -67,9 +161,19 @@ async function main() { } } +function gracefulShutdown() { + if (isShuttingDown) { + return + } + isShuttingDown = true + logger.info({ isShuttingDown }, 'received shutdown signal, cleaning up...') +} + // Check if the module is being run directly const currentScriptPath = fileURLToPath(import.meta.url) if (process.argv[1] === currentScriptPath) { + process.on('SIGINT', gracefulShutdown) + process.on('SIGTERM', gracefulShutdown) main() }