Merge pull request #25449 from overleaf/bg-histoy-redis-refactor-expire-worker

refactor the expire worker to make it easier to extend

GitOrigin-RevId: 7b277b243ed51ab3b46316d98b7157af95a9e42b
This commit is contained in:
Brian Gough
2025-05-09 09:05:43 +01:00
committed by Copybot
parent 0a0dc13030
commit 9e07549ecb
2 changed files with 162 additions and 102 deletions

View File

@@ -1,3 +1,5 @@
const logger = require('@overleaf/logger')
const BATCH_SIZE = 1000 // Default batch size for SCAN
/**
@@ -49,4 +51,134 @@ function extractKeyId(key) {
return null
}
module.exports = { scanRedisCluster, extractKeyId }
/**
* Fetches timestamps for a list of project IDs based on a given key name.
*
* @param {string[]} projectIds - Array of project identifiers.
* @param {object} rclient - The Redis client instance.
* @param {string} keyName - The base name for the Redis keys storing the timestamps (e.g., "expire-time", "persist-time").
* @param {number} currentTime - The current time (timestamp in milliseconds) to compare against.
* @returns {Promise<Array<{projectId: string, timestampValue: string}>>}
* A promise that resolves to an array of objects, each containing a projectId and
* its corresponding timestampValue, for due projects only.
*/
async function fetchOverdueProjects(projectIds, rclient, keyName, currentTime) {
if (!projectIds || projectIds.length === 0) {
return []
}
const timestampKeys = projectIds.map(id => `${keyName}:{${id}}`)
const timestamps = await rclient.mget(timestampKeys)
const dueProjects = []
for (let i = 0; i < projectIds.length; i++) {
const projectId = projectIds[i]
const timestampValue = timestamps[i]
if (timestampValue !== null) {
const timestamp = parseInt(timestampValue, 10)
if (!isNaN(timestamp) && currentTime > timestamp) {
dueProjects.push({ projectId, timestampValue })
}
}
}
return dueProjects
}
/**
* Scans Redis for keys matching a pattern derived from keyName, identifies items that are "due" based on a timestamp,
* and performs a specified action on them.
*
* @param {object} rclient - The Redis client instance.
* @param {string} taskName - A descriptive name for the task (used in logging).
* @param {string} keyName - The base name for the Redis keys (e.g., "expire-time", "persist-time").
* The function will derive the key prefix as `${keyName}:` and scan pattern as `${keyName}:{*}`.
* @param {function(string): Promise<void>} actionFn - An async function that takes a projectId and performs an action.
* @param {boolean} DRY_RUN - If true, logs actions that would be taken without performing them.
* @returns {Promise<{scannedKeyCount: number, processedKeyCount: number}>} Counts of scanned and processed keys.
*/
async function scanAndProcessDueItems(
rclient,
taskName,
keyName,
actionFn,
DRY_RUN
) {
let scannedKeyCount = 0
let processedKeyCount = 0
const START_TIME = Date.now()
const logContext = { taskName, dryRun: DRY_RUN }
const scanPattern = `${keyName}:{*}`
if (DRY_RUN) {
logger.info(logContext, `Starting ${taskName} scan in DRY RUN mode`)
} else {
logger.info(logContext, `Starting ${taskName} scan`)
}
for await (const keysBatch of scanRedisCluster(rclient, scanPattern)) {
scannedKeyCount += keysBatch.length
const projectIds = keysBatch.map(extractKeyId).filter(id => id != null)
if (projectIds.length === 0) {
continue
}
const currentTime = Date.now()
const overdueProjects = await fetchOverdueProjects(
projectIds,
rclient,
keyName,
currentTime
)
for (const project of overdueProjects) {
const { projectId } = project
if (DRY_RUN) {
logger.info(
{ ...logContext, projectId },
`[Dry Run] Would perform ${taskName} for project`
)
} else {
try {
await actionFn(projectId)
logger.info(
{ ...logContext, projectId },
`Successfully performed ${taskName} for project`
)
} catch (err) {
logger.error(
{ ...logContext, projectId, err },
`Error performing ${taskName} for project`
)
continue
}
}
processedKeyCount++
if (processedKeyCount % 1000 === 0 && processedKeyCount > 0) {
logger.info(
{ ...logContext, scannedKeyCount, processedKeyCount },
`${taskName} scan progress`
)
}
}
}
logger.info(
{
...logContext,
scannedKeyCount,
processedKeyCount,
elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000),
},
`${taskName} scan complete`
)
return { scannedKeyCount, processedKeyCount }
}
module.exports = {
scanRedisCluster,
extractKeyId,
scanAndProcessDueItems,
}

View File

@@ -1,11 +1,10 @@
const logger = require('@overleaf/logger')
const commandLineArgs = require('command-line-args') // Add this line
const commandLineArgs = require('command-line-args')
const redis = require('../lib/redis')
const { scanRedisCluster, extractKeyId } = require('../lib/scan')
const { scanAndProcessDueItems } = require('../lib/scan')
const { expireProject, claimExpireJob } = require('../lib/chunk_store/redis')
const rclient = redis.rclientHistory
const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}`
const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }]
const options = commandLineArgs(optionDefinitions)
@@ -13,112 +12,41 @@ const DRY_RUN = options['dry-run'] || false
logger.initialize('expire-redis-chunks')
function isExpiredKey(expireTimestamp, currentTime) {
const expireTime = parseInt(expireTimestamp, 10)
if (isNaN(expireTime)) {
return false
}
logger.debug(
{
expireTime,
currentTime,
expireIn: expireTime - currentTime,
expired: currentTime > expireTime,
},
'Checking if key is expired'
)
return currentTime > expireTime
}
async function fetchTimestamps(projectIds, rclient) {
const expireTimeKeys = projectIds.map(id => `expire-time:{${id}}`)
// For efficiency, we use MGET to fetch all the timestamps in a single request
const expireTimestamps = await rclient.mget(expireTimeKeys)
// Return an array of objects with projectId and expireTimestamp
const results = projectIds.map((projectId, index) => ({
projectId,
expireTimestamp: expireTimestamps[index],
}))
return results
}
async function processKeysBatch(projectIds, rclient) {
let clearedKeyCount = 0
if (projectIds.length === 0) {
return 0
}
const projects = await fetchTimestamps(projectIds, rclient)
const currentTime = Date.now()
for (const project of projects) {
const { projectId, expireTimestamp } = project
// For each key, do a quick check to see if the key is expired before calling
// the LUA script to expire the chunk atomically.
if (isExpiredKey(expireTimestamp, currentTime)) {
if (DRY_RUN) {
logger.info({ projectId }, '[Dry Run] Would expire chunk for project')
} else {
try {
const job = await claimExpireJob(projectId)
await expireProject(projectId)
await job.close()
} catch (err) {
logger.error({ projectId, err }, 'error expiring chunk for project')
continue
}
}
clearedKeyCount++
async function expireProjectAction(projectId) {
const job = await claimExpireJob(projectId)
try {
await expireProject(projectId)
} finally {
if (job && job.close) {
await job.close()
}
}
return clearedKeyCount
}
async function expireRedisChunks() {
let scannedKeyCount = 0
let clearedKeyCount = 0
const START_TIME = Date.now()
if (DRY_RUN) {
logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode')
} else {
logger.info({}, 'starting expireRedisChunks scan')
}
for await (const keysBatch of scanRedisCluster(
async function runExpireChunks() {
await scanAndProcessDueItems(
rclient,
EXPIRE_TIME_KEY_PATTERN
)) {
scannedKeyCount += keysBatch.length
clearedKeyCount += await processKeysBatch(
keysBatch.map(extractKeyId),
rclient
)
if (scannedKeyCount % 1000 === 0) {
logger.info(
{ scannedKeyCount, clearedKeyCount },
'expireRedisChunks scan progress'
)
}
}
logger.info(
{
scannedKeyCount,
clearedKeyCount,
elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000),
dryRun: DRY_RUN,
},
'expireRedisChunks scan complete'
'expireChunks',
'expire-time',
expireProjectAction,
DRY_RUN
)
await redis.disconnect()
}
// Check if the script is being run directly
if (require.main === module) {
expireRedisChunks().catch(err => {
logger.fatal({ err }, 'unhandled error in expireRedisChunks')
process.exit(1)
})
runExpireChunks()
.catch(err => {
logger.fatal(
{ err, taskName: 'expireChunks' },
'Unhandled error in runExpireChunks'
)
process.exit(1)
})
.finally(async () => {
await redis.disconnect()
})
} else {
// Export the function for module usage
module.exports = { expireRedisChunks }
module.exports = {
runExpireChunks,
}
}