Merge pull request #22466 from overleaf/ar-backup-files-when-inserting

[history-v1] backup files when inserting

GitOrigin-RevId: e636bce178604978c6d41c083bf671795d20b5a1
This commit is contained in:
Andrew Rumble
2024-12-12 10:44:13 +00:00
committed by Copybot
parent b165e71ba9
commit a92a37bc3c
5 changed files with 364 additions and 1 deletions

View File

@@ -194,7 +194,14 @@ async function createProjectBlob(req, res, next) {
}
const blobStore = new BlobStore(projectId)
await blobStore.putFile(tmpPath)
const newBlob = await blobStore.putFile(tmpPath)
try {
const { backupBlob } = await import('../../storage/lib/backupBlob.mjs')
await backupBlob(projectId, newBlob, tmpPath)
} catch (error) {
logger.warn({ error, projectId, hash }, 'Failed to backup blob')
}
res.status(HTTPStatus.CREATED).end()
})
}

View File

@@ -0,0 +1,167 @@
// @ts-check
import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs'
import { GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
import Stream from 'node:stream'
import fs from 'node:fs'
import Crypto from 'node:crypto'
import assert from './assert.js'
import { backedUpBlobs, projects } from './mongodb.js'
import { Binary, ObjectId } from 'mongodb'
import logger from '@overleaf/logger/logging-manager.js'
import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js'
import metrics from '@overleaf/metrics'
const HIGHWATER_MARK = 1024 * 1024
/**
* @typedef {import("overleaf-editor-core").Blob} Blob
*/
/**
* Increment a metric to record the outcome of a backup operation.
*
* @param {"success"|"failure"|"skipped"} status
* @param {"global"|"already_backed_up"|"none"} reason
*/
function recordBackupConclusion(status, reason = 'none') {
metrics.inc('blob_backed_up', 1, { status, reason })
}
/**
* Performs the actual upload of the blob to the backup storage.
*
* @param {string} projectId - The project ID of the project the blob belongs to (should have been converted from a postgres ID already if necessary)
* @param {Blob} blob - The blob being uploaded
* @param {string} path - The path to the file to upload (should have been stored on disk already)
* @return {Promise<void>}
*/
export async function uploadBlobToBackup(projectId, blob, path) {
const md5 = Crypto.createHash('md5')
await Stream.promises.pipeline(fs.createReadStream(path), md5)
const key = makeProjectKey(projectId, blob.getHash())
const persistor = await backupPersistor.forProject(projectBlobsBucket, key)
await persistor.sendStream(
projectBlobsBucket,
key,
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
{
contentType: 'application/octet-stream',
contentLength: blob.getByteLength(),
sourceMd5: md5.digest('hex'),
ifNoneMatch: '*',
}
)
}
/**
* Converts a legacy (postgres) historyId to a mongo projectId
*
* @param {string} historyId
* @return {Promise<string>}
* @private
*/
async function _convertLegacyHistoryIdToProjectId(historyId) {
const project = await projects.findOne(
{ 'overleaf.history.id': parseInt(historyId) },
{ projection: { _id: 1 } }
)
if (!project?._id) {
throw new Error('Did not find project for history id')
}
return project?._id?.toString()
}
/**
* Records that a blob was backed up for a project.
*
* @param {string} projectId - projectId for a project (mongo format)
* @param {string} hash
* @return {Promise<void>}
*/
async function storeBlobBackup(projectId, hash) {
await backedUpBlobs.updateOne(
{ _id: new ObjectId(projectId) },
{ $addToSet: { blobs: new Binary(Buffer.from(hash, 'hex')) } },
{ upsert: true }
)
}
/**
* Determine whether a specific blob has been backed up in this project.
*
* @param {string} projectId
* @param {string} hash
* @return {Promise<*>}
* @private
*/
export async function _blobIsBackedUp(projectId, hash) {
const blobs = await backedUpBlobs.findOne(
{
_id: new ObjectId(projectId),
blobs: new Binary(Buffer.from(hash, 'hex')),
},
{ projection: { _id: 1 } }
)
return blobs?._id
}
/**
* Back up a blob to the global storage and record that it was backed up.
*
* @param {string} historyId - history ID for a project (can be postgres format or mongo format)
* @param {Blob} blob - The blob that is being backed up
* @param {string} tmpPath - The path to a temporary file storing the contents of the blob.
* @return {Promise<void>}
*/
export async function backupBlob(historyId, blob, tmpPath) {
const hash = blob.getHash()
let projectId = historyId
if (assert.POSTGRES_ID_REGEXP.test(historyId)) {
projectId = await _convertLegacyHistoryIdToProjectId(historyId)
}
const globalBlob = GLOBAL_BLOBS.get(hash)
if (globalBlob && !globalBlob.demoted) {
recordBackupConclusion('skipped', 'global')
logger.debug({ projectId, hash }, 'Blob is global - skipping backup')
return
}
try {
if (await _blobIsBackedUp(projectId, hash)) {
recordBackupConclusion('skipped', 'already_backed_up')
logger.debug(
{ projectId, hash },
'Blob already backed up - skipping backup'
)
return
}
} catch (error) {
logger.warn({ error }, 'Failed to check if blob is backed up')
// We'll try anyway - we'll catch the error if it was backed up
}
try {
logger.debug({ projectId, hash }, 'Starting blob backup')
await uploadBlobToBackup(projectId, blob, tmpPath)
await storeBlobBackup(projectId, hash)
recordBackupConclusion('success')
} catch (error) {
if (error instanceof AlreadyWrittenError) {
logger.debug({ error, projectId, hash }, 'Blob already backed up')
// record that we backed it up already
await storeBlobBackup(projectId, hash)
recordBackupConclusion('failure', 'already_backed_up')
return
}
// eventually queue this for retry - for now this will be fixed by running the script
recordBackupConclusion('failure')
logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup')
} finally {
logger.debug({ projectId, hash }, 'Ended blob backup')
}
}

View File

@@ -10,6 +10,7 @@ const chunks = db.collection('projectHistoryChunks')
const blobs = db.collection('projectHistoryBlobs')
const globalBlobs = db.collection('projectHistoryGlobalBlobs')
const shardedBlobs = db.collection('projectHistoryShardedBlobs')
const projects = db.collection('projects')
// Temporary collection for tracking progress of backed up old blobs (without a hash).
// The initial sync process will be able to skip over these.
// Schema: _id: projectId, blobs: [Binary]
@@ -23,6 +24,7 @@ module.exports = {
chunks,
blobs,
globalBlobs,
projects,
shardedBlobs,
backedUpBlobs,
}

View File

@@ -0,0 +1,45 @@
// @ts-check
import { backedUpBlobs } from '../lib/mongodb.js'
import { mongoId } from '../lib/assert.js'
import { ObjectId } from 'mongodb'
import commandLineArgs from 'command-line-args'
const STATS = {
total: 0,
replaced: 0,
skipped: 0,
}
const config = commandLineArgs([
{ name: 'commit', type: Boolean, defaultValue: false },
])
async function processRecord(record) {
STATS.total++
try {
mongoId(record._id)
const newId = new ObjectId(record._id)
if (config.commit) {
await backedUpBlobs.insertOne({ _id: newId, blobs: record.blobs })
await backedUpBlobs.deleteOne({ _id: record._id })
}
STATS.replaced++
} catch (error) {
console.log(error)
STATS.skipped++
}
}
const cursor = backedUpBlobs
.find({ _id: { $type: 'string' } })
.project({ _id: 1, blobs: 1 })
while (await cursor.hasNext()) {
const record = await cursor.next()
await processRecord(record)
}
console.log(
`${!config.commit ? 'DRY RUN' : ''} ${STATS.total} records ${STATS.replaced} replaced, ${STATS.skipped} skipped`
)
process.exit()

View File

@@ -0,0 +1,142 @@
import { expect } from 'chai'
import { makeBlobForFile } from '../../../../storage/lib/blob_store/index.js'
import { backupBlob } from '../../../../storage/lib/backupBlob.mjs'
import fs from 'node:fs'
import path from 'node:path'
import os from 'node:os'
import fsExtra from 'fs-extra'
import { backedUpBlobs, projects } from '../../../../storage/lib/mongodb.js'
import { Binary, ObjectId } from 'mongodb'
import {
backupPersistor,
projectBlobsBucket,
} from '../../../../storage/lib/backupPersistor.mjs'
async function listS3Bucket(bucket, wantStorageClass) {
const client = backupPersistor._getClientForBucket(bucket)
const response = await client.listObjectsV2({ Bucket: bucket }).promise()
for (const object of response.Contents || []) {
if (wantStorageClass) {
expect(object).to.have.property('StorageClass', wantStorageClass)
}
}
return (response.Contents || []).map(item => item.Key || '')
}
describe('backupBlob', function () {
let filePath
let tmpDir
before(async function () {
tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'temp-test-'))
filePath = path.join(tmpDir, 'test.txt')
await fs.promises.writeFile(filePath, 'test')
})
after(async function () {
try {
fsExtra.remove(tmpDir)
} catch (err) {
if (err.code !== 'ENOENT') {
console.log('failed to delete temporary file')
}
}
})
beforeEach(async function () {
await backupPersistor.deleteDirectory(projectBlobsBucket, '')
expect(await listS3Bucket(projectBlobsBucket)).to.have.length(0)
})
describe('when the blob is already backed up', function () {
let blob
let historyId
beforeEach(async function () {
blob = await makeBlobForFile(filePath)
historyId = 'abc123def456abc789def123'
await backedUpBlobs.updateOne(
{
_id: new ObjectId(historyId),
},
{
$set: { blobs: [new Binary(Buffer.from(blob.getHash(), 'hex'))] },
},
{ upsert: true }
)
})
it('does not upload the blob', async function () {
await backupBlob(historyId, blob, filePath)
const bucketContents = await listS3Bucket(projectBlobsBucket)
expect(bucketContents).to.have.lengthOf(0)
})
it('does not store the backup', function () {})
})
describe('when the historyId is for a postgres project', function () {
let blob
let historyId
const projectId = new ObjectId()
beforeEach(async function () {
blob = await makeBlobForFile(filePath)
historyId = '123'
await projects.insertOne({
_id: projectId,
overleaf: { history: { id: 123 } },
})
await backedUpBlobs.deleteOne({ _id: projectId })
})
afterEach(async function () {
await projects.deleteOne({
_id: projectId,
})
})
it('uploads the blob to the backup', async function () {
await backupBlob(historyId, blob, filePath)
const bucketContents = await listS3Bucket(projectBlobsBucket)
expect(bucketContents).to.have.lengthOf(1)
})
it('stores the backup', function () {
expect(
backedUpBlobs.findOne({
_id: projectId,
blobs: {
$elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) },
},
})
).to.exist
})
})
describe('when the blob is not already backed up', function () {
let blob
let historyId
beforeEach(async function () {
blob = await makeBlobForFile(filePath)
historyId = 'abc123def456abc789def123'
await backedUpBlobs.deleteOne({ _id: new ObjectId(historyId) })
})
it('uploads the blob to the backup', async function () {
await backupBlob(historyId, blob, filePath)
const bucketContents = await listS3Bucket(projectBlobsBucket)
expect(bucketContents).to.have.lengthOf(1)
})
it('stores the backup', function () {
expect(
backedUpBlobs.findOne({
_id: new ObjectId(historyId),
blobs: {
$elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) },
},
})
).to.exist
})
})
})