Merge pull request #30000 from overleaf/bg-extend-backup-blob-script

extend backup blob script

GitOrigin-RevId: 226e624f0fd93bfe4890acce728ce8005f9787c5
This commit is contained in:
Brian Gough
2025-12-05 15:43:22 +00:00
committed by Copybot
parent 6a93b6e76a
commit b9888957e4
2 changed files with 109 additions and 14 deletions

View File

@@ -167,7 +167,7 @@ export async function storeBlobBackup(projectId, hash) {
* @return {Promise<*>}
* @private
*/
export async function _blobIsBackedUp(projectId, hash) {
export async function blobIsBackedUp(projectId, hash) {
const blobs = await backedUpBlobs.findOne(
{
_id: new ObjectId(projectId),
@@ -185,7 +185,7 @@ export async function _blobIsBackedUp(projectId, hash) {
* @param {Blob} blob - The blob that is being backed up
* @param {string} tmpPath - The path to a temporary file storing the contents of the blob.
* @param {CachedPerProjectEncryptedS3Persistor} [persistor] - The persistor to use (optional)
* @return {Promise<void>}
* @return {Promise<string|void>}
*/
export async function backupBlob(historyId, blob, tmpPath, persistor) {
const hash = blob.getHash()
@@ -200,17 +200,17 @@ export async function backupBlob(historyId, blob, tmpPath, persistor) {
if (globalBlob && !globalBlob.demoted) {
recordBackupConclusion('skipped', 'global')
logger.debug({ projectId, hash }, 'Blob is global - skipping backup')
return
return 'global'
}
try {
if (await _blobIsBackedUp(projectId, hash)) {
if (await blobIsBackedUp(projectId, hash)) {
recordBackupConclusion('skipped', 'already_backed_up')
logger.debug(
{ projectId, hash },
'Blob already backed up - skipping backup'
)
return
return 'already-recorded'
}
} catch (error) {
logger.warn({ error }, 'Failed to check if blob is backed up')
@@ -241,7 +241,7 @@ export async function backupBlob(historyId, blob, tmpPath, persistor) {
await storeBlobBackup(projectId, hash)
recordBackupConclusion('failure', 'already_backed_up')
// Blob already backed up so report success
return
return 'already-written'
}
recordBackupConclusion('failure')
logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup')

View File

@@ -1,18 +1,34 @@
// @ts-check
import commandLineArgs from 'command-line-args'
import { backupBlob, downloadBlobToDir } from '../lib/backupBlob.mjs'
import {
backupBlob,
downloadBlobToDir,
blobIsBackedUp,
} from '../lib/backupBlob.mjs'
import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs'
import withTmpDir from '../../api/controllers/with_tmp_dir.js'
import {
BlobStore,
GLOBAL_BLOBS,
loadGlobalBlobs,
makeProjectKey,
} from '../lib/blob_store/index.js'
import {
getBackupStatus,
unsetBackedUpBlobHashes,
} from '../lib/backup_store/index.js'
import chunkStore from '../lib/chunk_store/index.js'
import assert from '../lib/assert.js'
import knex from '../lib/knex.js'
import { client } from '../lib/mongodb.js'
import redis from '../lib/redis.js'
import { setTimeout } from 'node:timers/promises'
import fs from 'node:fs'
import pLimit from 'p-limit'
import Events from 'node:events'
// Silence warning.
Events.setMaxListeners(20)
await loadGlobalBlobs()
@@ -120,6 +136,32 @@ async function initialiseJobs({ historyId, hash, input }) {
return [{ hash, historyId }]
}
/**
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
*/
/** @type {Map<string, Promise<CachedPerProjectEncryptedS3Persistor>>} */
const persistorCache = new Map()
/**
* @param {string} historyId
* @returns {Promise<CachedPerProjectEncryptedS3Persistor>}
*/
function getPersistor(historyId) {
let persistorPromise = persistorCache.get(historyId)
if (!persistorPromise) {
persistorPromise = backupPersistor.forProject(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
persistorCache.set(historyId, persistorPromise)
}
return persistorPromise
}
// Track processed objects to handle input csv files with duplicate entries
const processedObjects = new Set()
/**
*
* @param {string} historyId
@@ -127,16 +169,48 @@ async function initialiseJobs({ historyId, hash, input }) {
* @return {Promise<void>}
*/
export async function downloadAndBackupBlob(historyId, hash) {
const key = `${historyId}:${hash}`
if (processedObjects.has(key)) {
console.log(`${historyId} ${hash} skipping previously processed blob`)
return
} else {
processedObjects.add(key)
}
const backend = chunkStore.getBackend(historyId)
const projectId = await backend.resolveHistoryIdToMongoProjectId(historyId)
// Check whether the project still exists
try {
await getBackupStatus(projectId)
} catch (err) {
if (err instanceof Error && err.message === 'Project not found') {
console.log(`${historyId} ${hash} project not found (expired)`)
return
} else if (err instanceof Error && err.message === 'Project deleted') {
console.log(`${historyId} ${hash} project deleted but not expired`)
// continue and allow backing up blob for a deleted project in case it is undeleted in future
} else {
throw err
}
}
// Force clearning of any backed up blob record
if (options.clear) {
await unsetBackedUpBlobHashes(projectId, [hash])
} else if (await blobIsBackedUp(projectId, hash)) {
// Check if the blob is already backed up
console.log(`${historyId} ${hash} already backed up`)
return
}
const persistor = await getPersistor(historyId)
const blobStore = new BlobStore(historyId)
const blob = await blobStore.getBlob(hash)
if (!blob) {
throw new Error(`Blob ${hash} could not be loaded`)
throw new Error(`Blob ${hash} could not be loaded for history ${historyId}`)
}
await withTmpDir(`blob-${hash}`, async tmpDir => {
await withTmpDir(`blob-${historyId}-${hash}`, async tmpDir => {
const filePath = await downloadBlobToDir(historyId, blob, tmpDir)
console.log(`Downloaded blob ${hash} to ${filePath}`)
await backupBlob(historyId, blob, filePath)
console.log('Backed up blob')
console.log(`${historyId} ${hash} Downloaded blob ${filePath}`)
const status = await backupBlob(historyId, blob, filePath, persistor)
console.log(`${historyId} ${hash} Blob`, status ?? 'backed up')
})
}
@@ -146,6 +220,8 @@ const options = commandLineArgs([
{ name: 'historyId', type: String },
{ name: 'hash', type: String },
{ name: 'input', type: String },
{ name: 'concurrency', alias: 'c', type: Number, defaultValue: 1 },
{ name: 'clear', type: Boolean },
])
try {
@@ -162,12 +238,31 @@ if (!Array.isArray(jobs)) {
process.exit(1)
}
for (const { historyId, hash } of jobs) {
const limit = pLimit(options.concurrency)
let successCount = 0
let failedCount = 0
const totalJobs = jobs.length
/**
* @param {string} historyId
* @param {string} hash
*/
async function runJob(historyId, hash) {
try {
await downloadAndBackupBlob(historyId, hash)
successCount++
} catch (error) {
console.error(error)
console.error(`${historyId} ${hash} Error:`, error)
process.exitCode = 1
failedCount++
}
}
const promises = jobs.map(({ historyId, hash }) =>
limit(runJob, historyId, hash)
)
await Promise.all(promises)
console.log(
`Backup complete: ${successCount} succeeded, ${failedCount} failed, ${totalJobs} total`
)
await gracefulShutdown()