diff --git a/package-lock.json b/package-lock.json index e7b462be63..82b46b24bc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43729,6 +43729,7 @@ "minimist": "^1.2.8", "mongodb-legacy": "6.1.3", "overleaf-editor-core": "*", + "p-queue": "^8.1.0", "request": "^2.88.2" }, "devDependencies": { diff --git a/services/project-history/app/js/RedisManager.js b/services/project-history/app/js/RedisManager.js index fe17508452..2f79a10a91 100644 --- a/services/project-history/app/js/RedisManager.js +++ b/services/project-history/app/js/RedisManager.js @@ -298,6 +298,26 @@ async function getFirstOpTimestamp(projectId) { return firstOpTimestamp } +async function getFirstOpTimestamps(projectIds) { + const keys = projectIds.map(projectId => + Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }) + ) + const results = await rclient.mget(keys) + const timestamps = results.map(result => { + // convert stored time back to a numeric timestamp + const timestamp = parseInt(result, 10) + + // check for invalid timestamp + if (isNaN(timestamp)) { + return null + } + + // convert numeric timestamp to a date object + return new Date(timestamp) + }) + return timestamps +} + async function clearFirstOpTimestamp(projectId) { const key = Keys.projectHistoryFirstOpTimestamp({ project_id: projectId }) await rclient.del(key) @@ -357,6 +377,7 @@ const getProjectIdsWithHistoryOpsCountCb = callbackify( ) const setFirstOpTimestampCb = callbackify(setFirstOpTimestamp) const getFirstOpTimestampCb = callbackify(getFirstOpTimestamp) +const getFirstOpTimestampsCb = callbackify(getFirstOpTimestamps) const clearFirstOpTimestampCb = callbackify(clearFirstOpTimestamp) const getProjectIdsWithFirstOpTimestampsCb = callbackify( getProjectIdsWithFirstOpTimestamps @@ -394,6 +415,7 @@ export { getProjectIdsWithHistoryOpsCountCb as getProjectIdsWithHistoryOpsCount, setFirstOpTimestampCb as setFirstOpTimestamp, getFirstOpTimestampCb as getFirstOpTimestamp, + getFirstOpTimestampsCb as getFirstOpTimestamps, clearFirstOpTimestampCb as clearFirstOpTimestamp, getProjectIdsWithFirstOpTimestampsCb as getProjectIdsWithFirstOpTimestamps, clearDanglingFirstOpTimestampCb as clearDanglingFirstOpTimestamp, @@ -413,6 +435,7 @@ export const promises = { getProjectIdsWithHistoryOpsCount, setFirstOpTimestamp, getFirstOpTimestamp, + getFirstOpTimestamps, clearFirstOpTimestamp, getProjectIdsWithFirstOpTimestamps, clearDanglingFirstOpTimestamp, diff --git a/services/project-history/config/settings.defaults.cjs b/services/project-history/config/settings.defaults.cjs index 2338718203..9e5a39868a 100644 --- a/services/project-history/config/settings.defaults.cjs +++ b/services/project-history/config/settings.defaults.cjs @@ -41,6 +41,9 @@ module.exports = { 10 ), }, + project_history: { + url: `http://${process.env.PROJECT_HISTORY_HOST || '127.0.0.1'}:3054`, + }, }, redis: { lock: { diff --git a/services/project-history/package.json b/services/project-history/package.json index 96cae3d726..2a54a807d3 100644 --- a/services/project-history/package.json +++ b/services/project-history/package.json @@ -37,6 +37,7 @@ "minimist": "^1.2.8", "mongodb-legacy": "6.1.3", "overleaf-editor-core": "*", + "p-queue": "^8.1.0", "request": "^2.88.2" }, "devDependencies": { diff --git a/services/project-history/scripts/flush_old.js b/services/project-history/scripts/flush_old.js new file mode 100644 index 0000000000..6dc140196e --- /dev/null +++ b/services/project-history/scripts/flush_old.js @@ -0,0 +1,191 @@ +#!/usr/bin/env node + +import Settings from '@overleaf/settings' +import minimist from 'minimist' +import logger from '@overleaf/logger' +import PQueue from 'p-queue' +import * as RedisManager from '../app/js/RedisManager.js' +import * as ErrorRecorder from '../app/js/ErrorRecorder.js' + +logger.logger.level('fatal') + +function usage() { + console.log(` +Usage: flush_old.js [options] + +Options: + -b, --batch-size Number of projects to process in each batch (default: 100) + -a, --max-age Maximum age of projects to keep (default: 3600) + -i, --interval Interval to spread the processing over (default: 300) + -c, --concurrency Number of concurrent jobs (default: 10) + -u, --buffer Buffer time in seconds to reserve at end (default: 15) + -n, --dry-run Show what would be done without making changes + -h, --help Show this help message + +Examples: + # Flush projects older than 24 hours with 5 concurrent jobs + flush_old.js --batch-size 100 --max-age 86400 -c 5 + + # Dry run to see what would be flushed + flush_old.js --max-age 3600 --dry-run +`) + process.exit(0) +} + +const argv = minimist(process.argv.slice(2), { + boolean: ['dry-run', 'help'], + alias: { + b: 'batch-size', + a: 'max-age', + i: 'interval', + c: 'concurrency', + n: 'dry-run', + u: 'buffer', + h: 'help', + }, + default: { + 'batch-size': 100, + 'max-age': 3600, + interval: 300, + concurrency: 10, + 'dry-run': false, + buffer: 15, + help: false, + }, +}) + +if (argv.help || process.argv.length === 2) { + usage() +} + +const batchSize = parseInt(argv['batch-size'], 10) +const maxAge = argv['max-age'] ? parseInt(argv['max-age'], 10) : null +const interval = parseInt(argv.interval, 10) || 300 +const concurrency = parseInt(argv.concurrency, 10) || 10 +const bufferTime = parseInt(argv.buffer, 10) || 15 +const dryRun = argv['dry-run'] + +/** + * Generator function that yields batches of items from an array + * @param {Array} array - The array to batch + * @param {number} size - The size of each batch + * @yields {Array} A batch of items + */ +function* getBatches(array, size) { + for (let i = 0; i < array.length; i += size) { + yield array.slice(i, i + size) + } +} + +let flushCount = 0 + +async function flushProject({ projectId, timestamp }) { + const url = `${Settings.apis.project_history.url}/project/${projectId}/flush` + if (dryRun) { + console.log(`[DRY RUN] would flush project ${projectId}`) + return + } + const response = await fetch(url, { + method: 'POST', + }) + flushCount++ + if (flushCount % 100 === 0) { + console.log('flushed', flushCount, 'projects, up to', timestamp) + } + if (!response.ok) { + throw new Error(`failed to flush project ${projectId}`) + } +} + +const SCRIPT_START_TIME = Date.now() // current time in milliseconds from start of script + +function olderThan(maxAge, timestamp) { + const age = (SCRIPT_START_TIME - timestamp) / 1000 + return age > maxAge +} + +async function main() { + const projectIds = await RedisManager.promises.getProjectIdsWithHistoryOps() + const failedProjects = await ErrorRecorder.promises.getFailedProjects() + const failedProjectIds = new Set(failedProjects.map(p => p.project_id)) + + const projectIdsToProcess = projectIds.filter(p => !failedProjectIds.has(p)) + console.log('number of projects with history ops', projectIds.length) + console.log( + 'number of failed projects to exclude', + projectIds.length - projectIdsToProcess.length + ) + const collectedProjects = [] + let nullCount = 0 + // iterate over the project ids in batches of doing a redis MGET to retrieve the first op timestamps + for (const batch of getBatches(projectIdsToProcess, batchSize)) { + const timestamps = await RedisManager.promises.getFirstOpTimestamps(batch) + const newProjects = batch + .map((projectId, idx) => { + return { projectId, timestamp: timestamps[idx] } + }) + .filter(({ timestamp }) => { + if (!timestamp) { + nullCount++ + } + return timestamp ? olderThan(maxAge, timestamp) : true + }) + collectedProjects.push(...newProjects) + } + // sort the collected projects by ascending timestamp + collectedProjects.sort((a, b) => a.timestamp - b.timestamp) + + console.log('number of projects to flush', collectedProjects.length) + console.log('number with null timestamps', nullCount) + + const elapsedTime = Math.floor((Date.now() - SCRIPT_START_TIME) / 1000) + console.log('elapsed time', elapsedTime, 'seconds, buffer time', bufferTime) + const remainingTime = Math.max(interval - elapsedTime - bufferTime, 0) + console.log('remaining time', remainingTime, 'seconds') + + const jobsPerSecond = Math.max( + Math.ceil(collectedProjects.length / Math.max(remainingTime, 60)), + 1 + ) + console.log('interval', interval, 'seconds') + console.log('jobs per second', jobsPerSecond) + console.log('concurrency', concurrency) + + const queue = new PQueue({ + concurrency, + interval: 1000, + intervalCap: jobsPerSecond, + }) + + const taskFns = collectedProjects.map(project => { + return async () => { + try { + await flushProject(project) + return { status: 'fulfilled', value: project } + } catch (error) { + return { status: 'rejected', reason: error, project } + } + } + }) + + const results = await queue.addAll(taskFns) + + console.log( + 'finished after', + Math.floor((Date.now() - SCRIPT_START_TIME) / 1000), + 'seconds' + ) + // count the number of successful and failed flushes + const success = results.filter(r => r.status === 'fulfilled').length + const failed = results.filter(r => r.status === 'rejected').length + console.log('completed', { success, failed }) +} + +main() + .then(() => { + process.exit(0) + }) + .catch(err => { + console.error(err) + process.exit(1) + })