From b960e7f76e048e3303c03984385e2cab86140f5e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 10 Jun 2025 17:45:02 +0100 Subject: [PATCH] Merge pull request #26270 from overleaf/bg-history-redis-commit-change-manager replace redis logic in persistChanges with new commitChanges method GitOrigin-RevId: e06f9477b9d5548fa92ef87fb6e1f4f672001a35 --- .../api/controllers/project_import.js | 4 +- services/history-v1/storage/index.js | 1 + .../history-v1/storage/lib/commit_changes.js | 93 +++++++++++++++++++ .../history-v1/storage/lib/persist_changes.js | 67 +------------ 4 files changed, 97 insertions(+), 68 deletions(-) create mode 100644 services/history-v1/storage/lib/commit_changes.js diff --git a/services/history-v1/api/controllers/project_import.js b/services/history-v1/api/controllers/project_import.js index dee45efce8..e71eee93f9 100644 --- a/services/history-v1/api/controllers/project_import.js +++ b/services/history-v1/api/controllers/project_import.js @@ -21,7 +21,7 @@ const BatchBlobStore = storage.BatchBlobStore const BlobStore = storage.BlobStore const chunkStore = storage.chunkStore const HashCheckBlobStore = storage.HashCheckBlobStore -const persistChanges = storage.persistChanges +const commitChanges = storage.commitChanges const InvalidChangeError = storage.InvalidChangeError const render = require('./render') @@ -110,7 +110,7 @@ async function importChanges(req, res, next) { let result try { - result = await persistChanges(projectId, changes, limits, endVersion, { + result = await commitChanges(projectId, changes, limits, endVersion, { queueChangesInRedis: true, }) } catch (err) { diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index a07c98c026..82a51583be 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -9,6 +9,7 @@ exports.redis = require('./lib/redis') exports.persistChanges = require('./lib/persist_changes') exports.persistor = require('./lib/persistor') exports.persistBuffer = require('./lib/persist_buffer') +exports.commitChanges = require('./lib/commit_changes') exports.queueChanges = require('./lib/queue_changes') exports.ProjectArchive = require('./lib/project_archive') exports.streams = require('./lib/streams') diff --git a/services/history-v1/storage/lib/commit_changes.js b/services/history-v1/storage/lib/commit_changes.js new file mode 100644 index 0000000000..fa22e05bbf --- /dev/null +++ b/services/history-v1/storage/lib/commit_changes.js @@ -0,0 +1,93 @@ +// @ts-check + +'use strict' + +const metrics = require('@overleaf/metrics') +const redisBackend = require('./chunk_store/redis') +const logger = require('@overleaf/logger') +const queueChanges = require('./queue_changes') +const persistChanges = require('./persist_changes') + +/** + * @typedef {import('overleaf-editor-core').Change} Change + */ + +/** + * Handle incoming changes by processing them according to the specified options. + * @param {string} projectId + * @param {Change[]} changes + * @param {Object} limits + * @param {number} endVersion + * @param {Object} options + * @param {Boolean} [options.queueChangesInRedis] + * If true, queue the changes in Redis for testing purposes. + * @return {Promise.} + */ + +async function commitChanges( + projectId, + changes, + limits, + endVersion, + options = {} +) { + if (options.queueChangesInRedis) { + try { + await queueChanges(projectId, changes, endVersion) + await fakePersistRedisChanges(projectId, changes, endVersion) + } catch (err) { + logger.error({ err }, 'Chunk buffer verification failed') + } + } + const result = await persistChanges(projectId, changes, limits, endVersion) + return result +} + +/** + * Simulates the persistence of changes by verifying a given set of changes against + * what is currently stored as non-persisted in Redis, and then updates the + * persisted version number in Redis. + * + * @async + * @param {string} projectId - The ID of the project. + * @param {Change[]} changesToPersist - An array of changes that are expected to be + * persisted. These are used for verification + * against the changes currently in Redis. + * @param {number} baseVersion - The base version number from which to calculate + * the new persisted version. + * @returns {Promise} A promise that resolves when the persisted version + * in Redis has been updated. + */ +async function fakePersistRedisChanges( + projectId, + changesToPersist, + baseVersion +) { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + baseVersion + ) + + if ( + serializeChanges(nonPersistedChanges) === serializeChanges(changesToPersist) + ) { + metrics.inc('persist_redis_changes_verification', 1, { status: 'match' }) + } else { + logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis') + metrics.inc('persist_redis_changes_verification', 1, { + status: 'mismatch', + }) + } + + const persistedVersion = baseVersion + nonPersistedChanges.length + await redisBackend.setPersistedVersion(projectId, persistedVersion) +} + +/** + * @param {Change[]} changes + */ +function serializeChanges(changes) { + return JSON.stringify(changes.map(change => change.toRaw())) +} + +module.exports = commitChanges diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index 95ffdc67d2..d2ca00053f 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -4,7 +4,6 @@ const _ = require('lodash') const logger = require('@overleaf/logger') -const metrics = require('@overleaf/metrics') const core = require('overleaf-editor-core') const Chunk = core.Chunk @@ -15,7 +14,6 @@ const chunkStore = require('./chunk_store') const { BlobStore } = require('./blob_store') const { InvalidChangeError } = require('./errors') const { getContentHash } = require('./content_hash') -const redisBackend = require('./chunk_store/redis') function countChangeBytes(change) { // Note: This is not quite accurate, because the raw change may contain raw @@ -57,18 +55,9 @@ Timer.prototype.elapsed = function () { * @param {core.Change[]} allChanges * @param {Object} limits * @param {number} clientEndVersion - * @param {Object} options - * @param {Boolean} [options.queueChangesInRedis] - * If true, queue the changes in Redis for testing purposes. * @return {Promise.} */ -async function persistChanges( - projectId, - allChanges, - limits, - clientEndVersion, - options = {} -) { +async function persistChanges(projectId, allChanges, limits, clientEndVersion) { assert.projectId(projectId) assert.array(allChanges) assert.maybe.object(limits) @@ -211,45 +200,6 @@ async function persistChanges( currentSnapshot.applyAll(currentChunk.getChanges()) } - async function queueChangesInRedis() { - const hollowSnapshot = currentSnapshot.clone() - // We're transforming a lazy snapshot to a hollow snapshot, so loadFiles() - // doesn't really need a blobStore, but its signature still requires it. - const blobStore = new BlobStore(projectId) - await hollowSnapshot.loadFiles('hollow', blobStore) - hollowSnapshot.applyAll(changesToPersist, { strict: true }) - const baseVersion = currentChunk.getEndVersion() - await redisBackend.queueChanges( - projectId, - hollowSnapshot, - baseVersion, - changesToPersist - ) - } - - async function fakePersistRedisChanges() { - const baseVersion = currentChunk.getEndVersion() - const nonPersistedChanges = await redisBackend.getNonPersistedChanges( - projectId, - baseVersion - ) - - if ( - serializeChanges(nonPersistedChanges) === - serializeChanges(changesToPersist) - ) { - metrics.inc('persist_redis_changes_verification', 1, { status: 'match' }) - } else { - logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis') - metrics.inc('persist_redis_changes_verification', 1, { - status: 'mismatch', - }) - } - - const persistedVersion = baseVersion + nonPersistedChanges.length - await redisBackend.setPersistedVersion(projectId, persistedVersion) - } - async function extendLastChunkIfPossible() { const timer = new Timer() const changesPushed = await fillChunk(currentChunk, changesToPersist) @@ -298,14 +248,6 @@ async function persistChanges( const numberOfChangesToPersist = oldChanges.length await loadLatestChunk() - if (options.queueChangesInRedis) { - try { - await queueChangesInRedis() - await fakePersistRedisChanges() - } catch (err) { - logger.error({ err }, 'Chunk buffer verification failed') - } - } await extendLastChunkIfPossible() await createNewChunksAsNeeded() @@ -320,11 +262,4 @@ async function persistChanges( } } -/** - * @param {core.Change[]} changes - */ -function serializeChanges(changes) { - return JSON.stringify(changes.map(change => change.toRaw())) -} - module.exports = persistChanges