mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
Merge pull request #22392 from overleaf/ar-backup-files-when-inserting
[history-v1] backup files when inserting GitOrigin-RevId: 1649b2828899d67ee37c0ac331917c6d5424c803
This commit is contained in:
@@ -194,7 +194,14 @@ async function createProjectBlob(req, res, next) {
|
||||
}
|
||||
|
||||
const blobStore = new BlobStore(projectId)
|
||||
await blobStore.putFile(tmpPath)
|
||||
const newBlob = await blobStore.putFile(tmpPath)
|
||||
|
||||
try {
|
||||
const { backupBlob } = await import('../../storage/lib/backupBlob.mjs')
|
||||
await backupBlob(projectId, newBlob, tmpPath)
|
||||
} catch (error) {
|
||||
logger.warn({ error, projectId, hash }, 'Failed to backup blob')
|
||||
}
|
||||
res.status(HTTPStatus.CREATED).end()
|
||||
})
|
||||
}
|
||||
|
||||
132
services/history-v1/storage/lib/backupBlob.mjs
Normal file
132
services/history-v1/storage/lib/backupBlob.mjs
Normal file
@@ -0,0 +1,132 @@
|
||||
import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs'
|
||||
import { makeProjectKey } from './blob_store/index.js'
|
||||
import Stream from 'node:stream'
|
||||
import fs from 'node:fs'
|
||||
import Crypto from 'node:crypto'
|
||||
import assert from './assert.js'
|
||||
import { backedUpBlobs, projects } from './mongodb.js'
|
||||
import { Binary } from 'mongodb'
|
||||
import logger from '@overleaf/logger/logging-manager.js'
|
||||
import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
|
||||
const HIGHWATER_MARK = 1024 * 1024
|
||||
|
||||
/**
|
||||
* Performs the actual upload of the blob to the backup storage.
|
||||
*
|
||||
* @param {string} projectId - The project ID of the project the blob belongs to (should have been converted from a postgres ID already if necessary)
|
||||
* @param {Blob} blob - The blob being uploaded
|
||||
* @param {string} path - The path to the file to upload (should have been stored on disk already)
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function uploadBlobToBackup(projectId, blob, path) {
|
||||
const md5 = Crypto.createHash('md5')
|
||||
await Stream.promises.pipeline(fs.createReadStream(path), md5)
|
||||
const key = makeProjectKey(projectId, blob.getHash())
|
||||
const persistor = await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
await persistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
key,
|
||||
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
|
||||
{
|
||||
contentType: 'application/octet-stream',
|
||||
contentLength: blob.getByteLength(),
|
||||
sourceMd5: md5.digest('hex'),
|
||||
ifNoneMatch: '*',
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a legacy (postgres) historyId to a mongo projectId
|
||||
*
|
||||
* @param {string} historyId
|
||||
* @return {Promise<string>}
|
||||
* @private
|
||||
*/
|
||||
async function _convertLegacyHistoryIdToProjectId(historyId) {
|
||||
const project = await projects.findOne(
|
||||
{ 'overleaf.history.id': parseInt(historyId) },
|
||||
{ projection: { _id: 1 } }
|
||||
)
|
||||
|
||||
if (!project?._id) {
|
||||
throw new Error('Did not find project for history id')
|
||||
}
|
||||
|
||||
return project?._id?.toString()
|
||||
}
|
||||
|
||||
/**
|
||||
* Records that a blob was backed up for a project.
|
||||
*
|
||||
* @param {string} projectId - projectId for a project (mongo format)
|
||||
* @param {string} hash
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async function storeBlobBackup(projectId, hash) {
|
||||
await backedUpBlobs.updateOne(
|
||||
{ _id: projectId },
|
||||
{ $addToSet: { blobs: new Binary(Buffer.from(hash, 'hex')) } },
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
|
||||
export async function _blobIsBackedUp(projectId, hash) {
|
||||
const backedUpBlobsForProject = await backedUpBlobs.findOne(
|
||||
{
|
||||
_id: projectId,
|
||||
},
|
||||
{ blobs: 1 }
|
||||
)
|
||||
return backedUpBlobsForProject?.blobs?.some(b =>
|
||||
b.buffer.equals(Buffer.from(hash, 'hex'))
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Back up a blob to the global storage and record that it was backed up.
|
||||
*
|
||||
* @param {string} historyId - history ID for a project (can be postgres format or mongo format)
|
||||
* @param {Blob} blob - The blob that is being backed up
|
||||
* @param {string} tmpPath - The path to a temporary file storing the contents of the blob.
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export async function backupBlob(historyId, blob, tmpPath) {
|
||||
const hash = blob.getHash()
|
||||
|
||||
let projectId = historyId
|
||||
if (assert.POSTGRES_ID_REGEXP.test(historyId)) {
|
||||
projectId = await _convertLegacyHistoryIdToProjectId(historyId)
|
||||
}
|
||||
|
||||
try {
|
||||
if (await _blobIsBackedUp(projectId, hash)) {
|
||||
logger.debug(
|
||||
{ projectId, hash },
|
||||
'Blob already backed up - skipping backup'
|
||||
)
|
||||
return
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn({ error }, 'Failed to check if blob is backed up')
|
||||
// We'll try anyway - we'll catch the error if it was backed up
|
||||
}
|
||||
|
||||
try {
|
||||
logger.debug({ projectId, hash }, 'Starting blob backup')
|
||||
await uploadBlobToBackup(projectId, blob, tmpPath)
|
||||
await storeBlobBackup(projectId, hash)
|
||||
} catch (error) {
|
||||
if (error instanceof AlreadyWrittenError) {
|
||||
logger.debug({ error, projectId, hash }, 'Blob already backed up')
|
||||
// record that we backed it up already
|
||||
await storeBlobBackup(projectId, hash)
|
||||
return
|
||||
}
|
||||
// eventually queue this for retry - for now this will be fixed by running the script
|
||||
logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup')
|
||||
} finally {
|
||||
logger.debug({ projectId, hash }, 'Ended blob backup')
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ const chunks = db.collection('projectHistoryChunks')
|
||||
const blobs = db.collection('projectHistoryBlobs')
|
||||
const globalBlobs = db.collection('projectHistoryGlobalBlobs')
|
||||
const shardedBlobs = db.collection('projectHistoryShardedBlobs')
|
||||
const projects = db.collection('projects')
|
||||
// Temporary collection for tracking progress of backed up old blobs (without a hash).
|
||||
// The initial sync process will be able to skip over these.
|
||||
// Schema: _id: projectId, blobs: [Binary]
|
||||
@@ -23,6 +24,7 @@ module.exports = {
|
||||
chunks,
|
||||
blobs,
|
||||
globalBlobs,
|
||||
projects,
|
||||
shardedBlobs,
|
||||
backedUpBlobs,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
import { expect } from 'chai'
|
||||
import { makeBlobForFile } from '../../../../storage/lib/blob_store/index.js'
|
||||
import { backupBlob } from '../../../../storage/lib/backupBlob.mjs'
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
import os from 'node:os'
|
||||
import fsExtra from 'fs-extra'
|
||||
import { backedUpBlobs, projects } from '../../../../storage/lib/mongodb.js'
|
||||
import { Binary, ObjectId } from 'mongodb'
|
||||
import {
|
||||
backupPersistor,
|
||||
projectBlobsBucket,
|
||||
} from '../../../../storage/lib/backupPersistor.mjs'
|
||||
|
||||
async function listS3Bucket(bucket, wantStorageClass) {
|
||||
const client = backupPersistor._getClientForBucket(bucket)
|
||||
const response = await client.listObjectsV2({ Bucket: bucket }).promise()
|
||||
|
||||
for (const object of response.Contents || []) {
|
||||
if (wantStorageClass) {
|
||||
expect(object).to.have.property('StorageClass', wantStorageClass)
|
||||
}
|
||||
}
|
||||
|
||||
return (response.Contents || []).map(item => item.Key || '')
|
||||
}
|
||||
|
||||
describe('backupBlob', function () {
|
||||
let filePath
|
||||
let tmpDir
|
||||
|
||||
before(async function () {
|
||||
tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'temp-test-'))
|
||||
filePath = path.join(tmpDir, 'test.txt')
|
||||
await fs.promises.writeFile(filePath, 'test')
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
try {
|
||||
fsExtra.remove(tmpDir)
|
||||
} catch (err) {
|
||||
if (err.code !== 'ENOENT') {
|
||||
console.log('failed to delete temporary file')
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
beforeEach(async function () {
|
||||
await backupPersistor.deleteDirectory(projectBlobsBucket, '')
|
||||
expect(await listS3Bucket(projectBlobsBucket)).to.have.length(0)
|
||||
})
|
||||
|
||||
describe('when the blob is already backed up', function () {
|
||||
let blob
|
||||
let historyId
|
||||
|
||||
beforeEach(async function () {
|
||||
blob = await makeBlobForFile(filePath)
|
||||
historyId = 'abc123def456abc789def123'
|
||||
await backedUpBlobs.updateOne(
|
||||
{
|
||||
_id: historyId,
|
||||
},
|
||||
{
|
||||
$set: { blobs: [new Binary(Buffer.from(blob.getHash(), 'hex'))] },
|
||||
},
|
||||
{ upsert: true }
|
||||
)
|
||||
})
|
||||
|
||||
it('does not upload the blob', async function () {
|
||||
await backupBlob(historyId, blob, filePath)
|
||||
const bucketContents = await listS3Bucket(projectBlobsBucket)
|
||||
expect(bucketContents).to.have.lengthOf(0)
|
||||
})
|
||||
it('does not store the backup', function () {})
|
||||
})
|
||||
|
||||
describe('when the historyId is for a postgres project', function () {
|
||||
let blob
|
||||
let historyId
|
||||
const projectId = new ObjectId()
|
||||
|
||||
beforeEach(async function () {
|
||||
blob = await makeBlobForFile(filePath)
|
||||
historyId = '123'
|
||||
await projects.insertOne({
|
||||
_id: projectId,
|
||||
overleaf: { history: { id: 123 } },
|
||||
})
|
||||
await backedUpBlobs.deleteOne({ _id: projectId })
|
||||
})
|
||||
|
||||
afterEach(async function () {
|
||||
await projects.deleteOne({
|
||||
_id: projectId,
|
||||
})
|
||||
})
|
||||
|
||||
it('uploads the blob to the backup', async function () {
|
||||
await backupBlob(historyId, blob, filePath)
|
||||
const bucketContents = await listS3Bucket(projectBlobsBucket)
|
||||
expect(bucketContents).to.have.lengthOf(1)
|
||||
})
|
||||
it('stores the backup', function () {
|
||||
expect(
|
||||
backedUpBlobs.findOne({
|
||||
_id: projectId,
|
||||
blobs: {
|
||||
$elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) },
|
||||
},
|
||||
})
|
||||
).to.exist
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the blob is not already backed up', function () {
|
||||
let blob
|
||||
let historyId
|
||||
beforeEach(async function () {
|
||||
blob = await makeBlobForFile(filePath)
|
||||
historyId = 'abc123def456abc789def123'
|
||||
await backedUpBlobs.deleteOne({ _id: historyId })
|
||||
})
|
||||
|
||||
it('uploads the blob to the backup', async function () {
|
||||
await backupBlob(historyId, blob, filePath)
|
||||
const bucketContents = await listS3Bucket(projectBlobsBucket)
|
||||
expect(bucketContents).to.have.lengthOf(1)
|
||||
})
|
||||
it('stores the backup', function () {
|
||||
expect(
|
||||
backedUpBlobs.findOne({
|
||||
_id: historyId,
|
||||
blobs: {
|
||||
$elemMatch: { $eq: new Binary(Buffer.from(blob.getHash(), 'hex')) },
|
||||
},
|
||||
})
|
||||
).to.exist
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user