From 832a0facba2f60cead6be23fbe59707e44db7e64 Mon Sep 17 00:00:00 2001 From: Miguel Serrano Date: Tue, 7 Feb 2023 10:14:50 +0100 Subject: [PATCH] Merge pull request #11658 from overleaf/bg-add-concurrency-to-migration add concurrency to the history migration script GitOrigin-RevId: 059a1a8b402627b03cb6dab79b5da22189f32704 --- .../web/scripts/history/migrate_history.js | 47 ++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/services/web/scripts/history/migrate_history.js b/services/web/scripts/history/migrate_history.js index 50aa9c91a3..9e35912165 100644 --- a/services/web/scripts/history/migrate_history.js +++ b/services/web/scripts/history/migrate_history.js @@ -11,6 +11,7 @@ const { waitForDb } = require('../../app/src/infrastructure/mongodb') const minimist = require('minimist') const fs = require('fs') const util = require('util') +const pLimit = require('p-limit') const logger = require('@overleaf/logger') logger.initialize('history-migration') // disable logging to stdout from internal modules @@ -31,14 +32,16 @@ const argv = minimist(process.argv.slice(2), { string: ['output'], alias: { verbose: 'v', + output: 'o', 'dry-run': 'd', + concurrency: 'j', 'use-query-hint': 'q', 'retry-failed': 'r', 'archive-on-failure': 'a', }, default: { output: DEFAULT_OUTPUT_FILE, - 'write-concurrency': 10, + concurrency: 1, 'batch-size': 100, 'max-upgrades-to-attempt': false, 'max-failures': 50, @@ -122,6 +125,9 @@ async function migrateProjects(projectsToMigrate) { let projectsFailed = 0 console.log('Starting migration...') + if (argv.concurrency > 1) { + console.log(`Using ${argv.concurrency} concurrent migrations`) + } // send log output for each migration to a file const output = fs.createWriteStream(argv.output, { flags: 'a' }) console.log(`Writing log output to ${argv.output}`) @@ -129,25 +135,43 @@ async function migrateProjects(projectsToMigrate) { function logJson(obj) { logger.log(JSON.stringify(obj)) } + // limit the number of concurrent migrations + const limit = pLimit(argv.concurrency) + const jobs = [] // throttle progress reporting to 2x per second const progressBar = createProgressBar() let i = 0 const N = projectsToMigrate.length const progressBarTimer = setInterval(() => { + if (INTERRUPT) { + return // don't update the progress bar if we're shutting down + } progressBar( i, N, `Migrated: ${projectsMigrated}, Failed: ${projectsFailed}` ) }, 500) - for (const project of projectsToMigrate) { + + async function _migrateProject(project) { + if (INTERRUPT) { + return // don't start any new jobs if we're shutting down + } const startTime = new Date() try { - if (INTERRUPT) { - break - } const result = await upgradeProject(project._id) + i++ + if (INTERRUPT && limit.activeCount > 1) { + // an interrupt was requested while this job was running + // report that we're waiting for the remaining jobs to finish + console.log( + `Waiting for remaining ${ + limit.activeCount - 1 + } active jobs to finish\r` + ) + } if (result.error) { + // failed to migrate this project logJson({ project_id: project._id, result, @@ -157,6 +181,7 @@ async function migrateProjects(projectsToMigrate) { }) projectsFailed++ } else { + // successfully migrated this project logJson({ project_id: project._id, result, @@ -166,6 +191,7 @@ async function migrateProjects(projectsToMigrate) { projectsMigrated++ } } catch (err) { + // unexpected error from the migration projectsFailed++ logJson({ project_id: project._id, @@ -174,8 +200,13 @@ async function migrateProjects(projectsToMigrate) { endTime: new Date(), }) } - i++ } + + for (const project of projectsToMigrate) { + jobs.push(limit(_migrateProject, project)) + } + // wait for all the queued jobs to complete + await Promise.all(jobs) clearInterval(progressBarTimer) progressBar(i, N, `Migrated: ${projectsMigrated}, Failed: ${projectsFailed}`) process.stdout.write('\n') @@ -210,7 +241,9 @@ async function main() { // then history could get into a broken state // Instead, skip any unprocessed projects and exit() at end of the batch. process.on('SIGINT', function () { - console.log('\nCaught SIGINT, waiting for in process upgrades to complete') + console.log( + '\nCaught SIGINT, waiting for all in-progess upgrades to complete' + ) INTERRUPT = true })