mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-25 02:00:10 +02:00
Merge pull request #22711 from overleaf/jpa-gzip
[history-v1] compress blobs before sending them to AWS GitOrigin-RevId: 1ca1dda6f36738fbabbf00fdab62b86230b9e4f9
This commit is contained in:
@@ -10,6 +10,7 @@ import { Binary, ObjectId } from 'mongodb'
|
||||
import logger from '@overleaf/logger/logging-manager.js'
|
||||
import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js'
|
||||
import metrics from '@overleaf/metrics'
|
||||
import zLib from 'node:zlib'
|
||||
|
||||
const HIGHWATER_MARK = 1024 * 1024
|
||||
|
||||
@@ -37,20 +38,58 @@ function recordBackupConclusion(status, reason = 'none') {
|
||||
*/
|
||||
export async function uploadBlobToBackup(historyId, blob, path) {
|
||||
const md5 = Crypto.createHash('md5')
|
||||
await Stream.promises.pipeline(fs.createReadStream(path), md5)
|
||||
const key = makeProjectKey(historyId, blob.getHash())
|
||||
const persistor = await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
await persistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
key,
|
||||
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
|
||||
{
|
||||
contentType: 'application/octet-stream',
|
||||
contentLength: blob.getByteLength(),
|
||||
sourceMd5: md5.digest('hex'),
|
||||
ifNoneMatch: '*',
|
||||
const filePathCompressed = path + '.gz'
|
||||
let backupSource
|
||||
let contentEncoding
|
||||
let size
|
||||
try {
|
||||
if (blob.getStringLength()) {
|
||||
backupSource = filePathCompressed
|
||||
contentEncoding = 'gzip'
|
||||
size = 0
|
||||
await Stream.promises.pipeline(
|
||||
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
|
||||
zLib.createGzip(),
|
||||
async function* (source) {
|
||||
for await (const chunk of source) {
|
||||
size += chunk.byteLength
|
||||
md5.update(chunk)
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
fs.createWriteStream(filePathCompressed, {
|
||||
highWaterMark: HIGHWATER_MARK,
|
||||
})
|
||||
)
|
||||
} else {
|
||||
backupSource = path
|
||||
size = blob.getByteLength()
|
||||
await Stream.promises.pipeline(
|
||||
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
|
||||
md5
|
||||
)
|
||||
}
|
||||
)
|
||||
const key = makeProjectKey(historyId, blob.getHash())
|
||||
const persistor = await backupPersistor.forProject(projectBlobsBucket, key)
|
||||
await persistor.sendStream(
|
||||
projectBlobsBucket,
|
||||
key,
|
||||
fs.createReadStream(backupSource, { highWaterMark: HIGHWATER_MARK }),
|
||||
{
|
||||
contentEncoding,
|
||||
contentType: 'application/octet-stream',
|
||||
contentLength: size,
|
||||
sourceMd5: md5.digest('hex'),
|
||||
ifNoneMatch: '*',
|
||||
}
|
||||
)
|
||||
} finally {
|
||||
if (backupSource === filePathCompressed) {
|
||||
try {
|
||||
await fs.promises.rm(filePathCompressed, { force: true })
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
import { expect } from 'chai'
|
||||
import { makeBlobForFile } from '../../../../storage/lib/blob_store/index.js'
|
||||
import Crypto from 'node:crypto'
|
||||
import Stream from 'node:stream'
|
||||
import {
|
||||
makeBlobForFile,
|
||||
getStringLengthOfFile,
|
||||
makeProjectKey,
|
||||
} from '../../../../storage/lib/blob_store/index.js'
|
||||
import { backupBlob } from '../../../../storage/lib/backupBlob.mjs'
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
@@ -11,11 +17,15 @@ import {
|
||||
backupPersistor,
|
||||
projectBlobsBucket,
|
||||
} from '../../../../storage/lib/backupPersistor.mjs'
|
||||
import { WritableBuffer } from '@overleaf/stream-utils'
|
||||
|
||||
async function listS3BucketRaw(bucket) {
|
||||
const client = backupPersistor._getClientForBucket(bucket)
|
||||
return await client.listObjectsV2({ Bucket: bucket }).promise()
|
||||
}
|
||||
|
||||
async function listS3Bucket(bucket, wantStorageClass) {
|
||||
const client = backupPersistor._getClientForBucket(bucket)
|
||||
const response = await client.listObjectsV2({ Bucket: bucket }).promise()
|
||||
|
||||
const response = await listS3BucketRaw(bucket)
|
||||
for (const object of response.Contents || []) {
|
||||
if (wantStorageClass) {
|
||||
expect(object).to.have.property('StorageClass', wantStorageClass)
|
||||
@@ -140,4 +150,55 @@ describe('backupBlob', function () {
|
||||
).to.exist
|
||||
})
|
||||
})
|
||||
|
||||
const cases = [
|
||||
{
|
||||
name: 'text file',
|
||||
content: Buffer.from('x'.repeat(1000)),
|
||||
storedSize: 29, // zlib.gzipSync(content).byteLength
|
||||
},
|
||||
{
|
||||
name: 'binary file',
|
||||
content: Buffer.from([0, 1, 2, 3]),
|
||||
storedSize: 4,
|
||||
},
|
||||
{
|
||||
name: 'large binary file',
|
||||
content: Crypto.randomBytes(10 * 1024 * 1024),
|
||||
storedSize: 10 * 1024 * 1024,
|
||||
},
|
||||
]
|
||||
for (const { name, content, storedSize } of cases) {
|
||||
describe(name, function () {
|
||||
let blob
|
||||
let key
|
||||
let historyId
|
||||
beforeEach(async function () {
|
||||
historyId = 'abc123def456abc789def123'
|
||||
await fs.promises.writeFile(filePath, content)
|
||||
blob = await makeBlobForFile(filePath)
|
||||
blob.setStringLength(
|
||||
await getStringLengthOfFile(blob.getByteLength(), filePath)
|
||||
)
|
||||
key = makeProjectKey(historyId, blob.getHash())
|
||||
await backupBlob(historyId, blob, filePath)
|
||||
})
|
||||
it('should upload the blob', async function () {
|
||||
const response = await listS3BucketRaw(projectBlobsBucket)
|
||||
expect(response.Contents).to.have.length(1)
|
||||
expect(response.Contents[0].Key).to.equal(key)
|
||||
expect(response.Contents[0].Size).to.equal(storedSize)
|
||||
})
|
||||
it('should read back the same content', async function () {
|
||||
const buf = new WritableBuffer()
|
||||
await Stream.promises.pipeline(
|
||||
await backupPersistor.getObjectStream(projectBlobsBucket, key, {
|
||||
autoGunzip: true,
|
||||
}),
|
||||
buf
|
||||
)
|
||||
expect(buf.getContents()).to.deep.equal(content)
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user