mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
[project-history] script for fixing-up files/metadata with bulk resync (#23184)
* [history-v1] add cheap endpoint for checking time of last history write The /raw endpoint skips the GCS lookup for the chunk. * [project-history] script for fixing-up files/metadata with bulk resync * [project-history] upgrade structure only resync when full sync is needed * [project-history] start resync and process resync updates under lock * [project-history] stop retrying during graceful shutdown GitOrigin-RevId: 73184d5786e1d40f5b7e21f387fc37cf43f0ac2d
This commit is contained in:
38
package-lock.json
generated
38
package-lock.json
generated
@@ -17071,18 +17071,6 @@
|
||||
"node": ">=6"
|
||||
}
|
||||
},
|
||||
"node_modules/cli": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/cli/-/cli-1.0.1.tgz",
|
||||
"integrity": "sha1-IoF1NPJL+klQw01TLUjsvGIbjBQ=",
|
||||
"dependencies": {
|
||||
"exit": "0.1.2",
|
||||
"glob": "^7.1.1"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=0.2.5"
|
||||
}
|
||||
},
|
||||
"node_modules/cli-color": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/cli-color/-/cli-color-2.0.3.tgz",
|
||||
@@ -21935,14 +21923,6 @@
|
||||
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
|
||||
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="
|
||||
},
|
||||
"node_modules/exit": {
|
||||
"version": "0.1.2",
|
||||
"resolved": "https://registry.npmjs.org/exit/-/exit-0.1.2.tgz",
|
||||
"integrity": "sha1-BjJjj42HfMghB9MKD/8aF8uhzQw=",
|
||||
"engines": {
|
||||
"node": ">= 0.8.0"
|
||||
}
|
||||
},
|
||||
"node_modules/expand-brackets": {
|
||||
"version": "2.1.4",
|
||||
"resolved": "https://registry.npmjs.org/expand-brackets/-/expand-brackets-2.1.4.tgz",
|
||||
@@ -43172,11 +43152,11 @@
|
||||
"body-parser": "^1.20.3",
|
||||
"bunyan": "^1.8.15",
|
||||
"celebrate": "^15.0.3",
|
||||
"cli": "^1.0.1",
|
||||
"diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688",
|
||||
"esmock": "^2.6.3",
|
||||
"express": "^4.21.0",
|
||||
"lodash": "^4.17.20",
|
||||
"minimist": "^1.2.8",
|
||||
"mongodb-legacy": "6.1.3",
|
||||
"overleaf-editor-core": "*",
|
||||
"request": "^2.88.2"
|
||||
@@ -51705,11 +51685,11 @@
|
||||
"celebrate": "^15.0.3",
|
||||
"chai": "^4.3.6",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
"cli": "^1.0.1",
|
||||
"diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688",
|
||||
"esmock": "^2.6.3",
|
||||
"express": "^4.21.0",
|
||||
"lodash": "^4.17.20",
|
||||
"minimist": "^1.2.8",
|
||||
"mocha": "^10.2.0",
|
||||
"mongodb-legacy": "6.1.3",
|
||||
"nock": "^13.5.3",
|
||||
@@ -59870,15 +59850,6 @@
|
||||
"resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz",
|
||||
"integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A=="
|
||||
},
|
||||
"cli": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/cli/-/cli-1.0.1.tgz",
|
||||
"integrity": "sha1-IoF1NPJL+klQw01TLUjsvGIbjBQ=",
|
||||
"requires": {
|
||||
"exit": "0.1.2",
|
||||
"glob": "^7.1.1"
|
||||
}
|
||||
},
|
||||
"cli-color": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/cli-color/-/cli-color-2.0.3.tgz",
|
||||
@@ -63435,11 +63406,6 @@
|
||||
"exegesis": "^4.1.0"
|
||||
}
|
||||
},
|
||||
"exit": {
|
||||
"version": "0.1.2",
|
||||
"resolved": "https://registry.npmjs.org/exit/-/exit-0.1.2.tgz",
|
||||
"integrity": "sha1-BjJjj42HfMghB9MKD/8aF8uhzQw="
|
||||
},
|
||||
"expand-brackets": {
|
||||
"version": "2.1.4",
|
||||
"resolved": "https://registry.npmjs.org/expand-brackets/-/expand-brackets-2.1.4.tgz",
|
||||
|
||||
@@ -86,6 +86,25 @@ async function getLatestHistory(req, res, next) {
|
||||
}
|
||||
}
|
||||
|
||||
async function getLatestHistoryRaw(req, res, next) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
try {
|
||||
const { startVersion, endVersion, endTimestamp } =
|
||||
await chunkStore.loadLatestRaw(projectId)
|
||||
res.json({
|
||||
startVersion,
|
||||
endVersion,
|
||||
endTimestamp,
|
||||
})
|
||||
} catch (err) {
|
||||
if (err instanceof Chunk.NotFoundError) {
|
||||
render.notFound(res)
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function getHistory(req, res, next) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
const version = req.swagger.params.version.value
|
||||
@@ -314,6 +333,7 @@ module.exports = {
|
||||
getLatestHashedContent: expressify(getLatestHashedContent),
|
||||
getLatestPersistedHistory: expressify(getLatestHistory),
|
||||
getLatestHistory: expressify(getLatestHistory),
|
||||
getLatestHistoryRaw: expressify(getLatestHistoryRaw),
|
||||
getHistory: expressify(getHistory),
|
||||
getHistoryBefore: expressify(getHistoryBefore),
|
||||
getZip: expressify(getZip),
|
||||
|
||||
@@ -84,6 +84,19 @@ module.exports = {
|
||||
},
|
||||
},
|
||||
},
|
||||
ChunkResponseRaw: {
|
||||
properties: {
|
||||
startVersion: {
|
||||
type: 'number',
|
||||
},
|
||||
endVersion: {
|
||||
type: 'number',
|
||||
},
|
||||
endTimestamp: {
|
||||
type: 'string',
|
||||
},
|
||||
},
|
||||
},
|
||||
History: {
|
||||
properties: {
|
||||
snapshot: {
|
||||
|
||||
@@ -321,6 +321,37 @@ exports.paths = {
|
||||
},
|
||||
},
|
||||
},
|
||||
'/projects/{project_id}/latest/history/raw': {
|
||||
get: {
|
||||
'x-swagger-router-controller': 'projects',
|
||||
operationId: 'getLatestHistoryRaw',
|
||||
tags: ['Project'],
|
||||
description: 'Get the metadata of latest sequence of changes.',
|
||||
parameters: [
|
||||
{
|
||||
name: 'project_id',
|
||||
in: 'path',
|
||||
description: 'project id',
|
||||
required: true,
|
||||
type: 'string',
|
||||
},
|
||||
],
|
||||
responses: {
|
||||
200: {
|
||||
description: 'Success',
|
||||
schema: {
|
||||
$ref: '#/definitions/ChunkResponseRaw',
|
||||
},
|
||||
},
|
||||
404: {
|
||||
description: 'Not Found',
|
||||
schema: {
|
||||
$ref: '#/definitions/Error',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
'/projects/{project_id}/latest/persistedHistory': {
|
||||
get: {
|
||||
'x-swagger-router-controller': 'projects',
|
||||
|
||||
@@ -82,21 +82,31 @@ async function lazyLoadHistoryFiles(history, batchBlobStore) {
|
||||
* Load the latest Chunk stored for a project, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @return {Promise.<Chunk>}
|
||||
* @return {Promise<{id: string, startVersion: number, endVersion: number, endTimestamp: Date}>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
async function loadLatestRaw(projectId) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
|
||||
const backend = getBackend(projectId)
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const batchBlobStore = new BatchBlobStore(blobStore)
|
||||
const chunkRecord = await backend.getLatestChunk(projectId)
|
||||
if (chunkRecord == null) {
|
||||
throw new Chunk.NotFoundError(projectId)
|
||||
}
|
||||
return chunkRecord
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the latest Chunk stored for a project, including blob metadata.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @return {Promise.<Chunk>}
|
||||
*/
|
||||
async function loadLatest(projectId) {
|
||||
const chunkRecord = await loadLatestRaw(projectId)
|
||||
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const batchBlobStore = new BatchBlobStore(blobStore)
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
return new Chunk(history, chunkRecord.startVersion)
|
||||
}
|
||||
@@ -318,6 +328,7 @@ module.exports = {
|
||||
getBackend,
|
||||
initializeProject,
|
||||
loadLatest,
|
||||
loadLatestRaw,
|
||||
loadAtVersion,
|
||||
loadAtTimestamp,
|
||||
create,
|
||||
|
||||
@@ -252,6 +252,7 @@ function chunkFromRecord(record) {
|
||||
id: record._id.toString(),
|
||||
startVersion: record.startVersion,
|
||||
endVersion: record.endVersion,
|
||||
endTimestamp: record.endTimestamp,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,7 @@ function chunkFromRecord(record) {
|
||||
id: record.id,
|
||||
startVersion: record.start_version,
|
||||
endVersion: record.end_version,
|
||||
endTimestamp: record.end_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,15 @@ describe('chunkStore', function () {
|
||||
await chunkStore.update(projectId, oldEndVersion, chunk)
|
||||
})
|
||||
|
||||
it('records the correct metadata in db', async function () {
|
||||
const raw = await chunkStore.loadLatestRaw(projectId)
|
||||
expect(raw).to.deep.include({
|
||||
startVersion: 0,
|
||||
endVersion: 2,
|
||||
endTimestamp: lastChangeTimestamp,
|
||||
})
|
||||
})
|
||||
|
||||
it('records the correct timestamp', async function () {
|
||||
const chunk = await chunkStore.loadLatest(projectId)
|
||||
expect(chunk.getEndTimestamp()).to.deep.equal(lastChangeTimestamp)
|
||||
|
||||
@@ -83,6 +83,10 @@ async function recordSyncStart(projectId) {
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param projectId
|
||||
* @return {Promise<{error: string, forceDebug?: boolean}|null>}
|
||||
*/
|
||||
async function getFailureRecord(projectId) {
|
||||
return await db.projectHistoryFailures.findOne({ project_id: projectId })
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import * as Errors from './Errors.js'
|
||||
import * as LocalFileWriter from './LocalFileWriter.js'
|
||||
import * as HashManager from './HashManager.js'
|
||||
import * as HistoryBlobTranslator from './HistoryBlobTranslator.js'
|
||||
import { promisifyMultiResult } from '@overleaf/promise-utils'
|
||||
|
||||
const HTTP_REQUEST_TIMEOUT = Settings.overleaf.history.requestTimeout
|
||||
|
||||
@@ -86,6 +87,28 @@ export function getMostRecentVersion(projectId, historyId, callback) {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {string} historyId
|
||||
* @param {(error: Error, rawChunk?: { startVersion: number, endVersion: number, endTimestamp: Date}) => void} callback
|
||||
*/
|
||||
export function getMostRecentVersionRaw(projectId, historyId, callback) {
|
||||
const path = `projects/${historyId}/latest/history/raw`
|
||||
logger.debug(
|
||||
{ projectId, historyId },
|
||||
'getting raw chunk from history service'
|
||||
)
|
||||
_requestHistoryService({ path, json: true }, (err, body) => {
|
||||
if (err) return callback(OError.tag(err))
|
||||
const { startVersion, endVersion, endTimestamp } = body
|
||||
callback(null, {
|
||||
startVersion,
|
||||
endVersion,
|
||||
endTimestamp: new Date(endTimestamp),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function _requestChunk(options, callback) {
|
||||
_requestHistoryService(options, (err, chunk) => {
|
||||
if (err) {
|
||||
@@ -576,7 +599,13 @@ export const promises = {
|
||||
/** @type {(projectId: string, historyId: string) => Promise<{chunk: import('overleaf-editor-core/lib/types.js').RawChunk}>} */
|
||||
getMostRecentChunk: promisify(getMostRecentChunk),
|
||||
getChunkAtVersion: promisify(getChunkAtVersion),
|
||||
getMostRecentVersion: promisify(getMostRecentVersion),
|
||||
getMostRecentVersion: promisifyMultiResult(getMostRecentVersion, [
|
||||
'version',
|
||||
'projectStructureAndDocVersions',
|
||||
'lastChange',
|
||||
'mostRecentChunk',
|
||||
]),
|
||||
getMostRecentVersionRaw: promisify(getMostRecentVersionRaw),
|
||||
getProjectBlob: promisify(getProjectBlob),
|
||||
getProjectBlobStream: promisify(getProjectBlobStream),
|
||||
sendChanges: promisify(sendChanges),
|
||||
|
||||
@@ -53,7 +53,7 @@ async function startResync(projectId, options = {}) {
|
||||
await LockManager.promises.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
async extendLock => {
|
||||
await _startResyncWithoutLock(projectId, options)
|
||||
await startResyncWithoutLock(projectId, options)
|
||||
}
|
||||
)
|
||||
} catch (error) {
|
||||
@@ -76,7 +76,7 @@ async function startHardResync(projectId, options = {}) {
|
||||
await clearResyncState(projectId)
|
||||
await RedisManager.promises.clearFirstOpTimestamp(projectId)
|
||||
await RedisManager.promises.destroyDocUpdatesQueue(projectId)
|
||||
await _startResyncWithoutLock(projectId, options)
|
||||
await startResyncWithoutLock(projectId, options)
|
||||
}
|
||||
)
|
||||
} catch (error) {
|
||||
@@ -86,7 +86,8 @@ async function startHardResync(projectId, options = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
async function _startResyncWithoutLock(projectId, options) {
|
||||
// The caller must hold the lock and should record any errors via the ErrorRecorder.
|
||||
async function startResyncWithoutLock(projectId, options) {
|
||||
await ErrorRecorder.promises.recordSyncStart(projectId)
|
||||
|
||||
const syncState = await _getResyncState(projectId)
|
||||
@@ -159,6 +160,29 @@ async function clearResyncState(projectId) {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {Date} date
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function clearResyncStateIfAllAfter(projectId, date) {
|
||||
const rawSyncState = await db.projectHistorySyncState.findOne({
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
})
|
||||
if (!rawSyncState) return // already cleared
|
||||
const state = SyncState.fromRaw(projectId, rawSyncState)
|
||||
if (state.isSyncOngoing()) return // new sync started
|
||||
for (const { timestamp } of rawSyncState.history) {
|
||||
if (timestamp < date) return // preserve old resync states
|
||||
}
|
||||
// expiresAt is cleared when starting a sync and bumped when making changes.
|
||||
// Use expiresAt as read to ensure we only clear the confirmed state.
|
||||
await db.projectHistorySyncState.deleteOne({
|
||||
project_id: new ObjectId(projectId.toString()),
|
||||
expiresAt: rawSyncState.expiresAt,
|
||||
})
|
||||
}
|
||||
|
||||
async function skipUpdatesDuringSync(projectId, updates) {
|
||||
const syncState = await _getResyncState(projectId)
|
||||
if (!syncState.isSyncOngoing()) {
|
||||
@@ -1132,6 +1156,7 @@ function trackingDirectivesEqual(a, b) {
|
||||
// EXPORTS
|
||||
|
||||
const startResyncCb = callbackify(startResync)
|
||||
const startResyncWithoutLockCb = callbackify(startResyncWithoutLock)
|
||||
const startHardResyncCb = callbackify(startHardResync)
|
||||
const setResyncStateCb = callbackify(setResyncState)
|
||||
const clearResyncStateCb = callbackify(clearResyncState)
|
||||
@@ -1174,6 +1199,7 @@ const expandSyncUpdatesCb = (
|
||||
|
||||
export {
|
||||
startResyncCb as startResync,
|
||||
startResyncWithoutLockCb as startResyncWithoutLock,
|
||||
startHardResyncCb as startHardResync,
|
||||
setResyncStateCb as setResyncState,
|
||||
clearResyncStateCb as clearResyncState,
|
||||
@@ -1183,9 +1209,11 @@ export {
|
||||
|
||||
export const promises = {
|
||||
startResync,
|
||||
startResyncWithoutLock,
|
||||
startHardResync,
|
||||
setResyncState,
|
||||
clearResyncState,
|
||||
clearResyncStateIfAllAfter,
|
||||
skipUpdatesDuringSync,
|
||||
expandSyncUpdates,
|
||||
}
|
||||
|
||||
@@ -60,6 +60,46 @@ export function getRawUpdates(projectId, batchSize, callback) {
|
||||
})
|
||||
}
|
||||
|
||||
// Trigger resync and start processing under lock to avoid other operations to
|
||||
// flush the resync updates.
|
||||
export function startResyncAndProcessUpdatesUnderLock(
|
||||
projectId,
|
||||
opts,
|
||||
callback
|
||||
) {
|
||||
const startTimeMs = Date.now()
|
||||
LockManager.runWithLock(
|
||||
keys.projectHistoryLock({ project_id: projectId }),
|
||||
(extendLock, releaseLock) => {
|
||||
SyncManager.startResyncWithoutLock(projectId, opts, err => {
|
||||
if (err) return callback(OError.tag(err))
|
||||
extendLock(err => {
|
||||
if (err) return callback(OError.tag(err))
|
||||
_countAndProcessUpdates(
|
||||
projectId,
|
||||
extendLock,
|
||||
REDIS_READ_BATCH_SIZE,
|
||||
releaseLock
|
||||
)
|
||||
})
|
||||
})
|
||||
},
|
||||
(error, queueSize) => {
|
||||
if (error) {
|
||||
OError.tag(error)
|
||||
}
|
||||
ErrorRecorder.record(projectId, queueSize, error, callback)
|
||||
if (queueSize > 0) {
|
||||
const duration = (Date.now() - startTimeMs) / 1000
|
||||
Metrics.historyFlushDurationSeconds.observe(duration)
|
||||
Metrics.historyFlushQueueSize.observe(queueSize)
|
||||
}
|
||||
// clear the timestamp in the background if the queue is now empty
|
||||
RedisManager.clearDanglingFirstOpTimestamp(projectId, () => {})
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// Process all updates for a project, only check project-level information once
|
||||
export function processUpdatesForProject(projectId, callback) {
|
||||
const startTimeMs = Date.now()
|
||||
@@ -631,5 +671,10 @@ function _sanitizeUpdate(update) {
|
||||
}
|
||||
|
||||
export const promises = {
|
||||
/** @type {(projectId: string) => Promise<number>} */
|
||||
processUpdatesForProject: promisify(processUpdatesForProject),
|
||||
/** @type {(projectId: string, opts: any) => Promise<number>} */
|
||||
startResyncAndProcessUpdatesUnderLock: promisify(
|
||||
startResyncAndProcessUpdatesUnderLock
|
||||
),
|
||||
}
|
||||
|
||||
@@ -30,11 +30,11 @@
|
||||
"body-parser": "^1.20.3",
|
||||
"bunyan": "^1.8.15",
|
||||
"celebrate": "^15.0.3",
|
||||
"cli": "^1.0.1",
|
||||
"diff-match-patch": "overleaf/diff-match-patch#89805f9c671a77a263fc53461acd62aa7498f688",
|
||||
"esmock": "^2.6.3",
|
||||
"express": "^4.21.0",
|
||||
"lodash": "^4.17.20",
|
||||
"minimist": "^1.2.8",
|
||||
"mongodb-legacy": "6.1.3",
|
||||
"overleaf-editor-core": "*",
|
||||
"request": "^2.88.2"
|
||||
|
||||
324
services/project-history/scripts/bulk_resync_file_fix_up.mjs
Normal file
324
services/project-history/scripts/bulk_resync_file_fix_up.mjs
Normal file
@@ -0,0 +1,324 @@
|
||||
// @ts-check
|
||||
import Events from 'node:events'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import readline from 'node:readline'
|
||||
import fs from 'node:fs'
|
||||
import minimist from 'minimist'
|
||||
import { ObjectId } from 'mongodb'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import logger from '@overleaf/logger'
|
||||
import Metrics from '@overleaf/metrics'
|
||||
import OError from '@overleaf/o-error'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import { db, mongoClient } from '../app/js/mongodb.js'
|
||||
import * as HistoryStoreManager from '../app/js/HistoryStoreManager.js'
|
||||
import * as RedisManager from '../app/js/RedisManager.js'
|
||||
import * as SyncManager from '../app/js/SyncManager.js'
|
||||
import * as UpdatesProcessor from '../app/js/UpdatesProcessor.js'
|
||||
import { NeedFullProjectStructureResyncError } from '../app/js/Errors.js'
|
||||
import * as ErrorRecorder from '../app/js/ErrorRecorder.js'
|
||||
|
||||
// Silence warning.
|
||||
Events.setMaxListeners(20)
|
||||
|
||||
// Enable caching for ObjectId.toString()
|
||||
ObjectId.cacheHexString = true
|
||||
|
||||
const READ_CONCURRENCY = parseInt(process.env.READ_CONCURRENCY || '100', 10)
|
||||
const WRITE_CONCURRENCY = parseInt(process.env.WRITE_CONCURRENCY || '10', 10)
|
||||
const FLUSH_RETRIES = parseInt(process.env.FLUSH_RETRIES || '20', 10)
|
||||
|
||||
// Relevant dates:
|
||||
// - 2024-12-19, start of event-hold removal in filestore bucket -> objects older than 24h are (soft-)deleted.
|
||||
// - 2024-12-23, copy operation skipped in filestore when cloning project -> objects not created on clone.
|
||||
// - 2025-01-24, no more filestore reads allowed in project-history -> no more empty files in history for 404s
|
||||
const FILESTORE_SOFT_DELETE_START = new Date('2024-12-19T00:00:00Z')
|
||||
const FILESTORE_READ_OFF = new Date('2025-01-24T15:00:00Z')
|
||||
|
||||
const argv = minimist(process.argv.slice(2), {
|
||||
string: ['logs'],
|
||||
})
|
||||
|
||||
let gracefulShutdownInitiated = false
|
||||
|
||||
process.on('SIGINT', handleSignal)
|
||||
process.on('SIGTERM', handleSignal)
|
||||
|
||||
function handleSignal() {
|
||||
gracefulShutdownInitiated = true
|
||||
console.warn('graceful shutdown initiated, draining queue')
|
||||
}
|
||||
|
||||
const STATS = {
|
||||
processedLines: 0,
|
||||
success: 0,
|
||||
changed: 0,
|
||||
failure: 0,
|
||||
skipped: 0,
|
||||
checkFailure: 0,
|
||||
}
|
||||
|
||||
function logStats() {
|
||||
console.log(
|
||||
JSON.stringify({
|
||||
time: new Date(),
|
||||
gracefulShutdownInitiated,
|
||||
...STATS,
|
||||
})
|
||||
)
|
||||
}
|
||||
const logInterval = setInterval(logStats, 10_000)
|
||||
|
||||
/**
|
||||
* @typedef {Object} FileRef
|
||||
* @property {ObjectId} _id
|
||||
* @property {any} linkedFileData
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} Folder
|
||||
* @property {Array<Folder>} folders
|
||||
* @property {Array<FileRef>} fileRefs
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} Project
|
||||
* @property {ObjectId} _id
|
||||
* @property {Date} lastUpdated
|
||||
* @property {Array<Folder>} rootFolder
|
||||
* @property {{history: {id: (number|string)}}} overleaf
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Folder} folder
|
||||
* @return {boolean}
|
||||
*/
|
||||
function checkFileTreeNeedsResync(folder) {
|
||||
if (!folder) return false
|
||||
if (Array.isArray(folder.fileRefs)) {
|
||||
for (const fileRef of folder.fileRefs) {
|
||||
if (fileRef.linkedFileData) return true
|
||||
if (fileRef._id.getTimestamp() > FILESTORE_SOFT_DELETE_START) return true
|
||||
}
|
||||
}
|
||||
if (Array.isArray(folder.folders)) {
|
||||
for (const child of folder.folders) {
|
||||
if (checkFileTreeNeedsResync(child)) return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {string} historyId
|
||||
* @return {Promise<Date>}
|
||||
*/
|
||||
async function getLastEndTimestamp(projectId, historyId) {
|
||||
const raw = await HistoryStoreManager.promises.getMostRecentVersionRaw(
|
||||
projectId,
|
||||
historyId
|
||||
)
|
||||
if (!raw) throw new Error('bug: history not initialized')
|
||||
return raw.endTimestamp
|
||||
}
|
||||
|
||||
/** @type {Record<string, (project: Project) => Promise<boolean>>} */
|
||||
const conditions = {
|
||||
// cheap: in-memory mongo lookup
|
||||
'updated after filestore soft-delete': async function (project) {
|
||||
return project.lastUpdated > FILESTORE_SOFT_DELETE_START
|
||||
},
|
||||
// cheap: in-memory mongo lookup
|
||||
'file-tree requires re-sync': async function (project) {
|
||||
return checkFileTreeNeedsResync(project.rootFolder?.[0])
|
||||
},
|
||||
// moderate: GET from Redis
|
||||
'has pending operations': async function (project) {
|
||||
const n = await RedisManager.promises.countUnprocessedUpdates(
|
||||
project._id.toString()
|
||||
)
|
||||
return n > 0
|
||||
},
|
||||
// expensive: GET from Mongo/Postgres via history-v1 HTTP API call
|
||||
'has been flushed after filestore soft-delete': async function (project) {
|
||||
// Resyncs started after soft-deleting can trigger 404s and result in empty files.
|
||||
const endTimestamp = await getLastEndTimestamp(
|
||||
project._id.toString(),
|
||||
project.overleaf.history.id.toString()
|
||||
)
|
||||
return endTimestamp > FILESTORE_SOFT_DELETE_START
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Project} project
|
||||
* @return {Promise<{projectId: string, historyId: string} | null>}
|
||||
*/
|
||||
async function checkProject(project) {
|
||||
if (gracefulShutdownInitiated) return null
|
||||
if (project._id.getTimestamp() > FILESTORE_READ_OFF) {
|
||||
STATS.skipped++ // Project created after all bugs were fixed.
|
||||
return null
|
||||
}
|
||||
const projectId = project._id.toString()
|
||||
const historyId = project.overleaf.history.id.toString()
|
||||
for (const [condition, check] of Object.entries(conditions)) {
|
||||
try {
|
||||
if (await check(project)) return { projectId, historyId }
|
||||
} catch (err) {
|
||||
logger.err({ projectId, condition, err }, 'failed to check project')
|
||||
STATS.checkFailure++
|
||||
return null
|
||||
}
|
||||
}
|
||||
STATS.skipped++
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {string} historyId
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function processProject(projectId, historyId) {
|
||||
if (gracefulShutdownInitiated) return
|
||||
const t0 = performance.now()
|
||||
try {
|
||||
await tryProcessProject(projectId, historyId)
|
||||
const latency = performance.now() - t0
|
||||
logger.info({ projectId, historyId, latency }, 'processed project')
|
||||
STATS.success++
|
||||
} catch (err) {
|
||||
logger.err({ err, projectId, historyId }, 'failed to process project')
|
||||
STATS.failure++
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function flushWithRetries(projectId) {
|
||||
for (let attempt = 0; attempt < FLUSH_RETRIES; attempt++) {
|
||||
try {
|
||||
await UpdatesProcessor.promises.processUpdatesForProject(projectId)
|
||||
return
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ projectId, err, attempt },
|
||||
'failed to flush updates, trying again'
|
||||
)
|
||||
if (gracefulShutdownInitiated) throw err
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await UpdatesProcessor.promises.processUpdatesForProject(projectId)
|
||||
} catch (err) {
|
||||
// @ts-ignore err is Error
|
||||
throw new OError('failed to flush updates', {}, err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} projectId
|
||||
* @param {string} historyId
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function tryProcessProject(projectId, historyId) {
|
||||
await flushWithRetries(projectId)
|
||||
const start = new Date()
|
||||
let needsFullSync = false
|
||||
try {
|
||||
await UpdatesProcessor.promises.startResyncAndProcessUpdatesUnderLock(
|
||||
projectId,
|
||||
{ resyncProjectStructureOnly: true }
|
||||
)
|
||||
} catch (err) {
|
||||
if (err instanceof NeedFullProjectStructureResyncError) {
|
||||
needsFullSync = true
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
if (needsFullSync) {
|
||||
logger.warn(
|
||||
{ projectId, historyId },
|
||||
'structure only resync not sufficient, doing full soft resync'
|
||||
)
|
||||
await SyncManager.promises.startResync(projectId, {})
|
||||
await UpdatesProcessor.promises.processUpdatesForProject(projectId)
|
||||
STATS.changed++
|
||||
} else {
|
||||
const after = await getLastEndTimestamp(projectId, historyId)
|
||||
if (after > start) {
|
||||
STATS.changed++
|
||||
}
|
||||
}
|
||||
// Avoid db.projectHistorySyncState from growing for each project we resynced.
|
||||
// MongoDB collections cannot shrink on their own. In case of success, purge
|
||||
// the db entry created by this script right away.
|
||||
await SyncManager.promises.clearResyncStateIfAllAfter(projectId, start)
|
||||
}
|
||||
|
||||
async function processBatch(projects) {
|
||||
const projectIds = (
|
||||
await promiseMapWithLimit(READ_CONCURRENCY, projects, checkProject)
|
||||
).filter(id => !!id)
|
||||
await promiseMapWithLimit(WRITE_CONCURRENCY, projectIds, ids =>
|
||||
processProject(ids.projectId, ids.historyId)
|
||||
)
|
||||
|
||||
if (gracefulShutdownInitiated) throw new Error('graceful shutdown triggered')
|
||||
}
|
||||
|
||||
async function processProjectsFromLog() {
|
||||
const rl = readline.createInterface({
|
||||
input: fs.createReadStream(argv.logs),
|
||||
})
|
||||
for await (const line of rl) {
|
||||
if (gracefulShutdownInitiated) break
|
||||
STATS.processedLines++
|
||||
if (!line.startsWith('{')) continue
|
||||
const { projectId, historyId, msg } = JSON.parse(line)
|
||||
if (msg !== 'failed to process project') continue
|
||||
await processProject(projectId, historyId) // does try/catch with logging
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
if (argv.logs) {
|
||||
await processProjectsFromLog()
|
||||
return
|
||||
}
|
||||
await batchedUpdate(db.projects, {}, processBatch, {
|
||||
_id: 1,
|
||||
lastUpdated: 1,
|
||||
'overleaf.history': 1,
|
||||
rootFolder: 1,
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
try {
|
||||
await main()
|
||||
} finally {
|
||||
clearInterval(logInterval)
|
||||
logStats()
|
||||
Metrics.close()
|
||||
await mongoClient.close()
|
||||
// TODO(das7pad): graceful shutdown for redis. Refactor process.exit when done.
|
||||
}
|
||||
console.log('Done.')
|
||||
await setTimeout(1_000)
|
||||
if (STATS.failure) {
|
||||
process.exit(Math.min(STATS.failure, 99))
|
||||
} else {
|
||||
process.exit(0)
|
||||
}
|
||||
} catch (err) {
|
||||
logger.err({ err }, 'fatal error')
|
||||
await setTimeout(1_000)
|
||||
process.exit(100)
|
||||
}
|
||||
Reference in New Issue
Block a user