From 8e74d3c58c7db76eb07ba7e27542dabefb3ea028 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 19 Nov 2024 10:46:48 +0100 Subject: [PATCH] Merge pull request #21947 from overleaf/bg-jpa-back-fill-script-tweaks [history-v1] back_fill_file_hash: performance tweaks GitOrigin-RevId: c3d0c7906707fc902addcde64eaf41c24ceeece7 --- libraries/object-persistor/src/S3Persistor.js | 40 +++++--- .../test/unit/S3PersistorTests.js | 20 +++- .../storage/scripts/back_fill_file_hash.mjs | 97 +++++++++++++++++-- .../js/storage/back_fill_file_hash.test.mjs | 66 +++++++++++-- 4 files changed, 195 insertions(+), 28 deletions(-) diff --git a/libraries/object-persistor/src/S3Persistor.js b/libraries/object-persistor/src/S3Persistor.js index 0a9ff6d260..2835a271ff 100644 --- a/libraries/object-persistor/src/S3Persistor.js +++ b/libraries/object-persistor/src/S3Persistor.js @@ -61,6 +61,9 @@ class SSECOptions { } class S3Persistor extends AbstractPersistor { + /** @type {Map} */ + #clients = new Map() + constructor(settings = {}) { super() @@ -131,19 +134,19 @@ class S3Persistor extends AbstractPersistor { // if we have an md5 hash, pass this to S3 to verify the upload - otherwise // we rely on the S3 client's checksum calculation to validate the upload - const clientOptions = {} + let computeChecksums = false if (opts.sourceMd5) { uploadOptions.ContentMD5 = PersistorHelper.hexToBase64(opts.sourceMd5) } else { - clientOptions.computeChecksums = true + computeChecksums = true } if (this.settings.disableMultiPartUpload) { - await this._getClientForBucket(bucketName, clientOptions) + await this._getClientForBucket(bucketName, computeChecksums) .putObject(uploadOptions) .promise() } else { - await this._getClientForBucket(bucketName, clientOptions) + await this._getClientForBucket(bucketName, computeChecksums) .upload(uploadOptions, { partSize: this.settings.partSize }) .promise() } @@ -517,23 +520,34 @@ class S3Persistor extends AbstractPersistor { /** * @param {string} bucket - * @param {Object} [clientOptions] + * @param {boolean} computeChecksums * @return {S3} * @private */ - _getClientForBucket(bucket, clientOptions) { - return new S3( - this._buildClientOptions( - this.settings.bucketCreds?.[bucket], - clientOptions + _getClientForBucket(bucket, computeChecksums = false) { + /** @type {S3.Types.ClientConfiguration} */ + const clientOptions = {} + const cacheKey = `${bucket}:${computeChecksums}` + if (computeChecksums) { + clientOptions.computeChecksums = true + } + let client = this.#clients.get(cacheKey) + if (!client) { + client = new S3( + this._buildClientOptions( + this.settings.bucketCreds?.[bucket], + clientOptions + ) ) - ) + this.#clients.set(cacheKey, client) + } + return client } /** * @param {Object} bucketCredentials - * @param {Object} clientOptions - * @return {Object} + * @param {S3.Types.ClientConfiguration} clientOptions + * @return {S3.Types.ClientConfiguration} * @private */ _buildClientOptions(bucketCredentials, clientOptions) { diff --git a/libraries/object-persistor/test/unit/S3PersistorTests.js b/libraries/object-persistor/test/unit/S3PersistorTests.js index 2f2ba3ea5c..822a4ac811 100644 --- a/libraries/object-persistor/test/unit/S3PersistorTests.js +++ b/libraries/object-persistor/test/unit/S3PersistorTests.js @@ -147,7 +147,7 @@ describe('S3PersistorTests', function () { deleteObjects: sinon.stub().returns(EmptyPromise), getSignedUrlPromise: sinon.stub().resolves(redirectUrl), } - S3 = sinon.stub().returns(S3Client) + S3 = sinon.stub().callsFake(() => Object.assign({}, S3Client)) Hash = { end: sinon.stub(), @@ -1027,4 +1027,22 @@ describe('S3PersistorTests', function () { }) }) }) + + describe('_getClientForBucket', function () { + it('should return same instance for same bucket', function () { + const a = S3Persistor._getClientForBucket('foo') + const b = S3Persistor._getClientForBucket('foo') + expect(a).to.equal(b) + }) + it('should return different instance for different bucket', function () { + const a = S3Persistor._getClientForBucket('foo') + const b = S3Persistor._getClientForBucket('bar') + expect(a).to.not.equal(b) + }) + it('should return different instance for same bucket different computeChecksums', function () { + const a = S3Persistor._getClientForBucket('foo', false) + const b = S3Persistor._getClientForBucket('foo', true) + expect(a).to.not.equal(b) + }) + }) }) diff --git a/services/history-v1/storage/scripts/back_fill_file_hash.mjs b/services/history-v1/storage/scripts/back_fill_file_hash.mjs index 9b73680c94..dcd50f8dee 100644 --- a/services/history-v1/storage/scripts/back_fill_file_hash.mjs +++ b/services/history-v1/storage/scripts/back_fill_file_hash.mjs @@ -3,6 +3,7 @@ import Crypto from 'node:crypto' import Events from 'node:events' import fs from 'node:fs' import Path from 'node:path' +import { performance } from 'node:perf_hooks' import Stream from 'node:stream' import zLib from 'node:zlib' import { setTimeout } from 'node:timers/promises' @@ -37,6 +38,7 @@ ObjectId.cacheHexString = true /** * @typedef {import("overleaf-editor-core").Blob} Blob + * @typedef {import("perf_hooks").EventLoopUtilization} EventLoopUtilization * @typedef {import("mongodb").Collection} Collection * @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor */ @@ -105,6 +107,12 @@ const RETRY_FILESTORE_404 = process.env.RETRY_FILESTORE_404 === 'true' const BUFFER_DIR = fs.mkdtempSync( process.env.BUFFER_DIR_PREFIX || '/tmp/back_fill_file_hash-' ) +// https://nodejs.org/api/stream.html#streamgetdefaulthighwatermarkobjectmode +const STREAM_HIGH_WATER_MARK = parseInt( + process.env.STREAM_HIGH_WATER_MARK || (64 * 1024).toString(), + 10 +) +const LOGGING_INTERVAL = parseInt(process.env.LOGGING_INTERVAL || '60000', 10) const projectsCollection = db.collection('projects') const deletedProjectsCollection = db.collection('deletedProjects') @@ -127,20 +135,81 @@ const STATS = { deduplicatedWriteToAWSLocalEgress: 0, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, + readFromGCSCount: 0, + readFromGCSIngress: 0, writeToAWSCount: 0, writeToAWSEgress: 0, } +const processStart = performance.now() +let lastLogTS = processStart +let lastLog = Object.assign({}, STATS) +let lastEventLoopStats = performance.eventLoopUtilization() + +/** + * @param {number} v + * @param {number} ms + */ +function toMiBPerSecond(v, ms) { + const ONE_MiB = 1024 * 1024 + return v / ONE_MiB / (ms / 1000) +} + +/** + * @param {any} stats + * @param {number} ms + * @return {{writeToAWSThroughputMiBPerSecond: number, readFromGCSThroughputMiBPerSecond: number}} + */ +function bandwidthStats(stats, ms) { + return { + readFromGCSThroughputMiBPerSecond: toMiBPerSecond( + stats.readFromGCSIngress, + ms + ), + writeToAWSThroughputMiBPerSecond: toMiBPerSecond( + stats.writeToAWSEgress, + ms + ), + } +} + +/** + * @param {EventLoopUtilization} nextEventLoopStats + * @param {number} now + * @return {Object} + */ +function computeDiff(nextEventLoopStats, now) { + const ms = now - lastLogTS + lastLogTS = now + const diff = { + eventLoop: performance.eventLoopUtilization( + nextEventLoopStats, + lastEventLoopStats + ), + } + for (const [name, v] of Object.entries(STATS)) { + diff[name] = v - lastLog[name] + } + return Object.assign(diff, bandwidthStats(diff, ms)) +} + function printStats() { + const now = performance.now() + const nextEventLoopStats = performance.eventLoopUtilization() console.log( JSON.stringify({ time: new Date(), ...STATS, + ...bandwidthStats(STATS, now - processStart), + eventLoop: nextEventLoopStats, + diff: computeDiff(nextEventLoopStats, now), }) ) + lastEventLoopStats = nextEventLoopStats + lastLog = Object.assign({}, STATS) } -setInterval(printStats, 60_000) +setInterval(printStats, LOGGING_INTERVAL) /** * @param {QueueEntry} entry @@ -187,12 +256,19 @@ async function processFileOnce(entry) { BUFFER_DIR, projectId.toString() + fileId.toString() ) - const dst = fs.createWriteStream(filePath) + const dst = fs.createWriteStream(filePath, { + highWaterMark: STREAM_HIGH_WATER_MARK, + }) + STATS.readFromGCSCount++ const src = await filestorePersistor.getObjectStream( USER_FILES_BUCKET_NAME, `${projectId}/${fileId}` ) - await Stream.promises.pipeline(src, dst) + try { + await Stream.promises.pipeline(src, dst) + } finally { + STATS.readFromGCSIngress += dst.bytesWritten + } const blobStore = new BlobStore(historyId) const blob = await blobStore.putFile(filePath) @@ -221,7 +297,7 @@ async function processFileOnce(entry) { contentEncoding = 'gzip' size = 0 await Stream.promises.pipeline( - fs.createReadStream(filePath), + fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }), zLib.createGzip(), async function* (source) { for await (const chunk of source) { @@ -230,12 +306,17 @@ async function processFileOnce(entry) { yield chunk } }, - fs.createWriteStream(filePathCompressed) + fs.createWriteStream(filePathCompressed, { + highWaterMark: STREAM_HIGH_WATER_MARK, + }) ) } else { backupSource = filePath size = blob.getByteLength() - await Stream.promises.pipeline(fs.createReadStream(filePath), md5) + await Stream.promises.pipeline( + fs.createReadStream(filePath, { highWaterMark: STREAM_HIGH_WATER_MARK }), + md5 + ) } const backendKeyPath = makeProjectKey(historyId, blob.getHash()) const persistor = await entry.ctx.getCachedPersistor(backendKeyPath) @@ -244,7 +325,9 @@ async function processFileOnce(entry) { await persistor.sendStream( projectBlobsBucket, backendKeyPath, - fs.createReadStream(backupSource), + fs.createReadStream(backupSource, { + highWaterMark: STREAM_HIGH_WATER_MARK, + }), { contentEncoding, contentType: 'application/octet-stream', diff --git a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs index 9281cefc2d..899da8893e 100644 --- a/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/back_fill_file_hash.test.mjs @@ -127,7 +127,8 @@ describe('back_fill_file_hash script', function () { const deleteProjectsRecordId2 = new ObjectId() const deleteProjectsRecordId3 = new ObjectId() const deleteProjectsRecordId4 = new ObjectId() - const contentFile7 = Buffer.alloc(11_000_000) + const twoByteUTF8Symbol = 'รถ' + const contentFile7 = Buffer.alloc(4_000_000, twoByteUTF8Symbol) const hashFile7 = gitBlobHashBuffer(contentFile7) const writtenBlobs = [ { projectId: projectId0, historyId: historyId0, fileId: fileId0 }, @@ -441,7 +442,12 @@ describe('back_fill_file_hash script', function () { ) }) - async function tryRunScript(env = {}) { + /** + * @param {Record} env + * @param {boolean} shouldHaveWritten + * @return {Promise<{result, stats: any}>} + */ + async function tryRunScript(env = {}, shouldHaveWritten) { let result try { result = await promisify(execFile)( @@ -471,14 +477,35 @@ describe('back_fill_file_hash script', function () { } result = { stdout, stderr, status: code } } + const extraStatsKeys = [ + 'eventLoop', + 'readFromGCSThroughputMiBPerSecond', + 'writeToAWSThroughputMiBPerSecond', + ] const stats = JSON.parse(result.stdout.trimEnd().split('\n').pop()) + expect(Object.keys(stats.diff).sort()).to.deep.equal( + [...extraStatsKeys, ...Object.keys(STATS_ALL)].sort() + ) + delete stats.diff expect(new Date(stats.time).toISOString()).to.equal(stats.time) delete stats.time + if (shouldHaveWritten) { + expect(stats.readFromGCSThroughputMiBPerSecond).to.be.greaterThan(0) + expect(stats.writeToAWSThroughputMiBPerSecond).to.be.greaterThan(0) + } + for (const key of extraStatsKeys) { + delete stats[key] + } return { stats, result } } - async function runScript(env = {}) { - const { stats, result } = await tryRunScript(env) + /** + * @param {Record} env + * @param {boolean} shouldHaveWritten + * @return {Promise<{result, stats: any}>} + */ + async function runScript(env = {}, shouldHaveWritten = true) { + const { stats, result } = await tryRunScript(env, shouldHaveWritten) if (result.status !== 0) { console.log(result) expect(result).to.have.property('status', 0) @@ -718,7 +745,7 @@ describe('back_fill_file_hash script', function () { ]) }) it('should process nothing on re-run', async function () { - const rerun = await runScript() + const rerun = await runScript({}, false) expect(rerun.stats).deep.equal({ ...STATS_ALL_ZERO, // We still need to iterate over all the projects. @@ -814,6 +841,8 @@ describe('back_fill_file_hash script', function () { deduplicatedWriteToAWSLocalEgress: 0, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, + readFromGCSCount: 0, + readFromGCSIngress: 0, writeToAWSCount: 0, writeToAWSEgress: 0, } @@ -834,8 +863,10 @@ describe('back_fill_file_hash script', function () { deduplicatedWriteToAWSLocalEgress: 0, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, + readFromGCSCount: 6, + readFromGCSIngress: 4000120, writeToAWSCount: 5, - writeToAWSEgress: 11000118, + writeToAWSEgress: 4032, } const STATS_UP_FROM_PROJECT1_ONWARD = { projects: 2, @@ -854,6 +885,8 @@ describe('back_fill_file_hash script', function () { deduplicatedWriteToAWSLocalEgress: 30, deduplicatedWriteToAWSRemoteCount: 0, deduplicatedWriteToAWSRemoteEgress: 0, + readFromGCSCount: 3, + readFromGCSIngress: 72, writeToAWSCount: 2, writeToAWSEgress: 58, } @@ -884,6 +917,7 @@ describe('back_fill_file_hash script', function () { sumStats(STATS_ALL, { ...STATS_ALL_ZERO, filesFailed: 1, + readFromGCSIngress: -24, writeToAWSCount: -1, writeToAWSEgress: -28, }) @@ -921,8 +955,13 @@ describe('back_fill_file_hash script', function () { ]) expectNotFoundError(result, 'failed to process file, trying again') expect(result.status).to.equal(0) - expect({ ...stats, filesRetries: 0 }).to.deep.equal(STATS_ALL) + expect({ ...stats, filesRetries: 0, readFromGCSCount: 0 }).to.deep.equal({ + ...STATS_ALL, + filesRetries: 0, + readFromGCSCount: 0, + }) expect(stats.filesRetries).to.be.greaterThan(0) + expect(stats.filesRetries).to.be.greaterThan(STATS_ALL.readFromGCSCount) }) describe('full run CONCURRENCY=1', function () { @@ -952,6 +991,19 @@ describe('back_fill_file_hash script', function () { commonAssertions() }) + describe('full run STREAM_HIGH_WATER_MARK=1MB', function () { + let output + beforeEach('run script', async function () { + output = await runScript({ + STREAM_HIGH_WATER_MARK: (1024 * 1024).toString(), + }) + }) + it('should print stats', function () { + expect(output.stats).deep.equal(STATS_ALL) + }) + commonAssertions() + }) + describe('with something in the bucket already', function () { beforeEach('create a file in s3', async function () { const buf = Buffer.from(fileId0.toString())