mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-06-04 14:49:01 +02:00
Merge pull request #26271 from overleaf/bg-history-redis-deployment-refactor
introduce history-v1 buffering levels GitOrigin-RevId: 7709935a5ceb19ef6c5723ded647217b7399759a
This commit is contained in:
@@ -26,6 +26,29 @@ const InvalidChangeError = storage.InvalidChangeError
|
||||
|
||||
const render = require('./render')
|
||||
|
||||
const config = require('config')
|
||||
// The history buffer level is used to determine whether to queue changes
|
||||
// in Redis or persist them directly to the chunk store.
|
||||
// If defaults to 0 (no queuing) if not set.
|
||||
const historyBufferLevel = config.has('historyBufferLevel')
|
||||
? parseInt(config.historyBufferLevel, 10)
|
||||
: 0
|
||||
// The forcePersistBuffer flag will ensure the buffer is fully persisted before
|
||||
// any persist operation. Set this to true if you want to make the persisted-version
|
||||
// in Redis match the endVersion of the latest chunk. This should be set to true
|
||||
// when downgrading from a history buffer level that queues changes in Redis
|
||||
// without persisting them immediately.
|
||||
const forcePersistBuffer = config.has('forcePersistBuffer')
|
||||
? config.get('forcePersistBuffer') === 'true'
|
||||
: false
|
||||
|
||||
logger.info(
|
||||
{ historyBufferLevel, forcePersistBuffer },
|
||||
historyBufferLevel > 0 || forcePersistBuffer
|
||||
? 'using history buffer'
|
||||
: 'history buffer disabled'
|
||||
)
|
||||
|
||||
async function importSnapshot(req, res) {
|
||||
const projectId = req.swagger.params.project_id.value
|
||||
const rawSnapshot = req.swagger.params.snapshot.value
|
||||
@@ -111,7 +134,8 @@ async function importChanges(req, res, next) {
|
||||
let result
|
||||
try {
|
||||
result = await commitChanges(projectId, changes, limits, endVersion, {
|
||||
queueChangesInRedis: true,
|
||||
historyBufferLevel,
|
||||
forcePersistBuffer,
|
||||
})
|
||||
} catch (err) {
|
||||
if (
|
||||
|
||||
@@ -84,6 +84,8 @@
|
||||
"maxFileUploadSize": "MAX_FILE_UPLOAD_SIZE",
|
||||
"httpsOnly": "HTTPS_ONLY",
|
||||
"httpRequestTimeout": "HTTP_REQUEST_TIMEOUT",
|
||||
"historyBufferLevel": "HISTORY_BUFFER_LEVEL",
|
||||
"forcePersistBuffer": "FORCE_PERSIST_BUFFER",
|
||||
"redis": {
|
||||
"queue": {
|
||||
"host": "QUEUES_REDIS_HOST",
|
||||
|
||||
@@ -650,6 +650,7 @@ async function expireProject(projectId) {
|
||||
metrics.inc('chunk_store.redis.set_persisted_version', 1, {
|
||||
status,
|
||||
})
|
||||
return status
|
||||
} catch (err) {
|
||||
metrics.inc('chunk_store.redis.set_persisted_version', 1, {
|
||||
status: 'error',
|
||||
|
||||
@@ -7,6 +7,7 @@ const redisBackend = require('./chunk_store/redis')
|
||||
const logger = require('@overleaf/logger')
|
||||
const queueChanges = require('./queue_changes')
|
||||
const persistChanges = require('./persist_changes')
|
||||
const persistBuffer = require('./persist_buffer')
|
||||
|
||||
/**
|
||||
* @typedef {import('overleaf-editor-core').Change} Change
|
||||
@@ -19,8 +20,8 @@ const persistChanges = require('./persist_changes')
|
||||
* @param {Object} limits
|
||||
* @param {number} endVersion
|
||||
* @param {Object} options
|
||||
* @param {Boolean} [options.queueChangesInRedis]
|
||||
* If true, queue the changes in Redis for testing purposes.
|
||||
* @param {number} [options.historyBufferLevel] - The history buffer level to use for processing changes.
|
||||
* @param {Boolean} [options.forcePersistBuffer] - If true, forces the buffer to be persisted before any operation.
|
||||
* @return {Promise.<Object?>}
|
||||
*/
|
||||
|
||||
@@ -31,16 +32,94 @@ async function commitChanges(
|
||||
endVersion,
|
||||
options = {}
|
||||
) {
|
||||
if (options.queueChangesInRedis) {
|
||||
const { historyBufferLevel, forcePersistBuffer } = options
|
||||
|
||||
// Force the buffer to be persisted if specified.
|
||||
if (forcePersistBuffer) {
|
||||
try {
|
||||
await queueChanges(projectId, changes, endVersion)
|
||||
await fakePersistRedisChanges(projectId, changes, endVersion)
|
||||
const status = await redisBackend.expireProject(projectId) // clear the project from Redis if it is persisted, returns 'not-persisted' if it was not persisted
|
||||
if (status === 'not-persisted') {
|
||||
await persistBuffer(projectId, limits)
|
||||
await redisBackend.expireProject(projectId) // clear the project from Redis after persisting
|
||||
metrics.inc('persist_buffer_force', 1, { status: 'persisted' })
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Chunk buffer verification failed')
|
||||
metrics.inc('persist_buffer_force', 1, { status: 'error' })
|
||||
logger.error(
|
||||
{ err, projectId },
|
||||
'failed to persist buffer before committing changes'
|
||||
)
|
||||
}
|
||||
}
|
||||
const result = await persistChanges(projectId, changes, limits, endVersion)
|
||||
return result
|
||||
|
||||
metrics.inc('commit_changes', 1, {
|
||||
history_buffer_level: historyBufferLevel || 0,
|
||||
})
|
||||
|
||||
// Now handle the changes based on the configured history buffer level.
|
||||
switch (historyBufferLevel) {
|
||||
case 4: // Queue changes and only persist them in the background
|
||||
await queueChanges(projectId, changes, endVersion)
|
||||
return {}
|
||||
case 3: // Queue changes and immediately persist with persistBuffer
|
||||
await queueChanges(projectId, changes, endVersion)
|
||||
return await persistBuffer(projectId, limits)
|
||||
case 2: // Equivalent to queueChangesInRedis:true
|
||||
await queueChangesFake(projectId, changes, limits, endVersion)
|
||||
return await persistChanges(projectId, changes, limits, endVersion)
|
||||
case 1: // Queue changes with fake persist only for projects in redis already
|
||||
await queueChangesFakeOnlyIfExists(projectId, changes, limits, endVersion)
|
||||
return await persistChanges(projectId, changes, limits, endVersion)
|
||||
case 0: // Persist changes directly to the chunk store
|
||||
return await persistChanges(projectId, changes, limits, endVersion)
|
||||
default:
|
||||
throw new Error(`Invalid history buffer level: ${historyBufferLevel}`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a set of changes in redis as if they had been persisted, ignoring any errors.
|
||||
* @param {string} projectId
|
||||
* @param {Change[]} changes
|
||||
* @param {Object} limits
|
||||
* @param {number} endVersion
|
||||
* @param {Object} [options]
|
||||
* @param {boolean} [options.onlyIfExists] - If true, only queue changes if the project
|
||||
* already exists in Redis.
|
||||
*/
|
||||
|
||||
async function queueChangesFake(
|
||||
projectId,
|
||||
changes,
|
||||
limits,
|
||||
endVersion,
|
||||
options = {}
|
||||
) {
|
||||
try {
|
||||
await queueChanges(projectId, changes, limits, endVersion)
|
||||
await fakePersistRedisChanges(projectId, changes, endVersion)
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Chunk buffer verification failed')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues changes in Redis, simulating persistence, but only if the project already exists.
|
||||
* @param {string} projectId - The ID of the project.
|
||||
* @param {Change[]} changes - An array of changes to be queued.
|
||||
* @param {Object} limits - Limits for the changes.
|
||||
* @param {number} endVersion - The expected version of the project before these changes are applied.
|
||||
*/
|
||||
|
||||
async function queueChangesFakeOnlyIfExists(
|
||||
projectId,
|
||||
changes,
|
||||
limits,
|
||||
endVersion
|
||||
) {
|
||||
await queueChangesFake(projectId, changes, limits, endVersion, {
|
||||
onlyIfExists: true,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
'use strict'
|
||||
|
||||
const logger = require('@overleaf/logger')
|
||||
const metrics = require('@overleaf/metrics')
|
||||
const OError = require('@overleaf/o-error')
|
||||
const assert = require('./assert')
|
||||
const chunkStore = require('./chunk_store')
|
||||
@@ -54,6 +55,7 @@ async function persistBuffer(projectId, limits) {
|
||||
{ projectId, endVersion },
|
||||
'no new changes in Redis buffer to persist'
|
||||
)
|
||||
metrics.inc('persist_buffer', 1, { status: 'no_changes' })
|
||||
// No changes to persist, update the persisted version in Redis
|
||||
// to match the current endVersion. This shouldn't be needed
|
||||
// unless a worker failed to update the persisted version.
|
||||
@@ -113,6 +115,7 @@ async function persistBuffer(projectId, limits) {
|
||||
)
|
||||
|
||||
if (!persistResult || !persistResult.currentChunk) {
|
||||
metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' })
|
||||
throw new OError(
|
||||
'persistChanges did not produce a new chunk for non-empty changes',
|
||||
{
|
||||
@@ -127,6 +130,7 @@ async function persistBuffer(projectId, limits) {
|
||||
const newEndVersion = newPersistedChunk.getEndVersion()
|
||||
|
||||
if (newEndVersion <= endVersion) {
|
||||
metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' })
|
||||
throw new OError(
|
||||
'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes',
|
||||
{
|
||||
@@ -154,6 +158,7 @@ async function persistBuffer(projectId, limits) {
|
||||
)
|
||||
|
||||
if (status !== 'ok') {
|
||||
metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' })
|
||||
throw new OError('failed to update persisted version in Redis', {
|
||||
projectId,
|
||||
newEndVersion,
|
||||
@@ -171,6 +176,8 @@ async function persistBuffer(projectId, limits) {
|
||||
'persistBuffer operation completed successfully'
|
||||
)
|
||||
|
||||
metrics.inc('persist_buffer', 1, { status: 'persisted' })
|
||||
|
||||
return persistResult
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user