diff --git a/services/history-v1/config/default.json b/services/history-v1/config/default.json index 74c5bcd237..c609730650 100644 --- a/services/history-v1/config/default.json +++ b/services/history-v1/config/default.json @@ -23,6 +23,7 @@ } } }, + "backupRPOInMS": "360000", "chunkStore": { "historyStoreConcurrency": "4" }, diff --git a/services/history-v1/storage/lib/backupVerifier.mjs b/services/history-v1/storage/lib/backupVerifier.mjs index 55247b91d7..6e1bc3ee7b 100644 --- a/services/history-v1/storage/lib/backupVerifier.mjs +++ b/services/history-v1/storage/lib/backupVerifier.mjs @@ -1,16 +1,35 @@ // @ts-check import config from 'config' import OError from '@overleaf/o-error' -import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs' -import { Blob } from 'overleaf-editor-core' -import { BlobStore, makeProjectKey } from './blob_store/index.js' +import chunkStore from '../lib/chunk_store/index.js' +import { + backupPersistor, + chunksBucket, + projectBlobsBucket, +} from './backupPersistor.mjs' +import { Blob, Chunk, History } from 'overleaf-editor-core' +import { BlobStore, GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js' import blobHash from './blob_hash.js' import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js' +import logger from '@overleaf/logger' +import { text } from 'node:stream/consumers' +import { createGunzip } from 'node:zlib' +import path from 'node:path' +import projectKey from './project_key.js' + +const RPO = parseInt(config.get('backupRPOInMS'), 10) /** - * @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor + * @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor */ +/** + * @return {Date} + */ +export function getEndDateForRPO() { + return new Date(Date.now() - RPO) +} + /** * @param {string} historyId * @param {string} hash @@ -20,13 +39,13 @@ export async function verifyBlob(historyId, hash) { } /** + * * @param {string} historyId - * @param {Array} hashes + * @return {Promise} */ -export async function verifyBlobs(historyId, hashes) { - let projectCache +async function getProjectPersistor(historyId) { try { - projectCache = await backupPersistor.forProjectRO( + return await backupPersistor.forProjectRO( projectBlobsBucket, makeProjectKey(historyId, '') ) @@ -36,16 +55,19 @@ export async function verifyBlobs(historyId, hashes) { } throw err } - await verifyBlobsWithCache(historyId, projectCache, hashes) } /** * @param {string} historyId - * @param {CachedPerProjectEncryptedS3Persistor} projectCache * @param {Array} hashes + * @param {CachedPerProjectEncryptedS3Persistor} [projectCache] */ -export async function verifyBlobsWithCache(historyId, projectCache, hashes) { +export async function verifyBlobs(historyId, hashes, projectCache) { if (hashes.length === 0) throw new Error('bug: empty hashes') + + if (!projectCache) { + projectCache = await getProjectPersistor(historyId) + } const blobStore = new BlobStore(historyId) for (const hash of hashes) { const path = makeProjectKey(historyId, hash) @@ -58,7 +80,7 @@ export async function verifyBlobsWithCache(historyId, projectCache, hashes) { }) } catch (err) { if (err instanceof NotFoundError) { - throw new BackupCorruptedError('missing blob') + throw new BackupCorruptedError('missing blob', { path, hash }) } throw err } @@ -73,7 +95,114 @@ export async function verifyBlobsWithCache(historyId, projectCache, hashes) { } } +/** + * @param {string} historyId + * @param {Date} [endTimestamp] + */ +export async function verifyProjectWithErrorContext( + historyId, + endTimestamp = getEndDateForRPO() +) { + try { + await verifyProject(historyId, endTimestamp) + } catch (err) { + // @ts-ignore err is Error instance + throw OError.tag(err, 'verifyProject', { historyId, endTimestamp }) + } +} + +/** + * + * @param {string} historyId + * @param {number} startVersion + * @param {CachedPerProjectEncryptedS3Persistor} backupPersistorForProject + * @return {Promise} + */ +async function loadChunk(historyId, startVersion, backupPersistorForProject) { + const key = path.join( + projectKey.format(historyId), + projectKey.pad(startVersion) + ) + const backupChunkStream = await backupPersistorForProject.getObjectStream( + chunksBucket, + key + ) + const raw = await text(backupChunkStream.pipe(createGunzip())) + return JSON.parse(raw) +} + +/** + * @param {string} historyId + * @param {Date} endTimestamp + */ +export async function verifyProject(historyId, endTimestamp) { + const backend = chunkStore.getBackend(historyId) + const [first, last] = await Promise.all([ + backend.getFirstChunkBeforeTimestamp(historyId, endTimestamp), + backend.getLastActiveChunkBeforeTimestamp(historyId, endTimestamp), + ]) + + const chunksRecordsToVerify = [ + { + chunkId: first.id, + chunkLabel: 'first', + }, + ] + if (first.startVersion !== last.startVersion) { + chunksRecordsToVerify.push({ + chunkId: last.id, + chunkLabel: 'last before RPO', + }) + } + + const projectCache = await getProjectPersistor(historyId) + + const chunks = await Promise.all( + chunksRecordsToVerify.map(async chunk => { + try { + return History.fromRaw( + await loadChunk(historyId, chunk.startVersion, projectCache) + ) + } catch (err) { + if (err instanceof Chunk.NotPersistedError) { + throw new BackupRPOViolationError('backup RPO violation', chunk) + } + throw err + } + }) + ) + const seenBlobs = new Set() + const blobsToVerify = [] + for (const chunk of chunks) { + /** @type {Set} */ + const chunkBlobs = new Set() + chunk.findBlobHashes(chunkBlobs) + let hasAddedBlobFromThisChunk = false + for (const blobHash of chunkBlobs) { + if (seenBlobs.has(blobHash)) continue // old blob + if (GLOBAL_BLOBS.has(blobHash)) continue // global blob + seenBlobs.add(blobHash) + if (!hasAddedBlobFromThisChunk) { + blobsToVerify.push(blobHash) + hasAddedBlobFromThisChunk = true + } + } + } + if (blobsToVerify.length === 0) { + logger.debug( + { + historyId, + chunksRecordsToVerify: chunksRecordsToVerify.map(c => c.chunkId), + }, + 'chunks contain no blobs to verify' + ) + return + } + await verifyBlobs(historyId, blobsToVerify, projectCache) +} + export class BackupCorruptedError extends OError {} +export class BackupRPOViolationError extends OError {} export async function healthCheck() { /** @type {Array} */ diff --git a/services/history-v1/storage/lib/chunk_store/mongo.js b/services/history-v1/storage/lib/chunk_store/mongo.js index 6090c555bb..1c8e9f1b16 100644 --- a/services/history-v1/storage/lib/chunk_store/mongo.js +++ b/services/history-v1/storage/lib/chunk_store/mongo.js @@ -54,6 +54,35 @@ async function getChunkForVersion(projectId, version) { return chunkFromRecord(record) } +/** + * Get the metadata for the chunk that contains the given version before the endTime. + */ +async function getFirstChunkBeforeTimestamp(projectId, timestamp) { + assert.mongoId(projectId, 'bad projectId') + assert.date(timestamp, 'bad timestamp') + + const recordActive = await getChunkForVersion(projectId, 0) + if (recordActive && recordActive.endTimestamp <= timestamp) { + return recordActive + } + + // fallback to deleted chunk + const recordDeleted = await mongodb.chunks.findOne( + { + projectId: new ObjectId(projectId), + state: 'deleted', + startVersion: 0, + updatedAt: { $lte: timestamp }, // indexed for state=deleted + endTimestamp: { $lte: timestamp }, + }, + { sort: { updatedAt: -1 } } + ) + if (recordDeleted) { + return chunkFromRecord(recordDeleted) + } + throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp) +} + /** * Get the metadata for the chunk that contains the version that was current at * the given timestamp. @@ -86,6 +115,39 @@ async function getChunkForTimestamp(projectId, timestamp) { return chunkFromRecord(record) } +/** + * Get the metadata for the chunk that contains the version that was current before + * the given timestamp. + */ +async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) { + assert.mongoId(projectId, 'bad projectId') + assert.date(timestamp, 'bad timestamp') + + const record = await mongodb.chunks.findOne( + { + projectId: new ObjectId(projectId), + state: 'active', + $or: [ + { + endTimestamp: { + $lte: timestamp, + }, + }, + { + endTimestamp: null, + }, + ], + }, + // We use the index on the startVersion for sorting records. This assumes + // that timestamps go up with each version. + { sort: { startVersion: -1 } } + ) + if (record == null) { + throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp) + } + return chunkFromRecord(record) +} + /** * Get all of a project's chunk ids */ @@ -310,6 +372,8 @@ function chunkFromRecord(record) { module.exports = { getLatestChunk, + getFirstChunkBeforeTimestamp, + getLastActiveChunkBeforeTimestamp, getChunkForVersion, getChunkForTimestamp, getProjectChunkIds, diff --git a/services/history-v1/storage/lib/chunk_store/postgres.js b/services/history-v1/storage/lib/chunk_store/postgres.js index 072f1f1ce6..81df8e7021 100644 --- a/services/history-v1/storage/lib/chunk_store/postgres.js +++ b/services/history-v1/storage/lib/chunk_store/postgres.js @@ -46,6 +46,59 @@ async function getChunkForVersion(projectId, version) { return chunkFromRecord(record) } +/** + * Get the metadata for the chunk that contains the given version. + */ +async function getFirstChunkBeforeTimestamp(projectId, timestamp) { + assert.date(timestamp, 'bad timestamp') + + const recordActive = await getChunkForVersion(projectId, 0) + // projectId must be valid if getChunkForVersion did not throw + projectId = parseInt(projectId, 10) + if (recordActive && recordActive.endTimestamp <= timestamp) { + return recordActive + } + + // fallback to deleted chunk + const recordDeleted = await knex('old_chunks') + .where('doc_id', projectId) + .where('start_version', '=', 0) + .where('end_timestamp', '<=', timestamp) + .orderBy('end_version', 'desc') + .first() + if (recordDeleted) { + return chunkFromRecord(recordDeleted) + } + throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp) +} + +/** + * Get the metadata for the chunk that contains the version that was current at + * the given timestamp. + */ +async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) { + assert.date(timestamp, 'bad timestamp') + assert.postgresId(projectId, 'bad projectId') + projectId = parseInt(projectId, 10) + + const query = knex('chunks') + .where('doc_id', projectId) + .where(function () { + this.where('end_timestamp', '<=', timestamp).orWhere( + 'end_timestamp', + null + ) + }) + .orderBy('end_version', 'desc', 'last') + + const record = await query.first() + + if (!record) { + throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp) + } + return chunkFromRecord(record) +} + /** * Get the metadata for the chunk that contains the version that was current at * the given timestamp. @@ -280,6 +333,8 @@ async function generateProjectId() { module.exports = { getLatestChunk, + getFirstChunkBeforeTimestamp, + getLastActiveChunkBeforeTimestamp, getChunkForVersion, getChunkForTimestamp, getProjectChunkIds, diff --git a/services/history-v1/storage/scripts/verify_project.mjs b/services/history-v1/storage/scripts/verify_project.mjs new file mode 100644 index 0000000000..6e1cb9de89 --- /dev/null +++ b/services/history-v1/storage/scripts/verify_project.mjs @@ -0,0 +1,33 @@ +import commandLineArgs from 'command-line-args' +import { verifyProjectWithErrorContext } from '../lib/backupVerifier.mjs' +import knex from '../lib/knex.js' +import { client } from '../lib/mongodb.js' +import { setTimeout } from 'node:timers/promises' +import { loadGlobalBlobs } from '../lib/blob_store/index.js' + +const { historyId } = commandLineArgs([{ name: 'historyId', type: String }]) + +async function gracefulShutdown(code = process.exitCode) { + await knex.destroy() + await client.close() + await setTimeout(1_000) + process.exit(code) +} + +if (!historyId) { + console.error('missing --historyId') + process.exitCode = 1 + await gracefulShutdown() +} + +await loadGlobalBlobs() + +try { + await verifyProjectWithErrorContext(historyId) + console.log('OK') +} catch (error) { + console.error('error verifying', error) + process.exitCode = 1 +} finally { + await gracefulShutdown() +} diff --git a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs index 54a801a919..8b6836fab5 100644 --- a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs +++ b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs @@ -6,23 +6,63 @@ import { expect } from 'chai' import testProjects from './support/test_projects.js' import { backupPersistor, + chunksBucket, projectBlobsBucket, } from '../../../../storage/lib/backupPersistor.mjs' import { BlobStore, makeProjectKey, } from '../../../../storage/lib/blob_store/index.js' -import Stream from 'stream' +import Stream from 'node:stream' import * as zlib from 'node:zlib' import { promisify } from 'node:util' import { execFile } from 'node:child_process' import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js' +import { chunkStore } from '../../../../storage/index.js' +import { Change, File, Operation } from 'overleaf-editor-core' +import Crypto from 'node:crypto' +import path from 'node:path' +import projectKey from '../../../../storage/lib/project_key.js' +import { historyStore } from '../../../../storage/lib/history_store.js' /** * @typedef {import("node-fetch").Response} Response * @typedef {import("overleaf-editor-core").Blob} Blob */ +async function verifyProjectScript(historyId) { + try { + const result = await promisify(execFile)( + process.argv0, + ['storage/scripts/verify_project.mjs', `--historyId=${historyId}`], + { + encoding: 'utf-8', + timeout: 5_000, + env: { + ...process.env, + LOG_LEVEL: 'warn', + }, + } + ) + return { status: 0, stdout: result.stdout, stderr: result.stderr } + } catch (err) { + if ( + err && + typeof err === 'object' && + 'stdout' in err && + 'code' in err && + 'stderr' in err + ) { + return { + stdout: typeof err.stdout === 'string' ? err.stdout : '', + status: typeof err.code === 'number' ? err.code : -1, + stderr: typeof err.stdout === 'string' ? err.stderr : '', + } + } + throw err + } +} + /** * @param {string} historyId * @param {string} hash @@ -69,22 +109,84 @@ async function verifyBlobHTTP(historyId, hash) { ) } +async function backupChunk(historyId) { + const newChunk = await chunkStore.loadLatestRaw(historyId) + const { buffer: chunkBuffer } = await historyStore.loadRawWithBuffer( + historyId, + newChunk.id + ) + const md5 = Crypto.createHash('md5').update(chunkBuffer) + await backupPersistor.sendStream( + chunksBucket, + path.join( + projectKey.format(historyId), + projectKey.pad(newChunk.startVersion) + ), + Stream.Readable.from([chunkBuffer]), + { + contentType: 'application/json', + contentEncoding: 'gzip', + contentLength: chunkBuffer.byteLength, + sourceMd5: md5.digest('hex'), + } + ) +} + +const FIFTEEN_MINUTES_IN_MS = 900_000 + +async function addFileInNewChunk( + fileContents, + filePath, + historyId, + { creationDate = new Date() } +) { + const chunk = await chunkStore.loadLatest(historyId) + const operation = Operation.addFile( + `${historyId}.txt`, + File.fromString(fileContents) + ) + const changes = [new Change([operation], creationDate, [])] + chunk.pushChanges(changes) + await chunkStore.update(historyId, 0, chunk) +} + /** * @param {string} historyId + * @param {Object} [backup] * @return {Promise} */ -async function prepareProjectAndBlob(historyId) { +async function prepareProjectAndBlob( + historyId, + { shouldBackupBlob, shouldBackupChunk, shouldCreateChunk } = { + shouldBackupBlob: true, + shouldBackupChunk: true, + shouldCreateChunk: true, + } +) { await testProjects.createEmptyProject(historyId) const blobStore = new BlobStore(historyId) - const blob = await blobStore.putString(historyId) - const gzipped = zlib.gzipSync(Buffer.from(historyId)) - await backupPersistor.sendStream( - projectBlobsBucket, - makeProjectKey(historyId, blob.getHash()), - Stream.Readable.from([gzipped]), - { contentLength: gzipped.byteLength, contentEncoding: 'gzip' } - ) - await checkDEKExists(historyId) + const fileContents = historyId + const blob = await blobStore.putString(fileContents) + if (shouldCreateChunk) { + await addFileInNewChunk(fileContents, `${historyId}.txt`, historyId, { + creationDate: new Date(new Date().getTime() - FIFTEEN_MINUTES_IN_MS), + }) + } + + if (shouldBackupBlob) { + const gzipped = zlib.gzipSync(Buffer.from(historyId)) + await backupPersistor.sendStream( + projectBlobsBucket, + makeProjectKey(historyId, blob.getHash()), + Stream.Readable.from([gzipped]), + { contentLength: gzipped.byteLength, contentEncoding: 'gzip' } + ) + await checkDEKExists(historyId) + } + if (shouldCreateChunk && shouldBackupChunk) { + await backupChunk(historyId) + } + return blob.getHash() } @@ -123,6 +225,53 @@ describe('backupVerifier', function () { const response = await fetch(testServer.testUrl('/health_check')) expect(response.status).to.equal(200) }) + describe('storage/scripts/verify_project.mjs', function () { + describe('when the project is appropriately backed up', function () { + it('should return 0', async function () { + const response = await verifyProjectScript(historyIdPostgres) + expect(response.status).to.equal(0) + }) + }) + describe('when the project chunk is not backed up', function () { + let response + beforeEach(async function () { + await prepareProjectAndBlob('000000000000000000000043', { + shouldBackupChunk: false, + shouldBackupBlob: true, + shouldCreateChunk: true, + }) + response = await verifyProjectScript('000000000000000000000043') + }) + it('should return 1', async function () { + expect(response.status).to.equal(1) + }) + it('should emit an error message referring to a missing chunk', async function () { + const stderr = response.stderr + expect(stderr).to.include('NotFoundError: no such file') + expect(stderr).to.include("bucketName: 'overleaf-test-history-chunks'") + expect(stderr).to.include("key: '340/000/000000000000000000/000000000'") + }) + }) + describe('when a project blob is not backed up', function () { + let response + beforeEach(async function () { + await prepareProjectAndBlob('43', { + shouldBackupChunk: true, + shouldBackupBlob: false, + shouldCreateChunk: true, + }) + response = await verifyProjectScript('43') + }) + + it('should return 1', function () { + expect(response.status).to.equal(1) + }) + + it('includes a BackupCorruptedError in stderr', function () { + expect(response.stderr).to.include('BackupCorruptedError: missing blob') + }) + }) + }) describe('storage/scripts/verify_backup_blob.mjs', function () { it('throws and does not create DEK if missing', async function () { const historyId = '404'