diff --git a/services/history-v1/storage/lib/scan.js b/services/history-v1/storage/lib/scan.js index 45d0c327fe..925b0590c4 100644 --- a/services/history-v1/storage/lib/scan.js +++ b/services/history-v1/storage/lib/scan.js @@ -1,3 +1,5 @@ +const logger = require('@overleaf/logger') + const BATCH_SIZE = 1000 // Default batch size for SCAN /** @@ -49,4 +51,134 @@ function extractKeyId(key) { return null } -module.exports = { scanRedisCluster, extractKeyId } +/** + * Fetches timestamps for a list of project IDs based on a given key name. + * + * @param {string[]} projectIds - Array of project identifiers. + * @param {object} rclient - The Redis client instance. + * @param {string} keyName - The base name for the Redis keys storing the timestamps (e.g., "expire-time", "persist-time"). + * @param {number} currentTime - The current time (timestamp in milliseconds) to compare against. + * @returns {Promise>} + * A promise that resolves to an array of objects, each containing a projectId and + * its corresponding timestampValue, for due projects only. + */ +async function fetchOverdueProjects(projectIds, rclient, keyName, currentTime) { + if (!projectIds || projectIds.length === 0) { + return [] + } + const timestampKeys = projectIds.map(id => `${keyName}:{${id}}`) + const timestamps = await rclient.mget(timestampKeys) + + const dueProjects = [] + for (let i = 0; i < projectIds.length; i++) { + const projectId = projectIds[i] + const timestampValue = timestamps[i] + + if (timestampValue !== null) { + const timestamp = parseInt(timestampValue, 10) + if (!isNaN(timestamp) && currentTime > timestamp) { + dueProjects.push({ projectId, timestampValue }) + } + } + } + return dueProjects +} + +/** + * Scans Redis for keys matching a pattern derived from keyName, identifies items that are "due" based on a timestamp, + * and performs a specified action on them. + * + * @param {object} rclient - The Redis client instance. + * @param {string} taskName - A descriptive name for the task (used in logging). + * @param {string} keyName - The base name for the Redis keys (e.g., "expire-time", "persist-time"). + * The function will derive the key prefix as `${keyName}:` and scan pattern as `${keyName}:{*}`. + * @param {function(string): Promise} actionFn - An async function that takes a projectId and performs an action. + * @param {boolean} DRY_RUN - If true, logs actions that would be taken without performing them. + * @returns {Promise<{scannedKeyCount: number, processedKeyCount: number}>} Counts of scanned and processed keys. + */ +async function scanAndProcessDueItems( + rclient, + taskName, + keyName, + actionFn, + DRY_RUN +) { + let scannedKeyCount = 0 + let processedKeyCount = 0 + const START_TIME = Date.now() + const logContext = { taskName, dryRun: DRY_RUN } + + const scanPattern = `${keyName}:{*}` + + if (DRY_RUN) { + logger.info(logContext, `Starting ${taskName} scan in DRY RUN mode`) + } else { + logger.info(logContext, `Starting ${taskName} scan`) + } + + for await (const keysBatch of scanRedisCluster(rclient, scanPattern)) { + scannedKeyCount += keysBatch.length + const projectIds = keysBatch.map(extractKeyId).filter(id => id != null) + + if (projectIds.length === 0) { + continue + } + + const currentTime = Date.now() + const overdueProjects = await fetchOverdueProjects( + projectIds, + rclient, + keyName, + currentTime + ) + + for (const project of overdueProjects) { + const { projectId } = project + if (DRY_RUN) { + logger.info( + { ...logContext, projectId }, + `[Dry Run] Would perform ${taskName} for project` + ) + } else { + try { + await actionFn(projectId) + logger.info( + { ...logContext, projectId }, + `Successfully performed ${taskName} for project` + ) + } catch (err) { + logger.error( + { ...logContext, projectId, err }, + `Error performing ${taskName} for project` + ) + continue + } + } + processedKeyCount++ + + if (processedKeyCount % 1000 === 0 && processedKeyCount > 0) { + logger.info( + { ...logContext, scannedKeyCount, processedKeyCount }, + `${taskName} scan progress` + ) + } + } + } + + logger.info( + { + ...logContext, + scannedKeyCount, + processedKeyCount, + elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000), + }, + `${taskName} scan complete` + ) + return { scannedKeyCount, processedKeyCount } +} + +module.exports = { + scanRedisCluster, + extractKeyId, + scanAndProcessDueItems, +} diff --git a/services/history-v1/storage/scripts/expire_redis_chunks.js b/services/history-v1/storage/scripts/expire_redis_chunks.js index de4e130ed4..af2be097b6 100644 --- a/services/history-v1/storage/scripts/expire_redis_chunks.js +++ b/services/history-v1/storage/scripts/expire_redis_chunks.js @@ -1,11 +1,10 @@ const logger = require('@overleaf/logger') -const commandLineArgs = require('command-line-args') // Add this line +const commandLineArgs = require('command-line-args') const redis = require('../lib/redis') -const { scanRedisCluster, extractKeyId } = require('../lib/scan') +const { scanAndProcessDueItems } = require('../lib/scan') const { expireProject, claimExpireJob } = require('../lib/chunk_store/redis') const rclient = redis.rclientHistory -const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}` const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }] const options = commandLineArgs(optionDefinitions) @@ -13,112 +12,41 @@ const DRY_RUN = options['dry-run'] || false logger.initialize('expire-redis-chunks') -function isExpiredKey(expireTimestamp, currentTime) { - const expireTime = parseInt(expireTimestamp, 10) - if (isNaN(expireTime)) { - return false - } - logger.debug( - { - expireTime, - currentTime, - expireIn: expireTime - currentTime, - expired: currentTime > expireTime, - }, - 'Checking if key is expired' - ) - return currentTime > expireTime -} - -async function fetchTimestamps(projectIds, rclient) { - const expireTimeKeys = projectIds.map(id => `expire-time:{${id}}`) - // For efficiency, we use MGET to fetch all the timestamps in a single request - const expireTimestamps = await rclient.mget(expireTimeKeys) - // Return an array of objects with projectId and expireTimestamp - const results = projectIds.map((projectId, index) => ({ - projectId, - expireTimestamp: expireTimestamps[index], - })) - return results -} - -async function processKeysBatch(projectIds, rclient) { - let clearedKeyCount = 0 - if (projectIds.length === 0) { - return 0 - } - const projects = await fetchTimestamps(projectIds, rclient) - const currentTime = Date.now() - - for (const project of projects) { - const { projectId, expireTimestamp } = project - // For each key, do a quick check to see if the key is expired before calling - // the LUA script to expire the chunk atomically. - if (isExpiredKey(expireTimestamp, currentTime)) { - if (DRY_RUN) { - logger.info({ projectId }, '[Dry Run] Would expire chunk for project') - } else { - try { - const job = await claimExpireJob(projectId) - await expireProject(projectId) - await job.close() - } catch (err) { - logger.error({ projectId, err }, 'error expiring chunk for project') - continue - } - } - clearedKeyCount++ +async function expireProjectAction(projectId) { + const job = await claimExpireJob(projectId) + try { + await expireProject(projectId) + } finally { + if (job && job.close) { + await job.close() } } - return clearedKeyCount } -async function expireRedisChunks() { - let scannedKeyCount = 0 - let clearedKeyCount = 0 - const START_TIME = Date.now() - - if (DRY_RUN) { - logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode') - } else { - logger.info({}, 'starting expireRedisChunks scan') - } - - for await (const keysBatch of scanRedisCluster( +async function runExpireChunks() { + await scanAndProcessDueItems( rclient, - EXPIRE_TIME_KEY_PATTERN - )) { - scannedKeyCount += keysBatch.length - clearedKeyCount += await processKeysBatch( - keysBatch.map(extractKeyId), - rclient - ) - if (scannedKeyCount % 1000 === 0) { - logger.info( - { scannedKeyCount, clearedKeyCount }, - 'expireRedisChunks scan progress' - ) - } - } - logger.info( - { - scannedKeyCount, - clearedKeyCount, - elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000), - dryRun: DRY_RUN, - }, - 'expireRedisChunks scan complete' + 'expireChunks', + 'expire-time', + expireProjectAction, + DRY_RUN ) - await redis.disconnect() } -// Check if the script is being run directly if (require.main === module) { - expireRedisChunks().catch(err => { - logger.fatal({ err }, 'unhandled error in expireRedisChunks') - process.exit(1) - }) + runExpireChunks() + .catch(err => { + logger.fatal( + { err, taskName: 'expireChunks' }, + 'Unhandled error in runExpireChunks' + ) + process.exit(1) + }) + .finally(async () => { + await redis.disconnect() + }) } else { - // Export the function for module usage - module.exports = { expireRedisChunks } + module.exports = { + runExpireChunks, + } }