From dbf6537dd1dcb38d764a79fbd992e10532e61685 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 7 Jan 2025 10:36:44 +0000 Subject: [PATCH] Merge pull request #22711 from overleaf/jpa-gzip [history-v1] compress blobs before sending them to AWS GitOrigin-RevId: 1ca1dda6f36738fbabbf00fdab62b86230b9e4f9 --- .../history-v1/storage/lib/backupBlob.mjs | 65 +++++++++++++---- .../acceptance/js/storage/backupBlob.test.mjs | 69 +++++++++++++++++-- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/services/history-v1/storage/lib/backupBlob.mjs b/services/history-v1/storage/lib/backupBlob.mjs index fb65bf6575..2602c0818f 100644 --- a/services/history-v1/storage/lib/backupBlob.mjs +++ b/services/history-v1/storage/lib/backupBlob.mjs @@ -10,6 +10,7 @@ import { Binary, ObjectId } from 'mongodb' import logger from '@overleaf/logger/logging-manager.js' import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js' import metrics from '@overleaf/metrics' +import zLib from 'node:zlib' const HIGHWATER_MARK = 1024 * 1024 @@ -37,20 +38,58 @@ function recordBackupConclusion(status, reason = 'none') { */ export async function uploadBlobToBackup(historyId, blob, path) { const md5 = Crypto.createHash('md5') - await Stream.promises.pipeline(fs.createReadStream(path), md5) - const key = makeProjectKey(historyId, blob.getHash()) - const persistor = await backupPersistor.forProject(projectBlobsBucket, key) - await persistor.sendStream( - projectBlobsBucket, - key, - fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }), - { - contentType: 'application/octet-stream', - contentLength: blob.getByteLength(), - sourceMd5: md5.digest('hex'), - ifNoneMatch: '*', + const filePathCompressed = path + '.gz' + let backupSource + let contentEncoding + let size + try { + if (blob.getStringLength()) { + backupSource = filePathCompressed + contentEncoding = 'gzip' + size = 0 + await Stream.promises.pipeline( + fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }), + zLib.createGzip(), + async function* (source) { + for await (const chunk of source) { + size += chunk.byteLength + md5.update(chunk) + yield chunk + } + }, + fs.createWriteStream(filePathCompressed, { + highWaterMark: HIGHWATER_MARK, + }) + ) + } else { + backupSource = path + size = blob.getByteLength() + await Stream.promises.pipeline( + fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }), + md5 + ) } - ) + const key = makeProjectKey(historyId, blob.getHash()) + const persistor = await backupPersistor.forProject(projectBlobsBucket, key) + await persistor.sendStream( + projectBlobsBucket, + key, + fs.createReadStream(backupSource, { highWaterMark: HIGHWATER_MARK }), + { + contentEncoding, + contentType: 'application/octet-stream', + contentLength: size, + sourceMd5: md5.digest('hex'), + ifNoneMatch: '*', + } + ) + } finally { + if (backupSource === filePathCompressed) { + try { + await fs.promises.rm(filePathCompressed, { force: true }) + } catch {} + } + } } /** diff --git a/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs b/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs index 0f2d0254b0..161acb7a55 100644 --- a/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/backupBlob.test.mjs @@ -1,5 +1,11 @@ import { expect } from 'chai' -import { makeBlobForFile } from '../../../../storage/lib/blob_store/index.js' +import Crypto from 'node:crypto' +import Stream from 'node:stream' +import { + makeBlobForFile, + getStringLengthOfFile, + makeProjectKey, +} from '../../../../storage/lib/blob_store/index.js' import { backupBlob } from '../../../../storage/lib/backupBlob.mjs' import fs from 'node:fs' import path from 'node:path' @@ -11,11 +17,15 @@ import { backupPersistor, projectBlobsBucket, } from '../../../../storage/lib/backupPersistor.mjs' +import { WritableBuffer } from '@overleaf/stream-utils' + +async function listS3BucketRaw(bucket) { + const client = backupPersistor._getClientForBucket(bucket) + return await client.listObjectsV2({ Bucket: bucket }).promise() +} async function listS3Bucket(bucket, wantStorageClass) { - const client = backupPersistor._getClientForBucket(bucket) - const response = await client.listObjectsV2({ Bucket: bucket }).promise() - + const response = await listS3BucketRaw(bucket) for (const object of response.Contents || []) { if (wantStorageClass) { expect(object).to.have.property('StorageClass', wantStorageClass) @@ -140,4 +150,55 @@ describe('backupBlob', function () { ).to.exist }) }) + + const cases = [ + { + name: 'text file', + content: Buffer.from('x'.repeat(1000)), + storedSize: 29, // zlib.gzipSync(content).byteLength + }, + { + name: 'binary file', + content: Buffer.from([0, 1, 2, 3]), + storedSize: 4, + }, + { + name: 'large binary file', + content: Crypto.randomBytes(10 * 1024 * 1024), + storedSize: 10 * 1024 * 1024, + }, + ] + for (const { name, content, storedSize } of cases) { + describe(name, function () { + let blob + let key + let historyId + beforeEach(async function () { + historyId = 'abc123def456abc789def123' + await fs.promises.writeFile(filePath, content) + blob = await makeBlobForFile(filePath) + blob.setStringLength( + await getStringLengthOfFile(blob.getByteLength(), filePath) + ) + key = makeProjectKey(historyId, blob.getHash()) + await backupBlob(historyId, blob, filePath) + }) + it('should upload the blob', async function () { + const response = await listS3BucketRaw(projectBlobsBucket) + expect(response.Contents).to.have.length(1) + expect(response.Contents[0].Key).to.equal(key) + expect(response.Contents[0].Size).to.equal(storedSize) + }) + it('should read back the same content', async function () { + const buf = new WritableBuffer() + await Stream.promises.pipeline( + await backupPersistor.getObjectStream(projectBlobsBucket, key, { + autoGunzip: true, + }), + buf + ) + expect(buf.getContents()).to.deep.equal(content) + }) + }) + } })