[project-history] add metrics for compression of updates (#26307)

* [project-history] add metrics for compression of updates

* [project-history] sample compression metrics

GitOrigin-RevId: 1cd67dc4ec7b44285afb436c62392b464f007f97
This commit is contained in:
Jakob Ackermann
2025-06-13 15:39:59 +02:00
committed by Copybot
parent a1591e8b0c
commit 6f516b25af
6 changed files with 90 additions and 14 deletions

1
package-lock.json generated
View File

@@ -44692,6 +44692,7 @@
"@overleaf/promise-utils": "*",
"@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*",
"@overleaf/stream-utils": "*",
"async": "^3.2.5",
"aws-sdk": "^2.650.0",
"body-parser": "^1.20.3",

View File

@@ -1,8 +1,15 @@
// @ts-check
import Metrics from '@overleaf/metrics'
import OError from '@overleaf/o-error'
import DMP from 'diff-match-patch'
import { EditOperationBuilder } from 'overleaf-editor-core'
import zlib from 'node:zlib'
import { ReadableString, WritableBuffer } from '@overleaf/stream-utils'
import Stream from 'node:stream'
import logger from '@overleaf/logger'
import { callbackify } from '@overleaf/promise-utils'
import Settings from '@overleaf/settings'
/**
* @import { DeleteOp, InsertOp, Op, Update } from './types'
@@ -180,6 +187,66 @@ export function concatUpdatesWithSameVersion(updates) {
return concattedUpdates
}
async function estimateStorage(updates) {
const blob = JSON.stringify(updates)
const bytes = Buffer.from(blob).byteLength
const read = new ReadableString(blob)
const compress = zlib.createGzip()
const write = new WritableBuffer()
await Stream.promises.pipeline(read, compress, write)
const bytesGz = write.size()
return { bytes, bytesGz, nUpdates: updates.length }
}
/**
* @param {Update[]} rawUpdates
* @param {string} projectId
* @param {import("./Profiler").Profiler} profile
* @return {Promise<Update[]>}
*/
async function compressRawUpdatesWithMetrics(rawUpdates, projectId, profile) {
if (100 * Math.random() > Settings.estimateCompressionSample) {
return compressRawUpdatesWithProfile(rawUpdates, projectId, profile)
}
const before = await estimateStorage(rawUpdates)
profile.log('estimateRawUpdatesSize')
const updates = compressRawUpdatesWithProfile(rawUpdates, projectId, profile)
const after = await estimateStorage(updates)
for (const [path, values] of Object.entries({ before, after })) {
for (const [method, v] of Object.entries(values)) {
Metrics.summary('updates_compression_estimate', v, { path, method })
}
}
for (const method of Object.keys(before)) {
const percentage = Math.ceil(100 * (after[method] / before[method]))
Metrics.summary('updates_compression_percentage', percentage, { method })
}
profile.log('estimateCompressedUpdatesSize')
return updates
}
export const compressRawUpdatesWithMetricsCb = callbackify(
compressRawUpdatesWithMetrics
)
/**
* @param {Update[]} rawUpdates
* @param {string} projectId
* @param {import("./Profiler").Profiler} profile
* @return {Update[]}
*/
function compressRawUpdatesWithProfile(rawUpdates, projectId, profile) {
const updates = compressRawUpdates(rawUpdates)
const timeTaken = profile.log('compressRawUpdates').getTimeDelta()
if (timeTaken >= 1000) {
logger.debug(
{ projectId, updates: rawUpdates, timeTaken },
'slow compression of raw updates'
)
}
return updates
}
export function compressRawUpdates(rawUpdates) {
let updates = convertToSingleOpUpdates(rawUpdates)
updates = compressUpdates(updates)

View File

@@ -593,17 +593,17 @@ export function _processUpdates(
return cb(err)
}
profile.log('skipAlreadyAppliedUpdates')
const compressedUpdates =
UpdateCompressor.compressRawUpdates(unappliedUpdates)
const timeTaken = profile
.log('compressRawUpdates')
.getTimeDelta()
if (timeTaken >= 1000) {
logger.debug(
{ projectId, updates: unappliedUpdates, timeTaken },
'slow compression of raw updates'
)
}
cb(null, unappliedUpdates)
},
(unappliedUpdates, cb) => {
UpdateCompressor.compressRawUpdatesWithMetricsCb(
unappliedUpdates,
projectId,
profile,
cb
)
},
(compressedUpdates, cb) => {
cb = profile.wrap('createBlobs', cb)
BlobManager.createBlobsForUpdates(
projectId,

View File

@@ -110,4 +110,8 @@ module.exports = {
shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '')
.split(',')
.filter(s => !!s),
estimateCompressionSample: parseInt(
process.env.ESTIMATE_COMPRESSION_SAMPLE || '0',
10
),
}

View File

@@ -25,6 +25,7 @@
"@overleaf/promise-utils": "*",
"@overleaf/redis-wrapper": "*",
"@overleaf/settings": "*",
"@overleaf/stream-utils": "*",
"async": "^3.2.5",
"aws-sdk": "^2.650.0",
"body-parser": "^1.20.3",

View File

@@ -22,7 +22,7 @@ describe('UpdatesProcessor', function () {
}
this.RedisManager = {}
this.UpdateCompressor = {
compressRawUpdates: sinon.stub(),
compressRawUpdatesWithMetricsCb: sinon.stub(),
}
this.UpdateTranslator = {
convertToChanges: sinon.stub(),
@@ -299,7 +299,10 @@ describe('UpdatesProcessor', function () {
null,
this.expandedUpdates
)
this.UpdateCompressor.compressRawUpdates.returns(this.compressedUpdates)
this.UpdateCompressor.compressRawUpdatesWithMetricsCb.yields(
null,
this.compressedUpdates
)
this.BlobManager.createBlobsForUpdates.callsArgWith(
4,
null,
@@ -347,7 +350,7 @@ describe('UpdatesProcessor', function () {
})
it('should compress updates', function () {
this.UpdateCompressor.compressRawUpdates.should.have.been.calledWith(
this.UpdateCompressor.compressRawUpdatesWithMetricsCb.should.have.been.calledWith(
this.expandedUpdates
)
})