From 24caafa6f01c0a19c2987a2ee08b5b4d7ab0444f Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 11 Jun 2025 15:43:12 +0100 Subject: [PATCH] Merge pull request #26271 from overleaf/bg-history-redis-deployment-refactor introduce history-v1 buffering levels GitOrigin-RevId: 7709935a5ceb19ef6c5723ded647217b7399759a --- .../api/controllers/project_import.js | 26 ++++- .../config/custom-environment-variables.json | 2 + .../storage/lib/chunk_store/redis.js | 1 + .../history-v1/storage/lib/commit_changes.js | 95 +++++++++++++++++-- .../history-v1/storage/lib/persist_buffer.js | 7 ++ 5 files changed, 122 insertions(+), 9 deletions(-) diff --git a/services/history-v1/api/controllers/project_import.js b/services/history-v1/api/controllers/project_import.js index e71eee93f9..7ec1c9aa4f 100644 --- a/services/history-v1/api/controllers/project_import.js +++ b/services/history-v1/api/controllers/project_import.js @@ -26,6 +26,29 @@ const InvalidChangeError = storage.InvalidChangeError const render = require('./render') +const config = require('config') +// The history buffer level is used to determine whether to queue changes +// in Redis or persist them directly to the chunk store. +// If defaults to 0 (no queuing) if not set. +const historyBufferLevel = config.has('historyBufferLevel') + ? parseInt(config.historyBufferLevel, 10) + : 0 +// The forcePersistBuffer flag will ensure the buffer is fully persisted before +// any persist operation. Set this to true if you want to make the persisted-version +// in Redis match the endVersion of the latest chunk. This should be set to true +// when downgrading from a history buffer level that queues changes in Redis +// without persisting them immediately. +const forcePersistBuffer = config.has('forcePersistBuffer') + ? config.get('forcePersistBuffer') === 'true' + : false + +logger.info( + { historyBufferLevel, forcePersistBuffer }, + historyBufferLevel > 0 || forcePersistBuffer + ? 'using history buffer' + : 'history buffer disabled' +) + async function importSnapshot(req, res) { const projectId = req.swagger.params.project_id.value const rawSnapshot = req.swagger.params.snapshot.value @@ -111,7 +134,8 @@ async function importChanges(req, res, next) { let result try { result = await commitChanges(projectId, changes, limits, endVersion, { - queueChangesInRedis: true, + historyBufferLevel, + forcePersistBuffer, }) } catch (err) { if ( diff --git a/services/history-v1/config/custom-environment-variables.json b/services/history-v1/config/custom-environment-variables.json index d07ae2925a..7fc2114699 100644 --- a/services/history-v1/config/custom-environment-variables.json +++ b/services/history-v1/config/custom-environment-variables.json @@ -84,6 +84,8 @@ "maxFileUploadSize": "MAX_FILE_UPLOAD_SIZE", "httpsOnly": "HTTPS_ONLY", "httpRequestTimeout": "HTTP_REQUEST_TIMEOUT", + "historyBufferLevel": "HISTORY_BUFFER_LEVEL", + "forcePersistBuffer": "FORCE_PERSIST_BUFFER", "redis": { "queue": { "host": "QUEUES_REDIS_HOST", diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index b43bdf8117..59bfd81e39 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -650,6 +650,7 @@ async function expireProject(projectId) { metrics.inc('chunk_store.redis.set_persisted_version', 1, { status, }) + return status } catch (err) { metrics.inc('chunk_store.redis.set_persisted_version', 1, { status: 'error', diff --git a/services/history-v1/storage/lib/commit_changes.js b/services/history-v1/storage/lib/commit_changes.js index fa22e05bbf..a9aca73008 100644 --- a/services/history-v1/storage/lib/commit_changes.js +++ b/services/history-v1/storage/lib/commit_changes.js @@ -7,6 +7,7 @@ const redisBackend = require('./chunk_store/redis') const logger = require('@overleaf/logger') const queueChanges = require('./queue_changes') const persistChanges = require('./persist_changes') +const persistBuffer = require('./persist_buffer') /** * @typedef {import('overleaf-editor-core').Change} Change @@ -19,8 +20,8 @@ const persistChanges = require('./persist_changes') * @param {Object} limits * @param {number} endVersion * @param {Object} options - * @param {Boolean} [options.queueChangesInRedis] - * If true, queue the changes in Redis for testing purposes. + * @param {number} [options.historyBufferLevel] - The history buffer level to use for processing changes. + * @param {Boolean} [options.forcePersistBuffer] - If true, forces the buffer to be persisted before any operation. * @return {Promise.} */ @@ -31,16 +32,94 @@ async function commitChanges( endVersion, options = {} ) { - if (options.queueChangesInRedis) { + const { historyBufferLevel, forcePersistBuffer } = options + + // Force the buffer to be persisted if specified. + if (forcePersistBuffer) { try { - await queueChanges(projectId, changes, endVersion) - await fakePersistRedisChanges(projectId, changes, endVersion) + const status = await redisBackend.expireProject(projectId) // clear the project from Redis if it is persisted, returns 'not-persisted' if it was not persisted + if (status === 'not-persisted') { + await persistBuffer(projectId, limits) + await redisBackend.expireProject(projectId) // clear the project from Redis after persisting + metrics.inc('persist_buffer_force', 1, { status: 'persisted' }) + } } catch (err) { - logger.error({ err }, 'Chunk buffer verification failed') + metrics.inc('persist_buffer_force', 1, { status: 'error' }) + logger.error( + { err, projectId }, + 'failed to persist buffer before committing changes' + ) } } - const result = await persistChanges(projectId, changes, limits, endVersion) - return result + + metrics.inc('commit_changes', 1, { + history_buffer_level: historyBufferLevel || 0, + }) + + // Now handle the changes based on the configured history buffer level. + switch (historyBufferLevel) { + case 4: // Queue changes and only persist them in the background + await queueChanges(projectId, changes, endVersion) + return {} + case 3: // Queue changes and immediately persist with persistBuffer + await queueChanges(projectId, changes, endVersion) + return await persistBuffer(projectId, limits) + case 2: // Equivalent to queueChangesInRedis:true + await queueChangesFake(projectId, changes, limits, endVersion) + return await persistChanges(projectId, changes, limits, endVersion) + case 1: // Queue changes with fake persist only for projects in redis already + await queueChangesFakeOnlyIfExists(projectId, changes, limits, endVersion) + return await persistChanges(projectId, changes, limits, endVersion) + case 0: // Persist changes directly to the chunk store + return await persistChanges(projectId, changes, limits, endVersion) + default: + throw new Error(`Invalid history buffer level: ${historyBufferLevel}`) + } +} + +/** + * Queues a set of changes in redis as if they had been persisted, ignoring any errors. + * @param {string} projectId + * @param {Change[]} changes + * @param {Object} limits + * @param {number} endVersion + * @param {Object} [options] + * @param {boolean} [options.onlyIfExists] - If true, only queue changes if the project + * already exists in Redis. + */ + +async function queueChangesFake( + projectId, + changes, + limits, + endVersion, + options = {} +) { + try { + await queueChanges(projectId, changes, limits, endVersion) + await fakePersistRedisChanges(projectId, changes, endVersion) + } catch (err) { + logger.error({ err }, 'Chunk buffer verification failed') + } +} + +/** + * Queues changes in Redis, simulating persistence, but only if the project already exists. + * @param {string} projectId - The ID of the project. + * @param {Change[]} changes - An array of changes to be queued. + * @param {Object} limits - Limits for the changes. + * @param {number} endVersion - The expected version of the project before these changes are applied. + */ + +async function queueChangesFakeOnlyIfExists( + projectId, + changes, + limits, + endVersion +) { + await queueChangesFake(projectId, changes, limits, endVersion, { + onlyIfExists: true, + }) } /** diff --git a/services/history-v1/storage/lib/persist_buffer.js b/services/history-v1/storage/lib/persist_buffer.js index 9534e5834a..0cf1a81bd8 100644 --- a/services/history-v1/storage/lib/persist_buffer.js +++ b/services/history-v1/storage/lib/persist_buffer.js @@ -2,6 +2,7 @@ 'use strict' const logger = require('@overleaf/logger') +const metrics = require('@overleaf/metrics') const OError = require('@overleaf/o-error') const assert = require('./assert') const chunkStore = require('./chunk_store') @@ -54,6 +55,7 @@ async function persistBuffer(projectId, limits) { { projectId, endVersion }, 'no new changes in Redis buffer to persist' ) + metrics.inc('persist_buffer', 1, { status: 'no_changes' }) // No changes to persist, update the persisted version in Redis // to match the current endVersion. This shouldn't be needed // unless a worker failed to update the persisted version. @@ -113,6 +115,7 @@ async function persistBuffer(projectId, limits) { ) if (!persistResult || !persistResult.currentChunk) { + metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' }) throw new OError( 'persistChanges did not produce a new chunk for non-empty changes', { @@ -127,6 +130,7 @@ async function persistBuffer(projectId, limits) { const newEndVersion = newPersistedChunk.getEndVersion() if (newEndVersion <= endVersion) { + metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' }) throw new OError( 'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes', { @@ -154,6 +158,7 @@ async function persistBuffer(projectId, limits) { ) if (status !== 'ok') { + metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' }) throw new OError('failed to update persisted version in Redis', { projectId, newEndVersion, @@ -171,6 +176,8 @@ async function persistBuffer(projectId, limits) { 'persistBuffer operation completed successfully' ) + metrics.inc('persist_buffer', 1, { status: 'persisted' }) + return persistResult }