mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-06-10 06:39:01 +02:00
Merge pull request #21763 from overleaf/bg-backup-script
initial script for running backups GitOrigin-RevId: d22c373de30738d8080d40dce10790f0bdcb9f51
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
// @ts-check
|
||||
import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs'
|
||||
import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
|
||||
import { GLOBAL_BLOBS, makeProjectKey, BlobStore } from './blob_store/index.js'
|
||||
import Stream from 'node:stream'
|
||||
import fs from 'node:fs'
|
||||
import Crypto from 'node:crypto'
|
||||
@@ -11,6 +11,7 @@ import logger from '@overleaf/logger/logging-manager.js'
|
||||
import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
import metrics from '@overleaf/metrics'
|
||||
import zLib from 'node:zlib'
|
||||
import Path from 'node:path'
|
||||
|
||||
const HIGHWATER_MARK = 1024 * 1024
|
||||
|
||||
@@ -18,6 +19,10 @@ const HIGHWATER_MARK = 1024 * 1024
|
||||
* @typedef {import("overleaf-editor-core").Blob} Blob
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
|
||||
*/
|
||||
|
||||
/**
|
||||
* Increment a metric to record the outcome of a backup operation.
|
||||
*
|
||||
@@ -28,6 +33,34 @@ function recordBackupConclusion(status, reason = 'none') {
|
||||
metrics.inc('blob_backed_up', 1, { status, reason })
|
||||
}
|
||||
|
||||
/**
|
||||
* Downloads a blob to a specified directory
|
||||
*
|
||||
* @param {string} historyId - The history ID of the project the blob belongs to
|
||||
* @param {Blob} blob - The blob to download
|
||||
* @param {string} tmpDir - The directory path where the blob will be downloaded
|
||||
* @returns {Promise<string>} The full path where the blob was downloaded
|
||||
*/
|
||||
export async function downloadBlobToDir(historyId, blob, tmpDir) {
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const blobHash = blob.getHash()
|
||||
const src = await blobStore.getStream(blobHash)
|
||||
const filePath = Path.join(tmpDir, `${historyId}-${blobHash}`)
|
||||
try {
|
||||
const dst = fs.createWriteStream(filePath, {
|
||||
highWaterMark: HIGHWATER_MARK,
|
||||
flags: 'wx',
|
||||
})
|
||||
await Stream.promises.pipeline(src, dst)
|
||||
return filePath
|
||||
} catch (error) {
|
||||
try {
|
||||
await fs.promises.unlink(filePath)
|
||||
} catch {}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the actual upload of the blob to the backup storage.
|
||||
*
|
||||
@@ -36,7 +69,7 @@ function recordBackupConclusion(status, reason = 'none') {
|
||||
* @param {string} path - The path to the file to upload (should have been stored on disk already)
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function uploadBlobToBackup(historyId, blob, path) {
|
||||
export async function uploadBlobToBackup(historyId, blob, path, persistor) {
|
||||
const md5 = Crypto.createHash('md5')
|
||||
const filePathCompressed = path + '.gz'
|
||||
let backupSource
|
||||
@@ -70,7 +103,6 @@ export async function uploadBlobToBackup(historyId, blob, path) {
|
||||
)
|
||||
}
|
||||
const key = makeProjectKey(historyId, blob.getHash())
|
||||
const persistor = await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
await persistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
key,
|
||||
@@ -119,7 +151,7 @@ async function _convertLegacyHistoryIdToProjectId(historyId) {
|
||||
* @param {string} hash
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function storeBlobBackup(projectId, hash) {
|
||||
export async function storeBlobBackup(projectId, hash) {
|
||||
await backedUpBlobs.updateOne(
|
||||
{ _id: new ObjectId(projectId) },
|
||||
{ $addToSet: { blobs: new Binary(Buffer.from(hash, 'hex')) } },
|
||||
@@ -152,9 +184,10 @@ export async function _blobIsBackedUp(projectId, hash) {
|
||||
* @param {string} historyId - history ID for a project (can be postgres format or mongo format)
|
||||
* @param {Blob} blob - The blob that is being backed up
|
||||
* @param {string} tmpPath - The path to a temporary file storing the contents of the blob.
|
||||
* @param {CachedPerProjectEncryptedS3Persistor} [persistor] - The persistor to use (optional)
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function backupBlob(historyId, blob, tmpPath) {
|
||||
export async function backupBlob(historyId, blob, tmpPath, persistor) {
|
||||
const hash = blob.getHash()
|
||||
|
||||
let projectId = historyId
|
||||
@@ -183,10 +216,22 @@ export async function backupBlob(historyId, blob, tmpPath) {
|
||||
logger.warn({ error }, 'Failed to check if blob is backed up')
|
||||
// We'll try anyway - we'll catch the error if it was backed up
|
||||
}
|
||||
|
||||
// If we weren't passed a persistor for this project, create one.
|
||||
// This will fetch the key from AWS, so it's prefereable to use
|
||||
// the same persistor for all blobs in a project where possible.
|
||||
if (!persistor) {
|
||||
logger.debug(
|
||||
{ historyId, hash },
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
}
|
||||
persistor ??= await backupPersistor.forProject(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, '')
|
||||
)
|
||||
try {
|
||||
logger.debug({ projectId, hash }, 'Starting blob backup')
|
||||
await uploadBlobToBackup(historyId, blob, tmpPath)
|
||||
await uploadBlobToBackup(historyId, blob, tmpPath, persistor)
|
||||
await storeBlobBackup(projectId, hash)
|
||||
recordBackupConclusion('success')
|
||||
} catch (error) {
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
/**
|
||||
* Provides a generator function to back up project chunks and blobs.
|
||||
*/
|
||||
|
||||
import {
|
||||
getProjectChunksFromVersion,
|
||||
loadAtVersion,
|
||||
loadByChunkRecord,
|
||||
} from './chunk_store/index.js'
|
||||
|
||||
import {
|
||||
GLOBAL_BLOBS, // NOTE: must call loadGlobalBlobs() before using this
|
||||
BlobStore,
|
||||
} from './blob_store/index.js'
|
||||
|
||||
import assert from './assert.js'
|
||||
|
||||
async function lookBehindForSeenBlobs(
|
||||
projectId,
|
||||
chunk,
|
||||
lastBackedUpVersion,
|
||||
seenBlobs
|
||||
) {
|
||||
if (chunk.startVersion === 0) {
|
||||
return // this is the first chunk, no need to check for blobs in the previous chunk
|
||||
}
|
||||
if (chunk.startVersion > 0 && lastBackedUpVersion > chunk.startVersion) {
|
||||
return // the snapshot in this chunk has already been backed up
|
||||
}
|
||||
if (
|
||||
chunk.startVersion > 0 &&
|
||||
lastBackedUpVersion === chunk.startVersion // same as previousChunk.endVersion
|
||||
) {
|
||||
// the snapshot in this chunk has not been backed up
|
||||
// so we find the set of backed up blobs from the previous chunk
|
||||
const previousChunk = await loadAtVersion(projectId, lastBackedUpVersion)
|
||||
const previousChunkHistory = previousChunk.getHistory()
|
||||
previousChunkHistory.findBlobHashes(seenBlobs)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Records blob hashes that have been previously seen in a chunk's history.
|
||||
*
|
||||
* @param {Object} chunk - The chunk containing history data
|
||||
* @param {number} currentBackedUpVersion - The version number that has been backed up
|
||||
* @param {Set<string>} seenBlobs - Set to collect previously seen blob hashes
|
||||
* @returns {void}
|
||||
*/
|
||||
function recordPreviouslySeenBlobs(chunk, currentBackedUpVersion, seenBlobs) {
|
||||
// We need to look at the chunk and decide how far we have backed up.
|
||||
// If we have not backed up this chunk at all, we need to backup the blobs
|
||||
// in the snapshot. Otherwise we need to backup the blobs in the changes
|
||||
// that have occurred since the last backup.
|
||||
const history = chunk.getHistory()
|
||||
const startVersion = chunk.getStartVersion()
|
||||
if (currentBackedUpVersion === 0) {
|
||||
// If we have only backed up version 0 (i.e. the first change)
|
||||
// then that includes the initial snapshot, so we consider
|
||||
// the blobs of the initial snapshot as seen. If the project
|
||||
// has not been backed up at all then currentBackedUpVersion
|
||||
// will be undefined.
|
||||
history.snapshot.findBlobHashes(seenBlobs)
|
||||
} else if (currentBackedUpVersion > startVersion) {
|
||||
history.snapshot.findBlobHashes(seenBlobs)
|
||||
for (let i = 0; i < currentBackedUpVersion - startVersion; i++) {
|
||||
history.changes[i].findBlobHashes(seenBlobs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collects new blob objects that need to be backed up from a given chunk.
|
||||
*
|
||||
* @param {Object} chunk - The chunk object containing history data
|
||||
* @param {Object} blobStore - Storage interface for retrieving blobs
|
||||
* @param {Set<string>} seenBlobs - Set of blob hashes that have already been processed
|
||||
* @returns {Promise<Object[]>} Array of blob objects that need to be backed up
|
||||
* @throws {Error} If blob retrieval fails
|
||||
*/
|
||||
async function collectNewBlobsForBackup(chunk, blobStore, seenBlobs) {
|
||||
/** @type {Set<string>} */
|
||||
const blobHashes = new Set()
|
||||
const history = chunk.getHistory()
|
||||
// Get all the blobs in this chunk, then exclude the seenBlobs and global blobs
|
||||
history.findBlobHashes(blobHashes)
|
||||
const blobsToBackup = await blobStore.getBlobs(
|
||||
[...blobHashes].filter(
|
||||
hash =>
|
||||
hash &&
|
||||
!seenBlobs.has(hash) &&
|
||||
(!GLOBAL_BLOBS.has(hash) || GLOBAL_BLOBS.get(hash).demoted)
|
||||
)
|
||||
)
|
||||
return blobsToBackup
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously generates backups for a project based on provided versions.
|
||||
* @param {string} projectId - The ID of the project's history to back up.
|
||||
* @param {number} lastBackedUpVersion - The last version that was successfully backed up.
|
||||
* @yields {AsyncGenerator<{ chunkRecord: object, chunkToBackup: object, chunkBuffer: Buffer, blobsToBackup: object[] }>}
|
||||
* Yields chunk records and corresponding data needed for backups.
|
||||
*/
|
||||
export async function* backupGenerator(projectId, lastBackedUpVersion) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.maybe.integer(lastBackedUpVersion, 'bad lastBackedUpVersion')
|
||||
|
||||
const blobStore = new BlobStore(projectId)
|
||||
|
||||
/** @type {Set<string>} */
|
||||
const seenBlobs = new Set() // records the blobs that are already backed up
|
||||
|
||||
const firstPendingVersion =
|
||||
lastBackedUpVersion >= 0 ? lastBackedUpVersion + 1 : 0
|
||||
let isStartingChunk = true
|
||||
let currentBackedUpVersion = lastBackedUpVersion
|
||||
const chunkRecordIterator = getProjectChunksFromVersion(
|
||||
projectId,
|
||||
firstPendingVersion
|
||||
)
|
||||
|
||||
for await (const chunkRecord of chunkRecordIterator) {
|
||||
const { chunk, chunkBuffer } = await loadByChunkRecord(
|
||||
projectId,
|
||||
chunkRecord
|
||||
)
|
||||
|
||||
if (isStartingChunk) {
|
||||
await lookBehindForSeenBlobs(
|
||||
projectId,
|
||||
chunkRecord,
|
||||
lastBackedUpVersion,
|
||||
seenBlobs
|
||||
)
|
||||
isStartingChunk = false
|
||||
}
|
||||
|
||||
recordPreviouslySeenBlobs(chunk, currentBackedUpVersion, seenBlobs)
|
||||
|
||||
const blobsToBackup = await collectNewBlobsForBackup(
|
||||
chunk,
|
||||
blobStore,
|
||||
seenBlobs
|
||||
)
|
||||
|
||||
yield { chunkRecord, chunkToBackup: chunk, chunkBuffer, blobsToBackup }
|
||||
|
||||
// After we generate a backup of this chunk, mark the backed up blobs as seen
|
||||
blobsToBackup.forEach(blob => seenBlobs.add(blob.getHash()))
|
||||
currentBackedUpVersion = chunkRecord.endVersion
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
const { Binary, ObjectId } = require('mongodb')
|
||||
const { projects, backedUpBlobs } = require('../mongodb')
|
||||
|
||||
// List projects with pending backups older than the specified interval
|
||||
function listPendingBackups(timeIntervalMs = 0) {
|
||||
const cutoffTime = new Date(Date.now() - timeIntervalMs)
|
||||
const cursor = projects.find(
|
||||
{
|
||||
'overleaf.backup.pendingChangeAt': {
|
||||
$exists: true,
|
||||
$lt: cutoffTime,
|
||||
},
|
||||
},
|
||||
{
|
||||
projection: { 'overleaf.backup': 1, 'overleaf.history': 1 },
|
||||
sort: { 'overleaf.backup.pendingChangeAt': 1 },
|
||||
}
|
||||
)
|
||||
return cursor
|
||||
}
|
||||
|
||||
// Retrieve the history ID for a given project without giving direct access to the
|
||||
// projects collection.
|
||||
|
||||
async function getHistoryId(projectId) {
|
||||
const project = await projects.findOne(
|
||||
{ _id: new ObjectId(projectId) },
|
||||
{
|
||||
projection: {
|
||||
'overleaf.history.id': 1,
|
||||
},
|
||||
}
|
||||
)
|
||||
if (!project) {
|
||||
throw new Error('Project not found')
|
||||
}
|
||||
return project.overleaf.history.id
|
||||
}
|
||||
|
||||
async function getBackupStatus(projectId) {
|
||||
const project = await projects.findOne(
|
||||
{ _id: new ObjectId(projectId) },
|
||||
{
|
||||
projection: {
|
||||
'overleaf.history': 1,
|
||||
'overleaf.backup': 1,
|
||||
},
|
||||
}
|
||||
)
|
||||
if (!project) {
|
||||
throw new Error('Project not found')
|
||||
}
|
||||
return {
|
||||
backupStatus: project.overleaf.backup,
|
||||
historyId: `${project.overleaf.history.id}`,
|
||||
currentEndVersion: project.overleaf.history.currentEndVersion,
|
||||
currentEndTimestamp: project.overleaf.history.currentEndTimestamp,
|
||||
}
|
||||
}
|
||||
|
||||
async function setBackupVersion(
|
||||
projectId,
|
||||
previousBackedUpVersion,
|
||||
currentBackedUpVersion,
|
||||
currentBackedUpAt
|
||||
) {
|
||||
// FIXME: include a check to handle race conditions
|
||||
// to make sure only one process updates the version numbers
|
||||
const result = await projects.updateOne(
|
||||
{
|
||||
_id: new ObjectId(projectId),
|
||||
'overleaf.backup.lastBackedUpVersion': previousBackedUpVersion,
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'overleaf.backup.lastBackedUpVersion': currentBackedUpVersion,
|
||||
'overleaf.backup.lastBackedUpAt': currentBackedUpAt,
|
||||
},
|
||||
}
|
||||
)
|
||||
if (result.matchedCount === 0 || result.modifiedCount === 0) {
|
||||
throw new Error('Failed to update backup version')
|
||||
}
|
||||
}
|
||||
|
||||
async function updateCurrentMetadataIfNotSet(projectId, latestChunkMetadata) {
|
||||
await projects.updateOne(
|
||||
{
|
||||
_id: new ObjectId(projectId),
|
||||
'overleaf.history.currentEndVersion': { $exists: false },
|
||||
'overleaf.history.currentEndTimestamp': { $exists: false },
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'overleaf.history.currentEndVersion': latestChunkMetadata.endVersion,
|
||||
'overleaf.history.currentEndTimestamp':
|
||||
latestChunkMetadata.endTimestamp,
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the pending change timestamp for a project's backup status
|
||||
* @param {string} projectId - The ID of the project to update
|
||||
* @param {Date} backupStartTime - The timestamp to set for pending changes
|
||||
* @returns {Promise<void>}
|
||||
*
|
||||
* If the project's last backed up version matches the current end version,
|
||||
* the pending change timestamp is removed. Otherwise, it's set to the provided
|
||||
* backup start time.
|
||||
*/
|
||||
async function updatePendingChangeTimestamp(projectId, backupStartTime) {
|
||||
await projects.updateOne({ _id: new ObjectId(projectId) }, [
|
||||
{
|
||||
$set: {
|
||||
'overleaf.backup.pendingChangeAt': {
|
||||
$cond: {
|
||||
if: {
|
||||
$eq: [
|
||||
'$overleaf.backup.lastBackedUpVersion',
|
||||
'$overleaf.history.currentEndVersion',
|
||||
],
|
||||
},
|
||||
then: '$$REMOVE',
|
||||
else: backupStartTime,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
])
|
||||
}
|
||||
|
||||
async function getBackedUpBlobHashes(projectId) {
|
||||
const result = await backedUpBlobs.findOne(
|
||||
{ _id: new ObjectId(projectId) },
|
||||
{ projection: { blobs: 1 } }
|
||||
)
|
||||
if (!result) {
|
||||
return new Set()
|
||||
}
|
||||
const hashes = result.blobs.map(b => b.buffer.toString('hex'))
|
||||
return new Set(hashes)
|
||||
}
|
||||
|
||||
async function unsetBackedUpBlobHashes(projectId, hashes) {
|
||||
const binaryHashes = hashes.map(h => new Binary(Buffer.from(h, 'hex')))
|
||||
const result = await backedUpBlobs.findOneAndUpdate(
|
||||
{ _id: new ObjectId(projectId) },
|
||||
{
|
||||
$pullAll: {
|
||||
blobs: binaryHashes,
|
||||
},
|
||||
},
|
||||
{ returnDocument: 'after' }
|
||||
)
|
||||
if (result && result.blobs.length === 0) {
|
||||
await backedUpBlobs.deleteOne({
|
||||
_id: new ObjectId(projectId),
|
||||
blobs: { $size: 0 },
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getHistoryId,
|
||||
getBackupStatus,
|
||||
setBackupVersion,
|
||||
updateCurrentMetadataIfNotSet,
|
||||
updatePendingChangeTimestamp,
|
||||
listPendingBackups,
|
||||
getBackedUpBlobHashes,
|
||||
unsetBackedUpBlobHashes,
|
||||
}
|
||||
@@ -355,6 +355,9 @@ class BlobStore {
|
||||
nonGlobalHashes.push(hash)
|
||||
}
|
||||
}
|
||||
if (nonGlobalHashes.length === 0) {
|
||||
return blobs // to avoid unnecessary database lookup
|
||||
}
|
||||
const projectBlobs = await this.backend.findBlobs(
|
||||
this.projectId,
|
||||
nonGlobalHashes
|
||||
|
||||
@@ -222,6 +222,19 @@ async function getChunkIdForVersion(projectId, version) {
|
||||
return chunkRecord.id
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the chunk metadata for a given version of a project.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {number} version
|
||||
* @return {Promise.<{id: string|number, startVersion: number, endVersion: number}>}
|
||||
*/
|
||||
async function getChunkMetadataForVersion(projectId, version) {
|
||||
const backend = getBackend(projectId)
|
||||
const chunkRecord = await backend.getChunkForVersion(projectId, version)
|
||||
return chunkRecord
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of a project's chunk ids
|
||||
*/
|
||||
@@ -231,6 +244,62 @@ async function getProjectChunkIds(projectId) {
|
||||
return chunkIds
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of a projects chunks directly
|
||||
*/
|
||||
async function getProjectChunks(projectId) {
|
||||
const backend = getBackend(projectId)
|
||||
const chunkIds = await backend.getProjectChunks(projectId)
|
||||
return chunkIds
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the chunk for a given chunk record, including blob metadata.
|
||||
*/
|
||||
async function loadByChunkRecord(projectId, chunkRecord) {
|
||||
const blobStore = new BlobStore(projectId)
|
||||
const batchBlobStore = new BatchBlobStore(blobStore)
|
||||
const { raw: rawHistory, buffer: chunkBuffer } =
|
||||
await historyStore.loadRawWithBuffer(projectId, chunkRecord.id)
|
||||
const history = History.fromRaw(rawHistory)
|
||||
await lazyLoadHistoryFiles(history, batchBlobStore)
|
||||
return {
|
||||
chunk: new Chunk(history, chunkRecord.endVersion - history.countChanges()),
|
||||
chunkBuffer,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously retrieves project chunks starting from a specific version.
|
||||
*
|
||||
* This generator function yields chunk records for a given project starting from the specified version (inclusive).
|
||||
* It continues to fetch and yield subsequent chunk records until the end version of the latest chunk metadata is reached.
|
||||
* If you want to fetch all the chunks *after* a version V, call this function with V+1.
|
||||
*
|
||||
* @param {string} projectId - The ID of the project.
|
||||
* @param {number} version - The starting version to retrieve chunks from.
|
||||
* @returns {AsyncGenerator<Object, void, undefined>} An async generator that yields chunk records.
|
||||
*/
|
||||
async function* getProjectChunksFromVersion(projectId, version) {
|
||||
const backend = getBackend(projectId)
|
||||
const latestChunkMetadata = await loadLatestRaw(projectId)
|
||||
if (!latestChunkMetadata || version > latestChunkMetadata.endVersion) {
|
||||
return
|
||||
}
|
||||
let chunkRecord = await backend.getChunkForVersion(projectId, version)
|
||||
while (chunkRecord != null) {
|
||||
yield chunkRecord
|
||||
if (chunkRecord.endVersion >= latestChunkMetadata.endVersion) {
|
||||
break
|
||||
} else {
|
||||
chunkRecord = await backend.getChunkForVersion(
|
||||
projectId,
|
||||
chunkRecord.endVersion + 1
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the given chunk from the database.
|
||||
*
|
||||
@@ -333,11 +402,15 @@ module.exports = {
|
||||
loadLatestRaw,
|
||||
loadAtVersion,
|
||||
loadAtTimestamp,
|
||||
loadByChunkRecord,
|
||||
create,
|
||||
update,
|
||||
destroy,
|
||||
getChunkIdForVersion,
|
||||
getChunkMetadataForVersion,
|
||||
getProjectChunkIds,
|
||||
getProjectChunks,
|
||||
getProjectChunksFromVersion,
|
||||
deleteProjectChunks,
|
||||
deleteOldChunks,
|
||||
AlreadyInitialized,
|
||||
|
||||
@@ -99,6 +99,21 @@ async function getProjectChunkIds(projectId) {
|
||||
return await cursor.map(record => record._id).toArray()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of a projects chunks directly
|
||||
*/
|
||||
async function getProjectChunks(projectId) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
|
||||
const cursor = mongodb.chunks
|
||||
.find(
|
||||
{ projectId: new ObjectId(projectId), state: 'active' },
|
||||
{ projection: { state: 0 } }
|
||||
)
|
||||
.sort({ startVersion: 1 })
|
||||
return await cursor.map(chunkFromRecord).toArray()
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a pending chunk before sending it to object storage.
|
||||
*/
|
||||
@@ -298,6 +313,7 @@ module.exports = {
|
||||
getChunkForVersion,
|
||||
getChunkForTimestamp,
|
||||
getProjectChunkIds,
|
||||
getProjectChunks,
|
||||
insertPendingChunk,
|
||||
confirmCreate,
|
||||
confirmUpdate,
|
||||
|
||||
@@ -102,6 +102,20 @@ async function getProjectChunkIds(projectId) {
|
||||
return records.map(record => record.id)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of a projects chunks directly
|
||||
*/
|
||||
async function getProjectChunks(projectId) {
|
||||
assert.postgresId(projectId, `bad projectId ${projectId}`)
|
||||
projectId = parseInt(projectId, 10)
|
||||
|
||||
const records = await knex('chunks')
|
||||
.select()
|
||||
.where('doc_id', projectId)
|
||||
.orderBy('end_version')
|
||||
return records.map(chunkFromRecord)
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a pending chunk before sending it to object storage.
|
||||
*/
|
||||
@@ -269,6 +283,7 @@ module.exports = {
|
||||
getChunkForVersion,
|
||||
getChunkForTimestamp,
|
||||
getProjectChunkIds,
|
||||
getProjectChunks,
|
||||
insertPendingChunk,
|
||||
confirmCreate,
|
||||
confirmUpdate,
|
||||
|
||||
@@ -21,6 +21,7 @@ const streams = require('./streams')
|
||||
const Chunk = core.Chunk
|
||||
|
||||
const gzip = promisify(zlib.gzip)
|
||||
const gunzip = promisify(zlib.gunzip)
|
||||
|
||||
class LoadError extends OError {
|
||||
/**
|
||||
@@ -114,6 +115,32 @@ class HistoryStore {
|
||||
}
|
||||
}
|
||||
|
||||
async loadRawWithBuffer(projectId, chunkId) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
assert.chunkId(chunkId, 'bad chunkId')
|
||||
|
||||
const key = getKey(projectId, chunkId)
|
||||
|
||||
logger.debug({ projectId, chunkId }, 'loadBuffer started')
|
||||
try {
|
||||
const buf = await streams.readStreamToBuffer(
|
||||
await this.#persistor.getObjectStream(this.#bucket, key)
|
||||
)
|
||||
const unzipped = await gunzip(buf)
|
||||
return {
|
||||
buffer: buf,
|
||||
raw: JSON.parse(unzipped.toString('utf-8')),
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof objectPersistor.Errors.NotFoundError) {
|
||||
throw new Chunk.NotPersistedError(projectId)
|
||||
}
|
||||
throw new LoadError(projectId, chunkId, err)
|
||||
} finally {
|
||||
logger.debug({ projectId, chunkId }, 'loadBuffer finished')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compress and store a {@link History}.
|
||||
*
|
||||
|
||||
@@ -0,0 +1,937 @@
|
||||
// @ts-check
|
||||
|
||||
import logger from '@overleaf/logger'
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import { History } from 'overleaf-editor-core'
|
||||
import { getProjectChunks, loadLatestRaw } from '../lib/chunk_store/index.js'
|
||||
import { client } from '../lib/mongodb.js'
|
||||
import knex from '../lib/knex.js'
|
||||
import { historyStore } from '../lib/history_store.js'
|
||||
import pLimit from 'p-limit'
|
||||
import {
|
||||
GLOBAL_BLOBS,
|
||||
loadGlobalBlobs,
|
||||
makeProjectKey,
|
||||
BlobStore,
|
||||
} from '../lib/blob_store/index.js'
|
||||
import {
|
||||
listPendingBackups,
|
||||
getBackupStatus,
|
||||
setBackupVersion,
|
||||
updateCurrentMetadataIfNotSet,
|
||||
updatePendingChangeTimestamp,
|
||||
getBackedUpBlobHashes,
|
||||
unsetBackedUpBlobHashes,
|
||||
} from '../lib/backup_store/index.js'
|
||||
import { backupBlob, downloadBlobToDir } from '../lib/backupBlob.mjs'
|
||||
import {
|
||||
backupPersistor,
|
||||
chunksBucket,
|
||||
projectBlobsBucket,
|
||||
} from '../lib/backupPersistor.mjs'
|
||||
import { backupGenerator } from '../lib/backupGenerator.mjs'
|
||||
import { promises as fs } from 'node:fs'
|
||||
import os from 'node:os'
|
||||
import path from 'node:path'
|
||||
import projectKey from '../lib/project_key.js'
|
||||
import Crypto from 'node:crypto'
|
||||
import Stream from 'node:stream'
|
||||
import { EventEmitter } from 'node:events'
|
||||
import { batchedUpdate } from '@overleaf/mongo-utils/batchedUpdate.js'
|
||||
import { createGunzip } from 'node:zlib'
|
||||
import { text } from 'node:stream/consumers'
|
||||
import { fromStream as blobHashFromStream } from '../lib/blob_hash.js'
|
||||
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
|
||||
EventEmitter.defaultMaxListeners = 20
|
||||
|
||||
logger.initialize('history-v1-backup')
|
||||
|
||||
let DRY_RUN = false
|
||||
let RETRY_LIMIT = 3
|
||||
const RETRY_DELAY = 1000
|
||||
let CONCURRENCY = 4
|
||||
let BATCH_CONCURRENCY = 1
|
||||
let BLOB_LIMITER = pLimit(CONCURRENCY)
|
||||
|
||||
async function retry(fn, times, delayMs) {
|
||||
let attempts = times
|
||||
while (attempts > 0) {
|
||||
try {
|
||||
const result = await fn()
|
||||
return result
|
||||
} catch (err) {
|
||||
attempts--
|
||||
if (attempts === 0) throw err
|
||||
await new Promise(resolve => setTimeout(resolve, delayMs))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function wrapWithRetry(fn, retries, delayMs) {
|
||||
return async (...args) => {
|
||||
const result = await retry(() => fn(...args), retries, delayMs)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
const downloadWithRetry = wrapWithRetry(
|
||||
downloadBlobToDir,
|
||||
RETRY_LIMIT,
|
||||
RETRY_DELAY
|
||||
)
|
||||
// FIXME: this creates a new backupPersistor for each blob
|
||||
// so there is no caching of the DEK
|
||||
const backupWithRetry = wrapWithRetry(backupBlob, RETRY_LIMIT, RETRY_DELAY)
|
||||
|
||||
async function findNewBlobs(projectId, blobs) {
|
||||
const newBlobs = []
|
||||
const existingBackedUpBlobHashes = await getBackedUpBlobHashes(projectId)
|
||||
for (const blob of blobs) {
|
||||
const hash = blob.getHash()
|
||||
if (existingBackedUpBlobHashes.has(blob.getHash())) {
|
||||
logger.debug({ projectId, hash }, 'Blob is already backed up, skipping')
|
||||
continue
|
||||
}
|
||||
const globalBlob = GLOBAL_BLOBS.get(hash)
|
||||
if (globalBlob && !globalBlob.demoted) {
|
||||
logger.debug(
|
||||
{ projectId, hash },
|
||||
'Blob is a global blob and not demoted, skipping'
|
||||
)
|
||||
continue
|
||||
}
|
||||
newBlobs.push(blob)
|
||||
}
|
||||
return newBlobs
|
||||
}
|
||||
|
||||
async function cleanBackedUpBlobs(projectId, blobs) {
|
||||
const hashes = blobs.map(blob => blob.getHash())
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would remove blobs',
|
||||
hashes.join(' '),
|
||||
'from project',
|
||||
projectId
|
||||
)
|
||||
return
|
||||
}
|
||||
await unsetBackedUpBlobHashes(projectId, hashes)
|
||||
}
|
||||
|
||||
async function backupSingleBlob(projectId, historyId, blob, tmpDir, persistor) {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would back up blob',
|
||||
JSON.stringify(blob),
|
||||
'in history',
|
||||
historyId,
|
||||
'for project',
|
||||
projectId
|
||||
)
|
||||
return
|
||||
}
|
||||
logger.info({ blob, historyId }, 'backing up blob')
|
||||
const blobPath = await downloadWithRetry(historyId, blob, tmpDir)
|
||||
await backupWithRetry(historyId, blob, blobPath, persistor)
|
||||
}
|
||||
|
||||
async function backupBlobs(projectId, historyId, blobs, limiter, persistor) {
|
||||
let tmpDir
|
||||
try {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'blob-backup-'))
|
||||
|
||||
const blobBackupOperations = blobs.map(blob =>
|
||||
limiter(backupSingleBlob, projectId, historyId, blob, tmpDir, persistor)
|
||||
)
|
||||
|
||||
await Promise.allSettled(blobBackupOperations)
|
||||
} finally {
|
||||
if (tmpDir) {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function backupChunk(
|
||||
projectId,
|
||||
historyId,
|
||||
chunkBackupPersistorForProject,
|
||||
chunkToBackup,
|
||||
chunkRecord,
|
||||
chunkBuffer
|
||||
) {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would back up chunk',
|
||||
JSON.stringify(chunkRecord),
|
||||
'in history',
|
||||
historyId,
|
||||
'for project',
|
||||
projectId,
|
||||
'key',
|
||||
makeChunkKey(historyId, chunkToBackup.startVersion)
|
||||
)
|
||||
return
|
||||
}
|
||||
const key = makeChunkKey(historyId, chunkToBackup.startVersion)
|
||||
logger.info({ chunkRecord, historyId, projectId, key }, 'backing up chunk')
|
||||
const md5 = Crypto.createHash('md5').update(chunkBuffer)
|
||||
await chunkBackupPersistorForProject.sendStream(
|
||||
chunksBucket,
|
||||
makeChunkKey(historyId, chunkToBackup.startVersion),
|
||||
Stream.Readable.from([chunkBuffer]),
|
||||
{
|
||||
contentType: 'application/json',
|
||||
contentEncoding: 'gzip',
|
||||
contentLength: chunkBuffer.byteLength,
|
||||
sourceMd5: md5.digest('hex'),
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async function updateBackupStatus(
|
||||
projectId,
|
||||
lastBackedUpVersion,
|
||||
chunkRecord,
|
||||
startOfBackupTime
|
||||
) {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would set backup version to',
|
||||
chunkRecord.endVersion,
|
||||
'with lastBackedUpTimestamp',
|
||||
startOfBackupTime
|
||||
)
|
||||
return
|
||||
}
|
||||
logger.info(
|
||||
{ projectId, chunkRecord, startOfBackupTime },
|
||||
'setting backupVersion and lastBackedUpTimestamp'
|
||||
)
|
||||
await setBackupVersion(
|
||||
projectId,
|
||||
lastBackedUpVersion,
|
||||
chunkRecord.endVersion,
|
||||
startOfBackupTime
|
||||
)
|
||||
}
|
||||
|
||||
// Define command-line options
|
||||
const optionDefinitions = [
|
||||
{
|
||||
name: 'projectId',
|
||||
alias: 'p',
|
||||
type: String,
|
||||
description: 'The ID of the project to backup',
|
||||
defaultOption: true,
|
||||
},
|
||||
{
|
||||
name: 'help',
|
||||
alias: 'h',
|
||||
type: Boolean,
|
||||
description: 'Display this usage guide.',
|
||||
},
|
||||
{
|
||||
name: 'status',
|
||||
alias: 's',
|
||||
type: Boolean,
|
||||
description: 'Display project status.',
|
||||
},
|
||||
{
|
||||
name: 'list',
|
||||
alias: 'l',
|
||||
type: Boolean,
|
||||
description: 'List projects that need to be backed up',
|
||||
},
|
||||
{
|
||||
name: 'dry-run',
|
||||
alias: 'n',
|
||||
type: Boolean,
|
||||
description: 'Perform a dry run without making any changes.',
|
||||
},
|
||||
{
|
||||
name: 'retries',
|
||||
alias: 'r',
|
||||
type: Number,
|
||||
description: 'Number of retries, default is 3.',
|
||||
},
|
||||
{
|
||||
name: 'concurrency',
|
||||
alias: 'c',
|
||||
type: Number,
|
||||
description: 'Number of concurrent blob downloads (default: 1)',
|
||||
},
|
||||
{
|
||||
name: 'batch-concurrency',
|
||||
alias: 'b',
|
||||
type: Number,
|
||||
description: 'Number of concurrent project operations (default: 1)',
|
||||
},
|
||||
{
|
||||
name: 'pending',
|
||||
alias: 'P',
|
||||
type: Boolean,
|
||||
description: 'Backup all pending projects.',
|
||||
},
|
||||
{
|
||||
name: 'interval',
|
||||
alias: 'i',
|
||||
type: Number,
|
||||
description: 'Time interval in seconds for pending backups (default: 3600)',
|
||||
defaultValue: 3600,
|
||||
},
|
||||
{
|
||||
name: 'init',
|
||||
alias: 'I',
|
||||
type: Boolean,
|
||||
description: 'Initialize backups for all projects.',
|
||||
},
|
||||
{
|
||||
name: 'start-date',
|
||||
type: String,
|
||||
description: 'Start date for initialization (ISO format)',
|
||||
},
|
||||
{
|
||||
name: 'end-date',
|
||||
type: String,
|
||||
description: 'End date for initialization (ISO format)',
|
||||
},
|
||||
{
|
||||
name: 'compare',
|
||||
alias: 'C',
|
||||
type: Boolean,
|
||||
description:
|
||||
'Compare backup with original chunks. With --start-date and --end-date compares all projects in range.',
|
||||
},
|
||||
]
|
||||
|
||||
function handleOptions() {
|
||||
const options = commandLineArgs(optionDefinitions)
|
||||
|
||||
if (options.help) {
|
||||
console.log('Usage:')
|
||||
optionDefinitions.forEach(option => {
|
||||
console.log(` --${option.name}, -${option.alias}: ${option.description}`)
|
||||
})
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
const projectIdRequired =
|
||||
!options.list &&
|
||||
!options.pending &&
|
||||
!options.init &&
|
||||
!(options.compare && options['start-date'] && options['end-date'])
|
||||
|
||||
if (projectIdRequired && !options.projectId) {
|
||||
console.error('Error: projectId is required')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (options.pending && options.projectId) {
|
||||
console.error('Error: --pending cannot be specified with projectId')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (options.pending && (options.list || options.status)) {
|
||||
console.error('Error: --pending is exclusive with --list and --status')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (options.init && options.pending) {
|
||||
console.error('Error: --init cannot be specified with --pending')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (
|
||||
(options['start-date'] || options['end-date']) &&
|
||||
!options.init &&
|
||||
!options.compare
|
||||
) {
|
||||
console.error(
|
||||
'Error: date options can only be used with --init or --compare'
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (
|
||||
options.compare &&
|
||||
!options.projectId &&
|
||||
!(options['start-date'] && options['end-date'])
|
||||
) {
|
||||
console.error(
|
||||
'Error: --compare requires either projectId or both --start-date and --end-date'
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
DRY_RUN = options['dry-run'] || false
|
||||
RETRY_LIMIT = options.retries || 3
|
||||
CONCURRENCY = options.concurrency || 1
|
||||
BATCH_CONCURRENCY = options['batch-concurrency'] || 1
|
||||
BLOB_LIMITER = pLimit(CONCURRENCY)
|
||||
return options
|
||||
}
|
||||
|
||||
async function displayBackupStatus(projectId) {
|
||||
const result = await analyseBackupStatus(projectId)
|
||||
console.log('Backup status:', JSON.stringify(result))
|
||||
}
|
||||
|
||||
async function analyseBackupStatus(projectId) {
|
||||
const { backupStatus, historyId, currentEndVersion, currentEndTimestamp } =
|
||||
await getBackupStatus(projectId)
|
||||
// TODO: when we have confidence that the latestChunkMetadata always matches
|
||||
// the values from the backupStatus we can skip loading it here
|
||||
const latestChunkMetadata = await loadLatestRaw(historyId)
|
||||
if (
|
||||
currentEndVersion &&
|
||||
currentEndVersion !== latestChunkMetadata.endVersion
|
||||
) {
|
||||
// compare the current end version with the latest chunk metadata to check that
|
||||
// the updates to the project collection are reliable
|
||||
// expect some failures due to the time window between getBackupStatus and
|
||||
// loadLatestRaw where the project is being actively edited.
|
||||
logger.warn(
|
||||
{
|
||||
projectId,
|
||||
historyId,
|
||||
currentEndVersion,
|
||||
currentEndTimestamp,
|
||||
latestChunkMetadata,
|
||||
},
|
||||
'currentEndVersion does not match latest chunk metadata'
|
||||
)
|
||||
}
|
||||
|
||||
if (DRY_RUN) {
|
||||
console.log('Project:', projectId)
|
||||
console.log('History ID:', historyId)
|
||||
console.log('Latest Chunk Metadata:', JSON.stringify(latestChunkMetadata))
|
||||
console.log('Current end version:', currentEndVersion)
|
||||
console.log('Current end timestamp:', currentEndTimestamp)
|
||||
console.log('Backup status:', backupStatus ?? 'none')
|
||||
}
|
||||
if (!backupStatus) {
|
||||
if (DRY_RUN) {
|
||||
console.log('No backup status found - doing full backup')
|
||||
}
|
||||
}
|
||||
const lastBackedUpVersion = backupStatus?.lastBackedUpVersion
|
||||
const endVersion = latestChunkMetadata.endVersion
|
||||
if (endVersion >= 0 && endVersion === lastBackedUpVersion) {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Project is up to date, last backed up at version',
|
||||
lastBackedUpVersion
|
||||
)
|
||||
}
|
||||
} else if (endVersion < lastBackedUpVersion) {
|
||||
throw new Error('backup is ahead of project')
|
||||
} else {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Project needs to be backed up from',
|
||||
lastBackedUpVersion,
|
||||
'to',
|
||||
endVersion
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
historyId,
|
||||
lastBackedUpVersion,
|
||||
currentVersion: latestChunkMetadata.endVersion || 0,
|
||||
upToDate: endVersion >= 0 && lastBackedUpVersion === endVersion,
|
||||
pendingChangeAt: backupStatus?.pendingChangeAt,
|
||||
currentEndVersion,
|
||||
currentEndTimestamp,
|
||||
latestChunkMetadata,
|
||||
}
|
||||
}
|
||||
|
||||
async function displayPendingBackups(options) {
|
||||
const intervalMs = options.interval * 1000
|
||||
for await (const project of listPendingBackups(intervalMs)) {
|
||||
console.log(
|
||||
'Project:',
|
||||
project._id.toHexString(),
|
||||
'backup status:',
|
||||
JSON.stringify(project.overleaf.backup),
|
||||
'history status:',
|
||||
JSON.stringify(project.overleaf.history, [
|
||||
'currentEndVersion',
|
||||
'currentEndTimestamp',
|
||||
])
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
function makeChunkKey(projectId, startVersion) {
|
||||
return path.join(projectKey.format(projectId), projectKey.pad(startVersion))
|
||||
}
|
||||
|
||||
async function backupProject(projectId, options) {
|
||||
// FIXME: flush the project first!
|
||||
// Let's assume the the flush happens externally and triggers this backup
|
||||
const backupStartTime = new Date()
|
||||
// find the last backed up version
|
||||
const {
|
||||
historyId,
|
||||
lastBackedUpVersion,
|
||||
currentVersion,
|
||||
upToDate,
|
||||
pendingChangeAt,
|
||||
currentEndVersion,
|
||||
latestChunkMetadata,
|
||||
} = await analyseBackupStatus(projectId)
|
||||
|
||||
if (upToDate) {
|
||||
logger.info(
|
||||
{
|
||||
projectId,
|
||||
historyId,
|
||||
lastBackedUpVersion,
|
||||
currentVersion,
|
||||
pendingChangeAt,
|
||||
},
|
||||
'backup is up to date'
|
||||
)
|
||||
|
||||
if (
|
||||
currentEndVersion === undefined &&
|
||||
latestChunkMetadata.endVersion >= 0
|
||||
) {
|
||||
if (DRY_RUN) {
|
||||
console.log('Would update current metadata to', latestChunkMetadata)
|
||||
} else {
|
||||
await updateCurrentMetadataIfNotSet(projectId, latestChunkMetadata)
|
||||
}
|
||||
}
|
||||
|
||||
// clear the pending changes timestamp if the backup is complete
|
||||
if (pendingChangeAt) {
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would update or clear pending changes timestamp',
|
||||
backupStartTime
|
||||
)
|
||||
} else {
|
||||
await updatePendingChangeTimestamp(projectId, backupStartTime)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(
|
||||
{
|
||||
projectId,
|
||||
historyId,
|
||||
lastBackedUpVersion,
|
||||
currentVersion,
|
||||
pendingChangeAt,
|
||||
},
|
||||
'backing up project'
|
||||
)
|
||||
|
||||
// this persistor works for both the chunks and blobs buckets,
|
||||
// because they use the same DEK
|
||||
const backupPersistorForProject = await backupPersistor.forProject(
|
||||
chunksBucket,
|
||||
makeProjectKey(historyId, '')
|
||||
)
|
||||
|
||||
let previousBackedUpVersion = lastBackedUpVersion
|
||||
|
||||
for await (const {
|
||||
blobsToBackup,
|
||||
chunkToBackup,
|
||||
chunkRecord,
|
||||
chunkBuffer,
|
||||
} of backupGenerator(historyId, lastBackedUpVersion)) {
|
||||
// backup the blobs first
|
||||
// this can be done in parallel but must fail if any blob cannot be backed up
|
||||
// if the blob already exists in the backup then that is allowed
|
||||
const newBlobs = await findNewBlobs(projectId, blobsToBackup)
|
||||
|
||||
await backupBlobs(
|
||||
projectId,
|
||||
historyId,
|
||||
newBlobs,
|
||||
BLOB_LIMITER,
|
||||
backupPersistorForProject
|
||||
)
|
||||
|
||||
// then backup the original compressed chunk using the startVersion as the key
|
||||
await backupChunk(
|
||||
projectId,
|
||||
historyId,
|
||||
backupPersistorForProject,
|
||||
chunkToBackup,
|
||||
chunkRecord,
|
||||
chunkBuffer
|
||||
)
|
||||
|
||||
// persist the backup status in mongo for the current chunk
|
||||
await updateBackupStatus(
|
||||
projectId,
|
||||
previousBackedUpVersion,
|
||||
chunkRecord,
|
||||
backupStartTime
|
||||
)
|
||||
|
||||
previousBackedUpVersion = chunkRecord.endVersion
|
||||
|
||||
await cleanBackedUpBlobs(projectId, blobsToBackup)
|
||||
}
|
||||
|
||||
// update the current end version and timestamp if they are not set
|
||||
if (currentEndVersion === undefined && latestChunkMetadata.endVersion >= 0) {
|
||||
if (DRY_RUN) {
|
||||
console.log('Would update current metadata to', latestChunkMetadata)
|
||||
} else {
|
||||
await updateCurrentMetadataIfNotSet(projectId, latestChunkMetadata)
|
||||
}
|
||||
}
|
||||
|
||||
// clear the pending changes timestamp if the backup is complete, otherwise set it to the time
|
||||
// when the backup started (to pick up the new changes on the next backup)
|
||||
if (DRY_RUN) {
|
||||
console.log(
|
||||
'Would update or clear pending changes timestamp',
|
||||
backupStartTime
|
||||
)
|
||||
} else {
|
||||
await updatePendingChangeTimestamp(projectId, backupStartTime)
|
||||
}
|
||||
}
|
||||
|
||||
function convertToISODate(dateStr) {
|
||||
if (!dateStr) return undefined
|
||||
// Expecting YYYY-MM-DD format
|
||||
if (!/^\d{4}-\d{2}-\d{2}$/.test(dateStr)) {
|
||||
throw new Error('Date must be in YYYY-MM-DD format')
|
||||
}
|
||||
return new Date(dateStr + 'T00:00:00.000Z').toISOString()
|
||||
}
|
||||
|
||||
async function initializeProjects(options) {
|
||||
const limiter = pLimit(BATCH_CONCURRENCY)
|
||||
|
||||
async function processBatch(batch) {
|
||||
const batchOperations = batch.map(project =>
|
||||
limiter(backupProject, project._id.toHexString(), options)
|
||||
)
|
||||
await Promise.allSettled(batchOperations)
|
||||
}
|
||||
|
||||
const query = {
|
||||
'overleaf.history.id': { $exists: true },
|
||||
'overleaf.backup.lastBackedUpVersion': { $exists: false },
|
||||
'overleaf.backup.pendingChangeAt': { $exists: false },
|
||||
}
|
||||
|
||||
await batchedUpdate(
|
||||
client.db().collection('projects'),
|
||||
query,
|
||||
processBatch,
|
||||
{
|
||||
_id: 1,
|
||||
},
|
||||
{ readPreference: 'secondary' },
|
||||
{
|
||||
BATCH_RANGE_START: convertToISODate(options['start-date']),
|
||||
BATCH_RANGE_END: convertToISODate(options['end-date']),
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async function backupPendingProjects(options) {
|
||||
const intervalMs = options.interval * 1000
|
||||
for await (const project of listPendingBackups(intervalMs)) {
|
||||
const projectId = project._id.toHexString()
|
||||
console.log(`Backing up pending project with ID: ${projectId}`)
|
||||
await backupProject(projectId, options)
|
||||
}
|
||||
}
|
||||
|
||||
class BlobComparator {
|
||||
constructor(backupPersistorForProject) {
|
||||
this.cache = new Map()
|
||||
this.backupPersistorForProject = backupPersistorForProject
|
||||
}
|
||||
|
||||
async compareBlob(historyId, blob) {
|
||||
let computedHash = this.cache.get(blob.hash)
|
||||
const fromCache = !!computedHash
|
||||
|
||||
if (!computedHash) {
|
||||
const blobKey = makeProjectKey(historyId, blob.hash)
|
||||
const backupBlobStream =
|
||||
await this.backupPersistorForProject.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
blobKey,
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
computedHash = await blobHashFromStream(blob.byteLength, backupBlobStream)
|
||||
this.cache.set(blob.hash, computedHash)
|
||||
}
|
||||
|
||||
const matches = computedHash === blob.hash
|
||||
return {
|
||||
matches,
|
||||
computedHash,
|
||||
fromCache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function compareBackups(projectId, options) {
|
||||
console.log(`Comparing backups for project ${projectId}`)
|
||||
const { historyId } = await getBackupStatus(projectId)
|
||||
const chunks = await getProjectChunks(historyId)
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const backupPersistorForProject = await backupPersistor.forProject(
|
||||
chunksBucket,
|
||||
makeProjectKey(historyId, '')
|
||||
)
|
||||
|
||||
let totalChunkMatches = 0
|
||||
let totalChunkMismatches = 0
|
||||
let totalChunksNotFound = 0
|
||||
let totalBlobMatches = 0
|
||||
let totalBlobMismatches = 0
|
||||
let totalBlobsNotFound = 0
|
||||
const errors = []
|
||||
const blobComparator = new BlobComparator(backupPersistorForProject)
|
||||
|
||||
for (const chunk of chunks) {
|
||||
try {
|
||||
// Compare chunk content
|
||||
const originalChunk = await historyStore.loadRaw(historyId, chunk.id)
|
||||
const key = makeChunkKey(historyId, chunk.startVersion)
|
||||
try {
|
||||
const backupChunkStream =
|
||||
await backupPersistorForProject.getObjectStream(chunksBucket, key)
|
||||
const backupStr = await text(backupChunkStream.pipe(createGunzip()))
|
||||
const originalStr = JSON.stringify(originalChunk)
|
||||
const backupChunk = JSON.parse(backupStr)
|
||||
const backupStartVersion = chunk.startVersion
|
||||
const backupEndVersion = chunk.startVersion + backupChunk.changes.length
|
||||
|
||||
if (originalStr === backupStr) {
|
||||
console.log(
|
||||
`✓ Chunk ${chunk.id} (v${chunk.startVersion}-v${chunk.endVersion}) matches`
|
||||
)
|
||||
totalChunkMatches++
|
||||
} else if (originalStr === JSON.stringify(JSON.parse(backupStr))) {
|
||||
console.log(
|
||||
`✓ Chunk ${chunk.id} (v${chunk.startVersion}-v${chunk.endVersion}) matches (after normalisation)`
|
||||
)
|
||||
totalChunkMatches++
|
||||
} else if (backupEndVersion < chunk.endVersion) {
|
||||
console.log(
|
||||
`✗ Chunk ${chunk.id} is ahead of backup (v${chunk.startVersion}-v${chunk.endVersion} vs v${backupStartVersion}-v${backupEndVersion})`
|
||||
)
|
||||
totalChunkMismatches++
|
||||
errors.push({ chunkId: chunk.id, error: 'Chunk ahead of backup' })
|
||||
} else {
|
||||
console.log(
|
||||
`✗ Chunk ${chunk.id} (v${chunk.startVersion}-v${chunk.endVersion}) MISMATCH`
|
||||
)
|
||||
totalChunkMismatches++
|
||||
errors.push({ chunkId: chunk.id, error: 'Chunk mismatch' })
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof NotFoundError) {
|
||||
console.log(`✗ Chunk ${chunk.id} not found in backup`)
|
||||
totalChunksNotFound++
|
||||
errors.push({ chunkId: chunk.id, error: `Chunk not found` })
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
const history = History.fromRaw(originalChunk)
|
||||
|
||||
// Compare blobs in chunk
|
||||
const blobHashes = new Set()
|
||||
history.findBlobHashes(blobHashes)
|
||||
const blobs = await blobStore.getBlobs(Array.from(blobHashes))
|
||||
for (const blob of blobs) {
|
||||
if (GLOBAL_BLOBS.has(blob.hash)) {
|
||||
const globalBlob = GLOBAL_BLOBS.get(blob.hash)
|
||||
console.log(
|
||||
` ✓ Blob ${blob.hash} is a global blob`,
|
||||
globalBlob.demoted ? '(demoted)' : ''
|
||||
)
|
||||
continue
|
||||
}
|
||||
try {
|
||||
const { matches, computedHash, fromCache } =
|
||||
await blobComparator.compareBlob(historyId, blob)
|
||||
|
||||
if (matches) {
|
||||
console.log(
|
||||
` ✓ Blob ${blob.hash} hash matches (${blob.byteLength} bytes)` +
|
||||
(fromCache ? ' (from cache)' : '')
|
||||
)
|
||||
totalBlobMatches++
|
||||
} else {
|
||||
console.log(
|
||||
` ✗ Blob ${blob.hash} hash mismatch (original: ${blob.hash}, backup: ${computedHash}) (${blob.byteLength} bytes, ${blob.stringLength} string length)` +
|
||||
(fromCache ? ' (from cache)' : '')
|
||||
)
|
||||
totalBlobMismatches++
|
||||
errors.push({
|
||||
chunkId: chunk.id,
|
||||
error: `Blob ${blob.hash} hash mismatch`,
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof NotFoundError) {
|
||||
console.log(` ✗ Blob ${blob.hash} not found in backup`)
|
||||
totalBlobsNotFound++
|
||||
errors.push({
|
||||
chunkId: chunk.id,
|
||||
error: `Blob ${blob.hash} not found`,
|
||||
})
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Error comparing chunk ${chunk.id}:`, err)
|
||||
errors.push({ chunkId: chunk.id, error: err })
|
||||
}
|
||||
}
|
||||
|
||||
// Print summary
|
||||
console.log('\nComparison Summary:')
|
||||
console.log('==================')
|
||||
console.log(`Total chunks: ${chunks.length}`)
|
||||
console.log(`Chunk matches: ${totalChunkMatches}`)
|
||||
console.log(`Chunk mismatches: ${totalChunkMismatches}`)
|
||||
console.log(`Chunk not found: ${totalChunksNotFound}`)
|
||||
console.log(`Blob matches: ${totalBlobMatches}`)
|
||||
console.log(`Blob mismatches: ${totalBlobMismatches}`)
|
||||
console.log(`Blob not found: ${totalBlobsNotFound}`)
|
||||
console.log(`Errors: ${errors.length}`)
|
||||
|
||||
if (errors.length > 0) {
|
||||
console.log('\nErrors:')
|
||||
errors.forEach(({ chunkId, error }) => {
|
||||
console.log(` Chunk ${chunkId}: ${error}`)
|
||||
})
|
||||
throw new Error('Backup comparison FAILED')
|
||||
} else {
|
||||
console.log('Backup comparison successful')
|
||||
}
|
||||
}
|
||||
|
||||
async function compareAllProjects(options) {
|
||||
const limiter = pLimit(BATCH_CONCURRENCY)
|
||||
let totalErrors = 0
|
||||
let totalProjects = 0
|
||||
|
||||
async function processBatch(batch) {
|
||||
const batchOperations = batch.map(project =>
|
||||
limiter(async () => {
|
||||
const projectId = project._id.toHexString()
|
||||
totalProjects++
|
||||
try {
|
||||
console.log(`\nComparing project ${projectId} (${totalProjects})`)
|
||||
await compareBackups(projectId, options)
|
||||
} catch (err) {
|
||||
totalErrors++
|
||||
console.error(`Failed to compare project ${projectId}:`, err)
|
||||
}
|
||||
})
|
||||
)
|
||||
await Promise.allSettled(batchOperations)
|
||||
}
|
||||
|
||||
const query = {
|
||||
'overleaf.history.id': { $exists: true },
|
||||
'overleaf.backup.lastBackedUpVersion': { $exists: true },
|
||||
}
|
||||
|
||||
await batchedUpdate(
|
||||
client.db().collection('projects'),
|
||||
query,
|
||||
processBatch,
|
||||
{
|
||||
_id: 1,
|
||||
'overleaf.history': 1,
|
||||
'overleaf.backup': 1,
|
||||
},
|
||||
{ readPreference: 'secondary' },
|
||||
{
|
||||
BATCH_RANGE_START: convertToISODate(options['start-date']),
|
||||
BATCH_RANGE_END: convertToISODate(options['end-date']),
|
||||
}
|
||||
)
|
||||
|
||||
console.log('\nComparison Summary:')
|
||||
console.log('==================')
|
||||
console.log(`Total projects processed: ${totalProjects}`)
|
||||
console.log(`Projects with errors: ${totalErrors}`)
|
||||
|
||||
if (totalErrors > 0) {
|
||||
throw new Error('Some project comparisons failed')
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const options = handleOptions()
|
||||
await loadGlobalBlobs()
|
||||
const projectId = options.projectId
|
||||
|
||||
if (options.status) {
|
||||
await displayBackupStatus(projectId)
|
||||
} else if (options.list) {
|
||||
await displayPendingBackups(options)
|
||||
} else if (options.pending) {
|
||||
await backupPendingProjects(options)
|
||||
} else if (options.init) {
|
||||
await initializeProjects(options)
|
||||
} else if (options.compare) {
|
||||
if (options['start-date'] && options['end-date']) {
|
||||
await compareAllProjects(options)
|
||||
} else {
|
||||
await compareBackups(projectId, options)
|
||||
}
|
||||
} else {
|
||||
await backupProject(projectId, options)
|
||||
}
|
||||
}
|
||||
|
||||
main()
|
||||
.then(() => {
|
||||
console.log('Completed')
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('Error backing up project:', err)
|
||||
process.exit(1)
|
||||
})
|
||||
.finally(() => {
|
||||
knex
|
||||
.destroy()
|
||||
.then(() => {
|
||||
console.log('Postgres connection closed')
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('Error closing Postgres connection:', err)
|
||||
})
|
||||
client
|
||||
.close()
|
||||
.then(() => {
|
||||
console.log('MongoDB connection closed')
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('Error closing MongoDB connection:', err)
|
||||
})
|
||||
})
|
||||
@@ -0,0 +1,104 @@
|
||||
// @ts-check
|
||||
import { readFileSync } from 'node:fs'
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import { client } from '../lib/mongodb.js'
|
||||
import {
|
||||
getBackedUpBlobHashes,
|
||||
unsetBackedUpBlobHashes,
|
||||
} from '../lib/backup_store/index.js'
|
||||
|
||||
let gracefulShutdownInitiated = false
|
||||
|
||||
// Parse command line arguments
|
||||
const args = commandLineArgs([
|
||||
{ name: 'input', type: String, alias: 'i', defaultOption: true },
|
||||
{ name: 'commit', type: Boolean, default: false },
|
||||
])
|
||||
|
||||
if (!args.input) {
|
||||
console.error(
|
||||
'Usage: node remove_backed_up_blobs.mjs --input <csv-file> [--commit]'
|
||||
)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
if (!args.commit) {
|
||||
console.log('Running in dry-run mode. Use --commit to apply changes.')
|
||||
}
|
||||
|
||||
// Signal handling
|
||||
process.on('SIGINT', handleSignal)
|
||||
process.on('SIGTERM', handleSignal)
|
||||
|
||||
function handleSignal() {
|
||||
console.warn('Graceful shutdown initiated')
|
||||
gracefulShutdownInitiated = true
|
||||
}
|
||||
|
||||
// Process CSV and remove blobs
|
||||
async function main() {
|
||||
const projectBlobs = new Map()
|
||||
const lines = readFileSync(args.input, 'utf8').split('\n')
|
||||
const SHA1_HEX_REGEX = /^[a-f0-9]{40}$/
|
||||
|
||||
// Skip header
|
||||
for (const line of lines.slice(1)) {
|
||||
if (!line.trim() || gracefulShutdownInitiated) break
|
||||
|
||||
const [projectId, path] = line.split(',')
|
||||
const pathParts = path.split('/')
|
||||
const hash = pathParts[3] + pathParts[4]
|
||||
|
||||
if (!SHA1_HEX_REGEX.test(hash)) {
|
||||
console.warn(`Invalid SHA1 hash for project ${projectId}: ${hash}`)
|
||||
continue
|
||||
}
|
||||
|
||||
if (!projectBlobs.has(projectId)) {
|
||||
projectBlobs.set(projectId, new Set())
|
||||
}
|
||||
projectBlobs.get(projectId).add(hash)
|
||||
}
|
||||
|
||||
// Process each project
|
||||
for (const [projectId, hashes] of projectBlobs) {
|
||||
if (gracefulShutdownInitiated) break
|
||||
|
||||
if (!args.commit) {
|
||||
console.log(
|
||||
`DRY-RUN: would remove ${hashes.size} blobs from project ${projectId}`
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const originalHashes = await getBackedUpBlobHashes(projectId)
|
||||
if (originalHashes.size === 0) {
|
||||
continue
|
||||
}
|
||||
const result = await unsetBackedUpBlobHashes(
|
||||
projectId,
|
||||
Array.from(hashes)
|
||||
)
|
||||
if (result) {
|
||||
console.log(
|
||||
`Project ${projectId}: want to remove ${hashes.size}, removed ${originalHashes.size - result.blobs.length}, ${result.blobs.length} remaining`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Error updating project ${projectId}:`, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the script
|
||||
main()
|
||||
.catch(err => {
|
||||
console.error('Fatal error:', err)
|
||||
process.exitCode = 1
|
||||
})
|
||||
.finally(() => {
|
||||
client
|
||||
.close()
|
||||
.catch(err => console.error('Error closing MongoDB connection:', err))
|
||||
})
|
||||
@@ -0,0 +1,210 @@
|
||||
import commandLineArgs from 'command-line-args'
|
||||
import {
|
||||
loadAtVersion,
|
||||
getChunkMetadataForVersion,
|
||||
getProjectChunksFromVersion,
|
||||
} from '../lib/chunk_store/index.js'
|
||||
import { client } from '../lib/mongodb.js'
|
||||
import knex from '../lib/knex.js'
|
||||
import {
|
||||
loadGlobalBlobs,
|
||||
BlobStore,
|
||||
makeProjectKey,
|
||||
} from '../lib/blob_store/index.js'
|
||||
import { TextDecoder } from 'node:util'
|
||||
import { backupPersistor, projectBlobsBucket } from '../lib/backupPersistor.mjs'
|
||||
import fs from 'node:fs'
|
||||
import { pipeline } from 'node:stream/promises'
|
||||
import os from 'node:os'
|
||||
import path from 'node:path'
|
||||
import { createHash } from 'node:crypto'
|
||||
|
||||
const optionDefinitions = [
|
||||
{ name: 'historyId', alias: 'p', type: String },
|
||||
{ name: 'version', alias: 'v', type: Number },
|
||||
{ name: 'blob', alias: 'b', type: String },
|
||||
{ name: 'remote', alias: 'r', type: Boolean },
|
||||
{ name: 'keep', alias: 'k', type: Boolean },
|
||||
]
|
||||
|
||||
async function listChunks(historyId) {
|
||||
for await (const chunkRecord of getProjectChunksFromVersion(historyId, 0)) {
|
||||
console.log('Chunk record:', chunkRecord)
|
||||
}
|
||||
}
|
||||
|
||||
async function displayChunk(historyId, version) {
|
||||
await loadGlobalBlobs()
|
||||
const chunkRecord = await getChunkMetadataForVersion(historyId, version)
|
||||
const chunk = await loadAtVersion(historyId, version)
|
||||
console.log('Chunk record', chunkRecord)
|
||||
console.log('Start version', chunk.getStartVersion())
|
||||
console.log('Number of changes', chunk.getChanges().length)
|
||||
console.log(JSON.stringify(chunk))
|
||||
}
|
||||
|
||||
async function fetchBlobRemote(historyId, blobHash) {
|
||||
const backupPersistorForProject = await backupPersistor.forProject(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, '')
|
||||
)
|
||||
const blobKey = makeProjectKey(historyId, blobHash)
|
||||
return {
|
||||
stream: await backupPersistorForProject.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
blobKey,
|
||||
{ autoGunzip: true }
|
||||
),
|
||||
metadata: { hash: blobHash },
|
||||
source: 'remote backup',
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchBlobLocal(historyId, blobHash) {
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const blob = await blobStore.getBlob(blobHash)
|
||||
if (!blob) throw new Error(`Blob ${blobHash} not found`)
|
||||
return {
|
||||
stream: await blobStore.getStream(blobHash),
|
||||
metadata: blob,
|
||||
source: 'local storage',
|
||||
}
|
||||
}
|
||||
|
||||
async function displayBlobContent(filepath, metadata, source, blobHash) {
|
||||
console.log('Source:', source)
|
||||
console.log('Blob metadata:', metadata)
|
||||
|
||||
// Compute git hash using streaming
|
||||
const stat = fs.statSync(filepath)
|
||||
const header = `blob ${stat.size}\0`
|
||||
const hash = createHash('sha1')
|
||||
hash.update(header)
|
||||
|
||||
const hashStream = fs.createReadStream(filepath)
|
||||
for await (const chunk of hashStream) {
|
||||
hash.update(chunk)
|
||||
}
|
||||
const gitHash = hash.digest('hex')
|
||||
|
||||
// Check content type and display preview
|
||||
const fd = fs.openSync(filepath, 'r')
|
||||
try {
|
||||
const headBuf = Buffer.alloc(16)
|
||||
const tailBuf = Buffer.alloc(16)
|
||||
|
||||
try {
|
||||
// Stream through TextDecoderStream to check for valid UTF-8
|
||||
const textStream = fs.createReadStream(filepath)
|
||||
const decoder = new TextDecoder('utf-8', { fatal: true })
|
||||
for await (const chunk of textStream) {
|
||||
decoder.decode(chunk, { stream: true })
|
||||
}
|
||||
decoder.decode()
|
||||
// If we get here, it's valid UTF-8
|
||||
if (stat.size <= 1024) {
|
||||
console.log('Content (text):', await fs.readFileSync(filepath, 'utf8'))
|
||||
} else {
|
||||
console.log('Content (text, truncated):')
|
||||
console.log(` Length: ${stat.size} bytes`)
|
||||
fs.readSync(fd, headBuf, 0, 16, 0)
|
||||
fs.readSync(fd, tailBuf, 0, 16, stat.size - 16)
|
||||
console.log(
|
||||
' Content:',
|
||||
headBuf.toString('utf8') +
|
||||
' ...(truncated)... ' +
|
||||
tailBuf.toString('utf8')
|
||||
)
|
||||
}
|
||||
} catch (e) {
|
||||
// Binary content - show head and tail
|
||||
console.log('Content (binary):')
|
||||
console.log(` Length: ${stat.size} bytes`)
|
||||
|
||||
if (stat.size <= 32) {
|
||||
// Small file - read it all
|
||||
const buf = Buffer.alloc(stat.size)
|
||||
fs.readSync(fd, buf, 0, stat.size, 0)
|
||||
const hexBytes = buf.toString('hex').match(/../g).join(' ')
|
||||
console.log(' Bytes:', hexBytes)
|
||||
} else {
|
||||
// Read tail for large files
|
||||
fs.readSync(fd, headBuf, 0, 16, 0)
|
||||
fs.readSync(fd, tailBuf, 0, 16, stat.size - 16)
|
||||
const headHex = headBuf.toString('hex').match(/../g).join(' ')
|
||||
const tailHex = tailBuf.toString('hex').match(/../g).join(' ')
|
||||
console.log(' Bytes:', headHex + ' ... ' + tailHex)
|
||||
}
|
||||
console.log(' Git-style SHA1:', gitHash)
|
||||
if (gitHash !== blobHash) {
|
||||
console.log(' Warning: Git hash differs from blob hash!\x1b[0m')
|
||||
console.log(' Blob hash:', blobHash)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fs.closeSync(fd)
|
||||
}
|
||||
}
|
||||
|
||||
async function withTempDir(prefix, fn, options = {}) {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), prefix))
|
||||
try {
|
||||
return await Promise.resolve(fn(tmpDir))
|
||||
} finally {
|
||||
if (!options.keep) {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true })
|
||||
} else {
|
||||
console.log('Keeping temporary file:', path.join(tmpDir, 'blob'))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function displayBlob(historyId, blobHash, options) {
|
||||
try {
|
||||
const { stream, metadata, source } = await (options.remote
|
||||
? fetchBlobRemote(historyId, blobHash)
|
||||
: fetchBlobLocal(historyId, blobHash))
|
||||
|
||||
await withTempDir(
|
||||
'blob-show-',
|
||||
async tmpDir => {
|
||||
const tmpPath = path.join(tmpDir, 'blob')
|
||||
await pipeline(stream, fs.createWriteStream(tmpPath))
|
||||
await displayBlobContent(tmpPath, metadata, source, blobHash)
|
||||
},
|
||||
{ keep: options.keep }
|
||||
)
|
||||
} catch (err) {
|
||||
if (err.code === 'NoSuchKey') {
|
||||
throw new Error(`Blob ${blobHash} not found in backup`)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const { historyId, version, blob, remote, keep } =
|
||||
commandLineArgs(optionDefinitions)
|
||||
if (!historyId) {
|
||||
console.error('Error: --historyId is required.')
|
||||
process.exit(1)
|
||||
}
|
||||
if (version != null) {
|
||||
await displayChunk(historyId, version)
|
||||
} else if (blob != null) {
|
||||
await displayBlob(historyId, blob, { remote, keep })
|
||||
} else {
|
||||
await listChunks(historyId)
|
||||
}
|
||||
}
|
||||
|
||||
main()
|
||||
.then(() => console.log('Done.'))
|
||||
.catch(err => {
|
||||
console.error('Error:', err)
|
||||
process.exit(1)
|
||||
})
|
||||
.finally(() => {
|
||||
knex.destroy().catch(err => console.error('Error closing Postgres:', err))
|
||||
client.close().catch(err => console.error('Error closing MongoDB:', err))
|
||||
})
|
||||
@@ -0,0 +1,624 @@
|
||||
import config from 'config'
|
||||
import { ObjectId } from 'mongodb'
|
||||
import { expect } from 'chai'
|
||||
import {
|
||||
backedUpBlobs,
|
||||
client,
|
||||
globalBlobs,
|
||||
} from '../../../../storage/lib/mongodb.js'
|
||||
import persistor from '../../../../storage/lib/persistor.js'
|
||||
import {
|
||||
loadGlobalBlobs,
|
||||
BlobStore,
|
||||
makeProjectKey,
|
||||
} from '../../../../storage/lib/blob_store/index.js'
|
||||
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
import projectKey from '../../../../storage/lib/project_key.js'
|
||||
import { getBackupStatus } from '../../../../storage/lib/backup_store/index.js'
|
||||
import { text, buffer } from 'node:stream/consumers'
|
||||
import { createGunzip } from 'node:zlib'
|
||||
import { Change, Operation, File, TextOperation } from 'overleaf-editor-core'
|
||||
import ChunkStore from '../../../../storage/lib/chunk_store/index.js'
|
||||
import persistChanges from '../../../../storage/lib/persist_changes.js'
|
||||
import { historyStore } from '../../../../storage/lib/history_store.js'
|
||||
import { execFile } from 'node:child_process'
|
||||
import { promisify } from 'node:util'
|
||||
import testFiles from '../storage/support/test_files.js'
|
||||
import fs from 'node:fs'
|
||||
import {
|
||||
backupBlob,
|
||||
storeBlobBackup,
|
||||
} from '../../../../storage/lib/backupBlob.mjs'
|
||||
import {
|
||||
backupPersistor,
|
||||
projectBlobsBucket,
|
||||
chunksBucket,
|
||||
} from '../../../../storage/lib/backupPersistor.mjs'
|
||||
import { Readable } from 'node:stream'
|
||||
|
||||
const projectsCollection = client.db().collection('projects')
|
||||
|
||||
/**
|
||||
* @param {ObjectId} projectId
|
||||
* @param {number} version
|
||||
* @return {string}
|
||||
*/
|
||||
function makeChunkKey(projectId, version) {
|
||||
return projectKey.format(projectId) + '/' + projectKey.pad(version)
|
||||
}
|
||||
|
||||
describe('backup script', function () {
|
||||
let project
|
||||
let projectId, historyId
|
||||
let limitsToPersistImmediately
|
||||
|
||||
before(function () {
|
||||
// Used to provide a limit which forces us to persist all of the changes
|
||||
const farFuture = new Date()
|
||||
farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000)
|
||||
limitsToPersistImmediately = {
|
||||
minChangeTimestamp: farFuture,
|
||||
maxChangeTimestamp: farFuture,
|
||||
maxChanges: 10,
|
||||
maxChunkChanges: 10,
|
||||
}
|
||||
})
|
||||
|
||||
beforeEach(async function () {
|
||||
// Set up test projects with proper history metadata
|
||||
projectId = new ObjectId()
|
||||
historyId = projectId.toString()
|
||||
project = {
|
||||
_id: projectId,
|
||||
overleaf: {
|
||||
history: {
|
||||
id: historyId,
|
||||
currentEndVersion: 0, // Will be updated as changes are made
|
||||
currentEndTimestamp: new Date(), // Will be updated as changes are made
|
||||
},
|
||||
backup: {
|
||||
// Start with no backup state
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Pre-load the global blobs
|
||||
await loadGlobalBlobs()
|
||||
|
||||
// Clean up any pre-existing test data
|
||||
await projectsCollection.deleteMany({
|
||||
_id: projectId,
|
||||
})
|
||||
await backedUpBlobs.deleteMany({}) // Clear any existing backedUpBlobs entries
|
||||
})
|
||||
|
||||
describe('with simple project content', function () {
|
||||
const contentString = 'hello world'
|
||||
const newContentString = 'hello world more'
|
||||
const graphPngPath = testFiles.path('graph.png')
|
||||
const graphPngBuf = fs.readFileSync(graphPngPath)
|
||||
const graphPngHash = testFiles.GRAPH_PNG_HASH
|
||||
const nonBmpPath = testFiles.path('non_bmp.txt')
|
||||
const DUMMY_HASH = '1111111111111111111111111111111111111111'
|
||||
|
||||
beforeEach(async function () {
|
||||
// Create initial project
|
||||
await projectsCollection.insertOne(project)
|
||||
|
||||
// Initialize project in chunk store
|
||||
await ChunkStore.initializeProject(historyId)
|
||||
|
||||
const blobStore = new BlobStore(historyId)
|
||||
|
||||
// Create the blobs and then back them up using backupBlob
|
||||
const graphPngBlob = await blobStore.putFile(graphPngPath)
|
||||
await backupBlob(historyId, graphPngBlob, graphPngPath)
|
||||
|
||||
// Add initial content using persistChanges
|
||||
const file = File.fromString(contentString)
|
||||
const addFileOp = Operation.addFile('main.tex', file)
|
||||
const addGraphFileOp = Operation.addFile(
|
||||
'graph.png',
|
||||
File.fromHash(testFiles.GRAPH_PNG_HASH)
|
||||
)
|
||||
const change1 = new Change([addFileOp, addGraphFileOp], new Date(), [])
|
||||
|
||||
await persistChanges(historyId, [change1], limitsToPersistImmediately, 0)
|
||||
|
||||
// Add a second change with a proper TextOperation
|
||||
// For text operation: first number is how many chars to retain, then the text to insert
|
||||
const textOp = TextOperation.fromJSON({
|
||||
textOperation: [contentString.length, ' more'], // Keep existing content, append ' more'
|
||||
})
|
||||
const editOp = Operation.editFile('main.tex', textOp)
|
||||
const change2 = new Change([editOp], new Date(), [])
|
||||
|
||||
// store an unrelated hash in the backedUpBlobs collection,
|
||||
// so we can test that only the backed up hashes are cleared.
|
||||
await storeBlobBackup(historyId, DUMMY_HASH)
|
||||
|
||||
await persistChanges(historyId, [change2], limitsToPersistImmediately, 1)
|
||||
})
|
||||
|
||||
it('should perform an initial backup', async function () {
|
||||
// Run backup script for initial version
|
||||
const { stdout } = await runBackupScript(['--projectId', projectId])
|
||||
expect(stdout).to.not.include(
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
|
||||
// Verify backup state
|
||||
const result = await getBackupStatus(projectId)
|
||||
expect(result.backupStatus.lastBackedUpVersion).to.equal(2)
|
||||
expect(result.backupStatus.lastBackedUpAt).to.be.an.instanceOf(Date)
|
||||
expect(result.currentEndTimestamp).to.be.an.instanceOf(Date)
|
||||
expect(result.backupStatus.pendingChangeAt).to.be.undefined
|
||||
|
||||
// Verify graph.png blob was backed up
|
||||
const graphBlobStream = await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, graphPngHash),
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
const graphBlobContent = await buffer(graphBlobStream)
|
||||
expect(graphBlobContent.equals(graphPngBuf)).to.be.true
|
||||
|
||||
// Verify chunk was backed up
|
||||
const chunkStream = await backupPersistor.getObjectStream(
|
||||
chunksBucket,
|
||||
makeChunkKey(historyId, 0)
|
||||
)
|
||||
const chunkContent = await text(chunkStream.pipe(createGunzip()))
|
||||
const chunk = await ChunkStore.loadLatestRaw(historyId)
|
||||
const rawHistory = await historyStore.loadRaw(historyId, chunk.id)
|
||||
expect(JSON.parse(chunkContent)).to.deep.equal(rawHistory)
|
||||
|
||||
// Unrelated entries from backedUpBlobs should be not cleared
|
||||
const backedUpBlobsDoc = await backedUpBlobs.findOne({
|
||||
_id: project._id,
|
||||
})
|
||||
expect(backedUpBlobsDoc).not.to.be.null
|
||||
expect(backedUpBlobsDoc.blobs).to.have.length(1)
|
||||
expect(backedUpBlobsDoc.blobs[0].toString('hex')).to.equal(DUMMY_HASH)
|
||||
})
|
||||
|
||||
it('should perform an incremental backup', async function () {
|
||||
// Backup first version
|
||||
const { stdout: stdout1 } = await runBackupScript([
|
||||
'--projectId',
|
||||
projectId,
|
||||
])
|
||||
expect(stdout1).to.not.include(
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
|
||||
// Verify first backup
|
||||
const result1 = await getBackupStatus(projectId)
|
||||
expect(result1.backupStatus.lastBackedUpVersion).to.equal(2)
|
||||
|
||||
// Persist additional changes
|
||||
const additionalTextOp = TextOperation.fromJSON({
|
||||
textOperation: [newContentString.length, ' even more'], // Keep existing content, append ' even more'
|
||||
})
|
||||
const additionalEditOp = Operation.editFile('main.tex', additionalTextOp)
|
||||
const additionalChange = new Change([additionalEditOp], new Date(), [])
|
||||
|
||||
// add the nonbmp file
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const nonBmpBlob = await blobStore.putFile(nonBmpPath)
|
||||
await backupBlob(historyId, nonBmpBlob, nonBmpPath)
|
||||
|
||||
// Verify that the non-BMP file was backed up when the file was added
|
||||
const newBackedUpBlobs = await backedUpBlobs.findOne({
|
||||
_id: project._id,
|
||||
})
|
||||
expect(newBackedUpBlobs).not.to.be.null
|
||||
expect(newBackedUpBlobs.blobs).to.have.length(2)
|
||||
expect(
|
||||
newBackedUpBlobs.blobs.map(b => b.toString('hex'))
|
||||
).to.have.members([testFiles.NON_BMP_TXT_HASH, DUMMY_HASH])
|
||||
|
||||
const addNonBmpFileOp = Operation.addFile(
|
||||
'non_bmp.txt',
|
||||
File.fromHash(testFiles.NON_BMP_TXT_HASH)
|
||||
)
|
||||
const additionalChange2 = new Change([addNonBmpFileOp], new Date(), [])
|
||||
|
||||
await persistChanges(
|
||||
historyId,
|
||||
[additionalChange, additionalChange2],
|
||||
limitsToPersistImmediately,
|
||||
2
|
||||
)
|
||||
|
||||
const afterChangeResult = await getBackupStatus(projectId)
|
||||
// Verify that the currentEndVersion and currentEndTimestamp are updated
|
||||
expect(afterChangeResult.currentEndVersion).to.equal(4)
|
||||
expect(afterChangeResult.currentEndTimestamp)
|
||||
.to.be.an.instanceOf(Date)
|
||||
.and.to.be.greaterThan(result1.currentEndTimestamp)
|
||||
// Persisting a change should not modify the backup version and timestamp
|
||||
expect(afterChangeResult.backupStatus.lastBackedUpVersion).to.equal(2)
|
||||
expect(afterChangeResult.backupStatus.lastBackedUpAt)
|
||||
.to.be.an.instanceOf(Date)
|
||||
.and.to.deep.equal(result1.backupStatus.lastBackedUpAt)
|
||||
// but it should update the pendingChangeAt timestamp
|
||||
expect(afterChangeResult.backupStatus.pendingChangeAt)
|
||||
.to.be.an.instanceOf(Date)
|
||||
.and.to.be.greaterThan(result1.backupStatus.lastBackedUpAt)
|
||||
|
||||
// Second backup
|
||||
const { stdout: stdout2 } = await runBackupScript([
|
||||
'--projectId',
|
||||
projectId,
|
||||
])
|
||||
expect(stdout2).to.not.include(
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
|
||||
// Verify incremental backup
|
||||
const result2 = await getBackupStatus(projectId)
|
||||
// The backup version and timestamp should be updated
|
||||
expect(result2.backupStatus.lastBackedUpVersion).to.equal(4)
|
||||
expect(result2.backupStatus.lastBackedUpAt)
|
||||
.to.be.an.instanceOf(Date)
|
||||
.and.to.be.greaterThan(result1.backupStatus.lastBackedUpAt)
|
||||
// The currentEndVersion and currentEndTimestamp should not be modified
|
||||
expect(result2.currentEndVersion).to.equal(4)
|
||||
expect(result2.currentEndTimestamp)
|
||||
.to.be.an.instanceOf(Date)
|
||||
.and.to.deep.equal(afterChangeResult.currentEndTimestamp)
|
||||
// The pendingChangeAt timestamp should be cleared when the backup is complete
|
||||
expect(result2.backupStatus.pendingChangeAt).to.be.undefined
|
||||
|
||||
// Verify additional blob was backed up
|
||||
const newBlobStream = await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, testFiles.NON_BMP_TXT_HASH),
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
const newBlobContent = await buffer(newBlobStream)
|
||||
expect(newBlobContent).to.deep.equal(
|
||||
fs.readFileSync(testFiles.path('non_bmp.txt'))
|
||||
)
|
||||
|
||||
// Check chunk was backed up
|
||||
const chunkStream = await backupPersistor.getObjectStream(
|
||||
chunksBucket,
|
||||
makeChunkKey(historyId, 0)
|
||||
)
|
||||
const chunkContent = await text(chunkStream.pipe(createGunzip()))
|
||||
const chunk = await ChunkStore.loadLatestRaw(historyId)
|
||||
const rawHistory = await historyStore.loadRaw(historyId, chunk.id)
|
||||
expect(JSON.parse(chunkContent)).to.deep.equal(rawHistory)
|
||||
|
||||
// Unrelated entries from backedUpBlobs should be not cleared
|
||||
const backedUpBlobsDoc = await backedUpBlobs.findOne({
|
||||
_id: project._id,
|
||||
})
|
||||
expect(backedUpBlobsDoc).not.to.be.null
|
||||
expect(backedUpBlobsDoc.blobs).to.have.length(1)
|
||||
expect(backedUpBlobsDoc.blobs[0].toString('hex')).to.equal(DUMMY_HASH)
|
||||
})
|
||||
|
||||
it('should not backup global blobs', async function () {
|
||||
const globalBlobString = 'a'
|
||||
const globalBlobHash = testFiles.STRING_A_HASH
|
||||
await globalBlobs.insertOne({
|
||||
_id: globalBlobHash,
|
||||
byteLength: globalBlobString.length,
|
||||
stringLength: globalBlobString.length,
|
||||
})
|
||||
const bucket = config.get('blobStore.globalBucket')
|
||||
for (const { key, content } of [
|
||||
{
|
||||
key: '2e/65/efe2a145dda7ee51d1741299f848e5bf752e',
|
||||
content: globalBlobString,
|
||||
},
|
||||
]) {
|
||||
const stream = Readable.from([content])
|
||||
await persistor.sendStream(bucket, key, stream)
|
||||
}
|
||||
await loadGlobalBlobs()
|
||||
|
||||
// Create a change using the global blob
|
||||
const addFileOp = Operation.addFile(
|
||||
'global.tex',
|
||||
File.fromHash(globalBlobHash)
|
||||
)
|
||||
const change = new Change([addFileOp], new Date(), [])
|
||||
|
||||
await persistChanges(historyId, [change], limitsToPersistImmediately, 2)
|
||||
|
||||
// Run backup
|
||||
await runBackupScript(['--projectId', projectId])
|
||||
|
||||
// Verify global blob wasn't backed up
|
||||
try {
|
||||
await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, globalBlobHash),
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
expect.fail('Should not find global blob in project blobs')
|
||||
} catch (err) {
|
||||
expect(err).to.be.an.instanceOf(NotFoundError)
|
||||
}
|
||||
})
|
||||
|
||||
it('should back up global blobs if they are demoted', async function () {
|
||||
const demotedBlobString = 'ab'
|
||||
const demotedBlobHash = testFiles.STRING_AB_HASH
|
||||
await globalBlobs.insertOne({
|
||||
_id: demotedBlobHash,
|
||||
byteLength: demotedBlobString.length,
|
||||
stringLength: demotedBlobString.length,
|
||||
demoted: true,
|
||||
})
|
||||
const bucket = config.get('blobStore.globalBucket')
|
||||
for (const { key, content } of [
|
||||
{
|
||||
key: '9a/e9/e86b7bd6cb1472d9373702d8249973da0832',
|
||||
content: demotedBlobString,
|
||||
},
|
||||
]) {
|
||||
const stream = Readable.from([content])
|
||||
await persistor.sendStream(bucket, key, stream)
|
||||
}
|
||||
await loadGlobalBlobs()
|
||||
|
||||
// Create a change using the global blob
|
||||
const addFileOp = Operation.addFile(
|
||||
'demoted.tex',
|
||||
File.fromHash(demotedBlobHash)
|
||||
)
|
||||
const change = new Change([addFileOp], new Date(), [])
|
||||
|
||||
await persistChanges(historyId, [change], limitsToPersistImmediately, 2)
|
||||
|
||||
// Run backup
|
||||
const { stdout } = await runBackupScript(['--projectId', projectId])
|
||||
expect(stdout).to.not.include(
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
|
||||
// Check chunk was backed up
|
||||
const chunkStream = await backupPersistor.getObjectStream(
|
||||
chunksBucket,
|
||||
makeChunkKey(historyId, 0)
|
||||
)
|
||||
const chunkContent = await text(chunkStream.pipe(createGunzip()))
|
||||
const chunk = await ChunkStore.loadLatestRaw(historyId)
|
||||
const rawHistory = await historyStore.loadRaw(historyId, chunk.id)
|
||||
expect(JSON.parse(chunkContent)).to.deep.equal(rawHistory)
|
||||
|
||||
// Verify that the demoted global blob was backed up
|
||||
try {
|
||||
const demotedBlobStream = await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, demotedBlobHash),
|
||||
{
|
||||
autoGunzip: true,
|
||||
}
|
||||
)
|
||||
const demotedBlobContent = await buffer(demotedBlobStream)
|
||||
expect(demotedBlobContent).to.deep.equal(Buffer.from(demotedBlobString))
|
||||
} catch (err) {
|
||||
expect.fail('Should find demoted global blob in project blobs')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe('with complex project content', function () {
|
||||
beforeEach(async function () {
|
||||
// Create initial project
|
||||
await projectsCollection.insertOne(project)
|
||||
|
||||
// Initialize project in chunk store
|
||||
await ChunkStore.initializeProject(historyId)
|
||||
|
||||
const blobStore = new BlobStore(historyId)
|
||||
|
||||
// Set up test files with varying content
|
||||
const testFilesData = {
|
||||
mainTex: { name: 'main.tex', content: 'Initial content' },
|
||||
chapter1: { name: 'chapter1.tex', content: 'Chapter 1 content' },
|
||||
chapter2: { name: 'chapter2.tex', content: 'Chapter 2 content' },
|
||||
bibliography: {
|
||||
name: 'bibliography.bib',
|
||||
content: '@article{key1,\n title={Title1}\n}',
|
||||
newContent: '@article{key2,\n title={Title2}\n}',
|
||||
},
|
||||
graph: {
|
||||
name: 'graph.png',
|
||||
path: testFiles.path('graph.png'),
|
||||
hash: testFiles.GRAPH_PNG_HASH,
|
||||
},
|
||||
unicodeFile: {
|
||||
name: 'unicodeFile.tex',
|
||||
path: testFiles.path('non_bmp.txt'),
|
||||
hash: testFiles.NON_BMP_TXT_HASH,
|
||||
},
|
||||
}
|
||||
|
||||
const textFiles = [
|
||||
testFilesData.mainTex,
|
||||
testFilesData.chapter1,
|
||||
testFilesData.chapter2,
|
||||
testFilesData.bibliography,
|
||||
]
|
||||
const binaryFiles = [testFilesData.graph, testFilesData.unicodeFile]
|
||||
|
||||
// Add binary files first
|
||||
await Promise.all(binaryFiles.map(file => blobStore.putFile(file.path)))
|
||||
|
||||
// Back up the binary files
|
||||
await Promise.all(
|
||||
binaryFiles.map(async file => {
|
||||
await backupBlob(
|
||||
historyId,
|
||||
await blobStore.putFile(file.path),
|
||||
file.path
|
||||
)
|
||||
})
|
||||
)
|
||||
|
||||
// Create operations to add all files initially
|
||||
const addFileOperations = Object.values(testFilesData).map(file => {
|
||||
if (file.path) {
|
||||
return Operation.addFile(file.name, File.fromHash(file.hash))
|
||||
}
|
||||
return Operation.addFile(file.name, File.fromString(file.content))
|
||||
})
|
||||
|
||||
// Initial change adding all files
|
||||
const initialChange = new Change(addFileOperations, new Date(), [])
|
||||
await persistChanges(
|
||||
historyId,
|
||||
[initialChange],
|
||||
limitsToPersistImmediately,
|
||||
0
|
||||
)
|
||||
|
||||
// Generate a series of edit operations for each text file
|
||||
const editOperations = []
|
||||
for (let i = 0; i < 50; i++) {
|
||||
const targetFile = textFiles[i % textFiles.length]
|
||||
if (!targetFile.path) {
|
||||
// Skip binary/unicode files
|
||||
const appendText = `\n\nEdit ${i + 1}`
|
||||
targetFile.content += appendText
|
||||
const textOp = TextOperation.fromJSON({
|
||||
textOperation: [
|
||||
targetFile.content.length - appendText.length,
|
||||
appendText,
|
||||
],
|
||||
})
|
||||
const editOp = Operation.editFile(targetFile.name, textOp)
|
||||
editOperations.push(new Change([editOp], new Date(), []))
|
||||
}
|
||||
}
|
||||
|
||||
// Add a delete operation
|
||||
const deleteChange = new Change(
|
||||
[Operation.removeFile(testFilesData.bibliography.name)],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
editOperations.push(deleteChange)
|
||||
|
||||
// Add the file back with different content
|
||||
const addBackChange = new Change(
|
||||
[
|
||||
Operation.addFile(
|
||||
testFilesData.bibliography.name,
|
||||
File.fromString(testFilesData.bibliography.newContent)
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
editOperations.push(addBackChange)
|
||||
// Persist all changes
|
||||
await persistChanges(
|
||||
historyId,
|
||||
editOperations,
|
||||
limitsToPersistImmediately,
|
||||
1
|
||||
)
|
||||
})
|
||||
|
||||
it('should backup all chunks and blobs from a complex project history', async function () {
|
||||
// Run backup script
|
||||
const { stdout } = await runBackupScript(['--projectId', projectId])
|
||||
expect(stdout).to.not.include(
|
||||
'warning: persistor not passed to backupBlob'
|
||||
)
|
||||
|
||||
// Verify backup state
|
||||
const result = await getBackupStatus(projectId)
|
||||
expect(result.backupStatus.lastBackedUpVersion).to.equal(53) // 1 initial change + 50 edits + 1 delete + 1 add back
|
||||
expect(result.backupStatus.lastBackedUpAt).to.be.an.instanceOf(Date)
|
||||
expect(result.currentEndTimestamp).to.be.an.instanceOf(Date)
|
||||
expect(result.backupStatus.pendingChangeAt).to.be.undefined
|
||||
|
||||
// Verify that binary files were backed up
|
||||
for (const hash of [
|
||||
testFiles.GRAPH_PNG_HASH,
|
||||
testFiles.NON_BMP_TXT_HASH,
|
||||
]) {
|
||||
const blobStream = await backupPersistor.getObjectStream(
|
||||
projectBlobsBucket,
|
||||
makeProjectKey(historyId, hash),
|
||||
{ autoGunzip: true }
|
||||
)
|
||||
expect(blobStream).to.exist
|
||||
}
|
||||
|
||||
// Get all chunks and verify they were backed up
|
||||
const listing = await backupPersistor
|
||||
._getClientForBucket(chunksBucket)
|
||||
.listObjectsV2({
|
||||
Bucket: chunksBucket,
|
||||
Prefix: projectKey.format(historyId) + '/',
|
||||
})
|
||||
.promise()
|
||||
const chunkKeys = listing.Contents.map(item => item.Key)
|
||||
expect(chunkKeys.length).to.equal(6) // Should have multiple chunks
|
||||
|
||||
const localChunks = await ChunkStore.getProjectChunks(historyId)
|
||||
const chunksByStartVersion = new Map()
|
||||
for (const chunkRecord of localChunks) {
|
||||
chunksByStartVersion.set(chunkRecord.startVersion, chunkRecord)
|
||||
}
|
||||
|
||||
// Verify the content of each chunk matches what's in the history store
|
||||
for (const chunkKey of chunkKeys) {
|
||||
const chunkStream = await backupPersistor.getObjectStream(
|
||||
chunksBucket,
|
||||
chunkKey
|
||||
)
|
||||
const chunkContent = await text(chunkStream.pipe(createGunzip()))
|
||||
const startVersion = parseInt(chunkKey.split('/').pop(), 10)
|
||||
const chunk = chunksByStartVersion.get(startVersion)
|
||||
const rawHistory = await historyStore.loadRaw(historyId, chunk.id)
|
||||
expect(JSON.parse(chunkContent)).to.deep.equal(rawHistory)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
/**
|
||||
* Run the backup script with given arguments
|
||||
* @param {string[]} args
|
||||
*/
|
||||
async function runBackupScript(args) {
|
||||
const TIMEOUT = 20 * 1000
|
||||
let result
|
||||
try {
|
||||
result = await promisify(execFile)(
|
||||
'node',
|
||||
['storage/scripts/backup.mjs', ...args],
|
||||
{
|
||||
encoding: 'utf-8',
|
||||
timeout: TIMEOUT,
|
||||
env: {
|
||||
...process.env,
|
||||
LOG_LEVEL: 'debug', // Override LOG_LEVEL of acceptance tests
|
||||
},
|
||||
}
|
||||
)
|
||||
result.status = 0
|
||||
} catch (err) {
|
||||
const { stdout, stderr, code } = err
|
||||
if (typeof code !== 'number') {
|
||||
console.log(err)
|
||||
}
|
||||
result = { stdout, stderr, status: code }
|
||||
}
|
||||
if (result.status !== 0) {
|
||||
console.log(result)
|
||||
}
|
||||
expect(result.status).to.equal(0)
|
||||
return result
|
||||
}
|
||||
@@ -5,8 +5,14 @@ import {
|
||||
makeBlobForFile,
|
||||
getStringLengthOfFile,
|
||||
makeProjectKey,
|
||||
BlobStore,
|
||||
} from '../../../../storage/lib/blob_store/index.js'
|
||||
import { backupBlob } from '../../../../storage/lib/backupBlob.mjs'
|
||||
import { Blob } from 'overleaf-editor-core'
|
||||
import { insertBlob } from '../../../../storage/lib/blob_store/mongo.js'
|
||||
import {
|
||||
backupBlob,
|
||||
downloadBlobToDir,
|
||||
} from '../../../../storage/lib/backupBlob.mjs'
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
import os from 'node:os'
|
||||
@@ -202,3 +208,71 @@ describe('backupBlob', function () {
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
describe('downloadBlobToDir', function () {
|
||||
let tmpDirDownload
|
||||
const historyId = 'abc123def456abc789def123'
|
||||
|
||||
before(async function () {
|
||||
tmpDirDownload = await fs.promises.mkdtemp(
|
||||
path.join(os.tmpdir(), 'downloadBlobTest-')
|
||||
)
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await fs.promises.rm(tmpDirDownload, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
it('should download the blob successfully', async function () {
|
||||
const data = 'hello world'
|
||||
// Use putString instead of writing a source file and using makeBlobForFile
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const blob = await blobStore.putString(data)
|
||||
|
||||
// Now call downloadBlobToDir which will use blobStore.getStream internally
|
||||
const downloadedFilePath = await downloadBlobToDir(
|
||||
historyId,
|
||||
blob,
|
||||
tmpDirDownload
|
||||
)
|
||||
const contents = await fs.promises.readFile(downloadedFilePath, 'utf8')
|
||||
expect(contents).to.equal(data)
|
||||
})
|
||||
|
||||
it('should delete the file on error (if file already exists)', async function () {
|
||||
const data = 'data that will not be written'
|
||||
const blobStore = new BlobStore(historyId)
|
||||
const blob = await blobStore.putString(data)
|
||||
const hash = blob.getHash()
|
||||
const fileName = `${historyId}-${hash}`
|
||||
|
||||
// Pre-create the destination file to trigger a failure due to an existing file
|
||||
const downloadedFilePath = path.join(tmpDirDownload, fileName)
|
||||
await fs.promises.writeFile(downloadedFilePath, 'preexisting content')
|
||||
|
||||
try {
|
||||
await downloadBlobToDir(historyId, blob, tmpDirDownload)
|
||||
expect.fail('should not reach here')
|
||||
} catch (error) {
|
||||
// Check that the file was deleted
|
||||
await expect(fs.promises.access(downloadedFilePath)).to.be.rejected
|
||||
}
|
||||
})
|
||||
|
||||
it('should not leave an empty file if download fails', async function () {
|
||||
// Create a blob with a hash that does not exist in the blob store
|
||||
const hash = '0000000000000000000000000000000000000000'
|
||||
const blob = new Blob(hash, 12, 12)
|
||||
await insertBlob(historyId, blob)
|
||||
const fileName = `${historyId}-${hash}`
|
||||
try {
|
||||
await downloadBlobToDir(historyId, blob, tmpDirDownload)
|
||||
expect.fail('should not reach here')
|
||||
} catch (error) {
|
||||
expect(error).to.be.instanceOf(Blob.NotFoundError)
|
||||
const downloadedFilePath = path.join(tmpDirDownload, fileName)
|
||||
// Check that the file was deleted
|
||||
await expect(fs.promises.access(downloadedFilePath)).to.be.rejected
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
@@ -0,0 +1,338 @@
|
||||
import { expect } from 'chai'
|
||||
import { backupGenerator } from '../../../../storage/lib/backupGenerator.mjs'
|
||||
import ChunkStore from '../../../../storage/lib/chunk_store/index.js'
|
||||
import persistChanges from '../../../../storage/lib/persist_changes.js'
|
||||
import {
|
||||
Change,
|
||||
Operation,
|
||||
TextOperation,
|
||||
AddFileOperation,
|
||||
File,
|
||||
} from 'overleaf-editor-core'
|
||||
import { ObjectId } from 'mongodb'
|
||||
import testFiles from './support/test_files.js'
|
||||
import { BlobStore } from '../../../../storage/lib/blob_store/index.js'
|
||||
import fs from 'node:fs'
|
||||
import blobHash from '../../../../storage/lib/blob_hash.js'
|
||||
|
||||
const scenarios = [
|
||||
{
|
||||
description: 'Postgres history',
|
||||
createProject: ChunkStore.initializeProject,
|
||||
},
|
||||
{
|
||||
description: 'Mongo history',
|
||||
createProject: () =>
|
||||
ChunkStore.initializeProject(new ObjectId().toString()),
|
||||
},
|
||||
]
|
||||
|
||||
for (const scenario of scenarios) {
|
||||
describe(`backupGenerator with ${scenario.description}`, function () {
|
||||
let projectId
|
||||
let limitsToPersistImmediately
|
||||
let blobStore
|
||||
const NUM_CHUNKS = 3
|
||||
const FINAL_VERSION = 24
|
||||
|
||||
before(function () {
|
||||
// used to provide a limit which forces us to persist all of the changes
|
||||
const farFuture = new Date()
|
||||
farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000)
|
||||
limitsToPersistImmediately = {
|
||||
minChangeTimestamp: farFuture,
|
||||
maxChangeTimestamp: farFuture,
|
||||
maxChunkChanges: 10,
|
||||
}
|
||||
})
|
||||
|
||||
beforeEach(async function () {
|
||||
projectId = await scenario.createProject()
|
||||
blobStore = new BlobStore(projectId)
|
||||
|
||||
// Add test files first
|
||||
await Promise.all([
|
||||
blobStore.putFile(testFiles.path('graph.png')),
|
||||
blobStore.putFile(testFiles.path('non_bmp.txt')),
|
||||
])
|
||||
|
||||
const HELLO_TXT = fs.readFileSync(testFiles.path('hello.txt')).toString()
|
||||
|
||||
// Create a sample project history for testing, with a chunk size of 10
|
||||
//
|
||||
// 1. Add a text file main.tex with contents from hello.txt
|
||||
// 2. Add a binary file image.png with contents from graph.png
|
||||
// 3. Add a text file other.tex with empty contents
|
||||
// 4. Apply 10 changes that append characters to the end of other.tex giving 'aaaaaaaaaa'
|
||||
// In applying the 10 changes we hit the first chunk boundary and create a new chunk.
|
||||
// The first chunk contains the 3 file operations and 7 changes
|
||||
// to other.tex which is now "aaaaaaa" (7 characters)
|
||||
// snapshot: {}
|
||||
// changes: add main.tex, add image.png, add other.tex, 7 changes to other.tex
|
||||
// The second chunk has a snapshot with the existing files
|
||||
// snapshot: main.tex, image.png, other.tex="aaaaaaa" (7 characters)
|
||||
// changes: 3 changes to other.tex, each appending 'a'
|
||||
// 5. Now we add a new file non_bmp.txt with non-BMP characters
|
||||
// 6. Finally we apply 10 more changes to other.tex, each appending another 'a' to give 'aaaaaaaaaaaaaaaaaaaa' (20 characters)
|
||||
// In applying the 10 changes we hit another chunk boundary and create a third chunk.
|
||||
// The final state of the second chunk is
|
||||
// snapshot: main.tex, image.png, other.tex="aaaaaaa" (7 characters)
|
||||
// changes:
|
||||
// 3 changes to other.tex, each appending 'a'
|
||||
// add file non_bmp.txt,
|
||||
// 6 changes to other.tex, each appending 'a'
|
||||
// The third chunk will contain the last 4 changes to other.tex
|
||||
// snapshot: main.tex, image.png, non_bmp.tex, other.tex="aaaaaaaaaaaaaaaa" (16 characters)
|
||||
// changes: 4 changes to other.tex, each appending 'a'
|
||||
|
||||
const textChange = new Change(
|
||||
[new AddFileOperation('main.tex', File.fromString(HELLO_TXT))],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
const binaryChange = new Change(
|
||||
[
|
||||
new AddFileOperation(
|
||||
'image.png',
|
||||
File.fromHash(testFiles.GRAPH_PNG_HASH)
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
const otherChange = new Change(
|
||||
[new AddFileOperation('other.tex', File.fromString(''))],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
// now append characters to the end of the contents of other.tex
|
||||
const otherEdits = Array.from(
|
||||
{ length: 10 },
|
||||
(_, i) =>
|
||||
new Change(
|
||||
[
|
||||
Operation.editFile(
|
||||
'other.tex',
|
||||
TextOperation.fromJSON({
|
||||
textOperation: i === 0 ? ['a'] : [i, 'a'],
|
||||
})
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
)
|
||||
const newFile = new Change(
|
||||
[
|
||||
new AddFileOperation(
|
||||
'non_bmp.txt',
|
||||
File.fromHash(testFiles.NON_BMP_TXT_HASH)
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
const moreOtherEdits = Array.from(
|
||||
{ length: 10 },
|
||||
(_, i) =>
|
||||
new Change(
|
||||
[
|
||||
Operation.editFile(
|
||||
'other.tex',
|
||||
TextOperation.fromJSON({ textOperation: [i + 10, 'a'] })
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
)
|
||||
)
|
||||
|
||||
await persistChanges(
|
||||
projectId,
|
||||
[
|
||||
textChange,
|
||||
binaryChange,
|
||||
otherChange,
|
||||
...otherEdits,
|
||||
newFile,
|
||||
...moreOtherEdits,
|
||||
],
|
||||
limitsToPersistImmediately,
|
||||
0
|
||||
)
|
||||
})
|
||||
|
||||
it('should yield correct data for an initial backup', async function () {
|
||||
const results = []
|
||||
for await (const result of backupGenerator(projectId)) {
|
||||
results.push(result)
|
||||
}
|
||||
|
||||
// There should be 3 chunks
|
||||
expect(results).to.have.length(NUM_CHUNKS)
|
||||
|
||||
// First chunk
|
||||
expect(results[0].chunkRecord.startVersion).to.equal(0)
|
||||
expect(results[0].chunkRecord.endVersion).to.equal(10)
|
||||
expect(results[0].blobsToBackup).to.have.deep.members([
|
||||
{
|
||||
hash: testFiles.HELLO_TXT_HASH,
|
||||
byteLength: testFiles.HELLO_TXT_BYTE_LENGTH,
|
||||
stringLength: testFiles.HELLO_TXT_UTF8_LENGTH,
|
||||
},
|
||||
{
|
||||
hash: testFiles.GRAPH_PNG_HASH,
|
||||
byteLength: testFiles.GRAPH_PNG_BYTE_LENGTH,
|
||||
stringLength: null,
|
||||
},
|
||||
{
|
||||
hash: File.EMPTY_FILE_HASH,
|
||||
byteLength: 0,
|
||||
stringLength: 0,
|
||||
},
|
||||
])
|
||||
|
||||
// Second chunk
|
||||
expect(results[1].chunkRecord.startVersion).to.equal(10)
|
||||
expect(results[1].chunkRecord.endVersion).to.equal(20)
|
||||
expect(results[1].blobsToBackup).to.have.deep.members([
|
||||
{
|
||||
hash: blobHash.fromString('a'.repeat(7)),
|
||||
byteLength: 7,
|
||||
stringLength: 7,
|
||||
},
|
||||
{
|
||||
hash: testFiles.NON_BMP_TXT_HASH,
|
||||
byteLength: testFiles.NON_BMP_TXT_BYTE_LENGTH,
|
||||
stringLength: null,
|
||||
},
|
||||
])
|
||||
|
||||
// Third chunk
|
||||
expect(results[2].chunkRecord.startVersion).to.equal(20)
|
||||
expect(results[2].chunkRecord.endVersion).to.equal(24)
|
||||
expect(results[2].blobsToBackup).to.have.deep.members([
|
||||
{
|
||||
hash: blobHash.fromString('a'.repeat(16)),
|
||||
byteLength: 16,
|
||||
stringLength: 16,
|
||||
},
|
||||
])
|
||||
})
|
||||
|
||||
for (
|
||||
let lastBackedUpVersion = 0;
|
||||
lastBackedUpVersion <= FINAL_VERSION;
|
||||
lastBackedUpVersion++
|
||||
) {
|
||||
it(`should yield the expected data when the last backed up version was ${lastBackedUpVersion}`, async function () {
|
||||
const results = []
|
||||
for await (const result of backupGenerator(
|
||||
projectId,
|
||||
lastBackedUpVersion
|
||||
)) {
|
||||
results.push(result)
|
||||
}
|
||||
|
||||
const chunkDefinitions = [
|
||||
{
|
||||
chunk: { startVersion: 0, endVersion: 10 },
|
||||
blobs: [
|
||||
{
|
||||
version: 1,
|
||||
blob: {
|
||||
hash: testFiles.HELLO_TXT_HASH,
|
||||
byteLength: testFiles.HELLO_TXT_BYTE_LENGTH,
|
||||
stringLength: testFiles.HELLO_TXT_UTF8_LENGTH,
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 2,
|
||||
blob: {
|
||||
hash: testFiles.GRAPH_PNG_HASH,
|
||||
byteLength: testFiles.GRAPH_PNG_BYTE_LENGTH,
|
||||
stringLength: null,
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 3,
|
||||
blob: {
|
||||
hash: File.EMPTY_FILE_HASH,
|
||||
byteLength: 0,
|
||||
stringLength: 0,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
chunk: { startVersion: 10, endVersion: 20 },
|
||||
blobs: [
|
||||
{
|
||||
version: 11,
|
||||
blob: {
|
||||
hash: blobHash.fromString('a'.repeat(7)),
|
||||
byteLength: 7,
|
||||
stringLength: 7,
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 14,
|
||||
blob: {
|
||||
hash: testFiles.NON_BMP_TXT_HASH,
|
||||
byteLength: testFiles.NON_BMP_TXT_BYTE_LENGTH,
|
||||
stringLength: null,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
chunk: { startVersion: 20, endVersion: 24 },
|
||||
blobs: [
|
||||
{
|
||||
version: 21,
|
||||
blob: {
|
||||
hash: blobHash.fromString('a'.repeat(16)),
|
||||
byteLength: 16,
|
||||
stringLength: 16,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
const expectedChunks = chunkDefinitions
|
||||
.filter(({ chunk }) => lastBackedUpVersion < chunk.endVersion)
|
||||
.map(({ chunk }) => chunk)
|
||||
const expectedBlobs = chunkDefinitions
|
||||
.filter(({ chunk }) => lastBackedUpVersion < chunk.endVersion)
|
||||
.map(({ blobs }) =>
|
||||
blobs
|
||||
.filter(({ version }) => lastBackedUpVersion < version)
|
||||
.map(({ blob }) => blob)
|
||||
)
|
||||
|
||||
expect(results).to.have.length(expectedChunks.length)
|
||||
expect(results).to.have.length(expectedBlobs.length)
|
||||
|
||||
results.forEach((result, i) => {
|
||||
expect(result.chunkRecord).to.deep.include(expectedChunks[i])
|
||||
expect(result.blobsToBackup).to.have.deep.members(expectedBlobs[i])
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
it(`should not back up blobs that have already been backed up in previous chunks`, async function () {
|
||||
const results = []
|
||||
for await (const result of backupGenerator(projectId)) {
|
||||
results.push(result)
|
||||
}
|
||||
const seenBlobs = new Set()
|
||||
for (const result of results) {
|
||||
for (const blob of result.blobsToBackup) {
|
||||
expect(seenBlobs).to.not.include(blob.hash)
|
||||
seenBlobs.add(blob.hash)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -326,6 +326,88 @@ describe('chunkStore', function () {
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when iterating the chunks with getProjectChunksFromVersion', function () {
|
||||
// The first chunk has startVersion:0 and endVersion:2
|
||||
for (let startVersion = 0; startVersion <= 2; startVersion++) {
|
||||
it(`returns all chunk records when starting from version ${startVersion}`, async function () {
|
||||
const chunkRecords = []
|
||||
for await (const chunk of chunkStore.getProjectChunksFromVersion(
|
||||
projectId,
|
||||
startVersion
|
||||
)) {
|
||||
chunkRecords.push(chunk)
|
||||
}
|
||||
const expectedChunks = [firstChunk, secondChunk, thirdChunk]
|
||||
expect(chunkRecords).to.have.length(expectedChunks.length)
|
||||
chunkRecords.forEach((chunkRecord, index) => {
|
||||
expect(chunkRecord.startVersion).to.deep.equal(
|
||||
expectedChunks[index].getStartVersion()
|
||||
)
|
||||
expect(chunkRecord.endVersion).to.deep.equal(
|
||||
expectedChunks[index].getEndVersion()
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// The second chunk has startVersion:2 and endVersion:4
|
||||
for (let startVersion = 3; startVersion <= 4; startVersion++) {
|
||||
it(`returns two chunk records when starting from version ${startVersion}`, async function () {
|
||||
const chunkRecords = []
|
||||
for await (const chunk of chunkStore.getProjectChunksFromVersion(
|
||||
projectId,
|
||||
startVersion
|
||||
)) {
|
||||
chunkRecords.push(chunk)
|
||||
}
|
||||
const expectedChunks = [secondChunk, thirdChunk]
|
||||
expect(chunkRecords).to.have.length(expectedChunks.length)
|
||||
chunkRecords.forEach((chunkRecord, index) => {
|
||||
expect(chunkRecord.startVersion).to.deep.equal(
|
||||
expectedChunks[index].getStartVersion()
|
||||
)
|
||||
expect(chunkRecord.endVersion).to.deep.equal(
|
||||
expectedChunks[index].getEndVersion()
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// The third chunk has startVersion:4 and endVersion:5
|
||||
for (let startVersion = 5; startVersion <= 5; startVersion++) {
|
||||
it(`returns one chunk record when starting from version ${startVersion}`, async function () {
|
||||
const chunkRecords = []
|
||||
for await (const chunk of chunkStore.getProjectChunksFromVersion(
|
||||
projectId,
|
||||
startVersion
|
||||
)) {
|
||||
chunkRecords.push(chunk)
|
||||
}
|
||||
const expectedChunks = [thirdChunk]
|
||||
expect(chunkRecords).to.have.length(expectedChunks.length)
|
||||
chunkRecords.forEach((chunkRecord, index) => {
|
||||
expect(chunkRecord.startVersion).to.deep.equal(
|
||||
expectedChunks[index].getStartVersion()
|
||||
)
|
||||
expect(chunkRecord.endVersion).to.deep.equal(
|
||||
expectedChunks[index].getEndVersion()
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
it('returns no chunk records when starting from a version after the last chunk', async function () {
|
||||
const chunkRecords = []
|
||||
for await (const chunk of chunkStore.getProjectChunksFromVersion(
|
||||
projectId,
|
||||
6
|
||||
)) {
|
||||
chunkRecords.push(chunk)
|
||||
}
|
||||
expect(chunkRecords).to.have.length(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('when saving to object storage fails', function () {
|
||||
|
||||
Reference in New Issue
Block a user