Merge pull request #26270 from overleaf/bg-history-redis-commit-change-manager

replace redis logic in persistChanges with new commitChanges method

GitOrigin-RevId: e06f9477b9d5548fa92ef87fb6e1f4f672001a35
This commit is contained in:
Brian Gough
2025-06-10 17:45:02 +01:00
committed by Copybot
parent 99a47cfde8
commit b960e7f76e
4 changed files with 97 additions and 68 deletions
@@ -21,7 +21,7 @@ const BatchBlobStore = storage.BatchBlobStore
const BlobStore = storage.BlobStore
const chunkStore = storage.chunkStore
const HashCheckBlobStore = storage.HashCheckBlobStore
const persistChanges = storage.persistChanges
const commitChanges = storage.commitChanges
const InvalidChangeError = storage.InvalidChangeError
const render = require('./render')
@@ -110,7 +110,7 @@ async function importChanges(req, res, next) {
let result
try {
result = await persistChanges(projectId, changes, limits, endVersion, {
result = await commitChanges(projectId, changes, limits, endVersion, {
queueChangesInRedis: true,
})
} catch (err) {
+1
View File
@@ -9,6 +9,7 @@ exports.redis = require('./lib/redis')
exports.persistChanges = require('./lib/persist_changes')
exports.persistor = require('./lib/persistor')
exports.persistBuffer = require('./lib/persist_buffer')
exports.commitChanges = require('./lib/commit_changes')
exports.queueChanges = require('./lib/queue_changes')
exports.ProjectArchive = require('./lib/project_archive')
exports.streams = require('./lib/streams')
@@ -0,0 +1,93 @@
// @ts-check
'use strict'
const metrics = require('@overleaf/metrics')
const redisBackend = require('./chunk_store/redis')
const logger = require('@overleaf/logger')
const queueChanges = require('./queue_changes')
const persistChanges = require('./persist_changes')
/**
* @typedef {import('overleaf-editor-core').Change} Change
*/
/**
* Handle incoming changes by processing them according to the specified options.
* @param {string} projectId
* @param {Change[]} changes
* @param {Object} limits
* @param {number} endVersion
* @param {Object} options
* @param {Boolean} [options.queueChangesInRedis]
* If true, queue the changes in Redis for testing purposes.
* @return {Promise.<Object?>}
*/
async function commitChanges(
projectId,
changes,
limits,
endVersion,
options = {}
) {
if (options.queueChangesInRedis) {
try {
await queueChanges(projectId, changes, endVersion)
await fakePersistRedisChanges(projectId, changes, endVersion)
} catch (err) {
logger.error({ err }, 'Chunk buffer verification failed')
}
}
const result = await persistChanges(projectId, changes, limits, endVersion)
return result
}
/**
* Simulates the persistence of changes by verifying a given set of changes against
* what is currently stored as non-persisted in Redis, and then updates the
* persisted version number in Redis.
*
* @async
* @param {string} projectId - The ID of the project.
* @param {Change[]} changesToPersist - An array of changes that are expected to be
* persisted. These are used for verification
* against the changes currently in Redis.
* @param {number} baseVersion - The base version number from which to calculate
* the new persisted version.
* @returns {Promise<void>} A promise that resolves when the persisted version
* in Redis has been updated.
*/
async function fakePersistRedisChanges(
projectId,
changesToPersist,
baseVersion
) {
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
projectId,
baseVersion
)
if (
serializeChanges(nonPersistedChanges) === serializeChanges(changesToPersist)
) {
metrics.inc('persist_redis_changes_verification', 1, { status: 'match' })
} else {
logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis')
metrics.inc('persist_redis_changes_verification', 1, {
status: 'mismatch',
})
}
const persistedVersion = baseVersion + nonPersistedChanges.length
await redisBackend.setPersistedVersion(projectId, persistedVersion)
}
/**
* @param {Change[]} changes
*/
function serializeChanges(changes) {
return JSON.stringify(changes.map(change => change.toRaw()))
}
module.exports = commitChanges
@@ -4,7 +4,6 @@
const _ = require('lodash')
const logger = require('@overleaf/logger')
const metrics = require('@overleaf/metrics')
const core = require('overleaf-editor-core')
const Chunk = core.Chunk
@@ -15,7 +14,6 @@ const chunkStore = require('./chunk_store')
const { BlobStore } = require('./blob_store')
const { InvalidChangeError } = require('./errors')
const { getContentHash } = require('./content_hash')
const redisBackend = require('./chunk_store/redis')
function countChangeBytes(change) {
// Note: This is not quite accurate, because the raw change may contain raw
@@ -57,18 +55,9 @@ Timer.prototype.elapsed = function () {
* @param {core.Change[]} allChanges
* @param {Object} limits
* @param {number} clientEndVersion
* @param {Object} options
* @param {Boolean} [options.queueChangesInRedis]
* If true, queue the changes in Redis for testing purposes.
* @return {Promise.<Object?>}
*/
async function persistChanges(
projectId,
allChanges,
limits,
clientEndVersion,
options = {}
) {
async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
assert.projectId(projectId)
assert.array(allChanges)
assert.maybe.object(limits)
@@ -211,45 +200,6 @@ async function persistChanges(
currentSnapshot.applyAll(currentChunk.getChanges())
}
async function queueChangesInRedis() {
const hollowSnapshot = currentSnapshot.clone()
// We're transforming a lazy snapshot to a hollow snapshot, so loadFiles()
// doesn't really need a blobStore, but its signature still requires it.
const blobStore = new BlobStore(projectId)
await hollowSnapshot.loadFiles('hollow', blobStore)
hollowSnapshot.applyAll(changesToPersist, { strict: true })
const baseVersion = currentChunk.getEndVersion()
await redisBackend.queueChanges(
projectId,
hollowSnapshot,
baseVersion,
changesToPersist
)
}
async function fakePersistRedisChanges() {
const baseVersion = currentChunk.getEndVersion()
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
projectId,
baseVersion
)
if (
serializeChanges(nonPersistedChanges) ===
serializeChanges(changesToPersist)
) {
metrics.inc('persist_redis_changes_verification', 1, { status: 'match' })
} else {
logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis')
metrics.inc('persist_redis_changes_verification', 1, {
status: 'mismatch',
})
}
const persistedVersion = baseVersion + nonPersistedChanges.length
await redisBackend.setPersistedVersion(projectId, persistedVersion)
}
async function extendLastChunkIfPossible() {
const timer = new Timer()
const changesPushed = await fillChunk(currentChunk, changesToPersist)
@@ -298,14 +248,6 @@ async function persistChanges(
const numberOfChangesToPersist = oldChanges.length
await loadLatestChunk()
if (options.queueChangesInRedis) {
try {
await queueChangesInRedis()
await fakePersistRedisChanges()
} catch (err) {
logger.error({ err }, 'Chunk buffer verification failed')
}
}
await extendLastChunkIfPossible()
await createNewChunksAsNeeded()
@@ -320,11 +262,4 @@ async function persistChanges(
}
}
/**
* @param {core.Change[]} changes
*/
function serializeChanges(changes) {
return JSON.stringify(changes.map(change => change.toRaw()))
}
module.exports = persistChanges