From 6404e3047d6724728d39960a0f892ce41a62b54b Mon Sep 17 00:00:00 2001 From: Andrew Rumble Date: Wed, 11 Dec 2024 13:17:18 +0000 Subject: [PATCH] Merge pull request #22392 from overleaf/ar-backup-files-when-inserting [history-v1] backup files when inserting GitOrigin-RevId: 1649b2828899d67ee37c0ac331917c6d5424c803 --- .../history-v1/api/controllers/projects.js | 9 +- .../history-v1/storage/lib/backupBlob.mjs | 132 ++++++++++++++++ services/history-v1/storage/lib/mongodb.js | 2 + .../acceptance/js/storage/backupBlob.test.mjs | 142 ++++++++++++++++++ 4 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 services/history-v1/storage/lib/backupBlob.mjs create mode 100644 services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs diff --git a/services/history-v1/api/controllers/projects.js b/services/history-v1/api/controllers/projects.js index 80f9cdc660..2478052a43 100644 --- a/services/history-v1/api/controllers/projects.js +++ b/services/history-v1/api/controllers/projects.js @@ -194,7 +194,14 @@ async function createProjectBlob(req, res, next) { } const blobStore = new BlobStore(projectId) - await blobStore.putFile(tmpPath) + const newBlob = await blobStore.putFile(tmpPath) + + try { + const { backupBlob } = await import('../../storage/lib/backupBlob.mjs') + await backupBlob(projectId, newBlob, tmpPath) + } catch (error) { + logger.warn({ error, projectId, hash }, 'Failed to backup blob') + } res.status(HTTPStatus.CREATED).end() }) } diff --git a/services/history-v1/storage/lib/backupBlob.mjs b/services/history-v1/storage/lib/backupBlob.mjs new file mode 100644 index 0000000000..3b2cf86c4c --- /dev/null +++ b/services/history-v1/storage/lib/backupBlob.mjs @@ -0,0 +1,132 @@ +import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs' +import { makeProjectKey } from './blob_store/index.js' +import Stream from 'node:stream' +import fs from 'node:fs' +import Crypto from 'node:crypto' +import assert from './assert.js' +import { backedUpBlobs, projects } from './mongodb.js' +import { Binary } from 'mongodb' +import logger from '@overleaf/logger/logging-manager.js' +import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js' + +const HIGHWATER_MARK = 1024 * 1024 + +/** + * Performs the actual upload of the blob to the backup storage. + * + * @param {string} projectId - The project ID of the project the blob belongs to (should have been converted from a postgres ID already if necessary) + * @param {Blob} blob - The blob being uploaded + * @param {string} path - The path to the file to upload (should have been stored on disk already) + * @return {Promise} + */ +export async function uploadBlobToBackup(projectId, blob, path) { + const md5 = Crypto.createHash('md5') + await Stream.promises.pipeline(fs.createReadStream(path), md5) + const key = makeProjectKey(projectId, blob.getHash()) + const persistor = await backupPersistor.forProject(projectBlobsBucket, key) + await persistor.sendStream( + projectBlobsBucket, + key, + fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }), + { + contentType: 'application/octet-stream', + contentLength: blob.getByteLength(), + sourceMd5: md5.digest('hex'), + ifNoneMatch: '*', + } + ) +} + +/** + * Converts a legacy (postgres) historyId to a mongo projectId + * + * @param {string} historyId + * @return {Promise} + * @private + */ +async function _convertLegacyHistoryIdToProjectId(historyId) { + const project = await projects.findOne( + { 'overleaf.history.id': parseInt(historyId) }, + { projection: { _id: 1 } } + ) + + if (!project?._id) { + throw new Error('Did not find project for history id') + } + + return project?._id?.toString() +} + +/** + * Records that a blob was backed up for a project. + * + * @param {string} projectId - projectId for a project (mongo format) + * @param {string} hash + * @return {Promise} + */ +async function storeBlobBackup(projectId, hash) { + await backedUpBlobs.updateOne( + { _id: projectId }, + { $addToSet: { blobs: new Binary(Buffer.from(hash, 'hex')) } }, + { upsert: true } + ) +} + +export async function _blobIsBackedUp(projectId, hash) { + const backedUpBlobsForProject = await backedUpBlobs.findOne( + { + _id: projectId, + }, + { blobs: 1 } + ) + return backedUpBlobsForProject?.blobs?.some(b => + b.buffer.equals(Buffer.from(hash, 'hex')) + ) +} + +/** + * Back up a blob to the global storage and record that it was backed up. + * + * @param {string} historyId - history ID for a project (can be postgres format or mongo format) + * @param {Blob} blob - The blob that is being backed up + * @param {string} tmpPath - The path to a temporary file storing the contents of the blob. + * @return {Promise} + */ +export async function backupBlob(historyId, blob, tmpPath) { + const hash = blob.getHash() + + let projectId = historyId + if (assert.POSTGRES_ID_REGEXP.test(historyId)) { + projectId = await _convertLegacyHistoryIdToProjectId(historyId) + } + + try { + if (await _blobIsBackedUp(projectId, hash)) { + logger.debug( + { projectId, hash }, + 'Blob already backed up - skipping backup' + ) + return + } + } catch (error) { + logger.warn({ error }, 'Failed to check if blob is backed up') + // We'll try anyway - we'll catch the error if it was backed up + } + + try { + logger.debug({ projectId, hash }, 'Starting blob backup') + await uploadBlobToBackup(projectId, blob, tmpPath) + await storeBlobBackup(projectId, hash) + } catch (error) { + if (error instanceof AlreadyWrittenError) { + logger.debug({ error, projectId, hash }, 'Blob already backed up') + // record that we backed it up already + await storeBlobBackup(projectId, hash) + return + } + // eventually queue this for retry - for now this will be fixed by running the script + logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup') + } finally { + logger.debug({ projectId, hash }, 'Ended blob backup') + } +} diff --git a/services/history-v1/storage/lib/mongodb.js b/services/history-v1/storage/lib/mongodb.js index 938e9555c2..e887bc25a5 100644 --- a/services/history-v1/storage/lib/mongodb.js +++ b/services/history-v1/storage/lib/mongodb.js @@ -10,6 +10,7 @@ const chunks = db.collection('projectHistoryChunks') const blobs = db.collection('projectHistoryBlobs') const globalBlobs = db.collection('projectHistoryGlobalBlobs') const shardedBlobs = db.collection('projectHistoryShardedBlobs') +const projects = db.collection('projects') // Temporary collection for tracking progress of backed up old blobs (without a hash). // The initial sync process will be able to skip over these. // Schema: _id: projectId, blobs: [Binary] @@ -23,6 +24,7 @@ module.exports = { chunks, blobs, globalBlobs, + projects, shardedBlobs, backedUpBlobs, } diff --git a/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs b/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs new file mode 100644 index 0000000000..1da907e6ec --- /dev/null +++ b/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs @@ -0,0 +1,142 @@ +import { expect } from 'chai' +import { makeBlobForFile } from '../../../../storage/lib/blob_store/index.js' +import { backupBlob } from '../../../../storage/lib/backupBlob.mjs' +import fs from 'node:fs' +import path from 'node:path' +import os from 'node:os' +import fsExtra from 'fs-extra' +import { backedUpBlobs, projects } from '../../../../storage/lib/mongodb.js' +import { Binary, ObjectId } from 'mongodb' +import { + backupPersistor, + projectBlobsBucket, +} from '../../../../storage/lib/backupPersistor.mjs' + +async function listS3Bucket(bucket, wantStorageClass) { + const client = backupPersistor._getClientForBucket(bucket) + const response = await client.listObjectsV2({ Bucket: bucket }).promise() + + for (const object of response.Contents || []) { + if (wantStorageClass) { + expect(object).to.have.property('StorageClass', wantStorageClass) + } + } + + return (response.Contents || []).map(item => item.Key || '') +} + +describe('backupBlob', function () { + let filePath + let tmpDir + + before(async function () { + tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'temp-test-')) + filePath = path.join(tmpDir, 'test.txt') + await fs.promises.writeFile(filePath, 'test') + }) + + after(async function () { + try { + fsExtra.remove(tmpDir) + } catch (err) { + if (err.code !== 'ENOENT') { + console.log('failed to delete temporary file') + } + } + }) + + beforeEach(async function () { + await backupPersistor.deleteDirectory(projectBlobsBucket, '') + expect(await listS3Bucket(projectBlobsBucket)).to.have.length(0) + }) + + describe('when the blob is already backed up', function () { + let blob + let historyId + + beforeEach(async function () { + blob = await makeBlobForFile(filePath) + historyId = 'abc123def456abc789def123' + await backedUpBlobs.updateOne( + { + _id: historyId, + }, + { + $set: { blobs: [new Binary(Buffer.from(blob.getHash(), 'hex'))] }, + }, + { upsert: true } + ) + }) + + it('does not upload the blob', async function () { + await backupBlob(historyId, blob, filePath) + const bucketContents = await listS3Bucket(projectBlobsBucket) + expect(bucketContents).to.have.lengthOf(0) + }) + it('does not store the backup', function () {}) + }) + + describe('when the historyId is for a postgres project', function () { + let blob + let historyId + const projectId = new ObjectId() + + beforeEach(async function () { + blob = await makeBlobForFile(filePath) + historyId = '123' + await projects.insertOne({ + _id: projectId, + overleaf: { history: { id: 123 } }, + }) + await backedUpBlobs.deleteOne({ _id: projectId }) + }) + + afterEach(async function () { + await projects.deleteOne({ + _id: projectId, + }) + }) + + it('uploads the blob to the backup', async function () { + await backupBlob(historyId, blob, filePath) + const bucketContents = await listS3Bucket(projectBlobsBucket) + expect(bucketContents).to.have.lengthOf(1) + }) + it('stores the backup', function () { + expect( + backedUpBlobs.findOne({ + _id: projectId, + blobs: { + $elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) }, + }, + }) + ).to.exist + }) + }) + + describe('when the blob is not already backed up', function () { + let blob + let historyId + beforeEach(async function () { + blob = await makeBlobForFile(filePath) + historyId = 'abc123def456abc789def123' + await backedUpBlobs.deleteOne({ _id: historyId }) + }) + + it('uploads the blob to the backup', async function () { + await backupBlob(historyId, blob, filePath) + const bucketContents = await listS3Bucket(projectBlobsBucket) + expect(bucketContents).to.have.lengthOf(1) + }) + it('stores the backup', function () { + expect( + backedUpBlobs.findOne({ + _id: historyId, + blobs: { + $elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) }, + }, + }) + ).to.exist + }) + }) +})