mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
[project-history] add metrics for compression of updates (#26307)
* [project-history] add metrics for compression of updates * [project-history] sample compression metrics GitOrigin-RevId: 1cd67dc4ec7b44285afb436c62392b464f007f97
This commit is contained in:
1
package-lock.json
generated
1
package-lock.json
generated
@@ -44692,6 +44692,7 @@
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/redis-wrapper": "*",
|
||||
"@overleaf/settings": "*",
|
||||
"@overleaf/stream-utils": "*",
|
||||
"async": "^3.2.5",
|
||||
"aws-sdk": "^2.650.0",
|
||||
"body-parser": "^1.20.3",
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
// @ts-check
|
||||
|
||||
import Metrics from '@overleaf/metrics'
|
||||
import OError from '@overleaf/o-error'
|
||||
import DMP from 'diff-match-patch'
|
||||
import { EditOperationBuilder } from 'overleaf-editor-core'
|
||||
import zlib from 'node:zlib'
|
||||
import { ReadableString, WritableBuffer } from '@overleaf/stream-utils'
|
||||
import Stream from 'node:stream'
|
||||
import logger from '@overleaf/logger'
|
||||
import { callbackify } from '@overleaf/promise-utils'
|
||||
import Settings from '@overleaf/settings'
|
||||
|
||||
/**
|
||||
* @import { DeleteOp, InsertOp, Op, Update } from './types'
|
||||
@@ -180,6 +187,66 @@ export function concatUpdatesWithSameVersion(updates) {
|
||||
return concattedUpdates
|
||||
}
|
||||
|
||||
async function estimateStorage(updates) {
|
||||
const blob = JSON.stringify(updates)
|
||||
const bytes = Buffer.from(blob).byteLength
|
||||
const read = new ReadableString(blob)
|
||||
const compress = zlib.createGzip()
|
||||
const write = new WritableBuffer()
|
||||
await Stream.promises.pipeline(read, compress, write)
|
||||
const bytesGz = write.size()
|
||||
return { bytes, bytesGz, nUpdates: updates.length }
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Update[]} rawUpdates
|
||||
* @param {string} projectId
|
||||
* @param {import("./Profiler").Profiler} profile
|
||||
* @return {Promise<Update[]>}
|
||||
*/
|
||||
async function compressRawUpdatesWithMetrics(rawUpdates, projectId, profile) {
|
||||
if (100 * Math.random() > Settings.estimateCompressionSample) {
|
||||
return compressRawUpdatesWithProfile(rawUpdates, projectId, profile)
|
||||
}
|
||||
const before = await estimateStorage(rawUpdates)
|
||||
profile.log('estimateRawUpdatesSize')
|
||||
const updates = compressRawUpdatesWithProfile(rawUpdates, projectId, profile)
|
||||
const after = await estimateStorage(updates)
|
||||
for (const [path, values] of Object.entries({ before, after })) {
|
||||
for (const [method, v] of Object.entries(values)) {
|
||||
Metrics.summary('updates_compression_estimate', v, { path, method })
|
||||
}
|
||||
}
|
||||
for (const method of Object.keys(before)) {
|
||||
const percentage = Math.ceil(100 * (after[method] / before[method]))
|
||||
Metrics.summary('updates_compression_percentage', percentage, { method })
|
||||
}
|
||||
profile.log('estimateCompressedUpdatesSize')
|
||||
return updates
|
||||
}
|
||||
|
||||
export const compressRawUpdatesWithMetricsCb = callbackify(
|
||||
compressRawUpdatesWithMetrics
|
||||
)
|
||||
|
||||
/**
|
||||
* @param {Update[]} rawUpdates
|
||||
* @param {string} projectId
|
||||
* @param {import("./Profiler").Profiler} profile
|
||||
* @return {Update[]}
|
||||
*/
|
||||
function compressRawUpdatesWithProfile(rawUpdates, projectId, profile) {
|
||||
const updates = compressRawUpdates(rawUpdates)
|
||||
const timeTaken = profile.log('compressRawUpdates').getTimeDelta()
|
||||
if (timeTaken >= 1000) {
|
||||
logger.debug(
|
||||
{ projectId, updates: rawUpdates, timeTaken },
|
||||
'slow compression of raw updates'
|
||||
)
|
||||
}
|
||||
return updates
|
||||
}
|
||||
|
||||
export function compressRawUpdates(rawUpdates) {
|
||||
let updates = convertToSingleOpUpdates(rawUpdates)
|
||||
updates = compressUpdates(updates)
|
||||
|
||||
@@ -593,17 +593,17 @@ export function _processUpdates(
|
||||
return cb(err)
|
||||
}
|
||||
profile.log('skipAlreadyAppliedUpdates')
|
||||
const compressedUpdates =
|
||||
UpdateCompressor.compressRawUpdates(unappliedUpdates)
|
||||
const timeTaken = profile
|
||||
.log('compressRawUpdates')
|
||||
.getTimeDelta()
|
||||
if (timeTaken >= 1000) {
|
||||
logger.debug(
|
||||
{ projectId, updates: unappliedUpdates, timeTaken },
|
||||
'slow compression of raw updates'
|
||||
)
|
||||
}
|
||||
cb(null, unappliedUpdates)
|
||||
},
|
||||
(unappliedUpdates, cb) => {
|
||||
UpdateCompressor.compressRawUpdatesWithMetricsCb(
|
||||
unappliedUpdates,
|
||||
projectId,
|
||||
profile,
|
||||
cb
|
||||
)
|
||||
},
|
||||
(compressedUpdates, cb) => {
|
||||
cb = profile.wrap('createBlobs', cb)
|
||||
BlobManager.createBlobsForUpdates(
|
||||
projectId,
|
||||
|
||||
@@ -110,4 +110,8 @@ module.exports = {
|
||||
shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '')
|
||||
.split(',')
|
||||
.filter(s => !!s),
|
||||
estimateCompressionSample: parseInt(
|
||||
process.env.ESTIMATE_COMPRESSION_SAMPLE || '0',
|
||||
10
|
||||
),
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
"@overleaf/promise-utils": "*",
|
||||
"@overleaf/redis-wrapper": "*",
|
||||
"@overleaf/settings": "*",
|
||||
"@overleaf/stream-utils": "*",
|
||||
"async": "^3.2.5",
|
||||
"aws-sdk": "^2.650.0",
|
||||
"body-parser": "^1.20.3",
|
||||
|
||||
@@ -22,7 +22,7 @@ describe('UpdatesProcessor', function () {
|
||||
}
|
||||
this.RedisManager = {}
|
||||
this.UpdateCompressor = {
|
||||
compressRawUpdates: sinon.stub(),
|
||||
compressRawUpdatesWithMetricsCb: sinon.stub(),
|
||||
}
|
||||
this.UpdateTranslator = {
|
||||
convertToChanges: sinon.stub(),
|
||||
@@ -299,7 +299,10 @@ describe('UpdatesProcessor', function () {
|
||||
null,
|
||||
this.expandedUpdates
|
||||
)
|
||||
this.UpdateCompressor.compressRawUpdates.returns(this.compressedUpdates)
|
||||
this.UpdateCompressor.compressRawUpdatesWithMetricsCb.yields(
|
||||
null,
|
||||
this.compressedUpdates
|
||||
)
|
||||
this.BlobManager.createBlobsForUpdates.callsArgWith(
|
||||
4,
|
||||
null,
|
||||
@@ -347,7 +350,7 @@ describe('UpdatesProcessor', function () {
|
||||
})
|
||||
|
||||
it('should compress updates', function () {
|
||||
this.UpdateCompressor.compressRawUpdates.should.have.been.calledWith(
|
||||
this.UpdateCompressor.compressRawUpdatesWithMetricsCb.should.have.been.calledWith(
|
||||
this.expandedUpdates
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user