diff --git a/services/history-v1/storage/lib/backupBlob.mjs b/services/history-v1/storage/lib/backupBlob.mjs index ecb5689eab..715ac68758 100644 --- a/services/history-v1/storage/lib/backupBlob.mjs +++ b/services/history-v1/storage/lib/backupBlob.mjs @@ -167,7 +167,7 @@ export async function storeBlobBackup(projectId, hash) { * @return {Promise<*>} * @private */ -export async function _blobIsBackedUp(projectId, hash) { +export async function blobIsBackedUp(projectId, hash) { const blobs = await backedUpBlobs.findOne( { _id: new ObjectId(projectId), @@ -185,7 +185,7 @@ export async function _blobIsBackedUp(projectId, hash) { * @param {Blob} blob - The blob that is being backed up * @param {string} tmpPath - The path to a temporary file storing the contents of the blob. * @param {CachedPerProjectEncryptedS3Persistor} [persistor] - The persistor to use (optional) - * @return {Promise} + * @return {Promise} */ export async function backupBlob(historyId, blob, tmpPath, persistor) { const hash = blob.getHash() @@ -200,17 +200,17 @@ export async function backupBlob(historyId, blob, tmpPath, persistor) { if (globalBlob && !globalBlob.demoted) { recordBackupConclusion('skipped', 'global') logger.debug({ projectId, hash }, 'Blob is global - skipping backup') - return + return 'global' } try { - if (await _blobIsBackedUp(projectId, hash)) { + if (await blobIsBackedUp(projectId, hash)) { recordBackupConclusion('skipped', 'already_backed_up') logger.debug( { projectId, hash }, 'Blob already backed up - skipping backup' ) - return + return 'already-recorded' } } catch (error) { logger.warn({ error }, 'Failed to check if blob is backed up') @@ -241,7 +241,7 @@ export async function backupBlob(historyId, blob, tmpPath, persistor) { await storeBlobBackup(projectId, hash) recordBackupConclusion('failure', 'already_backed_up') // Blob already backed up so report success - return + return 'already-written' } recordBackupConclusion('failure') logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup') diff --git a/services/history-v1/storage/scripts/backup_blob.mjs b/services/history-v1/storage/scripts/backup_blob.mjs index 314b05313e..c03c229bd3 100644 --- a/services/history-v1/storage/scripts/backup_blob.mjs +++ b/services/history-v1/storage/scripts/backup_blob.mjs @@ -1,18 +1,34 @@ // @ts-check import commandLineArgs from 'command-line-args' -import { backupBlob, downloadBlobToDir } from '../lib/backupBlob.mjs' +import { + backupBlob, + downloadBlobToDir, + blobIsBackedUp, +} from '../lib/backupBlob.mjs' +import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs' import withTmpDir from '../../api/controllers/with_tmp_dir.js' import { BlobStore, GLOBAL_BLOBS, loadGlobalBlobs, + makeProjectKey, } from '../lib/blob_store/index.js' +import { + getBackupStatus, + unsetBackedUpBlobHashes, +} from '../lib/backup_store/index.js' +import chunkStore from '../lib/chunk_store/index.js' import assert from '../lib/assert.js' import knex from '../lib/knex.js' import { client } from '../lib/mongodb.js' import redis from '../lib/redis.js' import { setTimeout } from 'node:timers/promises' import fs from 'node:fs' +import pLimit from 'p-limit' +import Events from 'node:events' + +// Silence warning. +Events.setMaxListeners(20) await loadGlobalBlobs() @@ -120,6 +136,32 @@ async function initialiseJobs({ historyId, hash, input }) { return [{ hash, historyId }] } +/** + * @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor + */ + +/** @type {Map>} */ +const persistorCache = new Map() + +/** + * @param {string} historyId + * @returns {Promise} + */ +function getPersistor(historyId) { + let persistorPromise = persistorCache.get(historyId) + if (!persistorPromise) { + persistorPromise = backupPersistor.forProject( + projectBlobsBucket, + makeProjectKey(historyId, '') + ) + persistorCache.set(historyId, persistorPromise) + } + return persistorPromise +} + +// Track processed objects to handle input csv files with duplicate entries +const processedObjects = new Set() + /** * * @param {string} historyId @@ -127,16 +169,48 @@ async function initialiseJobs({ historyId, hash, input }) { * @return {Promise} */ export async function downloadAndBackupBlob(historyId, hash) { + const key = `${historyId}:${hash}` + if (processedObjects.has(key)) { + console.log(`${historyId} ${hash} skipping previously processed blob`) + return + } else { + processedObjects.add(key) + } + const backend = chunkStore.getBackend(historyId) + const projectId = await backend.resolveHistoryIdToMongoProjectId(historyId) + // Check whether the project still exists + try { + await getBackupStatus(projectId) + } catch (err) { + if (err instanceof Error && err.message === 'Project not found') { + console.log(`${historyId} ${hash} project not found (expired)`) + return + } else if (err instanceof Error && err.message === 'Project deleted') { + console.log(`${historyId} ${hash} project deleted but not expired`) + // continue and allow backing up blob for a deleted project in case it is undeleted in future + } else { + throw err + } + } + // Force clearning of any backed up blob record + if (options.clear) { + await unsetBackedUpBlobHashes(projectId, [hash]) + } else if (await blobIsBackedUp(projectId, hash)) { + // Check if the blob is already backed up + console.log(`${historyId} ${hash} already backed up`) + return + } + const persistor = await getPersistor(historyId) const blobStore = new BlobStore(historyId) const blob = await blobStore.getBlob(hash) if (!blob) { - throw new Error(`Blob ${hash} could not be loaded`) + throw new Error(`Blob ${hash} could not be loaded for history ${historyId}`) } - await withTmpDir(`blob-${hash}`, async tmpDir => { + await withTmpDir(`blob-${historyId}-${hash}`, async tmpDir => { const filePath = await downloadBlobToDir(historyId, blob, tmpDir) - console.log(`Downloaded blob ${hash} to ${filePath}`) - await backupBlob(historyId, blob, filePath) - console.log('Backed up blob') + console.log(`${historyId} ${hash} Downloaded blob ${filePath}`) + const status = await backupBlob(historyId, blob, filePath, persistor) + console.log(`${historyId} ${hash} Blob`, status ?? 'backed up') }) } @@ -146,6 +220,8 @@ const options = commandLineArgs([ { name: 'historyId', type: String }, { name: 'hash', type: String }, { name: 'input', type: String }, + { name: 'concurrency', alias: 'c', type: Number, defaultValue: 1 }, + { name: 'clear', type: Boolean }, ]) try { @@ -162,12 +238,31 @@ if (!Array.isArray(jobs)) { process.exit(1) } -for (const { historyId, hash } of jobs) { +const limit = pLimit(options.concurrency) +let successCount = 0 +let failedCount = 0 +const totalJobs = jobs.length + +/** + * @param {string} historyId + * @param {string} hash + */ +async function runJob(historyId, hash) { try { await downloadAndBackupBlob(historyId, hash) + successCount++ } catch (error) { - console.error(error) + console.error(`${historyId} ${hash} Error:`, error) process.exitCode = 1 + failedCount++ } } + +const promises = jobs.map(({ historyId, hash }) => + limit(runJob, historyId, hash) +) +await Promise.all(promises) +console.log( + `Backup complete: ${successCount} succeeded, ${failedCount} failed, ${totalJobs} total` +) await gracefulShutdown()