Use BackupBlobStore to get chunk content

GitOrigin-RevId: 91de53101ea289b43bdb08352aecc09ae89d0f74
This commit is contained in:
Andrew Rumble
2025-03-27 17:14:19 +00:00
committed by Copybot
parent 797f29d40a
commit f2b0a982ac
2 changed files with 271 additions and 67 deletions

View File

@@ -5,16 +5,172 @@ import {
chunksBucket,
backupPersistor,
projectBlobsBucket,
globalBlobsBucket as backupGlobalBlobsBucket,
} from './backupPersistor.mjs'
import archiver from 'archiver'
import { Chunk, History } from 'overleaf-editor-core'
import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
import core, { Chunk, History } from 'overleaf-editor-core'
import {
GLOBAL_BLOBS,
makeProjectKey,
getStringLengthOfFile,
makeGlobalKey,
} from './blob_store/index.js'
import streams from './streams.js'
import objectPersistor from '@overleaf/object-persistor'
import OError from '@overleaf/o-error'
import chunkStore from './chunk_store/index.js'
import { loadChunk } from './backupVerifier.mjs'
import logger from '@overleaf/logger'
import fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
import withTmpDir from '../../api/controllers/with_tmp_dir.js'
import { loadChunk } from './backupVerifier.mjs'
import globalBlobPersistor from './persistor.js'
import config from 'config'
import { NoKEKMatchedError } from '@overleaf/object-persistor/src/Errors.js'
const globalBlobsBucket = config.get('blobStore.globalBucket')
class BackupBlobStore {
/**
*
* @param {string} historyId
* @param {string} tmp
* @param {CachedPerProjectEncryptedS3Persistor} persistor
* @param {boolean} useBackupGlobalBlobs
*/
constructor(historyId, tmp, persistor, useBackupGlobalBlobs) {
this.historyId = historyId
this.tmp = tmp
this.blobs = new Map()
this.persistor = persistor
this.useBackupGlobalBlobs = useBackupGlobalBlobs
}
/**
* Required for BlobStore interface - not supported.
*
* @template T
* @return {Promise<T>}
*/
async getObject() {
throw new Error('Not implemented')
}
/**
*
* @param {Set<string>} hashes
* @return {Promise<void>}
*/
async fetchBlobs(hashes) {
for await (const hash of hashes) {
if (this.blobs.has(hash)) return
const path = `${this.tmp}/${hash}`
/** @type {core.Blob} */
let blob
/** @type {NodeJS.ReadableStream} */
let blobStream
if (GLOBAL_BLOBS.has(hash)) {
try {
const blobData = await this.fetchGlobalBlob(hash)
await pipeline(blobData.stream, fs.createWriteStream(path))
blob = blobData.blob
} catch (err) {
logger.warn({ hash, err }, 'Failed to fetch global blob')
continue
}
} else {
try {
blobStream = await fetchBlob(this.historyId, hash, this.persistor)
await pipeline(blobStream, fs.createWriteStream(path))
blob = await this.makeBlob(hash, path)
} catch (err) {
logger.warn({ err, hash }, 'Failed to fetch chunk blob')
continue
}
}
this.blobs.set(hash, blob)
}
}
/**
*
* @param {string} hash
* @return {Promise<{ blob: core.Blob, stream: NodeJS.ReadableStream }>}
*/
async fetchGlobalBlob(hash) {
const globalBlob = GLOBAL_BLOBS.get(hash)
if (!globalBlob) {
throw new Error('blob does not exist or is not a global blob')
}
let stream
const key = makeGlobalKey(hash)
if (this.useBackupGlobalBlobs) {
stream = await this.persistor.getObjectStream(
backupGlobalBlobsBucket,
key
)
} else {
stream = await globalBlobPersistor.getObjectStream(globalBlobsBucket, key)
}
return { blob: globalBlob.blob, stream }
}
/**
*
* @param {string} hash
* @param {string} pathname
* @return {Promise<core.Blob>}
*/
async makeBlob(hash, pathname) {
const stat = await fs.promises.stat(pathname)
const byteLength = stat.size
const stringLength = await getStringLengthOfFile(byteLength, pathname)
if (stringLength) {
return new core.Blob(hash, byteLength, stringLength)
}
return new core.Blob(hash, byteLength)
}
/**
*
* @param {string} hash
* @return {Promise<string>}
*/
async getString(hash) {
const stream = await this.getStream(hash)
const buffer = await streams.readStreamToBuffer(stream)
return buffer.toString()
}
/**
*
* @param {string} hash
* @return {Promise<fs.ReadStream>}
*/
async getStream(hash) {
return fs.createReadStream(this.getBlobPathname(hash))
}
/**
*
* @param {string} hash
* @return {Promise<core.Blob>}
*/
async getBlob(hash) {
return this.blobs.get(hash)
}
/**
*
* @param {string} hash
* @return {string}
*/
getBlobPathname(hash) {
return path.join(this.tmp, hash)
}
}
/**
* @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor
@@ -34,10 +190,21 @@ import logger from '@overleaf/logger'
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
*/
async function getProjectPersistor(historyId) {
return await backupPersistor.forProjectRO(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
try {
return await backupPersistor.forProjectRO(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
} catch (error) {
if (error instanceof NoKEKMatchedError) {
logger.info({}, 'no kek matched')
}
throw new BackupPersistorError(
'Failed to get project persistor',
{ historyId },
error instanceof Error ? error : undefined
)
}
}
/**
@@ -77,13 +244,19 @@ async function fetchBlob(historyId, hash, persistor) {
})
}
/**
* @typedef {object} AddChunkOptions
* @property {string} [prefix] Should include trailing slash (if length > 0)
* @property {boolean} [useBackupGlobalBlobs]
*/
/**
*
* @param {History} history
* @param {Archiver} archive
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
* @param {string} historyId
* @param {string} [prefix] Should include trailing slash (if length > 0)
* @param {AddChunkOptions} [options]
* @returns {Promise<void>}
*/
async function addChunkToArchive(
@@ -91,33 +264,64 @@ async function addChunkToArchive(
archive,
projectCache,
historyId,
prefix = ''
{ prefix = '', useBackupGlobalBlobs = false } = {}
) {
const chunkBlobs = new Set()
history.findBlobHashes(chunkBlobs)
const files = getBlobMap(history, chunkBlobs)
logger.debug({ chunkBlobs, files }, 'Adding blobs to archive')
await withTmpDir('recovery-blob-', async tmpDir => {
const blobStore = new BackupBlobStore(
historyId,
tmpDir,
projectCache,
useBackupGlobalBlobs
)
await blobStore.fetchBlobs(chunkBlobs)
for (const chunkBlob of chunkBlobs) {
if (GLOBAL_BLOBS.has(chunkBlob)) {
logger.debug('Skipping global blob:', chunkBlob)
continue
await history.loadFiles('lazy', blobStore)
const snapshot = history.getSnapshot()
snapshot.applyAll(history.getChanges())
const filePaths = snapshot.getFilePathnames()
if (filePaths.length === 0) {
logger.warn(
{ historyId, projectVersion: snapshot.projectVersion },
'No files found in snapshot backup'
)
}
const blobStream = await fetchBlob(historyId, chunkBlob, projectCache)
for (const filePath of filePaths) {
/** @type {core.File | null | undefined} */
const file = snapshot.getFile(filePath)
if (!file) {
logger.error({ filePath }, 'File not found in snapshot')
continue
}
await file.load('eager', blobStore)
let name = chunkBlob
const hash = file.getHash()
if (files.has(chunkBlob)) {
name = files.get(chunkBlob)
} else {
logger.debug('Blob not found in file map:', chunkBlob)
/** @type {string | fs.ReadStream | null | undefined} */
let content = file.getContent({ filterTrackedDeletes: true })
if (content === null) {
if (!hash) {
logger.error({ filePath }, 'File does not have a hash')
continue
}
const blob = await blobStore.getBlob(hash)
if (!blob) {
logger.error({ filePath }, 'Blob not found in blob store')
continue
}
content = await blobStore.getStream(hash)
}
archive.append(content, {
name: `${prefix}${filePath}`,
})
}
archive.append(blobStream, {
name: `${prefix}${name}`,
})
}
})
}
/**
@@ -134,28 +338,6 @@ async function findStartVersionOfLatestChunk(historyId) {
return chunk.startVersion
}
/**
*
* @param {History} history
* @param {Set<string>} chunkBlobs
* @return {Map<string, string>}
*/
function getBlobMap(history, chunkBlobs) {
const files = new Map()
history.changes.forEach(change => {
change.operations.forEach(operation => {
if (operation.getFile) {
const file = operation.getFile()
if (chunkBlobs.has(file.data.hash)) {
files.set(file.data.hash, operation.pathname)
}
}
})
})
return files
}
/**
* Restore a project from the latest snapshot
*
@@ -163,9 +345,16 @@ function getBlobMap(history, chunkBlobs) {
*
* @param {Archiver} archive
* @param {string} historyId
* @param {boolean} [useBackupGlobalBlobs]
* @return {Promise<void>}
*/
export async function archiveLatestChunk(archive, historyId) {
export async function archiveLatestChunk(
archive,
historyId,
useBackupGlobalBlobs = false
) {
logger.info({ historyId, useBackupGlobalBlobs }, 'Archiving latest chunk')
const projectCache = await getProjectPersistor(historyId)
const startVersion = await findStartVersionOfLatestChunk(historyId)
@@ -178,7 +367,9 @@ export async function archiveLatestChunk(archive, historyId) {
const backedUpChunk = History.fromRaw(backedUpChunkRaw)
await addChunkToArchive(backedUpChunk, archive, projectCache, historyId)
await addChunkToArchive(backedUpChunk, archive, projectCache, historyId, {
useBackupGlobalBlobs,
})
return archive
}
@@ -192,16 +383,19 @@ export async function archiveLatestChunk(archive, historyId) {
*
* @param {Archiver} archive
* @param {string} historyId
* @param {boolean} [useBackupGlobalBlobs]
* @return {Promise<void>}
*/
export async function archiveRawProject(archive, historyId) {
export async function archiveRawProject(
archive,
historyId,
useBackupGlobalBlobs = false
) {
const projectCache = await getProjectPersistor(historyId)
const key = path.join(projectKey.format(historyId), projectKey.pad(0))
const { contents: chunks } = await projectCache.listDirectory(
chunksBucket,
key
projectKey.format(historyId)
)
if (chunks.length === 0) {
@@ -209,11 +403,13 @@ export async function archiveRawProject(archive, historyId) {
}
for (const chunkRecord of chunks) {
logger.debug({ key: chunkRecord.Key }, 'Processing chunk')
if (!chunkRecord.Key) {
logger.debug({ chunkRecord }, 'no key')
continue
}
const chunkId = chunkRecord.Key.split('/').pop()
logger.debug({ chunkId, key: chunkRecord.Key }, 'Processing chunk')
const { chunkData, buffer } = await loadChunkByKey(
projectCache,
chunkRecord.Key
@@ -225,12 +421,11 @@ export async function archiveRawProject(archive, historyId) {
const chunk = History.fromRaw(chunkData)
await addChunkToArchive(
chunk,
archive,
projectCache,
historyId,
`${historyId}/chunks/${chunkId}/`
)
await addChunkToArchive(chunk, archive, projectCache, historyId, {
prefix: `${historyId}/chunks/${chunkId}/`,
useBackupGlobalBlobs,
})
}
}
export class BackupPersistorError extends OError {}

View File

@@ -7,10 +7,16 @@ import { setTimeout } from 'node:timers/promises'
import {
archiveLatestChunk,
archiveRawProject,
BackupPersistorError,
} from '../lib/backupArchiver.mjs'
import knex from '../lib/knex.js'
import { client } from '../lib/mongodb.js'
import archiver from 'archiver'
import Events from 'node:events'
import { Chunk } from 'overleaf-editor-core'
// Silence warning.
Events.setMaxListeners(20)
const SUPPORTED_MODES = ['raw', 'latest']
@@ -26,7 +32,8 @@ async function shutdown(code = 0) {
if (outputFile) {
outputFile.close()
}
await Promise.all([knex.destroy(), client.close()])
await knex.destroy()
await client.close()
await setTimeout(1000)
process.exit(code)
}
@@ -73,7 +80,7 @@ try {
{ name: 'help', type: Boolean },
]))
} catch (err) {
console.error(err.message)
console.error(err instanceof Error ? err.message : err)
help = true
}
@@ -144,7 +151,9 @@ archive.on(
*/
function (progress) {
if (verbose) {
console.log(`${progress.entries.processed} / ${progress.entries.total}`)
console.log(
`${progress.entries.processed} processed out of ${progress.entries.total}`
)
}
}
)
@@ -169,7 +178,7 @@ archive.on(
* @param {ArchiverError} warning
*/
function (warning) {
console.warn(`Warning writing archive: ${warning.message}`)
console.warn(`Warning encountered when writing archive: ${warning.message}`)
}
)