[document-updater] migrate ProjectFlusher to async/await (#28796)

GitOrigin-RevId: 24f61d6c0fab5d65b962cc7031ce0b8c84d5a915
This commit is contained in:
Jakob Ackermann
2025-10-08 15:35:16 +02:00
committed by Copybot
parent a809f679fd
commit ec659f32ba
2 changed files with 90 additions and 142 deletions

View File

@@ -1,139 +1,95 @@
/* eslint-disable
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS101: Remove unnecessary use of Array.from
* DS102: Remove unnecessary code created because of implicit returns
* DS205: Consider reworking code to avoid use of IIFEs
* DS207: Consider shorter variations of null checks
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
const request = require('request')
const { setTimeout } = require('node:timers/promises')
const Settings = require('@overleaf/settings')
const RedisManager = require('./RedisManager')
const { rclient } = RedisManager
const docUpdaterKeys = Settings.redis.documentupdater.key_schema
const async = require('async')
const { rclient } = require('./RedisManager')
const ProjectManager = require('./ProjectManager')
const _ = require('lodash')
const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const { promiseMapSettledWithLimit } = require('@overleaf/promise-utils')
const docUpdaterKeys = Settings.redis.documentupdater.key_schema
const ProjectFlusher = {
// iterate over keys asynchronously using redis scan (non-blocking)
// handle all the cluster nodes or single redis server
_getKeys(pattern, limit, callback) {
const nodes = (typeof rclient.nodes === 'function'
? rclient.nodes('master')
: undefined) || [rclient]
const doKeyLookupForNode = (node, cb) =>
ProjectFlusher._getKeysFromNode(node, pattern, limit, cb)
return async.concatSeries(nodes, doKeyLookupForNode, callback)
},
_getKeysFromNode(node, pattern, limit, callback) {
if (limit == null) {
limit = 1000
}
let cursor = 0 // redis iterator
const keySet = {} // use hash to avoid duplicate results
const batchSize = limit != null ? Math.min(limit, 1000) : 1000
// scan over all keys looking for pattern
const doIteration = (
cb // avoid hitting redis too hard
) =>
node.scan(
cursor,
'MATCH',
pattern,
'COUNT',
batchSize,
function (error, reply) {
let keys
if (error != null) {
return callback(error)
}
;[cursor, keys] = Array.from(reply)
for (const key of Array.from(keys)) {
keySet[key] = true
}
keys = Object.keys(keySet)
const noResults = cursor === '0' // redis returns string results not numeric
const limitReached = limit != null && keys.length >= limit
if (noResults || limitReached) {
return callback(null, keys)
} else {
return setTimeout(doIteration, 10)
}
}
)
return doIteration()
},
// extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
// or docsInProject:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
_extractIds(keyList) {
const ids = (() => {
const result = []
for (const key of Array.from(keyList)) {
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
result.push(m[1])
}
return result
})()
return ids
},
flushAllProjects(options, callback) {
logger.info({ options }, 'flushing all projects')
return ProjectFlusher._getKeys(
docUpdaterKeys.docsInProject({ project_id: '*' }),
options.limit,
function (error, projectKeys) {
if (error != null) {
logger.err({ err: error }, 'error getting keys for flushing')
return callback(error)
}
const projectIds = ProjectFlusher._extractIds(projectKeys)
if (options.dryRun) {
return callback(null, projectIds)
}
const jobs = _.map(
projectIds,
projectId => cb =>
ProjectManager.flushAndDeleteProjectWithLocks(
projectId,
{ background: true },
cb
)
)
return async.parallelLimit(
async.reflectAll(jobs),
options.concurrency,
function (error, results) {
const success = []
const failure = []
_.each(results, function (result, i) {
if (result.error != null) {
return failure.push(projectIds[i])
} else {
return success.push(projectIds[i])
}
})
logger.info(
{ successCount: success.length, failureCount: failure.length },
'finished flushing all projects'
)
return callback(error, { success, failure })
}
)
}
)
},
// iterate over keys asynchronously using redis scan (non-blocking)
// handle all the cluster nodes or single redis server
async function _getKeys(pattern, limit) {
const nodes = (typeof rclient.nodes === 'function'
? rclient.nodes('master')
: undefined) || [rclient]
let keys = []
for (const node of nodes) {
keys = keys.concat(await _getKeysFromNode(node, pattern, limit))
}
return keys
}
module.exports = ProjectFlusher
module.exports.promises = promisifyAll(ProjectFlusher)
async function _getKeysFromNode(node, pattern, limit = 1000) {
let cursor = 0 // redis iterator
const keySet = new Set() // use hash to avoid duplicate results
const batchSize = Math.min(limit, 1000)
while (true) {
// scan over all keys looking for pattern
const reply = await node.scan(cursor, 'MATCH', pattern, 'COUNT', batchSize)
cursor = reply[0]
for (const key of reply[1]) {
keySet.add(key)
}
const noResults = cursor === '0' // redis returns string results not numeric
const limitReached = keySet.size >= limit
if (noResults || limitReached) {
return Array.from(keySet)
} else {
// avoid hitting redis too hard
await setTimeout(10)
}
}
}
// extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b
// or docsInProject:{57fd0b1f53a8396d22b2c24b} (for redis cluster)
function _extractIds(keyList) {
const result = []
for (const key of Array.from(keyList)) {
const m = key.match(/:\{?([0-9a-f]{24})\}?/) // extract object id
result.push(m[1])
}
return result
}
async function flushAllProjects(options) {
logger.info({ options }, 'flushing all projects')
const projectKeys = await _getKeys(
docUpdaterKeys.docsInProject({ project_id: '*' }),
options.limit
)
const projectIds = _extractIds(projectKeys)
if (options.dryRun) {
return projectIds
}
const results = await promiseMapSettledWithLimit(
options.concurrency,
projectIds,
projectId =>
ProjectManager.promises.flushAndDeleteProjectWithLocks(projectId, {
background: true,
})
)
const success = []
const failure = []
for (let i = 0; i < results.length; i++) {
if (results[i].status === 'rejected') {
failure.push(projectIds[i])
} else {
success.push(projectIds[i])
}
}
logger.info(
{ successCount: success.length, failureCount: failure.length },
'finished flushing all projects'
)
return { success, failure }
}
module.exports = {
_extractIds,
promises: {
flushAllProjects,
},
}

View File

@@ -32,15 +32,7 @@ Options:
}
console.log('Flushing all projects with options:', options)
return await new Promise((resolve, reject) => {
ProjectFlusher.flushAllProjects(options, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
await ProjectFlusher.promises.flushAllProjects(options)
}
main()