mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
Add script for restoring projects from backup
GitOrigin-RevId: b639d74989afa17638bbcba0f8ee34c978fcd5d9
This commit is contained in:
236
services/history-v1/storage/lib/backupArchiver.mjs
Normal file
236
services/history-v1/storage/lib/backupArchiver.mjs
Normal file
@@ -0,0 +1,236 @@
|
||||
// @ts-check
|
||||
import path from 'node:path'
|
||||
import projectKey from './project_key.js'
|
||||
import {
|
||||
chunksBucket,
|
||||
backupPersistor,
|
||||
projectBlobsBucket,
|
||||
} from './backupPersistor.mjs'
|
||||
import archiver from 'archiver'
|
||||
import { Chunk, History } from 'overleaf-editor-core'
|
||||
import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
|
||||
import streams from './streams.js'
|
||||
import objectPersistor from '@overleaf/object-persistor'
|
||||
import OError from '@overleaf/o-error'
|
||||
import chunkStore from './chunk_store/index.js'
|
||||
import { loadChunk } from './backupVerifier.mjs'
|
||||
import logger from '@overleaf/logger'
|
||||
|
||||
/**
|
||||
* @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {(import('archiver').Archiver)} Archiver
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {(import('overleaf-editor-core').FileMap)} FileMap
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
* @param historyId
|
||||
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
||||
*/
|
||||
async function getProjectPersistor(historyId) {
|
||||
return await backupPersistor.forProjectRO(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, '')
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param persistor
|
||||
* @param {string} key
|
||||
* @return {Promise<{chunkData: any, buffer: Buffer}>}
|
||||
*/
|
||||
async function loadChunkByKey(persistor, key) {
|
||||
try {
|
||||
const buf = await streams.gunzipStreamToBuffer(
|
||||
await persistor.getObjectStream(chunksBucket, key)
|
||||
)
|
||||
return { chunkData: JSON.parse(buf.toString('utf-8')), buffer: buf }
|
||||
} catch (err) {
|
||||
if (err instanceof objectPersistor.Errors.NotFoundError) {
|
||||
throw new Chunk.NotPersistedError('chunk not found')
|
||||
}
|
||||
if (err instanceof Error) {
|
||||
throw OError.tag(err, 'Failed to load chunk', { key })
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} historyId
|
||||
* @param {string} hash
|
||||
* @param {CachedPerProjectEncryptedS3Persistor} persistor
|
||||
* @return {Promise<NodeJS.ReadableStream>}
|
||||
*/
|
||||
async function fetchBlob(historyId, hash, persistor) {
|
||||
const path = makeProjectKey(historyId, hash)
|
||||
return await persistor.getObjectStream(projectBlobsBucket, path, {
|
||||
autoGunzip: true,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {History} history
|
||||
* @param {Archiver} archive
|
||||
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
|
||||
* @param {string} historyId
|
||||
* @param {string} [prefix] Should include trailing slash (if length > 0)
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function addChunkToArchive(
|
||||
history,
|
||||
archive,
|
||||
projectCache,
|
||||
historyId,
|
||||
prefix = ''
|
||||
) {
|
||||
const chunkBlobs = new Set()
|
||||
history.findBlobHashes(chunkBlobs)
|
||||
const files = getBlobMap(history, chunkBlobs)
|
||||
|
||||
logger.debug({ chunkBlobs, files }, 'Adding blobs to archive')
|
||||
|
||||
for (const chunkBlob of chunkBlobs) {
|
||||
if (GLOBAL_BLOBS.has(chunkBlob)) {
|
||||
logger.debug('Skipping global blob:', chunkBlob)
|
||||
continue
|
||||
}
|
||||
const blobStream = await fetchBlob(historyId, chunkBlob, projectCache)
|
||||
|
||||
let name = chunkBlob
|
||||
|
||||
if (files.has(chunkBlob)) {
|
||||
name = files.get(chunkBlob)
|
||||
} else {
|
||||
logger.debug('Blob not found in file map:', chunkBlob)
|
||||
}
|
||||
|
||||
archive.append(blobStream, {
|
||||
name: `${prefix}${name}`,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} historyId
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
async function findStartVersionOfLatestChunk(historyId) {
|
||||
const backend = chunkStore.getBackend(historyId)
|
||||
const chunk = await backend.getLatestChunk(historyId, { readOnly: true })
|
||||
if (!chunk) {
|
||||
throw new Error('Latest chunk could not be loaded')
|
||||
}
|
||||
return chunk.startVersion
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {History} history
|
||||
* @param {Set<string>} chunkBlobs
|
||||
* @return {Map<string, string>}
|
||||
*/
|
||||
function getBlobMap(history, chunkBlobs) {
|
||||
const files = new Map()
|
||||
|
||||
history.changes.forEach(change => {
|
||||
change.operations.forEach(operation => {
|
||||
if (operation.getFile) {
|
||||
const file = operation.getFile()
|
||||
if (chunkBlobs.has(file.data.hash)) {
|
||||
files.set(file.data.hash, operation.pathname)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
return files
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore a project from the latest snapshot
|
||||
*
|
||||
* There is an assumption that the database backup has been restored.
|
||||
*
|
||||
* @param {Archiver} archive
|
||||
* @param {string} historyId
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function archiveLatestChunk(archive, historyId) {
|
||||
const projectCache = await getProjectPersistor(historyId)
|
||||
|
||||
const startVersion = await findStartVersionOfLatestChunk(historyId)
|
||||
|
||||
const backedUpChunkRaw = await loadChunk(
|
||||
historyId,
|
||||
startVersion,
|
||||
projectCache
|
||||
)
|
||||
|
||||
const backedUpChunk = History.fromRaw(backedUpChunkRaw)
|
||||
|
||||
await addChunkToArchive(backedUpChunk, archive, projectCache, historyId)
|
||||
|
||||
return archive
|
||||
}
|
||||
|
||||
/**
|
||||
* Download raw files from the backup.
|
||||
*
|
||||
* This can work without the database being backed up.
|
||||
*
|
||||
* It will split the project into chunks per directory and download the blobs alongside the chunk.
|
||||
*
|
||||
* @param {Archiver} archive
|
||||
* @param {string} historyId
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function archiveRawProject(archive, historyId) {
|
||||
const projectCache = await getProjectPersistor(historyId)
|
||||
|
||||
const key = path.join(projectKey.format(historyId), projectKey.pad(0))
|
||||
|
||||
const { contents: chunks } = await projectCache.listDirectory(
|
||||
chunksBucket,
|
||||
key
|
||||
)
|
||||
|
||||
if (chunks.length === 0) {
|
||||
throw new Error('No chunks found')
|
||||
}
|
||||
|
||||
for (const chunkRecord of chunks) {
|
||||
logger.debug({ key: chunkRecord.Key }, 'Processing chunk')
|
||||
if (!chunkRecord.Key) {
|
||||
continue
|
||||
}
|
||||
const chunkId = chunkRecord.Key.split('/').pop()
|
||||
const { chunkData, buffer } = await loadChunkByKey(
|
||||
projectCache,
|
||||
chunkRecord.Key
|
||||
)
|
||||
|
||||
archive.append(buffer, {
|
||||
name: `${historyId}/chunks/${chunkId}/chunk.json`,
|
||||
})
|
||||
|
||||
const chunk = History.fromRaw(chunkData)
|
||||
|
||||
await addChunkToArchive(
|
||||
chunk,
|
||||
archive,
|
||||
projectCache,
|
||||
historyId,
|
||||
`${historyId}/chunks/${chunkId}/`
|
||||
)
|
||||
}
|
||||
}
|
||||
162
services/history-v1/storage/scripts/recover_zip_from_backup.mjs
Normal file
162
services/history-v1/storage/scripts/recover_zip_from_backup.mjs
Normal file
@@ -0,0 +1,162 @@
|
||||
// @ts-check
|
||||
import { loadGlobalBlobs } from '../lib/blob_store/index.js'
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import assert from '../lib/assert.js'
|
||||
import fs from 'node:fs'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import {
|
||||
archiveLatestChunk,
|
||||
archiveRawProject,
|
||||
} from '../lib/backupArchiver.mjs'
|
||||
import knex from '../lib/knex.js'
|
||||
import { client } from '../lib/mongodb.js'
|
||||
import archiver from 'archiver'
|
||||
|
||||
const SUPPORTED_MODES = ['raw', 'latest']
|
||||
|
||||
// outputFile needs to be available in the shutdown function (which may be called before it's declared)
|
||||
// eslint-disable-next-line prefer-const
|
||||
let outputFile
|
||||
|
||||
/**
|
||||
* Gracefully shutdown the process
|
||||
* @param {number} code
|
||||
*/
|
||||
async function shutdown(code = 0) {
|
||||
if (outputFile) {
|
||||
outputFile.close()
|
||||
}
|
||||
await Promise.all([knex.destroy(), client.close()])
|
||||
await setTimeout(1000)
|
||||
process.exit(code)
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('archiver').ZipArchive} ZipArchive
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {import('archiver').ProgressData} ProgressData
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {import('archiver').EntryData} EntryData
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} ArchiverError
|
||||
* @property {string} message
|
||||
* @property {string} code
|
||||
* @property {Object} data
|
||||
*/
|
||||
|
||||
const { historyId, mode, output, verbose } = commandLineArgs([
|
||||
{ name: 'historyId', type: String },
|
||||
{ name: 'output', type: String },
|
||||
{ name: 'mode', type: String, defaultValue: 'raw' },
|
||||
{ name: 'verbose', type: String, defaultValue: false },
|
||||
])
|
||||
|
||||
if (!historyId) {
|
||||
console.error('missing --historyId')
|
||||
await shutdown(1)
|
||||
}
|
||||
|
||||
if (!output) {
|
||||
console.error('missing --output')
|
||||
|
||||
await shutdown(1)
|
||||
}
|
||||
|
||||
try {
|
||||
assert.projectId(historyId)
|
||||
} catch (error) {
|
||||
console.error('Invalid history ID')
|
||||
await shutdown(1)
|
||||
}
|
||||
|
||||
if (!SUPPORTED_MODES.includes(mode)) {
|
||||
console.error(
|
||||
'Invalid mode; supported modes are: ' + SUPPORTED_MODES.join(', ')
|
||||
)
|
||||
await shutdown(1)
|
||||
}
|
||||
|
||||
await loadGlobalBlobs()
|
||||
|
||||
outputFile = fs.createWriteStream(output)
|
||||
|
||||
const archive = archiver.create('zip', {})
|
||||
|
||||
archive.on('close', function () {
|
||||
console.log(archive.pointer() + ' total bytes')
|
||||
console.log(`Wrote ${output}`)
|
||||
shutdown().catch(e => console.error('Error shutting down', e))
|
||||
})
|
||||
|
||||
archive.on(
|
||||
'error',
|
||||
/**
|
||||
*
|
||||
* @param {ArchiverError} e
|
||||
*/
|
||||
function (e) {
|
||||
console.error(`Error writing archive: ${e.message}`)
|
||||
}
|
||||
)
|
||||
|
||||
archive.on('end', function () {
|
||||
console.log(`Wrote ${archive.pointer()} total bytes to ${output}`)
|
||||
shutdown().catch(e => console.error('Error shutting down', e))
|
||||
})
|
||||
|
||||
archive.on(
|
||||
'progress',
|
||||
/**
|
||||
*
|
||||
* @param {ProgressData} progress
|
||||
*/
|
||||
function (progress) {
|
||||
if (verbose) {
|
||||
console.log(`${progress.entries.processed} / ${progress.entries.total}`)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
archive.on(
|
||||
'entry',
|
||||
/**
|
||||
*
|
||||
* @param {EntryData} entry
|
||||
*/
|
||||
function (entry) {
|
||||
if (verbose) {
|
||||
console.log(`${entry.name} added`)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
archive.on(
|
||||
'warning',
|
||||
/**
|
||||
*
|
||||
* @param {ArchiverError} warning
|
||||
*/
|
||||
function (warning) {
|
||||
console.warn(`Warning writing archive: ${warning.message}`)
|
||||
}
|
||||
)
|
||||
|
||||
switch (mode) {
|
||||
case 'latest':
|
||||
await archiveLatestChunk(archive, historyId)
|
||||
break
|
||||
case 'raw':
|
||||
default:
|
||||
await archiveRawProject(archive, historyId)
|
||||
break
|
||||
}
|
||||
|
||||
archive.pipe(outputFile)
|
||||
|
||||
await archive.finalize()
|
||||
Reference in New Issue
Block a user