Merge pull request #26353 from overleaf/bg-history-redis-extend-persist-worker

extend persist worker to make parallel requests

GitOrigin-RevId: 8def7d5a8b5c9fcbe5fe45ac8f3ace503d31877a
This commit is contained in:
Eric Mc Sween
2025-06-16 08:44:12 -04:00
committed by Copybot
parent 01ad56140c
commit 5dfa4d6a46
3 changed files with 110 additions and 2 deletions

2
package-lock.json generated
View File

@@ -43182,6 +43182,7 @@
"license": "Proprietary",
"dependencies": {
"@google-cloud/secret-manager": "^5.6.0",
"@overleaf/fetch-utils": "*",
"@overleaf/logger": "*",
"@overleaf/metrics": "*",
"@overleaf/mongo-utils": "*",
@@ -43211,6 +43212,7 @@
"mongodb": "6.12.0",
"overleaf-editor-core": "*",
"p-limit": "^6.2.0",
"p-queue": "^8.1.0",
"pg": "^8.7.1",
"pg-query-stream": "^4.2.4",
"swagger-tools": "^0.10.4",

View File

@@ -7,6 +7,7 @@
"private": true,
"dependencies": {
"@google-cloud/secret-manager": "^5.6.0",
"@overleaf/fetch-utils": "*",
"@overleaf/logger": "*",
"@overleaf/metrics": "*",
"@overleaf/mongo-utils": "*",
@@ -36,6 +37,7 @@
"mongodb": "6.12.0",
"overleaf-editor-core": "*",
"p-limit": "^6.2.0",
"p-queue": "^8.1.0",
"pg": "^8.7.1",
"pg-query-stream": "^4.2.4",
"swagger-tools": "^0.10.4",

View File

@@ -1,3 +1,6 @@
import config from 'config'
import PQueue from 'p-queue'
import { fetchNothing } from '@overleaf/fetch-utils'
import logger from '@overleaf/logger'
import commandLineArgs from 'command-line-args'
import * as redis from '../lib/redis.js'
@@ -17,9 +20,19 @@ EventEmitter.defaultMaxListeners = 11
const rclient = redis.rclientHistory
const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }]
const optionDefinitions = [
{ name: 'dry-run', alias: 'd', type: Boolean },
{ name: 'queue', type: Boolean },
{ name: 'max-time', type: Number },
{ name: 'min-rate', type: Number, defaultValue: 1 },
]
const options = commandLineArgs(optionDefinitions)
const DRY_RUN = options['dry-run'] || false
const USE_QUEUE = options.queue || false
const MAX_TIME = options['max-time'] || null
const MIN_RATE = options['min-rate']
const HISTORY_V1_URL = `http://${process.env.HISTORY_V1_HOST || 'localhost'}:${process.env.PORT || 3100}`
let isShuttingDown = false
logger.initialize('persist-redis-chunks')
@@ -39,15 +52,96 @@ async function persistProjectAction(projectId) {
}
}
async function requestProjectFlush(projectId) {
const job = await claimPersistJob(projectId)
logger.debug({ projectId }, 'sending project flush request')
const url = `${HISTORY_V1_URL}/api/projects/${projectId}/flush`
const credentials = Buffer.from(
`staging:${config.get('basicHttpAuth.password')}`
).toString('base64')
await fetchNothing(url, {
method: 'POST',
headers: {
Authorization: `Basic ${credentials}`,
},
})
if (job && job.close) {
await job.close()
}
}
async function persistQueuedProjects(queuedProjects) {
const totalCount = queuedProjects.size
// Compute the rate at which we need to dispatch requests
const targetRate = MAX_TIME > 0 ? Math.ceil(totalCount / MAX_TIME) : 0
// Rate limit to spread the requests over the interval.
const queue = new PQueue({
intervalCap: Math.max(MIN_RATE, targetRate),
interval: 1000, // use a 1 second interval
})
logger.info(
{ totalCount, targetRate, minRate: MIN_RATE, maxTime: MAX_TIME },
'dispatching project flush requests'
)
const startTime = Date.now()
let dispatchedCount = 0
for (const projectId of queuedProjects) {
if (isShuttingDown) {
logger.info('Shutting down, stopping project flush requests')
queue.clear()
break
}
queue.add(async () => {
try {
await requestProjectFlush(projectId)
} catch (err) {
logger.error({ err, projectId }, 'error while flushing project')
}
})
dispatchedCount++
if (dispatchedCount % 1000 === 0) {
logger.info(
{ count: dispatchedCount },
'dispatched project flush requests'
)
}
await queue.onEmpty()
}
const elapsedTime = Math.floor((Date.now() - startTime) / 1000)
logger.info(
{ count: totalCount, elapsedTime },
'dispatched project flush requests'
)
await queue.onIdle()
}
async function runPersistChunks() {
const queuedProjects = new Set()
async function queueProjectAction(projectId) {
queuedProjects.add(projectId)
}
await loadGlobalBlobs()
await scanAndProcessDueItems(
rclient,
'persistChunks',
'persist-time',
persistProjectAction,
USE_QUEUE ? queueProjectAction : persistProjectAction,
DRY_RUN
)
if (USE_QUEUE) {
if (isShuttingDown) {
logger.info('Shutting down, skipping queued project persistence')
return
}
logger.info(
{ count: queuedProjects.size },
'queued projects for persistence'
)
await persistQueuedProjects(queuedProjects)
}
}
async function main() {
@@ -67,9 +161,19 @@ async function main() {
}
}
function gracefulShutdown() {
if (isShuttingDown) {
return
}
isShuttingDown = true
logger.info({ isShuttingDown }, 'received shutdown signal, cleaning up...')
}
// Check if the module is being run directly
const currentScriptPath = fileURLToPath(import.meta.url)
if (process.argv[1] === currentScriptPath) {
process.on('SIGINT', gracefulShutdown)
process.on('SIGTERM', gracefulShutdown)
main()
}