diff --git a/package-lock.json b/package-lock.json index 25aa65b02b..1f35c95a60 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17071,18 +17071,6 @@ "node": ">=6" } }, - "node_modules/cli": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/cli/-/cli-1.0.1.tgz", - "integrity": "sha1-IoF1NPJL+klQw01TLUjsvGIbjBQ=", - "dependencies": { - "exit": "0.1.2", - "glob": "^7.1.1" - }, - "engines": { - "node": ">=0.2.5" - } - }, "node_modules/cli-color": { "version": "2.0.3", "resolved": "https://registry.npmjs.org/cli-color/-/cli-color-2.0.3.tgz", @@ -21935,14 +21923,6 @@ "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" }, - "node_modules/exit": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/exit/-/exit-0.1.2.tgz", - "integrity": "sha1-BjJjj42HfMghB9MKD/8aF8uhzQw=", - "engines": { - "node": ">= 0.8.0" - } - }, "node_modules/expand-brackets": { "version": "2.1.4", "resolved": "https://registry.npmjs.org/expand-brackets/-/expand-brackets-2.1.4.tgz", @@ -43172,11 +43152,11 @@ "body-parser": "^1.20.3", "bunyan": "^1.8.15", "celebrate": "^15.0.3", - "cli": "^1.0.1", "diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688", "esmock": "^2.6.3", "express": "^4.21.0", "lodash": "^4.17.20", + "minimist": "^1.2.8", "mongodb-legacy": "6.1.3", "overleaf-editor-core": "*", "request": "^2.88.2" @@ -51705,11 +51685,11 @@ "celebrate": "^15.0.3", "chai": "^4.3.6", "chai-as-promised": "^7.1.1", - "cli": "^1.0.1", "diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688", "esmock": "^2.6.3", "express": "^4.21.0", "lodash": "^4.17.20", + "minimist": "^1.2.8", "mocha": "^10.2.0", "mongodb-legacy": "6.1.3", "nock": "^13.5.3", @@ -59870,15 +59850,6 @@ "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz", "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==" }, - "cli": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/cli/-/cli-1.0.1.tgz", - "integrity": "sha1-IoF1NPJL+klQw01TLUjsvGIbjBQ=", - "requires": { - "exit": "0.1.2", - "glob": "^7.1.1" - } - }, "cli-color": { "version": "2.0.3", "resolved": "https://registry.npmjs.org/cli-color/-/cli-color-2.0.3.tgz", @@ -63435,11 +63406,6 @@ "exegesis": "^4.1.0" } }, - "exit": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/exit/-/exit-0.1.2.tgz", - "integrity": "sha1-BjJjj42HfMghB9MKD/8aF8uhzQw=" - }, "expand-brackets": { "version": "2.1.4", "resolved": "https://registry.npmjs.org/expand-brackets/-/expand-brackets-2.1.4.tgz", diff --git a/services/history-v1/api/controllers/projects.js b/services/history-v1/api/controllers/projects.js index 2478052a43..80e4926aa8 100644 --- a/services/history-v1/api/controllers/projects.js +++ b/services/history-v1/api/controllers/projects.js @@ -86,6 +86,25 @@ async function getLatestHistory(req, res, next) { } } +async function getLatestHistoryRaw(req, res, next) { + const projectId = req.swagger.params.project_id.value + try { + const { startVersion, endVersion, endTimestamp } = + await chunkStore.loadLatestRaw(projectId) + res.json({ + startVersion, + endVersion, + endTimestamp, + }) + } catch (err) { + if (err instanceof Chunk.NotFoundError) { + render.notFound(res) + } else { + throw err + } + } +} + async function getHistory(req, res, next) { const projectId = req.swagger.params.project_id.value const version = req.swagger.params.version.value @@ -314,6 +333,7 @@ module.exports = { getLatestHashedContent: expressify(getLatestHashedContent), getLatestPersistedHistory: expressify(getLatestHistory), getLatestHistory: expressify(getLatestHistory), + getLatestHistoryRaw: expressify(getLatestHistoryRaw), getHistory: expressify(getHistory), getHistoryBefore: expressify(getHistoryBefore), getZip: expressify(getZip), diff --git a/services/history-v1/api/swagger/index.js b/services/history-v1/api/swagger/index.js index edfb68f4e4..3702c6ec07 100644 --- a/services/history-v1/api/swagger/index.js +++ b/services/history-v1/api/swagger/index.js @@ -84,6 +84,19 @@ module.exports = { }, }, }, + ChunkResponseRaw: { + properties: { + startVersion: { + type: 'number', + }, + endVersion: { + type: 'number', + }, + endTimestamp: { + type: 'string', + }, + }, + }, History: { properties: { snapshot: { diff --git a/services/history-v1/api/swagger/projects.js b/services/history-v1/api/swagger/projects.js index 39026022d0..1cea9a2ec3 100644 --- a/services/history-v1/api/swagger/projects.js +++ b/services/history-v1/api/swagger/projects.js @@ -321,6 +321,37 @@ exports.paths = { }, }, }, + '/projects/{project_id}/latest/history/raw': { + get: { + 'x-swagger-router-controller': 'projects', + operationId: 'getLatestHistoryRaw', + tags: ['Project'], + description: 'Get the metadata of latest sequence of changes.', + parameters: [ + { + name: 'project_id', + in: 'path', + description: 'project id', + required: true, + type: 'string', + }, + ], + responses: { + 200: { + description: 'Success', + schema: { + $ref: '#/definitions/ChunkResponseRaw', + }, + }, + 404: { + description: 'Not Found', + schema: { + $ref: '#/definitions/Error', + }, + }, + }, + }, + }, '/projects/{project_id}/latest/persistedHistory': { get: { 'x-swagger-router-controller': 'projects', diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index 9ccf948820..381854b45f 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -82,21 +82,31 @@ async function lazyLoadHistoryFiles(history, batchBlobStore) { * Load the latest Chunk stored for a project, including blob metadata. * * @param {string} projectId - * @return {Promise.} + * @return {Promise<{id: string, startVersion: number, endVersion: number, endTimestamp: Date}>} */ -async function loadLatest(projectId) { +async function loadLatestRaw(projectId) { assert.projectId(projectId, 'bad projectId') const backend = getBackend(projectId) - const blobStore = new BlobStore(projectId) - const batchBlobStore = new BatchBlobStore(blobStore) const chunkRecord = await backend.getLatestChunk(projectId) if (chunkRecord == null) { throw new Chunk.NotFoundError(projectId) } + return chunkRecord +} +/** + * Load the latest Chunk stored for a project, including blob metadata. + * + * @param {string} projectId + * @return {Promise.} + */ +async function loadLatest(projectId) { + const chunkRecord = await loadLatestRaw(projectId) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) + const blobStore = new BlobStore(projectId) + const batchBlobStore = new BatchBlobStore(blobStore) await lazyLoadHistoryFiles(history, batchBlobStore) return new Chunk(history, chunkRecord.startVersion) } @@ -318,6 +328,7 @@ module.exports = { getBackend, initializeProject, loadLatest, + loadLatestRaw, loadAtVersion, loadAtTimestamp, create, diff --git a/services/history-v1/storage/lib/chunk_store/mongo.js b/services/history-v1/storage/lib/chunk_store/mongo.js index f56131a25b..375bb9df08 100644 --- a/services/history-v1/storage/lib/chunk_store/mongo.js +++ b/services/history-v1/storage/lib/chunk_store/mongo.js @@ -252,6 +252,7 @@ function chunkFromRecord(record) { id: record._id.toString(), startVersion: record.startVersion, endVersion: record.endVersion, + endTimestamp: record.endTimestamp, } } diff --git a/services/history-v1/storage/lib/chunk_store/postgres.js b/services/history-v1/storage/lib/chunk_store/postgres.js index f6eead7354..78b0326ee0 100644 --- a/services/history-v1/storage/lib/chunk_store/postgres.js +++ b/services/history-v1/storage/lib/chunk_store/postgres.js @@ -81,6 +81,7 @@ function chunkFromRecord(record) { id: record.id, startVersion: record.start_version, endVersion: record.end_version, + endTimestamp: record.end_timestamp, } } diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js index 54d01548b8..c9c7aa44ed 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js @@ -69,6 +69,15 @@ describe('chunkStore', function () { await chunkStore.update(projectId, oldEndVersion, chunk) }) + it('records the correct metadata in db', async function () { + const raw = await chunkStore.loadLatestRaw(projectId) + expect(raw).to.deep.include({ + startVersion: 0, + endVersion: 2, + endTimestamp: lastChangeTimestamp, + }) + }) + it('records the correct timestamp', async function () { const chunk = await chunkStore.loadLatest(projectId) expect(chunk.getEndTimestamp()).to.deep.equal(lastChangeTimestamp) diff --git a/services/project-history/app/js/ErrorRecorder.js b/services/project-history/app/js/ErrorRecorder.js index 3c0570f822..5f90d7b62a 100644 --- a/services/project-history/app/js/ErrorRecorder.js +++ b/services/project-history/app/js/ErrorRecorder.js @@ -83,6 +83,10 @@ async function recordSyncStart(projectId) { ) } +/** + * @param projectId + * @return {Promise<{error: string, forceDebug?: boolean}|null>} + */ async function getFailureRecord(projectId) { return await db.projectHistoryFailures.findOne({ project_id: projectId }) } diff --git a/services/project-history/app/js/HistoryStoreManager.js b/services/project-history/app/js/HistoryStoreManager.js index eff25c8663..4ef67860a1 100644 --- a/services/project-history/app/js/HistoryStoreManager.js +++ b/services/project-history/app/js/HistoryStoreManager.js @@ -17,6 +17,7 @@ import * as Errors from './Errors.js' import * as LocalFileWriter from './LocalFileWriter.js' import * as HashManager from './HashManager.js' import * as HistoryBlobTranslator from './HistoryBlobTranslator.js' +import { promisifyMultiResult } from '@overleaf/promise-utils' const HTTP_REQUEST_TIMEOUT = Settings.overleaf.history.requestTimeout @@ -86,6 +87,28 @@ export function getMostRecentVersion(projectId, historyId, callback) { }) } +/** + * @param {string} projectId + * @param {string} historyId + * @param {(error: Error, rawChunk?: { startVersion: number, endVersion: number, endTimestamp: Date}) => void} callback + */ +export function getMostRecentVersionRaw(projectId, historyId, callback) { + const path = `projects/${historyId}/latest/history/raw` + logger.debug( + { projectId, historyId }, + 'getting raw chunk from history service' + ) + _requestHistoryService({ path, json: true }, (err, body) => { + if (err) return callback(OError.tag(err)) + const { startVersion, endVersion, endTimestamp } = body + callback(null, { + startVersion, + endVersion, + endTimestamp: new Date(endTimestamp), + }) + }) +} + function _requestChunk(options, callback) { _requestHistoryService(options, (err, chunk) => { if (err) { @@ -576,7 +599,13 @@ export const promises = { /** @type {(projectId: string, historyId: string) => Promise<{chunk: import('overleaf-editor-core/lib/types.js').RawChunk}>} */ getMostRecentChunk: promisify(getMostRecentChunk), getChunkAtVersion: promisify(getChunkAtVersion), - getMostRecentVersion: promisify(getMostRecentVersion), + getMostRecentVersion: promisifyMultiResult(getMostRecentVersion, [ + 'version', + 'projectStructureAndDocVersions', + 'lastChange', + 'mostRecentChunk', + ]), + getMostRecentVersionRaw: promisify(getMostRecentVersionRaw), getProjectBlob: promisify(getProjectBlob), getProjectBlobStream: promisify(getProjectBlobStream), sendChanges: promisify(sendChanges), diff --git a/services/project-history/app/js/SyncManager.js b/services/project-history/app/js/SyncManager.js index 67e89cc85a..271057cf25 100644 --- a/services/project-history/app/js/SyncManager.js +++ b/services/project-history/app/js/SyncManager.js @@ -53,7 +53,7 @@ async function startResync(projectId, options = {}) { await LockManager.promises.runWithLock( keys.projectHistoryLock({ project_id: projectId }), async extendLock => { - await _startResyncWithoutLock(projectId, options) + await startResyncWithoutLock(projectId, options) } ) } catch (error) { @@ -76,7 +76,7 @@ async function startHardResync(projectId, options = {}) { await clearResyncState(projectId) await RedisManager.promises.clearFirstOpTimestamp(projectId) await RedisManager.promises.destroyDocUpdatesQueue(projectId) - await _startResyncWithoutLock(projectId, options) + await startResyncWithoutLock(projectId, options) } ) } catch (error) { @@ -86,7 +86,8 @@ async function startHardResync(projectId, options = {}) { } } -async function _startResyncWithoutLock(projectId, options) { +// The caller must hold the lock and should record any errors via the ErrorRecorder. +async function startResyncWithoutLock(projectId, options) { await ErrorRecorder.promises.recordSyncStart(projectId) const syncState = await _getResyncState(projectId) @@ -159,6 +160,29 @@ async function clearResyncState(projectId) { }) } +/** + * @param {string} projectId + * @param {Date} date + * @return {Promise} + */ +async function clearResyncStateIfAllAfter(projectId, date) { + const rawSyncState = await db.projectHistorySyncState.findOne({ + project_id: new ObjectId(projectId.toString()), + }) + if (!rawSyncState) return // already cleared + const state = SyncState.fromRaw(projectId, rawSyncState) + if (state.isSyncOngoing()) return // new sync started + for (const { timestamp } of rawSyncState.history) { + if (timestamp < date) return // preserve old resync states + } + // expiresAt is cleared when starting a sync and bumped when making changes. + // Use expiresAt as read to ensure we only clear the confirmed state. + await db.projectHistorySyncState.deleteOne({ + project_id: new ObjectId(projectId.toString()), + expiresAt: rawSyncState.expiresAt, + }) +} + async function skipUpdatesDuringSync(projectId, updates) { const syncState = await _getResyncState(projectId) if (!syncState.isSyncOngoing()) { @@ -1132,6 +1156,7 @@ function trackingDirectivesEqual(a, b) { // EXPORTS const startResyncCb = callbackify(startResync) +const startResyncWithoutLockCb = callbackify(startResyncWithoutLock) const startHardResyncCb = callbackify(startHardResync) const setResyncStateCb = callbackify(setResyncState) const clearResyncStateCb = callbackify(clearResyncState) @@ -1174,6 +1199,7 @@ const expandSyncUpdatesCb = ( export { startResyncCb as startResync, + startResyncWithoutLockCb as startResyncWithoutLock, startHardResyncCb as startHardResync, setResyncStateCb as setResyncState, clearResyncStateCb as clearResyncState, @@ -1183,9 +1209,11 @@ export { export const promises = { startResync, + startResyncWithoutLock, startHardResync, setResyncState, clearResyncState, + clearResyncStateIfAllAfter, skipUpdatesDuringSync, expandSyncUpdates, } diff --git a/services/project-history/app/js/UpdatesProcessor.js b/services/project-history/app/js/UpdatesProcessor.js index bbce79e0be..788a8cdf29 100644 --- a/services/project-history/app/js/UpdatesProcessor.js +++ b/services/project-history/app/js/UpdatesProcessor.js @@ -60,6 +60,46 @@ export function getRawUpdates(projectId, batchSize, callback) { }) } +// Trigger resync and start processing under lock to avoid other operations to +// flush the resync updates. +export function startResyncAndProcessUpdatesUnderLock( + projectId, + opts, + callback +) { + const startTimeMs = Date.now() + LockManager.runWithLock( + keys.projectHistoryLock({ project_id: projectId }), + (extendLock, releaseLock) => { + SyncManager.startResyncWithoutLock(projectId, opts, err => { + if (err) return callback(OError.tag(err)) + extendLock(err => { + if (err) return callback(OError.tag(err)) + _countAndProcessUpdates( + projectId, + extendLock, + REDIS_READ_BATCH_SIZE, + releaseLock + ) + }) + }) + }, + (error, queueSize) => { + if (error) { + OError.tag(error) + } + ErrorRecorder.record(projectId, queueSize, error, callback) + if (queueSize > 0) { + const duration = (Date.now() - startTimeMs) / 1000 + Metrics.historyFlushDurationSeconds.observe(duration) + Metrics.historyFlushQueueSize.observe(queueSize) + } + // clear the timestamp in the background if the queue is now empty + RedisManager.clearDanglingFirstOpTimestamp(projectId, () => {}) + } + ) +} + // Process all updates for a project, only check project-level information once export function processUpdatesForProject(projectId, callback) { const startTimeMs = Date.now() @@ -631,5 +671,10 @@ function _sanitizeUpdate(update) { } export const promises = { + /** @type {(projectId: string) => Promise} */ processUpdatesForProject: promisify(processUpdatesForProject), + /** @type {(projectId: string, opts: any) => Promise} */ + startResyncAndProcessUpdatesUnderLock: promisify( + startResyncAndProcessUpdatesUnderLock + ), } diff --git a/services/project-history/package.json b/services/project-history/package.json index 3be1e70f54..34b3df08f1 100644 --- a/services/project-history/package.json +++ b/services/project-history/package.json @@ -30,11 +30,11 @@ "body-parser": "^1.20.3", "bunyan": "^1.8.15", "celebrate": "^15.0.3", - "cli": "^1.0.1", "diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688", "esmock": "^2.6.3", "express": "^4.21.0", "lodash": "^4.17.20", + "minimist": "^1.2.8", "mongodb-legacy": "6.1.3", "overleaf-editor-core": "*", "request": "^2.88.2" diff --git a/services/project-history/scripts/bulk_resync_file_fix_up.mjs b/services/project-history/scripts/bulk_resync_file_fix_up.mjs new file mode 100644 index 0000000000..c4a590fb62 --- /dev/null +++ b/services/project-history/scripts/bulk_resync_file_fix_up.mjs @@ -0,0 +1,324 @@ +// @ts-check +import Events from 'node:events' +import { setTimeout } from 'node:timers/promises' +import readline from 'node:readline' +import fs from 'node:fs' +import minimist from 'minimist' +import { ObjectId } from 'mongodb' +import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js' +import logger from '@overleaf/logger' +import Metrics from '@overleaf/metrics' +import OError from '@overleaf/o-error' +import { promiseMapWithLimit } from '@overleaf/promise-utils' +import { db, mongoClient } from '../app/js/mongodb.js' +import * as HistoryStoreManager from '../app/js/HistoryStoreManager.js' +import * as RedisManager from '../app/js/RedisManager.js' +import * as SyncManager from '../app/js/SyncManager.js' +import * as UpdatesProcessor from '../app/js/UpdatesProcessor.js' +import { NeedFullProjectStructureResyncError } from '../app/js/Errors.js' +import * as ErrorRecorder from '../app/js/ErrorRecorder.js' + +// Silence warning. +Events.setMaxListeners(20) + +// Enable caching for ObjectId.toString() +ObjectId.cacheHexString = true + +const READ_CONCURRENCY = parseInt(process.env.READ_CONCURRENCY || '100', 10) +const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY || '10', 10) +const FLUSH_RETRIES = parseInt(process.env.FLUSH_RETRIES || '20', 10) + +// Relevant dates: +// - 2024-12-19, start of event-hold removal in filestore bucket -> objects older than 24h are (soft-)deleted. +// - 2024-12-23, copy operation skipped in filestore when cloning project -> objects not created on clone. +// - 2025-01-24, no more filestore reads allowed in project-history -> no more empty files in history for 404s +const FILESTORE_SOFT_DELETE_START = new Date('2024-12-19T00:00:00Z') +const FILESTORE_READ_OFF = new Date('2025-01-24T15:00:00Z') + +const argv = minimist(process.argv.slice(2), { + string: ['logs'], +}) + +let gracefulShutdownInitiated = false + +process.on('SIGINT', handleSignal) +process.on('SIGTERM', handleSignal) + +function handleSignal() { + gracefulShutdownInitiated = true + console.warn('graceful shutdown initiated, draining queue') +} + +const STATS = { + processedLines: 0, + success: 0, + changed: 0, + failure: 0, + skipped: 0, + checkFailure: 0, +} + +function logStats() { + console.log( + JSON.stringify({ + time: new Date(), + gracefulShutdownInitiated, + ...STATS, + }) + ) +} +const logInterval = setInterval(logStats, 10_000) + +/** + * @typedef {Object} FileRef + * @property {ObjectId} _id + * @property {any} linkedFileData + */ + +/** + * @typedef {Object} Folder + * @property {Array} folders + * @property {Array} fileRefs + */ + +/** + * @typedef {Object} Project + * @property {ObjectId} _id + * @property {Date} lastUpdated + * @property {Array} rootFolder + * @property {{history: {id: (number|string)}}} overleaf + */ + +/** + * @param {Folder} folder + * @return {boolean} + */ +function checkFileTreeNeedsResync(folder) { + if (!folder) return false + if (Array.isArray(folder.fileRefs)) { + for (const fileRef of folder.fileRefs) { + if (fileRef.linkedFileData) return true + if (fileRef._id.getTimestamp() > FILESTORE_SOFT_DELETE_START) return true + } + } + if (Array.isArray(folder.folders)) { + for (const child of folder.folders) { + if (checkFileTreeNeedsResync(child)) return true + } + } + return false +} + +/** + * @param {string} projectId + * @param {string} historyId + * @return {Promise} + */ +async function getLastEndTimestamp(projectId, historyId) { + const raw = await HistoryStoreManager.promises.getMostRecentVersionRaw( + projectId, + historyId + ) + if (!raw) throw new Error('bug: history not initialized') + return raw.endTimestamp +} + +/** @type {Record Promise>} */ +const conditions = { + // cheap: in-memory mongo lookup + 'updated after filestore soft-delete': async function (project) { + return project.lastUpdated > FILESTORE_SOFT_DELETE_START + }, + // cheap: in-memory mongo lookup + 'file-tree requires re-sync': async function (project) { + return checkFileTreeNeedsResync(project.rootFolder?.[0]) + }, + // moderate: GET from Redis + 'has pending operations': async function (project) { + const n = await RedisManager.promises.countUnprocessedUpdates( + project._id.toString() + ) + return n > 0 + }, + // expensive: GET from Mongo/Postgres via history-v1 HTTP API call + 'has been flushed after filestore soft-delete': async function (project) { + // Resyncs started after soft-deleting can trigger 404s and result in empty files. + const endTimestamp = await getLastEndTimestamp( + project._id.toString(), + project.overleaf.history.id.toString() + ) + return endTimestamp > FILESTORE_SOFT_DELETE_START + }, +} + +/** + * @param {Project} project + * @return {Promise<{projectId: string, historyId: string} | null>} + */ +async function checkProject(project) { + if (gracefulShutdownInitiated) return null + if (project._id.getTimestamp() > FILESTORE_READ_OFF) { + STATS.skipped++ // Project created after all bugs were fixed. + return null + } + const projectId = project._id.toString() + const historyId = project.overleaf.history.id.toString() + for (const [condition, check] of Object.entries(conditions)) { + try { + if (await check(project)) return { projectId, historyId } + } catch (err) { + logger.err({ projectId, condition, err }, 'failed to check project') + STATS.checkFailure++ + return null + } + } + STATS.skipped++ + return null +} + +/** + * @param {string} projectId + * @param {string} historyId + * @return {Promise} + */ +async function processProject(projectId, historyId) { + if (gracefulShutdownInitiated) return + const t0 = performance.now() + try { + await tryProcessProject(projectId, historyId) + const latency = performance.now() - t0 + logger.info({ projectId, historyId, latency }, 'processed project') + STATS.success++ + } catch (err) { + logger.err({ err, projectId, historyId }, 'failed to process project') + STATS.failure++ + } +} + +/** + * @param {string} projectId + * @return {Promise} + */ +async function flushWithRetries(projectId) { + for (let attempt = 0; attempt < FLUSH_RETRIES; attempt++) { + try { + await UpdatesProcessor.promises.processUpdatesForProject(projectId) + return + } catch (err) { + logger.warn( + { projectId, err, attempt }, + 'failed to flush updates, trying again' + ) + if (gracefulShutdownInitiated) throw err + } + } + + try { + await UpdatesProcessor.promises.processUpdatesForProject(projectId) + } catch (err) { + // @ts-ignore err is Error + throw new OError('failed to flush updates', {}, err) + } +} + +/** + * @param {string} projectId + * @param {string} historyId + * @return {Promise} + */ +async function tryProcessProject(projectId, historyId) { + await flushWithRetries(projectId) + const start = new Date() + let needsFullSync = false + try { + await UpdatesProcessor.promises.startResyncAndProcessUpdatesUnderLock( + projectId, + { resyncProjectStructureOnly: true } + ) + } catch (err) { + if (err instanceof NeedFullProjectStructureResyncError) { + needsFullSync = true + } else { + throw err + } + } + if (needsFullSync) { + logger.warn( + { projectId, historyId }, + 'structure only resync not sufficient, doing full soft resync' + ) + await SyncManager.promises.startResync(projectId, {}) + await UpdatesProcessor.promises.processUpdatesForProject(projectId) + STATS.changed++ + } else { + const after = await getLastEndTimestamp(projectId, historyId) + if (after > start) { + STATS.changed++ + } + } + // Avoid db.projectHistorySyncState from growing for each project we resynced. + // MongoDB collections cannot shrink on their own. In case of success, purge + // the db entry created by this script right away. + await SyncManager.promises.clearResyncStateIfAllAfter(projectId, start) +} + +async function processBatch(projects) { + const projectIds = ( + await promiseMapWithLimit(READ_CONCURRENCY, projects, checkProject) + ).filter(id => !!id) + await promiseMapWithLimit(WRITE_CONCURRENCY, projectIds, ids => + processProject(ids.projectId, ids.historyId) + ) + + if (gracefulShutdownInitiated) throw new Error('graceful shutdown triggered') +} + +async function processProjectsFromLog() { + const rl = readline.createInterface({ + input: fs.createReadStream(argv.logs), + }) + for await (const line of rl) { + if (gracefulShutdownInitiated) break + STATS.processedLines++ + if (!line.startsWith('{')) continue + const { projectId, historyId, msg } = JSON.parse(line) + if (msg !== 'failed to process project') continue + await processProject(projectId, historyId) // does try/catch with logging + } +} + +async function main() { + if (argv.logs) { + await processProjectsFromLog() + return + } + await batchedUpdate(db.projects, {}, processBatch, { + _id: 1, + lastUpdated: 1, + 'overleaf.history': 1, + rootFolder: 1, + }) +} + +try { + try { + await main() + } finally { + clearInterval(logInterval) + logStats() + Metrics.close() + await mongoClient.close() + // TODO(das7pad): graceful shutdown for redis. Refactor process.exit when done. + } + console.log('Done.') + await setTimeout(1_000) + if (STATS.failure) { + process.exit(Math.min(STATS.failure, 99)) + } else { + process.exit(0) + } +} catch (err) { + logger.err({ err }, 'fatal error') + await setTimeout(1_000) + process.exit(100) +}