From 6f516b25afb7db9307ae8e2fbcd67736202a874f Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Fri, 13 Jun 2025 15:39:59 +0200 Subject: [PATCH] [project-history] add metrics for compression of updates (#26307) * [project-history] add metrics for compression of updates * [project-history] sample compression metrics GitOrigin-RevId: 1cd67dc4ec7b44285afb436c62392b464f007f97 --- package-lock.json | 1 + .../app/js/UpdateCompressor.js | 67 +++++++++++++++++++ .../app/js/UpdatesProcessor.js | 22 +++--- .../config/settings.defaults.cjs | 4 ++ services/project-history/package.json | 1 + .../UpdatesManager/UpdatesProcessorTests.js | 9 ++- 6 files changed, 90 insertions(+), 14 deletions(-) diff --git a/package-lock.json b/package-lock.json index ce75c110c4..d41c60b6a5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44692,6 +44692,7 @@ "@overleaf/promise-utils": "*", "@overleaf/redis-wrapper": "*", "@overleaf/settings": "*", + "@overleaf/stream-utils": "*", "async": "^3.2.5", "aws-sdk": "^2.650.0", "body-parser": "^1.20.3", diff --git a/services/project-history/app/js/UpdateCompressor.js b/services/project-history/app/js/UpdateCompressor.js index 471fc791ab..a6b3789b56 100644 --- a/services/project-history/app/js/UpdateCompressor.js +++ b/services/project-history/app/js/UpdateCompressor.js @@ -1,8 +1,15 @@ // @ts-check +import Metrics from '@overleaf/metrics' import OError from '@overleaf/o-error' import DMP from 'diff-match-patch' import { EditOperationBuilder } from 'overleaf-editor-core' +import zlib from 'node:zlib' +import { ReadableString, WritableBuffer } from '@overleaf/stream-utils' +import Stream from 'node:stream' +import logger from '@overleaf/logger' +import { callbackify } from '@overleaf/promise-utils' +import Settings from '@overleaf/settings' /** * @import { DeleteOp, InsertOp, Op, Update } from './types' @@ -180,6 +187,66 @@ export function concatUpdatesWithSameVersion(updates) { return concattedUpdates } +async function estimateStorage(updates) { + const blob = JSON.stringify(updates) + const bytes = Buffer.from(blob).byteLength + const read = new ReadableString(blob) + const compress = zlib.createGzip() + const write = new WritableBuffer() + await Stream.promises.pipeline(read, compress, write) + const bytesGz = write.size() + return { bytes, bytesGz, nUpdates: updates.length } +} + +/** + * @param {Update[]} rawUpdates + * @param {string} projectId + * @param {import("./Profiler").Profiler} profile + * @return {Promise} + */ +async function compressRawUpdatesWithMetrics(rawUpdates, projectId, profile) { + if (100 * Math.random() > Settings.estimateCompressionSample) { + return compressRawUpdatesWithProfile(rawUpdates, projectId, profile) + } + const before = await estimateStorage(rawUpdates) + profile.log('estimateRawUpdatesSize') + const updates = compressRawUpdatesWithProfile(rawUpdates, projectId, profile) + const after = await estimateStorage(updates) + for (const [path, values] of Object.entries({ before, after })) { + for (const [method, v] of Object.entries(values)) { + Metrics.summary('updates_compression_estimate', v, { path, method }) + } + } + for (const method of Object.keys(before)) { + const percentage = Math.ceil(100 * (after[method] / before[method])) + Metrics.summary('updates_compression_percentage', percentage, { method }) + } + profile.log('estimateCompressedUpdatesSize') + return updates +} + +export const compressRawUpdatesWithMetricsCb = callbackify( + compressRawUpdatesWithMetrics +) + +/** + * @param {Update[]} rawUpdates + * @param {string} projectId + * @param {import("./Profiler").Profiler} profile + * @return {Update[]} + */ +function compressRawUpdatesWithProfile(rawUpdates, projectId, profile) { + const updates = compressRawUpdates(rawUpdates) + const timeTaken = profile.log('compressRawUpdates').getTimeDelta() + if (timeTaken >= 1000) { + logger.debug( + { projectId, updates: rawUpdates, timeTaken }, + 'slow compression of raw updates' + ) + } + return updates +} + export function compressRawUpdates(rawUpdates) { let updates = convertToSingleOpUpdates(rawUpdates) updates = compressUpdates(updates) diff --git a/services/project-history/app/js/UpdatesProcessor.js b/services/project-history/app/js/UpdatesProcessor.js index a76241d7ca..112600ec6e 100644 --- a/services/project-history/app/js/UpdatesProcessor.js +++ b/services/project-history/app/js/UpdatesProcessor.js @@ -593,17 +593,17 @@ export function _processUpdates( return cb(err) } profile.log('skipAlreadyAppliedUpdates') - const compressedUpdates = - UpdateCompressor.compressRawUpdates(unappliedUpdates) - const timeTaken = profile - .log('compressRawUpdates') - .getTimeDelta() - if (timeTaken >= 1000) { - logger.debug( - { projectId, updates: unappliedUpdates, timeTaken }, - 'slow compression of raw updates' - ) - } + cb(null, unappliedUpdates) + }, + (unappliedUpdates, cb) => { + UpdateCompressor.compressRawUpdatesWithMetricsCb( + unappliedUpdates, + projectId, + profile, + cb + ) + }, + (compressedUpdates, cb) => { cb = profile.wrap('createBlobs', cb) BlobManager.createBlobsForUpdates( projectId, diff --git a/services/project-history/config/settings.defaults.cjs b/services/project-history/config/settings.defaults.cjs index d259d070b9..d767cddd96 100644 --- a/services/project-history/config/settings.defaults.cjs +++ b/services/project-history/config/settings.defaults.cjs @@ -110,4 +110,8 @@ module.exports = { shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '') .split(',') .filter(s => !!s), + estimateCompressionSample: parseInt( + process.env.ESTIMATE_COMPRESSION_SAMPLE || '0', + 10 + ), } diff --git a/services/project-history/package.json b/services/project-history/package.json index 2a54a807d3..f293037ec1 100644 --- a/services/project-history/package.json +++ b/services/project-history/package.json @@ -25,6 +25,7 @@ "@overleaf/promise-utils": "*", "@overleaf/redis-wrapper": "*", "@overleaf/settings": "*", + "@overleaf/stream-utils": "*", "async": "^3.2.5", "aws-sdk": "^2.650.0", "body-parser": "^1.20.3", diff --git a/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js b/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js index 6f148e5a8d..c6e1811977 100644 --- a/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js +++ b/services/project-history/test/unit/js/UpdatesManager/UpdatesProcessorTests.js @@ -22,7 +22,7 @@ describe('UpdatesProcessor', function () { } this.RedisManager = {} this.UpdateCompressor = { - compressRawUpdates: sinon.stub(), + compressRawUpdatesWithMetricsCb: sinon.stub(), } this.UpdateTranslator = { convertToChanges: sinon.stub(), @@ -299,7 +299,10 @@ describe('UpdatesProcessor', function () { null, this.expandedUpdates ) - this.UpdateCompressor.compressRawUpdates.returns(this.compressedUpdates) + this.UpdateCompressor.compressRawUpdatesWithMetricsCb.yields( + null, + this.compressedUpdates + ) this.BlobManager.createBlobsForUpdates.callsArgWith( 4, null, @@ -347,7 +350,7 @@ describe('UpdatesProcessor', function () { }) it('should compress updates', function () { - this.UpdateCompressor.compressRawUpdates.should.have.been.calledWith( + this.UpdateCompressor.compressRawUpdatesWithMetricsCb.should.have.been.calledWith( this.expandedUpdates ) })