diff --git a/services/history-v1/storage/lib/backupArchiver.mjs b/services/history-v1/storage/lib/backupArchiver.mjs index 7e831140be..30a9708879 100644 --- a/services/history-v1/storage/lib/backupArchiver.mjs +++ b/services/history-v1/storage/lib/backupArchiver.mjs @@ -5,16 +5,172 @@ import { chunksBucket, backupPersistor, projectBlobsBucket, + globalBlobsBucket as backupGlobalBlobsBucket, } from './backupPersistor.mjs' -import archiver from 'archiver' -import { Chunk, History } from 'overleaf-editor-core' -import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js' +import core, { Chunk, History } from 'overleaf-editor-core' +import { + GLOBAL_BLOBS, + makeProjectKey, + getStringLengthOfFile, + makeGlobalKey, +} from './blob_store/index.js' import streams from './streams.js' import objectPersistor from '@overleaf/object-persistor' import OError from '@overleaf/o-error' import chunkStore from './chunk_store/index.js' -import { loadChunk } from './backupVerifier.mjs' import logger from '@overleaf/logger' +import fs from 'node:fs' +import { pipeline } from 'node:stream/promises' +import withTmpDir from '../../api/controllers/with_tmp_dir.js' +import { loadChunk } from './backupVerifier.mjs' +import globalBlobPersistor from './persistor.js' +import config from 'config' +import { NoKEKMatchedError } from '@overleaf/object-persistor/src/Errors.js' + +const globalBlobsBucket = config.get('blobStore.globalBucket') + +class BackupBlobStore { + /** + * + * @param {string} historyId + * @param {string} tmp + * @param {CachedPerProjectEncryptedS3Persistor} persistor + * @param {boolean} useBackupGlobalBlobs + */ + constructor(historyId, tmp, persistor, useBackupGlobalBlobs) { + this.historyId = historyId + this.tmp = tmp + this.blobs = new Map() + this.persistor = persistor + this.useBackupGlobalBlobs = useBackupGlobalBlobs + } + + /** + * Required for BlobStore interface - not supported. + * + * @template T + * @return {Promise} + */ + async getObject() { + throw new Error('Not implemented') + } + + /** + * + * @param {Set} hashes + * @return {Promise} + */ + async fetchBlobs(hashes) { + for await (const hash of hashes) { + if (this.blobs.has(hash)) return + const path = `${this.tmp}/${hash}` + /** @type {core.Blob} */ + let blob + /** @type {NodeJS.ReadableStream} */ + let blobStream + if (GLOBAL_BLOBS.has(hash)) { + try { + const blobData = await this.fetchGlobalBlob(hash) + await pipeline(blobData.stream, fs.createWriteStream(path)) + blob = blobData.blob + } catch (err) { + logger.warn({ hash, err }, 'Failed to fetch global blob') + continue + } + } else { + try { + blobStream = await fetchBlob(this.historyId, hash, this.persistor) + await pipeline(blobStream, fs.createWriteStream(path)) + blob = await this.makeBlob(hash, path) + } catch (err) { + logger.warn({ err, hash }, 'Failed to fetch chunk blob') + continue + } + } + + this.blobs.set(hash, blob) + } + } + + /** + * + * @param {string} hash + * @return {Promise<{ blob: core.Blob, stream: NodeJS.ReadableStream }>} + */ + async fetchGlobalBlob(hash) { + const globalBlob = GLOBAL_BLOBS.get(hash) + if (!globalBlob) { + throw new Error('blob does not exist or is not a global blob') + } + let stream + + const key = makeGlobalKey(hash) + + if (this.useBackupGlobalBlobs) { + stream = await this.persistor.getObjectStream( + backupGlobalBlobsBucket, + key + ) + } else { + stream = await globalBlobPersistor.getObjectStream(globalBlobsBucket, key) + } + return { blob: globalBlob.blob, stream } + } + + /** + * + * @param {string} hash + * @param {string} pathname + * @return {Promise} + */ + async makeBlob(hash, pathname) { + const stat = await fs.promises.stat(pathname) + const byteLength = stat.size + const stringLength = await getStringLengthOfFile(byteLength, pathname) + if (stringLength) { + return new core.Blob(hash, byteLength, stringLength) + } + return new core.Blob(hash, byteLength) + } + + /** + * + * @param {string} hash + * @return {Promise} + */ + async getString(hash) { + const stream = await this.getStream(hash) + const buffer = await streams.readStreamToBuffer(stream) + return buffer.toString() + } + + /** + * + * @param {string} hash + * @return {Promise} + */ + async getStream(hash) { + return fs.createReadStream(this.getBlobPathname(hash)) + } + + /** + * + * @param {string} hash + * @return {Promise} + */ + async getBlob(hash) { + return this.blobs.get(hash) + } + + /** + * + * @param {string} hash + * @return {string} + */ + getBlobPathname(hash) { + return path.join(this.tmp, hash) + } +} /** * @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor @@ -34,10 +190,21 @@ import logger from '@overleaf/logger' * @return {Promise} */ async function getProjectPersistor(historyId) { - return await backupPersistor.forProjectRO( - projectBlobsBucket, - makeProjectKey(historyId, '') - ) + try { + return await backupPersistor.forProjectRO( + projectBlobsBucket, + makeProjectKey(historyId, '') + ) + } catch (error) { + if (error instanceof NoKEKMatchedError) { + logger.info({}, 'no kek matched') + } + throw new BackupPersistorError( + 'Failed to get project persistor', + { historyId }, + error instanceof Error ? error : undefined + ) + } } /** @@ -77,13 +244,19 @@ async function fetchBlob(historyId, hash, persistor) { }) } +/** + * @typedef {object} AddChunkOptions + * @property {string} [prefix] Should include trailing slash (if length > 0) + * @property {boolean} [useBackupGlobalBlobs] + */ + /** * * @param {History} history * @param {Archiver} archive * @param {CachedPerProjectEncryptedS3Persistor} projectCache * @param {string} historyId - * @param {string} [prefix] Should include trailing slash (if length > 0) + * @param {AddChunkOptions} [options] * @returns {Promise} */ async function addChunkToArchive( @@ -91,33 +264,64 @@ async function addChunkToArchive( archive, projectCache, historyId, - prefix = '' + { prefix = '', useBackupGlobalBlobs = false } = {} ) { const chunkBlobs = new Set() history.findBlobHashes(chunkBlobs) - const files = getBlobMap(history, chunkBlobs) - logger.debug({ chunkBlobs, files }, 'Adding blobs to archive') + await withTmpDir('recovery-blob-', async tmpDir => { + const blobStore = new BackupBlobStore( + historyId, + tmpDir, + projectCache, + useBackupGlobalBlobs + ) + await blobStore.fetchBlobs(chunkBlobs) - for (const chunkBlob of chunkBlobs) { - if (GLOBAL_BLOBS.has(chunkBlob)) { - logger.debug('Skipping global blob:', chunkBlob) - continue + await history.loadFiles('lazy', blobStore) + + const snapshot = history.getSnapshot() + snapshot.applyAll(history.getChanges()) + + const filePaths = snapshot.getFilePathnames() + + if (filePaths.length === 0) { + logger.warn( + { historyId, projectVersion: snapshot.projectVersion }, + 'No files found in snapshot backup' + ) } - const blobStream = await fetchBlob(historyId, chunkBlob, projectCache) + for (const filePath of filePaths) { + /** @type {core.File | null | undefined} */ + const file = snapshot.getFile(filePath) + if (!file) { + logger.error({ filePath }, 'File not found in snapshot') + continue + } + await file.load('eager', blobStore) - let name = chunkBlob + const hash = file.getHash() - if (files.has(chunkBlob)) { - name = files.get(chunkBlob) - } else { - logger.debug('Blob not found in file map:', chunkBlob) + /** @type {string | fs.ReadStream | null | undefined} */ + let content = file.getContent({ filterTrackedDeletes: true }) + + if (content === null) { + if (!hash) { + logger.error({ filePath }, 'File does not have a hash') + continue + } + const blob = await blobStore.getBlob(hash) + if (!blob) { + logger.error({ filePath }, 'Blob not found in blob store') + continue + } + content = await blobStore.getStream(hash) + } + archive.append(content, { + name: `${prefix}${filePath}`, + }) } - - archive.append(blobStream, { - name: `${prefix}${name}`, - }) - } + }) } /** @@ -134,28 +338,6 @@ async function findStartVersionOfLatestChunk(historyId) { return chunk.startVersion } -/** - * - * @param {History} history - * @param {Set} chunkBlobs - * @return {Map} - */ -function getBlobMap(history, chunkBlobs) { - const files = new Map() - - history.changes.forEach(change => { - change.operations.forEach(operation => { - if (operation.getFile) { - const file = operation.getFile() - if (chunkBlobs.has(file.data.hash)) { - files.set(file.data.hash, operation.pathname) - } - } - }) - }) - return files -} - /** * Restore a project from the latest snapshot * @@ -163,9 +345,16 @@ function getBlobMap(history, chunkBlobs) { * * @param {Archiver} archive * @param {string} historyId + * @param {boolean} [useBackupGlobalBlobs] * @return {Promise} */ -export async function archiveLatestChunk(archive, historyId) { +export async function archiveLatestChunk( + archive, + historyId, + useBackupGlobalBlobs = false +) { + logger.info({ historyId, useBackupGlobalBlobs }, 'Archiving latest chunk') + const projectCache = await getProjectPersistor(historyId) const startVersion = await findStartVersionOfLatestChunk(historyId) @@ -178,7 +367,9 @@ export async function archiveLatestChunk(archive, historyId) { const backedUpChunk = History.fromRaw(backedUpChunkRaw) - await addChunkToArchive(backedUpChunk, archive, projectCache, historyId) + await addChunkToArchive(backedUpChunk, archive, projectCache, historyId, { + useBackupGlobalBlobs, + }) return archive } @@ -192,16 +383,19 @@ export async function archiveLatestChunk(archive, historyId) { * * @param {Archiver} archive * @param {string} historyId + * @param {boolean} [useBackupGlobalBlobs] * @return {Promise} */ -export async function archiveRawProject(archive, historyId) { +export async function archiveRawProject( + archive, + historyId, + useBackupGlobalBlobs = false +) { const projectCache = await getProjectPersistor(historyId) - const key = path.join(projectKey.format(historyId), projectKey.pad(0)) - const { contents: chunks } = await projectCache.listDirectory( chunksBucket, - key + projectKey.format(historyId) ) if (chunks.length === 0) { @@ -209,11 +403,13 @@ export async function archiveRawProject(archive, historyId) { } for (const chunkRecord of chunks) { - logger.debug({ key: chunkRecord.Key }, 'Processing chunk') if (!chunkRecord.Key) { + logger.debug({ chunkRecord }, 'no key') continue } const chunkId = chunkRecord.Key.split('/').pop() + logger.debug({ chunkId, key: chunkRecord.Key }, 'Processing chunk') + const { chunkData, buffer } = await loadChunkByKey( projectCache, chunkRecord.Key @@ -225,12 +421,11 @@ export async function archiveRawProject(archive, historyId) { const chunk = History.fromRaw(chunkData) - await addChunkToArchive( - chunk, - archive, - projectCache, - historyId, - `${historyId}/chunks/${chunkId}/` - ) + await addChunkToArchive(chunk, archive, projectCache, historyId, { + prefix: `${historyId}/chunks/${chunkId}/`, + useBackupGlobalBlobs, + }) } } + +export class BackupPersistorError extends OError {} diff --git a/services/history-v1/storage/scripts/recover_zip_from_backup.mjs b/services/history-v1/storage/scripts/recover_zip_from_backup.mjs index 7770057962..a16c450092 100644 --- a/services/history-v1/storage/scripts/recover_zip_from_backup.mjs +++ b/services/history-v1/storage/scripts/recover_zip_from_backup.mjs @@ -7,10 +7,16 @@ import { setTimeout } from 'node:timers/promises' import { archiveLatestChunk, archiveRawProject, + BackupPersistorError, } from '../lib/backupArchiver.mjs' import knex from '../lib/knex.js' import { client } from '../lib/mongodb.js' import archiver from 'archiver' +import Events from 'node:events' +import { Chunk } from 'overleaf-editor-core' + +// Silence warning. +Events.setMaxListeners(20) const SUPPORTED_MODES = ['raw', 'latest'] @@ -26,7 +32,8 @@ async function shutdown(code = 0) { if (outputFile) { outputFile.close() } - await Promise.all([knex.destroy(), client.close()]) + await knex.destroy() + await client.close() await setTimeout(1000) process.exit(code) } @@ -73,7 +80,7 @@ try { { name: 'help', type: Boolean }, ])) } catch (err) { - console.error(err.message) + console.error(err instanceof Error ? err.message : err) help = true } @@ -144,7 +151,9 @@ archive.on( */ function (progress) { if (verbose) { - console.log(`${progress.entries.processed} / ${progress.entries.total}`) + console.log( + `${progress.entries.processed} processed out of ${progress.entries.total}` + ) } } ) @@ -169,7 +178,7 @@ archive.on( * @param {ArchiverError} warning */ function (warning) { - console.warn(`Warning writing archive: ${warning.message}`) + console.warn(`Warning encountered when writing archive: ${warning.message}`) } )