diff --git a/services/history-v1/storage/lib/backupArchiver.mjs b/services/history-v1/storage/lib/backupArchiver.mjs new file mode 100644 index 0000000000..7e831140be --- /dev/null +++ b/services/history-v1/storage/lib/backupArchiver.mjs @@ -0,0 +1,236 @@ +// @ts-check +import path from 'node:path' +import projectKey from './project_key.js' +import { + chunksBucket, + backupPersistor, + projectBlobsBucket, +} from './backupPersistor.mjs' +import archiver from 'archiver' +import { Chunk, History } from 'overleaf-editor-core' +import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js' +import streams from './streams.js' +import objectPersistor from '@overleaf/object-persistor' +import OError from '@overleaf/o-error' +import chunkStore from './chunk_store/index.js' +import { loadChunk } from './backupVerifier.mjs' +import logger from '@overleaf/logger' + +/** + * @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor + */ + +/** + * @typedef {(import('archiver').Archiver)} Archiver + */ + +/** + * @typedef {(import('overleaf-editor-core').FileMap)} FileMap + */ + +/** + * + * @param historyId + * @return {Promise} + */ +async function getProjectPersistor(historyId) { + return await backupPersistor.forProjectRO( + projectBlobsBucket, + makeProjectKey(historyId, '') + ) +} + +/** + * + * @param persistor + * @param {string} key + * @return {Promise<{chunkData: any, buffer: Buffer}>} + */ +async function loadChunkByKey(persistor, key) { + try { + const buf = await streams.gunzipStreamToBuffer( + await persistor.getObjectStream(chunksBucket, key) + ) + return { chunkData: JSON.parse(buf.toString('utf-8')), buffer: buf } + } catch (err) { + if (err instanceof objectPersistor.Errors.NotFoundError) { + throw new Chunk.NotPersistedError('chunk not found') + } + if (err instanceof Error) { + throw OError.tag(err, 'Failed to load chunk', { key }) + } + throw err + } +} + +/** + * + * @param {string} historyId + * @param {string} hash + * @param {CachedPerProjectEncryptedS3Persistor} persistor + * @return {Promise} + */ +async function fetchBlob(historyId, hash, persistor) { + const path = makeProjectKey(historyId, hash) + return await persistor.getObjectStream(projectBlobsBucket, path, { + autoGunzip: true, + }) +} + +/** + * + * @param {History} history + * @param {Archiver} archive + * @param {CachedPerProjectEncryptedS3Persistor} projectCache + * @param {string} historyId + * @param {string} [prefix] Should include trailing slash (if length > 0) + * @returns {Promise} + */ +async function addChunkToArchive( + history, + archive, + projectCache, + historyId, + prefix = '' +) { + const chunkBlobs = new Set() + history.findBlobHashes(chunkBlobs) + const files = getBlobMap(history, chunkBlobs) + + logger.debug({ chunkBlobs, files }, 'Adding blobs to archive') + + for (const chunkBlob of chunkBlobs) { + if (GLOBAL_BLOBS.has(chunkBlob)) { + logger.debug('Skipping global blob:', chunkBlob) + continue + } + const blobStream = await fetchBlob(historyId, chunkBlob, projectCache) + + let name = chunkBlob + + if (files.has(chunkBlob)) { + name = files.get(chunkBlob) + } else { + logger.debug('Blob not found in file map:', chunkBlob) + } + + archive.append(blobStream, { + name: `${prefix}${name}`, + }) + } +} + +/** + * + * @param {string} historyId + * @return {Promise} + */ +async function findStartVersionOfLatestChunk(historyId) { + const backend = chunkStore.getBackend(historyId) + const chunk = await backend.getLatestChunk(historyId, { readOnly: true }) + if (!chunk) { + throw new Error('Latest chunk could not be loaded') + } + return chunk.startVersion +} + +/** + * + * @param {History} history + * @param {Set} chunkBlobs + * @return {Map} + */ +function getBlobMap(history, chunkBlobs) { + const files = new Map() + + history.changes.forEach(change => { + change.operations.forEach(operation => { + if (operation.getFile) { + const file = operation.getFile() + if (chunkBlobs.has(file.data.hash)) { + files.set(file.data.hash, operation.pathname) + } + } + }) + }) + return files +} + +/** + * Restore a project from the latest snapshot + * + * There is an assumption that the database backup has been restored. + * + * @param {Archiver} archive + * @param {string} historyId + * @return {Promise} + */ +export async function archiveLatestChunk(archive, historyId) { + const projectCache = await getProjectPersistor(historyId) + + const startVersion = await findStartVersionOfLatestChunk(historyId) + + const backedUpChunkRaw = await loadChunk( + historyId, + startVersion, + projectCache + ) + + const backedUpChunk = History.fromRaw(backedUpChunkRaw) + + await addChunkToArchive(backedUpChunk, archive, projectCache, historyId) + + return archive +} + +/** + * Download raw files from the backup. + * + * This can work without the database being backed up. + * + * It will split the project into chunks per directory and download the blobs alongside the chunk. + * + * @param {Archiver} archive + * @param {string} historyId + * @return {Promise} + */ +export async function archiveRawProject(archive, historyId) { + const projectCache = await getProjectPersistor(historyId) + + const key = path.join(projectKey.format(historyId), projectKey.pad(0)) + + const { contents: chunks } = await projectCache.listDirectory( + chunksBucket, + key + ) + + if (chunks.length === 0) { + throw new Error('No chunks found') + } + + for (const chunkRecord of chunks) { + logger.debug({ key: chunkRecord.Key }, 'Processing chunk') + if (!chunkRecord.Key) { + continue + } + const chunkId = chunkRecord.Key.split('/').pop() + const { chunkData, buffer } = await loadChunkByKey( + projectCache, + chunkRecord.Key + ) + + archive.append(buffer, { + name: `${historyId}/chunks/${chunkId}/chunk.json`, + }) + + const chunk = History.fromRaw(chunkData) + + await addChunkToArchive( + chunk, + archive, + projectCache, + historyId, + `${historyId}/chunks/${chunkId}/` + ) + } +} diff --git a/services/history-v1/storage/scripts/recover_zip_from_backup.mjs b/services/history-v1/storage/scripts/recover_zip_from_backup.mjs new file mode 100644 index 0000000000..cd63086c89 --- /dev/null +++ b/services/history-v1/storage/scripts/recover_zip_from_backup.mjs @@ -0,0 +1,162 @@ +// @ts-check +import { loadGlobalBlobs } from '../lib/blob_store/index.js' +import commandLineArgs from 'command-line-args' +import assert from '../lib/assert.js' +import fs from 'node:fs' +import { setTimeout } from 'node:timers/promises' +import { + archiveLatestChunk, + archiveRawProject, +} from '../lib/backupArchiver.mjs' +import knex from '../lib/knex.js' +import { client } from '../lib/mongodb.js' +import archiver from 'archiver' + +const SUPPORTED_MODES = ['raw', 'latest'] + +// outputFile needs to be available in the shutdown function (which may be called before it's declared) +// eslint-disable-next-line prefer-const +let outputFile + +/** + * Gracefully shutdown the process + * @param {number} code + */ +async function shutdown(code = 0) { + if (outputFile) { + outputFile.close() + } + await Promise.all([knex.destroy(), client.close()]) + await setTimeout(1000) + process.exit(code) +} + +/** + * @typedef {import('archiver').ZipArchive} ZipArchive + */ + +/** + * @typedef {import('archiver').ProgressData} ProgressData + */ + +/** + * @typedef {import('archiver').EntryData} EntryData + */ + +/** + * @typedef {Object} ArchiverError + * @property {string} message + * @property {string} code + * @property {Object} data + */ + +const { historyId, mode, output, verbose } = commandLineArgs([ + { name: 'historyId', type: String }, + { name: 'output', type: String }, + { name: 'mode', type: String, defaultValue: 'raw' }, + { name: 'verbose', type: String, defaultValue: false }, +]) + +if (!historyId) { + console.error('missing --historyId') + await shutdown(1) +} + +if (!output) { + console.error('missing --output') + + await shutdown(1) +} + +try { + assert.projectId(historyId) +} catch (error) { + console.error('Invalid history ID') + await shutdown(1) +} + +if (!SUPPORTED_MODES.includes(mode)) { + console.error( + 'Invalid mode; supported modes are: ' + SUPPORTED_MODES.join(', ') + ) + await shutdown(1) +} + +await loadGlobalBlobs() + +outputFile = fs.createWriteStream(output) + +const archive = archiver.create('zip', {}) + +archive.on('close', function () { + console.log(archive.pointer() + ' total bytes') + console.log(`Wrote ${output}`) + shutdown().catch(e => console.error('Error shutting down', e)) +}) + +archive.on( + 'error', + /** + * + * @param {ArchiverError} e + */ + function (e) { + console.error(`Error writing archive: ${e.message}`) + } +) + +archive.on('end', function () { + console.log(`Wrote ${archive.pointer()} total bytes to ${output}`) + shutdown().catch(e => console.error('Error shutting down', e)) +}) + +archive.on( + 'progress', + /** + * + * @param {ProgressData} progress + */ + function (progress) { + if (verbose) { + console.log(`${progress.entries.processed} / ${progress.entries.total}`) + } + } +) + +archive.on( + 'entry', + /** + * + * @param {EntryData} entry + */ + function (entry) { + if (verbose) { + console.log(`${entry.name} added`) + } + } +) + +archive.on( + 'warning', + /** + * + * @param {ArchiverError} warning + */ + function (warning) { + console.warn(`Warning writing archive: ${warning.message}`) + } +) + +switch (mode) { + case 'latest': + await archiveLatestChunk(archive, historyId) + break + case 'raw': + default: + await archiveRawProject(archive, historyId) + break +} + +archive.pipe(outputFile) + +await archive.finalize()