diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index 5fe283a34c..2aa492f46e 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -1,7 +1,6 @@ exports.BatchBlobStore = require('./lib/batch_blob_store') exports.blobHash = require('./lib/blob_hash') exports.HashCheckBlobStore = require('./lib/hash_check_blob_store') -exports.chunkBuffer = require('./lib/chunk_buffer') exports.chunkStore = require('./lib/chunk_store') exports.historyStore = require('./lib/history_store').historyStore exports.knex = require('./lib/knex') diff --git a/services/history-v1/storage/lib/chunk_buffer/index.js b/services/history-v1/storage/lib/chunk_buffer/index.js deleted file mode 100644 index 5ef533ddba..0000000000 --- a/services/history-v1/storage/lib/chunk_buffer/index.js +++ /dev/null @@ -1,39 +0,0 @@ -'use strict' - -/** - * @module storage/lib/chunk_buffer - */ - -const chunkStore = require('../chunk_store') -const redisBackend = require('../chunk_store/redis') -const metrics = require('@overleaf/metrics') -/** - * Load the latest Chunk stored for a project, including blob metadata. - * - * @param {string} projectId - * @return {Promise.} - */ -async function loadLatest(projectId) { - const chunkRecord = await chunkStore.loadLatestRaw(projectId) - const cachedChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - if (cachedChunk) { - metrics.inc('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - return cachedChunk - } else { - metrics.inc('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - const chunk = await chunkStore.loadLatest(projectId) - await redisBackend.setCurrentChunk(projectId, chunk) - return chunk - } -} - -module.exports = { - loadLatest, -} diff --git a/services/history-v1/storage/lib/chunk_store/errors.js b/services/history-v1/storage/lib/chunk_store/errors.js index 5f0eba6aac..fc37dbe2a1 100644 --- a/services/history-v1/storage/lib/chunk_store/errors.js +++ b/services/history-v1/storage/lib/chunk_store/errors.js @@ -1,7 +1,13 @@ const OError = require('@overleaf/o-error') class ChunkVersionConflictError extends OError {} +class BaseVersionConflictError extends OError {} +class JobNotFoundError extends OError {} +class JobNotReadyError extends OError {} module.exports = { ChunkVersionConflictError, + BaseVersionConflictError, + JobNotFoundError, + JobNotReadyError, } diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index d9c423861d..5c62db5387 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -1,20 +1,28 @@ const metrics = require('@overleaf/metrics') -const logger = require('@overleaf/logger') +const OError = require('@overleaf/o-error') const redis = require('../redis') const rclient = redis.rclientHistory // -const { Snapshot, Change, History, Chunk } = require('overleaf-editor-core') +const { Change, Snapshot } = require('overleaf-editor-core') +const { + BaseVersionConflictError, + JobNotFoundError, + JobNotReadyError, +} = require('./errors') -const TEMPORARY_CACHE_LIFETIME = 300 // 5 minutes +const MAX_PERSISTED_CHANGES = 100 // Maximum number of persisted changes to keep in the buffer for clients that need to catch up. +const PROJECT_TTL_MS = 3600 * 1000 // Amount of time a project can stay inactive before it gets expired +const MAX_PERSIST_DELAY_MS = 300 * 1000 // Maximum amount of time before a change is persisted +const RETRY_DELAY_MS = 120 * 1000 // Time before a claimed job is considered stale and a worker can retry it. const keySchema = { - snapshot({ projectId }) { - return `snapshot:{${projectId}}` + head({ projectId }) { + return `head:{${projectId}}` }, - startVersion({ projectId }) { - return `snapshot-version:{${projectId}}` + headVersion({ projectId }) { + return `head-version:{${projectId}}` }, - changes({ projectId }) { - return `changes:{${projectId}}` + persistedVersion({ projectId }) { + return `persisted-version:{${projectId}}` }, expireTime({ projectId }) { return `expire-time:{${projectId}}` @@ -22,457 +30,689 @@ const keySchema = { persistTime({ projectId }) { return `persist-time:{${projectId}}` }, + changes({ projectId }) { + return `changes:{${projectId}}` + }, } -rclient.defineCommand('get_current_chunk', { - numberOfKeys: 3, - lua: ` - local startVersionValue = redis.call('GET', KEYS[2]) - if not startVersionValue then - return nil -- this is a cache-miss - end - local snapshotValue = redis.call('GET', KEYS[1]) - local changesValues = redis.call('LRANGE', KEYS[3], 0, -1) - return {snapshotValue, startVersionValue, changesValues} - `, -}) - -/** - * Retrieves the current chunk of project history from Redis storage - * @param {string} projectId - The unique identifier of the project - * @returns {Promise} A Promise that resolves to a Chunk object containing project history, - * or null if retrieval fails - * @throws {Error} If Redis operations fail - */ -async function getCurrentChunk(projectId) { - try { - const result = await rclient.get_current_chunk( - keySchema.snapshot({ projectId }), - keySchema.startVersion({ projectId }), - keySchema.changes({ projectId }) - ) - if (!result) { - return null // cache-miss - } - const snapshot = Snapshot.fromRaw(JSON.parse(result[0])) - const startVersion = JSON.parse(result[1]) - const changes = result[2].map(c => Change.fromRaw(JSON.parse(c))) - const history = new History(snapshot, changes) - const chunk = new Chunk(history, startVersion) - metrics.inc('chunk_store.redis.get_current_chunk', 1, { status: 'success' }) - return chunk - } catch (err) { - logger.error({ err, projectId }, 'error getting current chunk from redis') - metrics.inc('chunk_store.redis.get_current_chunk', 1, { status: 'error' }) - return null - } -} - -rclient.defineCommand('get_current_chunk_if_valid', { - numberOfKeys: 3, - lua: ` - local expectedStartVersion = ARGV[1] - local expectedChangesCount = tonumber(ARGV[2]) - local startVersionValue = redis.call('GET', KEYS[2]) - if not startVersionValue then - return nil -- this is a cache-miss - end - if startVersionValue ~= expectedStartVersion then - return nil -- this is a cache-miss - end - local changesCount = redis.call('LLEN', KEYS[3]) - if changesCount ~= expectedChangesCount then - return nil -- this is a cache-miss - end - local snapshotValue = redis.call('GET', KEYS[1]) - local changesValues = redis.call('LRANGE', KEYS[3], 0, -1) - return {snapshotValue, startVersionValue, changesValues} - `, -}) - -async function getCurrentChunkIfValid(projectId, chunkRecord) { - try { - const changesCount = chunkRecord.endVersion - chunkRecord.startVersion - const result = await rclient.get_current_chunk_if_valid( - keySchema.snapshot({ projectId }), - keySchema.startVersion({ projectId }), - keySchema.changes({ projectId }), - chunkRecord.startVersion, - changesCount - ) - if (!result) { - return null // cache-miss - } - const snapshot = Snapshot.fromRaw(JSON.parse(result[0])) - const startVersion = parseInt(result[1], 10) - const changes = result[2].map(c => Change.fromRaw(JSON.parse(c))) - const history = new History(snapshot, changes) - const chunk = new Chunk(history, startVersion) - metrics.inc('chunk_store.redis.get_current_chunk_if_valid', 1, { - status: 'success', - }) - return chunk - } catch (err) { - logger.error( - { err, projectId, chunkRecord }, - 'error getting current chunk from redis' - ) - metrics.inc('chunk_store.redis.get_current_chunk_if_valid', 1, { - status: 'error', - }) - return null - } -} - -rclient.defineCommand('get_current_chunk_metadata', { +rclient.defineCommand('get_head_snapshot', { numberOfKeys: 2, lua: ` - local startVersionValue = redis.call('GET', KEYS[1]) - if not startVersionValue then - return nil -- this is a cache-miss + local headSnapshotKey = KEYS[1] + local headVersionKey = KEYS[2] + + -- Check if the head version exists. If not, consider it a cache miss. + local version = redis.call('GET', headVersionKey) + if not version then + return nil end - local changesCount = redis.call('LLEN', KEYS[2]) - return {startVersionValue, changesCount} + + -- Retrieve the snapshot value + local snapshot = redis.call('GET', headSnapshotKey) + return {snapshot, version} `, }) /** - * Retrieves the current chunk metadata for a given project from Redis - * @param {string} projectId - The ID of the project to get metadata for - * @returns {Promise} Object containing startVersion and changesCount if found, null on error or cache miss - * @property {number} startVersion - The starting version information - * @property {number} changesCount - The number of changes in the chunk + * Retrieves the head snapshot from Redis storage + * @param {string} projectId - The unique identifier of the project + * @returns {Promise<{version: number, snapshot: Snapshot}|null>} A Promise that resolves to an object containing the version and Snapshot, + * or null if retrieval fails or cache miss + * @throws {Error} If Redis operations fail */ -async function getCurrentChunkMetadata(projectId) { +async function getHeadSnapshot(projectId) { try { - const result = await rclient.get_current_chunk_metadata( - keySchema.startVersion({ projectId }), - keySchema.changes({ projectId }) + const result = await rclient.get_head_snapshot( + keySchema.head({ projectId }), + keySchema.headVersion({ projectId }) ) if (!result) { + metrics.inc('chunk_store.redis.get_head_snapshot', 1, { + status: 'cache-miss', + }) return null // cache-miss } - const startVersion = JSON.parse(result[0]) - const changesCount = parseInt(result[1], 10) - return { startVersion, changesCount } + const snapshot = Snapshot.fromRaw(JSON.parse(result[0])) + const version = parseInt(result[1], 10) + metrics.inc('chunk_store.redis.get_head_snapshot', 1, { + status: 'success', + }) + return { version, snapshot } } catch (err) { - return null - } -} - -rclient.defineCommand('set_current_chunk', { - numberOfKeys: 4, - lua: ` - local snapshotValue = ARGV[1] - local startVersionValue = ARGV[2] - local expireTime = ARGV[3] - redis.call('SET', KEYS[1], snapshotValue) - redis.call('SET', KEYS[2], startVersionValue) - redis.call('SET', KEYS[3], expireTime) - redis.call('DEL', KEYS[4]) -- clear the old changes list - if #ARGV >= 4 then - redis.call('RPUSH', KEYS[4], unpack(ARGV, 4)) - end - - `, -}) - -/** - * Stores the current chunk of project history in Redis - * @param {string} projectId - The ID of the project - * @param {Chunk} chunk - The chunk object containing history data - * @returns {Promise<*>} Returns the result of the Redis operation, or null if an error occurs - * @throws {Error} May throw Redis-related errors which are caught internally - */ -async function setCurrentChunk(projectId, chunk) { - try { - const snapshotKey = keySchema.snapshot({ projectId }) - const startVersionKey = keySchema.startVersion({ projectId }) - const changesKey = keySchema.changes({ projectId }) - const expireTimeKey = keySchema.expireTime({ projectId }) - - const snapshot = chunk.history.snapshot - const startVersion = chunk.startVersion - const changes = chunk.history.changes - const expireTime = Date.now() + TEMPORARY_CACHE_LIFETIME * 1000 - - await rclient.set_current_chunk( - snapshotKey, // KEYS[1] - startVersionKey, // KEYS[2] - expireTimeKey, // KEYS[3] - changesKey, // KEYS[4] - JSON.stringify(snapshot.toRaw()), // ARGV[1] - startVersion, // ARGV[2] - expireTime, // ARGV[3] - ...changes.map(c => JSON.stringify(c.toRaw())) // ARGV[4..] - ) - metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'success' }) - } catch (err) { - logger.error( - { err, projectId, chunk }, - 'error setting current chunk in redis' - ) - metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'error' }) - return null // while testing we will suppress any errors - } -} - -/** - * Checks whether a cached chunk's version metadata matches the current chunk's metadata - * @param {Chunk} cachedChunk - The chunk retrieved from cache - * @param {Chunk} currentChunk - The current chunk to compare against - * @returns {boolean} - Returns true if the chunks have matching start and end versions, false otherwise - */ -function checkCacheValidity(cachedChunk, currentChunk) { - return Boolean( - cachedChunk && - cachedChunk.getStartVersion() === currentChunk.getStartVersion() && - cachedChunk.getEndVersion() === currentChunk.getEndVersion() - ) -} - -/** - * Validates if a cached chunk matches the current chunk metadata by comparing versions - * @param {Object} cachedChunk - The cached chunk object to validate - * @param {Object} currentChunkMetadata - The current chunk metadata to compare against - * @param {number} currentChunkMetadata.startVersion - The starting version number - * @param {number} currentChunkMetadata.endVersion - The ending version number - * @returns {boolean} - True if the cached chunk is valid, false otherwise - */ -function checkCacheValidityWithMetadata(cachedChunk, currentChunkMetadata) { - return Boolean( - cachedChunk && - cachedChunk.getStartVersion() === currentChunkMetadata.startVersion && - cachedChunk.getEndVersion() === currentChunkMetadata.endVersion - ) -} - -/** - * Compares two chunks for equality using stringified JSON comparison - * @param {string} projectId - The ID of the project - * @param {Chunk} cachedChunk - The cached chunk to compare - * @param {Chunk} currentChunk - The current chunk to compare against - * @returns {boolean} - Returns false if either chunk is null/undefined, otherwise returns the comparison result - */ -function compareChunks(projectId, cachedChunk, currentChunk) { - if (!cachedChunk || !currentChunk) { - return false - } - const identical = JSON.stringify(cachedChunk) === JSON.stringify(currentChunk) - if (!identical) { - try { - logger.error( - { - projectId, - cachedChunkStartVersion: cachedChunk.getStartVersion(), - cachedChunkEndVersion: cachedChunk.getEndVersion(), - currentChunkStartVersion: currentChunk.getStartVersion(), - currentChunkEndVersion: currentChunk.getEndVersion(), - }, - 'chunk cache mismatch' - ) - } catch (err) { - // ignore errors while logging - } - } - metrics.inc('chunk_store.redis.compare_chunks', 1, { - status: identical ? 'success' : 'fail', - }) - return identical -} - -// Define Lua script for atomic cache clearing -rclient.defineCommand('expire_chunk_cache', { - numberOfKeys: 5, - lua: ` - local persistTimeExists = redis.call('EXISTS', KEYS[5]) - if persistTimeExists == 1 then - return nil -- chunk has changes pending, do not expire - end - local currentTime = tonumber(ARGV[1]) - local expireTimeValue = redis.call('GET', KEYS[4]) - if not expireTimeValue then - return nil -- this is a cache-miss - end - local expireTime = tonumber(expireTimeValue) - if currentTime < expireTime then - return nil -- cache is still valid - end - -- Cache is expired and all changes are persisted, proceed to delete the keys atomically - redis.call('DEL', KEYS[1]) -- snapshot key - redis.call('DEL', KEYS[2]) -- startVersion key - redis.call('DEL', KEYS[3]) -- changes key - redis.call('DEL', KEYS[4]) -- expireTime key - return 1 - `, -}) - -/** - * Expire cache entries for a project's chunk data if needed - * @param {string} projectId - The ID of the project whose cache should be cleared - * @returns {Promise} A promise that resolves to true if successful, false on error - */ -async function expireCurrentChunk(projectId, currentTime) { - try { - const snapshotKey = keySchema.snapshot({ projectId }) - const startVersionKey = keySchema.startVersion({ projectId }) - const changesKey = keySchema.changes({ projectId }) - const expireTimeKey = keySchema.expireTime({ projectId }) - const persistTimeKey = keySchema.persistTime({ projectId }) - const result = await rclient.expire_chunk_cache( - snapshotKey, - startVersionKey, - changesKey, - expireTimeKey, - persistTimeKey, - currentTime || Date.now() - ) - if (!result) { - logger.debug( - { projectId }, - 'chunk cache not expired due to pending changes' - ) - metrics.inc('chunk_store.redis.expire_cache', 1, { - status: 'skip-due-to-pending-changes', - }) - return false // not expired - } - metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'success' }) - return true - } catch (err) { - logger.error({ err, projectId }, 'error clearing chunk cache from redis') - metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'error' }) - return false - } -} - -// Define Lua script for atomic cache clearing -rclient.defineCommand('clear_chunk_cache', { - numberOfKeys: 5, - lua: ` - local persistTimeExists = redis.call('EXISTS', KEYS[5]) - if persistTimeExists == 1 then - return nil -- chunk has changes pending, do not clear - end - -- Delete all keys related to a project's chunk cache atomically - redis.call('DEL', KEYS[1]) -- snapshot key - redis.call('DEL', KEYS[2]) -- startVersion key - redis.call('DEL', KEYS[3]) -- changes key - redis.call('DEL', KEYS[4]) -- expireTime key - return 1 - `, -}) - -/** - * Clears all cache entries for a project's chunk data - * @param {string} projectId - The ID of the project whose cache should be cleared - * @returns {Promise} A promise that resolves to true if successful, false on error - */ -async function clearCache(projectId) { - try { - const snapshotKey = keySchema.snapshot({ projectId }) - const startVersionKey = keySchema.startVersion({ projectId }) - const changesKey = keySchema.changes({ projectId }) - const expireTimeKey = keySchema.expireTime({ projectId }) - const persistTimeKey = keySchema.persistTime({ projectId }) // Add persistTimeKey - - const result = await rclient.clear_chunk_cache( - snapshotKey, - startVersionKey, - changesKey, - expireTimeKey, - persistTimeKey - ) - if (result === null) { - logger.debug( - { projectId }, - 'chunk cache not cleared due to pending changes' - ) - metrics.inc('chunk_store.redis.clear_cache', 1, { - status: 'skip-due-to-pending-changes', - }) - return false - } - metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' }) - return true - } catch (err) { - logger.error({ err, projectId }, 'error clearing chunk cache from redis') - metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'error' }) - return false - } -} - -// Define Lua script for getting chunk status -rclient.defineCommand('get_chunk_status', { - numberOfKeys: 2, // expireTimeKey, persistTimeKey - lua: ` - local expireTimeValue = redis.call('GET', KEYS[1]) - local persistTimeValue = redis.call('GET', KEYS[2]) - return {expireTimeValue, persistTimeValue} - `, -}) - -/** - * Retrieves the current chunk status for a given project from Redis - * @param {string} projectId - The ID of the project to get status for - * @returns {Promise} Object containing expireTime and persistTime, or nulls on error - * @property {number|null} expireTime - The expiration time of the chunk - * @property {number|null} persistTime - The persistence time of the chunk - */ -async function getCurrentChunkStatus(projectId) { - try { - const expireTimeKey = keySchema.expireTime({ projectId }) - const persistTimeKey = keySchema.persistTime({ projectId }) - - const result = await rclient.get_chunk_status(expireTimeKey, persistTimeKey) - - // Lua script returns an array [expireTimeValue, persistTimeValue] - // Redis nil replies are converted to null by ioredis - const [expireTime, persistTime] = result - - return { - expireTime: expireTime ? parseInt(expireTime, 10) : null, // Parse to number or null - persistTime: persistTime ? parseInt(persistTime, 10) : null, // Parse to number or null - } - } catch (err) { - logger.warn({ err, projectId }, 'error getting chunk status from redis') - return { expireTime: null, persistTime: null } // Return nulls on error - } -} - -/** - * Sets the persist time for a project's chunk cache. - * This is primarily intended for testing purposes. - * @param {string} projectId - The ID of the project. - * @param {number} timestamp - The timestamp to set as the persist time. - * @returns {Promise} - */ -async function setPersistTime(projectId, timestamp) { - try { - const persistTimeKey = keySchema.persistTime({ projectId }) - await rclient.set(persistTimeKey, timestamp) - metrics.inc('chunk_store.redis.set_persist_time', 1, { status: 'success' }) - } catch (err) { - logger.error( - { err, projectId, timestamp }, - 'error setting persist time in redis' - ) - metrics.inc('chunk_store.redis.set_persist_time', 1, { status: 'error' }) - // Re-throw the error so the test fails if setting fails + metrics.inc('chunk_store.redis.get_head_snapshot', 1, { status: 'error' }) throw err } } -module.exports = { - getCurrentChunk, - getCurrentChunkIfValid, - setCurrentChunk, - getCurrentChunkMetadata, - checkCacheValidity, - checkCacheValidityWithMetadata, - compareChunks, - expireCurrentChunk, - clearCache, - getCurrentChunkStatus, - setPersistTime, // Export the new function +rclient.defineCommand('queue_changes', { + numberOfKeys: 5, + lua: ` + local headSnapshotKey = KEYS[1] + local headVersionKey = KEYS[2] + local changesKey = KEYS[3] + local expireTimeKey = KEYS[4] + local persistTimeKey = KEYS[5] + + local baseVersion = tonumber(ARGV[1]) + local head = ARGV[2] + local persistTime = tonumber(ARGV[3]) + local expireTime = tonumber(ARGV[4]) + -- Changes start from ARGV[5] + + local headVersion = tonumber(redis.call('GET', headVersionKey)) + if headVersion and headVersion ~= baseVersion then + return 'conflict' + end + + -- Check if there are any changes to queue + if #ARGV < 5 then + return 'no_changes_provided' + end + + -- Store the changes + -- RPUSH changesKey change1 change2 ... + redis.call('RPUSH', changesKey, unpack(ARGV, 5, #ARGV)) + + -- Update head snapshot only if changes were successfully pushed + redis.call('SET', headSnapshotKey, head) + + -- Update the head version + local numChanges = #ARGV - 4 + local newHeadVersion = baseVersion + numChanges + redis.call('SET', headVersionKey, newHeadVersion) + + -- Update the persist time if the new time is sooner + local currentPersistTime = tonumber(redis.call('GET', persistTimeKey)) + if not currentPersistTime or persistTime < currentPersistTime then + redis.call('SET', persistTimeKey, persistTime) + end + + -- Update the expire time + redis.call('SET', expireTimeKey, expireTime) + + return 'ok' + `, +}) + +/** + * Atomically queues changes to the project history in Redis if the baseVersion matches. + * Updates head snapshot, version, persist time, and expire time. + * + * @param {string} projectId - The project identifier. + * @param {Snapshot} headSnapshot - The new head snapshot after applying changes. + * @param {number} baseVersion - The expected current head version. + * @param {Change[]} changes - An array of Change objects to queue. + * @param {number} persistTime - Timestamp (ms since epoch) when the oldest change in the buffer should be persisted. + * @param {number} expireTime - Timestamp (ms since epoch) when the project buffer should expire if inactive. + * @returns {Promise} Resolves on success. + * @throws {BaseVersionConflictError} If the baseVersion does not match the current head version in Redis. + * @throws {Error} If changes array is empty or if Redis operations fail. + */ +async function queueChanges( + projectId, + headSnapshot, + baseVersion, + changes, + persistTime, + expireTime +) { + if (!changes || changes.length === 0) { + throw new Error('Cannot queue empty changes array') + } + + try { + const keys = [ + keySchema.head({ projectId }), + keySchema.headVersion({ projectId }), + keySchema.changes({ projectId }), + keySchema.expireTime({ projectId }), + keySchema.persistTime({ projectId }), + ] + + const args = [ + baseVersion.toString(), + JSON.stringify(headSnapshot.toRaw()), + persistTime.toString(), + expireTime.toString(), + ...changes.map(change => JSON.stringify(change.toRaw())), // Serialize changes + ] + + const status = await rclient.queue_changes(keys, args) + metrics.inc('chunk_store.redis.queue_changes', 1, { status }) + if (status === 'ok') { + return + } + if (status === 'conflict') { + throw new BaseVersionConflictError('base version mismatch', { + projectId, + baseVersion, + }) + } else { + throw new Error(`unexpected result queuing changes: ${status}`) + } + } catch (err) { + if (err instanceof BaseVersionConflictError) { + // Re-throw conflict errors directly + throw err + } + metrics.inc('chunk_store.redis.queue_changes', 1, { status: 'error' }) + throw err + } +} + +rclient.defineCommand('get_state', { + numberOfKeys: 6, // Number of keys defined in keySchema + lua: ` + local headSnapshotKey = KEYS[1] + local headVersionKey = KEYS[2] + local persistedVersionKey = KEYS[3] + local expireTimeKey = KEYS[4] + local persistTimeKey = KEYS[5] + local changesKey = KEYS[6] + + local headSnapshot = redis.call('GET', headSnapshotKey) + local headVersion = redis.call('GET', headVersionKey) + local persistedVersion = redis.call('GET', persistedVersionKey) + local expireTime = redis.call('GET', expireTimeKey) + local persistTime = redis.call('GET', persistTimeKey) + local changes = redis.call('LRANGE', changesKey, 0, -1) -- Get all changes in the list + + return {headSnapshot, headVersion, persistedVersion, expireTime, persistTime, changes} + `, +}) + +/** + * Retrieves the entire state associated with a project from Redis atomically. + * @param {string} projectId - The unique identifier of the project. + * @returns {Promise} A Promise that resolves to an object containing the project state, + * or null if the project state does not exist (e.g., head version is missing). + * @throws {Error} If Redis operations fail. + */ +async function getState(projectId) { + const keys = [ + keySchema.head({ projectId }), + keySchema.headVersion({ projectId }), + keySchema.persistedVersion({ projectId }), + keySchema.expireTime({ projectId }), + keySchema.persistTime({ projectId }), + keySchema.changes({ projectId }), + ] + + // Pass keys individually, not as an array + const result = await rclient.get_state(...keys) + + const [ + rawHeadSnapshot, + rawHeadVersion, + rawPersistedVersion, + rawExpireTime, + rawPersistTime, + rawChanges, + ] = result + + // Safely parse values, providing defaults or nulls if necessary + const headSnapshot = rawHeadSnapshot + ? JSON.parse(rawHeadSnapshot) + : rawHeadSnapshot + const headVersion = rawHeadVersion ? parseInt(rawHeadVersion, 10) : null // Should always exist if result is not null + const persistedVersion = rawPersistedVersion + ? parseInt(rawPersistedVersion, 10) + : null + const expireTime = rawExpireTime ? parseInt(rawExpireTime, 10) : null + const persistTime = rawPersistTime ? parseInt(rawPersistTime, 10) : null + const changes = rawChanges ? rawChanges.map(JSON.parse) : null + + return { + headSnapshot, + headVersion, + persistedVersion, + expireTime, + persistTime, + changes, + } +} + +rclient.defineCommand('get_changes_since_version', { + numberOfKeys: 2, + lua: ` + local headVersionKey = KEYS[1] + local changesKey = KEYS[2] + + local requestedVersion = tonumber(ARGV[1]) + + -- Check if head version exists + local headVersion = tonumber(redis.call('GET', headVersionKey)) + if not headVersion then + return {'not_found'} + end + + -- If requested version equals head version, return empty array + if requestedVersion == headVersion then + return {'ok', {}} + end + + -- If requested version is greater than head version, return error + if requestedVersion > headVersion then + return {'out_of_bounds'} + end + + -- Get length of changes list + local changesCount = redis.call('LLEN', changesKey) + + -- Check if requested version is too old (changes already removed from buffer) + if requestedVersion < (headVersion - changesCount) then + return {'out_of_bounds'} + end + + -- Calculate the starting index, using negative indexing to count backwards + -- from the end of the list + local startIndex = requestedVersion - headVersion + + -- Get changes using LRANGE + local changes = redis.call('LRANGE', changesKey, startIndex, -1) + + return {'ok', changes} + `, +}) + +/** + * Retrieves changes since a specific version for a project from Redis. + * + * @param {string} projectId - The unique identifier of the project. + * @param {number} version - The version number to retrieve changes since. + * @returns {Promise<{status: string, changes?: Array}>} A Promise that resolves to an object containing: + * - status: 'OK', 'NOT_FOUND', or 'OUT_OF_BOUNDS' + * - changes: Array of Change objects (only when status is 'OK') + * @throws {Error} If Redis operations fail. + */ +async function getChangesSinceVersion(projectId, version) { + try { + const keys = [ + keySchema.headVersion({ projectId }), + keySchema.changes({ projectId }), + ] + + const args = [version.toString()] + + const result = await rclient.get_changes_since_version(keys, args) + const status = result[0] + + if (status === 'ok') { + // If status is OK, parse the changes + const changes = result[1] + ? result[1].map(rawChange => + typeof rawChange === 'string' ? JSON.parse(rawChange) : rawChange + ) + : [] + + metrics.inc('chunk_store.redis.get_changes_since_version', 1, { + status: 'success', + }) + return { status, changes } + } else { + // For other statuses, just return the status + metrics.inc('chunk_store.redis.get_changes_since_version', 1, { + status, + }) + return { status } + } + } catch (err) { + metrics.inc('chunk_store.redis.get_changes_since_version', 1, { + status: 'error', + }) + throw err + } +} + +rclient.defineCommand('get_non_persisted_changes', { + numberOfKeys: 3, + lua: ` + local headVersionKey = KEYS[1] + local persistedVersionKey = KEYS[2] + local changesKey = KEYS[3] + + -- Check if head version exists + local headVersion = tonumber(redis.call('GET', headVersionKey)) + if not headVersion then + return {} + end + + -- Check if persisted version exists + local persistedVersion = tonumber(redis.call('GET', persistedVersionKey)) + + local startIndex + if not persistedVersion then + -- None of the changes in Redis have been persisted + startIndex = 0 + elseif persistedVersion > headVersion then + -- This should never happen + return redis.error_reply('HEAD_VERSION_BEHIND_PERSISTED_VERSION') + elseif persistedVersion == headVersion then + return {} + else + -- startIndex is negative and counts from the end of the list of changes + startIndex = persistedVersion - headVersion + end + + -- Get changes using LRANGE + local changes = redis.call('LRANGE', changesKey, startIndex, -1) + + return changes + `, +}) + +/** + * Retrieves non-persisted changes for a project from Redis. + * + * @param {string} projectId - The unique identifier of the project. + * @returns {Promise} A Promise that resolves to an array of non-persisted Change objects. + * @throws {Error} If Redis operations fail. + */ +async function getNonPersistedChanges(projectId) { + try { + const keys = [ + keySchema.headVersion({ projectId }), + keySchema.persistedVersion({ projectId }), + keySchema.changes({ projectId }), + ] + + const result = await rclient.get_non_persisted_changes(keys) + + // Parse the changes + const changes = result?.map(json => Change.fromRaw(JSON.parse(json))) ?? [] + + metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { + status: 'success', + }) + return changes + } catch (err) { + metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { + status: 'error', + }) + throw err + } +} + +rclient.defineCommand('set_persisted_version', { + numberOfKeys: 3, + lua: ` + local headVersionKey = KEYS[1] + local persistedVersionKey = KEYS[2] + local changesKey = KEYS[3] + + local newPersistedVersion = tonumber(ARGV[1]) + local maxPersistedChanges = tonumber(ARGV[2]) + + -- Check if head version exists + local headVersion = tonumber(redis.call('GET', headVersionKey)) + if not headVersion then + return 'not_found' + end + + -- Set the persisted version + redis.call('SET', persistedVersionKey, newPersistedVersion) + + -- Calculate the starting index, to keep only maxPersistedChanges beyond the persisted version + -- Using negative indexing to count backwards from the end of the list + local startIndex = newPersistedVersion - headVersion - maxPersistedChanges + + -- Trim the changes list to keep only the specified number of changes beyond persisted version + if startIndex < 0 then + redis.call('LTRIM', changesKey, startIndex, -1) + end + + return 'ok' + `, +}) + +/** + * Sets the persisted version for a project in Redis and trims the changes list. + * + * @param {string} projectId - The unique identifier of the project. + * @param {number} persistedVersion - The version number to set as persisted. + * @returns {Promise} A Promise that resolves to 'OK' or 'NOT_FOUND'. + * @throws {Error} If Redis operations fail. + */ +async function setPersistedVersion(projectId, persistedVersion) { + try { + const keys = [ + keySchema.headVersion({ projectId }), + keySchema.persistedVersion({ projectId }), + keySchema.changes({ projectId }), + ] + + const args = [persistedVersion.toString(), MAX_PERSISTED_CHANGES.toString()] + + const status = await rclient.set_persisted_version(keys, args) + + metrics.inc('chunk_store.redis.set_persisted_version', 1, { + status, + }) + + return status + } catch (err) { + metrics.inc('chunk_store.redis.set_persisted_version', 1, { + status: 'error', + }) + throw err + } +} + +rclient.defineCommand('set_expire_time', { + numberOfKeys: 2, + lua: ` + local expireTimeKey = KEYS[1] + local headVersionKey = KEYS[2] + local expireTime = tonumber(ARGV[1]) + + -- Only set the expire time if the project is loaded in Redis + local headVersion = redis.call('GET', headVersionKey) + if headVersion then + redis.call('SET', expireTimeKey, expireTime) + end + `, +}) + +/** + * Sets the expire version for a project in Redis + * + * @param {string} projectId + * @param {number} expireTime - Timestamp (ms since epoch) when the project + * buffer should expire if inactive + */ +async function setExpireTime(projectId, expireTime) { + try { + await rclient.set_expire_time( + keySchema.expireTime({ projectId }), + keySchema.headVersion({ projectId }), + expireTime.toString() + ) + metrics.inc('chunk_store.redis.set_expire_time', 1, { status: 'success' }) + } catch (err) { + metrics.inc('chunk_store.redis.set_expire_time', 1, { status: 'error' }) + throw err + } +} + +rclient.defineCommand('expire_project', { + numberOfKeys: 6, + lua: ` + local headKey = KEYS[1] + local headVersionKey = KEYS[2] + local changesKey = KEYS[3] + local persistedVersionKey = KEYS[4] + local persistTimeKey = KEYS[5] + local expireTimeKey = KEYS[6] + + local headVersion = tonumber(redis.call('GET', headVersionKey)) + if not headVersion then + return 'not-found' + end + + local persistedVersion = tonumber(redis.call('GET', persistedVersionKey)) + if not persistedVersion or persistedVersion ~= headVersion then + return 'not-persisted' + end + + redis.call('DEL', + headKey, + headVersionKey, + changesKey, + persistedVersionKey, + persistTimeKey, + expireTimeKey + ) + return 'success' + `, +}) + +async function expireProject(projectId) { + try { + const status = await rclient.expire_project( + keySchema.head({ projectId }), + keySchema.headVersion({ projectId }), + keySchema.changes({ projectId }), + keySchema.persistedVersion({ projectId }), + keySchema.persistTime({ projectId }), + keySchema.expireTime({ projectId }) + ) + metrics.inc('chunk_store.redis.set_persisted_version', 1, { + status, + }) + } catch (err) { + metrics.inc('chunk_store.redis.set_persisted_version', 1, { + status: 'error', + }) + throw err + } +} + +rclient.defineCommand('claim_job', { + numberOfKeys: 1, + lua: ` + local jobTimeKey = KEYS[1] + local currentTime = tonumber(ARGV[1]) + local retryDelay = tonumber(ARGV[2]) + + local jobTime = tonumber(redis.call('GET', jobTimeKey)) + if not jobTime then + return {'no-job'} + end + + local msUntilReady = jobTime - currentTime + if msUntilReady <= 0 then + local retryTime = currentTime + retryDelay + redis.call('SET', jobTimeKey, retryTime) + return {'ok', retryTime} + else + return {'wait', msUntilReady} + end + `, +}) + +rclient.defineCommand('close_job', { + numberOfKeys: 1, + lua: ` + local jobTimeKey = KEYS[1] + local expectedJobTime = tonumber(ARGV[1]) + + local jobTime = tonumber(redis.call('GET', jobTimeKey)) + if jobTime and jobTime == expectedJobTime then + redis.call('DEL', jobTimeKey) + end + `, +}) + +/** + * Claim an expire job + * + * @param {string} projectId + * @return {Promise} + */ +async function claimExpireJob(projectId) { + return await claimJob(keySchema.expireTime({ projectId })) +} + +/** + * Claim a persist job + * + * @param {string} projectId + * @return {Promise} + */ +async function claimPersistJob(projectId) { + return await claimJob(keySchema.persistTime({ projectId })) +} + +/** + * Claim a persist or expire job + * + * @param {string} jobKey - the Redis key containing the time at which the job + * is ready + * @return {Promise} + */ +async function claimJob(jobKey) { + let result, status + try { + result = await rclient.claim_job(jobKey, Date.now(), RETRY_DELAY_MS) + status = result[0] + metrics.inc('chunk_store.redis.claim_job', 1, { status }) + } catch (err) { + metrics.inc('chunk_store.redis.claim_job', 1, { status: 'error' }) + throw err + } + + if (status === 'ok') { + return new Job(jobKey, parseInt(result[1], 10)) + } else if (status === 'wait') { + throw new JobNotReadyError('job not ready', { + jobKey, + retryTime: result[1], + }) + } else if (status === 'no-job') { + throw new JobNotFoundError('job not found', { jobKey }) + } else { + throw new OError('unknown status for claim_job', { jobKey, status }) + } +} + +/** + * Handle for a claimed job + */ +class Job { + /** + * @param {string} redisKey + * @param {number} claimTimestamp + */ + constructor(redisKey, claimTimestamp) { + this.redisKey = redisKey + this.claimTimestamp = claimTimestamp + } + + async close() { + try { + await rclient.close_job(this.redisKey, this.claimTimestamp.toString()) + metrics.inc('chunk_store.redis.close_job', 1, { status: 'success' }) + } catch (err) { + metrics.inc('chunk_store.redis.close_job', 1, { status: 'error' }) + throw err + } + } +} + +module.exports = { + getHeadSnapshot, + queueChanges, + getState, + getChangesSinceVersion, + getNonPersistedChanges, + setPersistedVersion, + setExpireTime, + expireProject, + claimExpireJob, + claimPersistJob, + MAX_PERSISTED_CHANGES, + MAX_PERSIST_DELAY_MS, + PROJECT_TTL_MS, + RETRY_DELAY_MS, + keySchema, } diff --git a/services/history-v1/storage/scripts/expire_redis_chunks.js b/services/history-v1/storage/scripts/expire_redis_chunks.js index 11b34101da..de4e130ed4 100644 --- a/services/history-v1/storage/scripts/expire_redis_chunks.js +++ b/services/history-v1/storage/scripts/expire_redis_chunks.js @@ -2,7 +2,7 @@ const logger = require('@overleaf/logger') const commandLineArgs = require('command-line-args') // Add this line const redis = require('../lib/redis') const { scanRedisCluster, extractKeyId } = require('../lib/scan') -const { expireCurrentChunk } = require('../lib/chunk_store/redis') +const { expireProject, claimExpireJob } = require('../lib/chunk_store/redis') const rclient = redis.rclientHistory const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}` @@ -30,24 +30,42 @@ function isExpiredKey(expireTimestamp, currentTime) { return currentTime > expireTime } -async function processKeysBatch(keysBatch, rclient) { +async function fetchTimestamps(projectIds, rclient) { + const expireTimeKeys = projectIds.map(id => `expire-time:{${id}}`) + // For efficiency, we use MGET to fetch all the timestamps in a single request + const expireTimestamps = await rclient.mget(expireTimeKeys) + // Return an array of objects with projectId and expireTimestamp + const results = projectIds.map((projectId, index) => ({ + projectId, + expireTimestamp: expireTimestamps[index], + })) + return results +} + +async function processKeysBatch(projectIds, rclient) { let clearedKeyCount = 0 - if (keysBatch.length === 0) { + if (projectIds.length === 0) { return 0 } - // For efficiency, we use MGET to fetch all the timestamps in a single request - const expireTimestamps = await rclient.mget(keysBatch) + const projects = await fetchTimestamps(projectIds, rclient) const currentTime = Date.now() - for (let i = 0; i < keysBatch.length; i++) { - const key = keysBatch[i] + + for (const project of projects) { + const { projectId, expireTimestamp } = project // For each key, do a quick check to see if the key is expired before calling // the LUA script to expire the chunk atomically. - if (isExpiredKey(expireTimestamps[i], currentTime)) { - const projectId = extractKeyId(key) + if (isExpiredKey(expireTimestamp, currentTime)) { if (DRY_RUN) { logger.info({ projectId }, '[Dry Run] Would expire chunk for project') } else { - await expireCurrentChunk(projectId) + try { + const job = await claimExpireJob(projectId) + await expireProject(projectId) + await job.close() + } catch (err) { + logger.error({ projectId, err }, 'error expiring chunk for project') + continue + } } clearedKeyCount++ } @@ -61,7 +79,6 @@ async function expireRedisChunks() { const START_TIME = Date.now() if (DRY_RUN) { - // Use global DRY_RUN logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode') } else { logger.info({}, 'starting expireRedisChunks scan') @@ -72,7 +89,10 @@ async function expireRedisChunks() { EXPIRE_TIME_KEY_PATTERN )) { scannedKeyCount += keysBatch.length - clearedKeyCount += await processKeysBatch(keysBatch, rclient) + clearedKeyCount += await processKeysBatch( + keysBatch.map(extractKeyId), + rclient + ) if (scannedKeyCount % 1000 === 0) { logger.info( { scannedKeyCount, clearedKeyCount }, @@ -92,7 +112,13 @@ async function expireRedisChunks() { await redis.disconnect() } -expireRedisChunks().catch(err => { - logger.fatal({ err }, 'unhandled error in expireRedisChunks') - process.exit(1) -}) +// Check if the script is being run directly +if (require.main === module) { + expireRedisChunks().catch(err => { + logger.fatal({ err }, 'unhandled error in expireRedisChunks') + process.exit(1) + }) +} else { + // Export the function for module usage + module.exports = { expireRedisChunks } +} diff --git a/services/history-v1/test/acceptance/js/api/project_updates.test.js b/services/history-v1/test/acceptance/js/api/project_updates.test.js index eb7b1703a7..f50f3677b5 100644 --- a/services/history-v1/test/acceptance/js/api/project_updates.test.js +++ b/services/history-v1/test/acceptance/js/api/project_updates.test.js @@ -22,7 +22,6 @@ const TextOperation = core.TextOperation const V2DocVersions = core.V2DocVersions const knex = require('../../../../storage').knex -const redis = require('../../../../storage/lib/chunk_store/redis') describe('history import', function () { beforeEach(cleanup.everything) @@ -595,10 +594,6 @@ describe('history import', function () { testFiles.NULL_CHARACTERS_TXT_BYTE_LENGTH ) }) - .then(() => { - // Now clear the cache because we have changed the string length in the database - return redis.clearCache(testProjectId) - }) .then(importChanges) .then(getLatestContent) .then(response => { diff --git a/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js b/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js deleted file mode 100644 index 841282a8e4..0000000000 --- a/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js +++ /dev/null @@ -1,351 +0,0 @@ -'use strict' - -const { expect } = require('chai') -const sinon = require('sinon') -const { - Chunk, - Snapshot, - History, - File, - AddFileOperation, - EditFileOperation, - AddCommentOperation, - TextOperation, - Range, - TrackingProps, - Change, -} = require('overleaf-editor-core') -const cleanup = require('./support/cleanup') -const fixtures = require('./support/fixtures') -const chunkBuffer = require('../../../../storage/lib/chunk_buffer') -const chunkStore = require('../../../../storage/lib/chunk_store') -const redisBackend = require('../../../../storage/lib/chunk_store/redis') -const metrics = require('@overleaf/metrics') - -describe('chunk buffer', function () { - beforeEach(cleanup.everything) - beforeEach(fixtures.create) - beforeEach(function () { - sinon.spy(metrics, 'inc') - }) - afterEach(function () { - metrics.inc.restore() - }) - - const projectId = '123456' - - describe('loadLatest', function () { - // Initialize project and create a test chunk - beforeEach(async function () { - // Initialize project in chunk store - await chunkStore.initializeProject(projectId) - }) - - describe('with an existing chunk', function () { - beforeEach(async function () { - // Create a sample chunk with some content - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 1) // startVersion 1 - - // Store the chunk directly in the chunk store using create method - // which internally calls uploadChunk - await chunkStore.create(projectId, chunk) - - // Clear any existing cache - await redisBackend.clearCache(projectId) - }) - - it('should load from chunk store and update cache on first access (cache miss)', async function () { - // Load the underlying chunk from the chunk store for verification - const storedChunk = await chunkStore.loadLatest(projectId) - - // First access should load from chunk store and populate cache - const firstResult = await chunkBuffer.loadLatest(projectId) - - // Verify the chunk is correct - expect(firstResult).to.not.be.null - expect(firstResult.getStartVersion()).to.equal(1) - expect(firstResult.getEndVersion()).to.equal(2) - - // Verify the chunk is the same as the one in the store - expect(firstResult).to.deep.equal(storedChunk) - - // Verify that we got a cache miss metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - - // Reset the metrics spy - metrics.inc.resetHistory() - - // Second access should hit the cache - const secondResult = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same chunk - expect(secondResult).to.not.be.null - expect(secondResult.getStartVersion()).to.equal(1) - expect(secondResult.getEndVersion()).to.equal(2) - - // Verify the chunk is the same as the one in the store - expect(secondResult).to.deep.equal(storedChunk) - - // Verify that we got a cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true - - // Verify both chunks are equivalent - expect(secondResult.getStartVersion()).to.equal( - firstResult.getStartVersion() - ) - expect(secondResult.getEndVersion()).to.equal( - firstResult.getEndVersion() - ) - expect(secondResult).to.deep.equal(firstResult) - }) - - it('should refresh the cache when chunk changes in the store', async function () { - // First access to load into cache - const firstResult = await chunkBuffer.loadLatest(projectId) - expect(firstResult.getStartVersion()).to.equal(1) - - // Reset metrics spy - metrics.inc.resetHistory() - - // Create a new chunk with different content - const newSnapshot = new Snapshot() - const newChanges = [ - new Change( - [ - new AddFileOperation( - 'updated.tex', - File.fromString('Updated content') - ), - ], - new Date(), - [] - ), - ] - const newHistory = new History(newSnapshot, newChanges) - const newChunk = new Chunk(newHistory, 2) // Different start version - - // Store the new chunk directly in the chunk store - await chunkStore.create(projectId, newChunk) - - // Load the underlying chunk from the chunk store for verification - const storedChunk = await chunkStore.loadLatest(projectId) - - // Access again - should detect the change and refresh cache - const secondResult = await chunkBuffer.loadLatest(projectId) - - // Verify we got the updated chunk - expect(secondResult.getStartVersion()).to.equal(2) - expect(secondResult.getEndVersion()).to.equal(3) - // Verify that the chunk content is the same - expect(secondResult).to.deep.equal(storedChunk) - - // Verify that we got a cache miss metric (since the cached chunk was invalidated) - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - }) - - it('should continue using cache when chunk in store has not changed', async function () { - // Load the underlying chunk from the chunk store for verification - const storedChunk = await chunkStore.loadLatest(projectId) - - // First access to load into cache - await chunkBuffer.loadLatest(projectId) - - // Reset metrics spy - metrics.inc.resetHistory() - - // Access again without changing the underlying chunk - const result = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same chunk - expect(result.getStartVersion()).to.equal(1) - expect(result.getEndVersion()).to.equal(2) - expect(result).to.deep.equal(storedChunk) - - // Verify that we got a cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true - }) - }) - - it('should handle a chunk with metadata, comments and tracked changes', async function () { - // Create a snapshot and initial file - const snapshot = new Snapshot() - const initialFileOp = new AddFileOperation( - 'test.tex', - File.fromString('Initial line.\\nSecond line.', { - meta1: 'abc', - meta2: 'def', - }) - ) - const initialChange = new Change([initialFileOp], new Date(), []) - - // Add a comment - const commentOp = new AddCommentOperation( - 'comment1', - [new Range(0, 7)] // Range for "Initial" - ) - const commentChange = new Change( - [new EditFileOperation('test.tex', commentOp)], - new Date(), - [] - ) - - // Tracked insert - const trackedInsertOp = new TextOperation() - .retain(14) - .insert('Hello', { - commentIds: ['comment1'], - tracking: TrackingProps.fromRaw({ - ts: '2024-01-01T00:00:00.000Z', - type: 'insert', - userId: 'user1', - }), - }) - .retain(12) - const insertChange = new Change( - [new EditFileOperation('test.tex', trackedInsertOp)], - new Date(), - [] - ) - - // Tracked delete - const trackedDeleteOp = new TextOperation().retain(14, { - tracking: TrackingProps.fromRaw({ - ts: '2024-01-01T00:00:00.000Z', - type: 'delete', - userId: 'user1', - }), - }) - const deleteChange = new Change( - [new EditFileOperation('test.tex', trackedDeleteOp)], - new Date(), - [] - ) - - // Combine changes into history and create chunk - const history = new History(snapshot, [ - initialChange, - commentChange, - insertChange, - deleteChange, - ]) - const chunk = new Chunk(history, 1) // Start version 0 - // Store the chunk - await chunkStore.create(projectId, chunk) - // Clear the cache - await redisBackend.clearCache(projectId) - metrics.inc.resetHistory() - - // Load the underlying chunk from the chunk store for verification - const storedChunk = await chunkStore.loadLatest(projectId) - - // Load the chunk via buffer (cache miss) - const firstResult = await chunkBuffer.loadLatest(projectId) - - // Verify chunk details - expect(firstResult.getStartVersion()).to.equal(1) - expect(firstResult.getEndVersion()).to.equal(5) // 4 changes - expect(firstResult.history.changes.length).to.equal(4) - expect(firstResult).to.deep.equal(storedChunk) - - // Verify cache miss metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - - // Reset metrics - metrics.inc.resetHistory() - - // Second access should hit the cache - const secondResult = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same chunk - expect(secondResult.getStartVersion()).to.equal(1) - expect(secondResult.getEndVersion()).to.equal(5) - expect(secondResult.history.changes.length).to.equal(4) - expect(secondResult).to.deep.equal(storedChunk) - - // Verify cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true - }) - - describe('with an empty project', function () { - it('should handle a case with empty chunks (no changes)', async function () { - // Clear the cache - await redisBackend.clearCache(projectId) - - // Load the underlying chunk from the chunk store for verification - const storedChunk = await chunkStore.loadLatest(projectId) - - // Load the initial empty chunk via buffer - const result = await chunkBuffer.loadLatest(projectId) - - // Verify we got the empty chunk - expect(result.getStartVersion()).to.equal(0) - expect(result.getEndVersion()).to.equal(0) // Start equals end for empty chunks - expect(result.history.changes.length).to.equal(0) - - // Verify that the chunk is the same as the one in the store - expect(result).to.deep.equal(storedChunk) - - // Verify cache miss metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - - // Reset metrics - metrics.inc.resetHistory() - - // Second access should hit the cache - const secondResult = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same empty chunk - expect(secondResult.getStartVersion()).to.equal(0) - expect(secondResult.getEndVersion()).to.equal(0) - expect(secondResult.history.changes.length).to.equal(0) - - // Verify that the chunk is the same as the one in the store - expect(secondResult).to.deep.equal(storedChunk) - - // Verify cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true - }) - }) - }) -}) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index 612e802ff1..514ae617cf 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -2,923 +2,1031 @@ const { expect } = require('chai') const { - Chunk, Snapshot, - History, - File, - AddFileOperation, - Origin, Change, - V2DocVersions, + AddFileOperation, + File, } = require('overleaf-editor-core') const cleanup = require('./support/cleanup') const redisBackend = require('../../../../storage/lib/chunk_store/redis') +const { + JobNotReadyError, + JobNotFoundError, +} = require('../../../../storage/lib/chunk_store/errors') +const redis = require('../../../../storage/lib/redis') +const rclient = redis.rclientHistory +const keySchema = redisBackend.keySchema -describe('chunk store Redis backend', function () { +describe('chunk buffer Redis backend', function () { beforeEach(cleanup.everything) - const projectId = '123456' + const projectId = 'project123' - describe('getCurrentChunk', function () { + describe('getHeadSnapshot', function () { it('should return null on cache miss', async function () { - const chunk = await redisBackend.getCurrentChunk(projectId) - expect(chunk).to.be.null + const result = await redisBackend.getHeadSnapshot(projectId) + expect(result).to.be.null }) - it('should return the cached chunk', async function () { - // Create a sample chunk + it('should return the cached head snapshot and version', async function () { + // Create a sample snapshot and version const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 5) // startVersion 5 + const version = 42 + const rawSnapshot = JSON.stringify(snapshot.toRaw()) - // Cache the chunk - await redisBackend.setCurrentChunk(projectId, chunk) + // Manually set the data in Redis + await rclient.set(keySchema.head({ projectId }), rawSnapshot) + await rclient.set( + keySchema.headVersion({ projectId }), + version.toString() + ) - // Retrieve the cached chunk - const cachedChunk = await redisBackend.getCurrentChunk(projectId) + // Retrieve the cached snapshot + const result = await redisBackend.getHeadSnapshot(projectId) - expect(cachedChunk).to.not.be.null - expect(cachedChunk.getStartVersion()).to.equal(5) - expect(cachedChunk.getEndVersion()).to.equal(6) - expect(cachedChunk).to.deep.equal(chunk) + expect(result).to.not.be.null + expect(result.version).to.equal(version) + expect(result.snapshot).to.deep.equal(snapshot) // Use deep equal for object comparison + }) + + it('should return null if the version is missing', async function () { + // Create a sample snapshot + const snapshot = new Snapshot() + const rawSnapshot = JSON.stringify(snapshot.toRaw()) + + // Manually set only the snapshot data in Redis + await rclient.set(keySchema.head({ projectId }), rawSnapshot) + + // Attempt to retrieve the snapshot + const result = await redisBackend.getHeadSnapshot(projectId) + + expect(result).to.be.null }) }) - describe('setCurrentChunk', function () { - it('should successfully cache a chunk', async function () { - // Create a sample chunk - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 5) // startVersion 5 + describe('queueChanges', function () { + it('should queue changes when the base version matches head version', async function () { + // Create base version + const baseVersion = 0 - // Cache the chunk - await redisBackend.setCurrentChunk(projectId, chunk) + // Create a new head snapshot that will be set after changes + const headSnapshot = new Snapshot() - // Verify the chunk was cached correctly by retrieving it - const cachedChunk = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunk).to.not.be.null - expect(cachedChunk.getStartVersion()).to.equal(5) - expect(cachedChunk.getEndVersion()).to.equal(6) - expect(cachedChunk).to.deep.equal(chunk) + // Create changes + const timestamp = new Date() + const change = new Change([], timestamp, []) - // Verify that the chunk was stored correctly using the chunk metadata - const chunkMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(chunkMetadata).to.not.be.null - expect(chunkMetadata.startVersion).to.equal(5) - expect(chunkMetadata.changesCount).to.equal(1) + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 // 30 seconds from now + const expireTime = now + 60 * 60 * 1000 // 1 hour from now + + // Queue the changes + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change], + persistTime, + expireTime + ) + + // Get the state to verify the changes + const state = await redisBackend.getState(projectId) + + // Verify the result + expect(state).to.exist + expect(state.headVersion).to.equal(baseVersion + 1) + expect(state.headSnapshot).to.deep.equal(headSnapshot.toRaw()) + expect(state.persistTime).to.equal(persistTime) + expect(state.expireTime).to.equal(expireTime) }) - it('should correctly handle a chunk with zero changes', async function () { - // Create a sample chunk with no changes - const snapshot = new Snapshot() - const changes = [] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 10) // startVersion 10 + it('should throw BaseVersionConflictError when base version does not match head version', async function () { + // Create a mismatch scenario + const headSnapshot = new Snapshot() + const baseVersion = 0 - // Cache the chunk - await redisBackend.setCurrentChunk(projectId, chunk) + // Manually set a different head version in Redis + await rclient.set(keySchema.headVersion({ projectId }), '5') - // Retrieve the cached chunk - const cachedChunk = await redisBackend.getCurrentChunk(projectId) + // Create changes + const timestamp = new Date() + const change = new Change([], timestamp, []) - expect(cachedChunk).to.not.be.null - expect(cachedChunk.getStartVersion()).to.equal(10) - expect(cachedChunk.getEndVersion()).to.equal(10) // End version should equal start version with no changes - expect(cachedChunk.history.changes.length).to.equal(0) - expect(cachedChunk).to.deep.equal(chunk) + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Attempt to queue the changes with a mismatched base version + // This should throw a BaseVersionConflictError + try { + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change], + persistTime, + expireTime + ) + // If we get here, the test should fail + expect.fail('Expected BaseVersionConflictError but no error was thrown') + } catch (err) { + expect(err.name).to.equal('BaseVersionConflictError') + expect(err.info).to.deep.include({ + projectId, + baseVersion, + }) + } + }) + + it('should throw error when given an empty changes array', async function () { + // Create a valid scenario but with empty changes + const headSnapshot = new Snapshot() + const baseVersion = 0 + + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Attempt to queue with empty changes array + try { + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [], // Empty changes array + persistTime, + expireTime + ) + // If we get here, the test should fail + expect.fail('Expected Error but no error was thrown') + } catch (err) { + expect(err.message).to.equal('Cannot queue empty changes array') + } + }) + + it('should queue multiple changes and increment version correctly', async function () { + // Create base version + const baseVersion = 0 + + // Create a new head snapshot + const headSnapshot = new Snapshot() + + // Create multiple changes + const timestamp = new Date() + const change1 = new Change([], timestamp) + const change2 = new Change([], timestamp) + const change3 = new Change([], timestamp) + + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Queue the changes + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change1, change2, change3], // Multiple changes + persistTime, + expireTime + ) + + // Get the state to verify the changes + const state = await redisBackend.getState(projectId) + + // Verify that version was incremented by the number of changes + expect(state.headVersion).to.equal(baseVersion + 3) + expect(state.headSnapshot).to.deep.equal(headSnapshot.toRaw()) + }) + + it('should use the provided persistTime only if it is sooner than existing time', async function () { + // Create base version + const baseVersion = 0 + + // Create a new head snapshot + const headSnapshot = new Snapshot() + + // Create changes + const timestamp = new Date() + const change = new Change([], timestamp) + + // Set times + const now = Date.now() + const earlierPersistTime = now + 15 * 1000 // 15 seconds from now + const laterPersistTime = now + 30 * 1000 // 30 seconds from now + const expireTime = now + 60 * 60 * 1000 // 1 hour from now + + // First queue changes with the later persist time + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change], + laterPersistTime, + expireTime + ) + + // Get the state to verify the first persist time was set + let state = await redisBackend.getState(projectId) + expect(state.persistTime).to.equal(laterPersistTime) + + // Queue more changes with an earlier persist time + const newerHeadSnapshot = new Snapshot() + await redisBackend.queueChanges( + projectId, + newerHeadSnapshot, + baseVersion + 1, // Updated base version + [change], + earlierPersistTime, // Earlier time should replace the later one + expireTime + ) + + // Get the state to verify the persist time was updated to the earlier time + state = await redisBackend.getState(projectId) + expect(state.persistTime).to.equal(earlierPersistTime) + + // Queue more changes with another later persist time + const evenNewerHeadSnapshot = new Snapshot() + await redisBackend.queueChanges( + projectId, + evenNewerHeadSnapshot, + baseVersion + 2, // Updated base version + [change], + laterPersistTime, // Later time should not replace the earlier one + expireTime + ) + + // Get the state to verify the persist time remains at the earlier time + state = await redisBackend.getState(projectId) + expect(state.persistTime).to.equal(earlierPersistTime) // Should still be the earlier time }) }) - describe('updating already cached chunks', function () { - it('should replace a chunk with a longer chunk', async function () { - // Set initial chunk with one change - const snapshotA = new Snapshot() - const changesA = [ - new Change( - [ - new AddFileOperation( - 'test.tex', - File.fromString('Initial content') - ), - ], - new Date(), - [] - ), - ] - const historyA = new History(snapshotA, changesA) - const chunkA = new Chunk(historyA, 10) - - await redisBackend.setCurrentChunk(projectId, chunkA) - - // Verify the initial chunk was cached - const cachedChunkA = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkA.getStartVersion()).to.equal(10) - expect(cachedChunkA.getEndVersion()).to.equal(11) - expect(cachedChunkA.history.changes.length).to.equal(1) - - // Create a longer chunk (with more changes) - const snapshotB = new Snapshot() - const changesB = [ - new Change( - [new AddFileOperation('test1.tex', File.fromString('Content 1'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('test2.tex', File.fromString('Content 2'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('test3.tex', File.fromString('Content 3'))], - new Date(), - [] - ), - ] - const historyB = new History(snapshotB, changesB) - const chunkB = new Chunk(historyB, 15) - - // Replace the cached chunk - await redisBackend.setCurrentChunk(projectId, chunkB) - - // Verify the new chunk replaced the old one - const cachedChunkB = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkB).to.not.be.null - expect(cachedChunkB.getStartVersion()).to.equal(15) - expect(cachedChunkB.getEndVersion()).to.equal(18) - expect(cachedChunkB.history.changes.length).to.equal(3) - expect(cachedChunkB).to.deep.equal(chunkB) - - // Verify the metadata was updated - const updatedMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(updatedMetadata.startVersion).to.equal(15) - expect(updatedMetadata.changesCount).to.equal(3) + describe('getChangesSinceVersion', function () { + it('should return not_found when project does not exist', async function () { + const result = await redisBackend.getChangesSinceVersion(projectId, 1) + expect(result.status).to.equal('not_found') }) - it('should replace a chunk with a shorter chunk', async function () { - // Set initial chunk with three changes - const snapshotA = new Snapshot() - const changesA = [ - new Change( - [new AddFileOperation('file1.tex', File.fromString('Content 1'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('file2.tex', File.fromString('Content 2'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('file3.tex', File.fromString('Content 3'))], - new Date(), - [] - ), - ] - const historyA = new History(snapshotA, changesA) - const chunkA = new Chunk(historyA, 20) + it('should return empty array when requested version equals head version', async function () { + // Set head version + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) - await redisBackend.setCurrentChunk(projectId, chunkA) + // Request changes since the current head version + const result = await redisBackend.getChangesSinceVersion( + projectId, + headVersion + ) - // Verify the initial chunk was cached - const cachedChunkA = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkA.getStartVersion()).to.equal(20) - expect(cachedChunkA.getEndVersion()).to.equal(23) - expect(cachedChunkA.history.changes.length).to.equal(3) - - // Create a shorter chunk (with fewer changes) - const snapshotB = new Snapshot() - const changesB = [ - new Change( - [new AddFileOperation('new.tex', File.fromString('New content'))], - new Date(), - [] - ), - ] - const historyB = new History(snapshotB, changesB) - const chunkB = new Chunk(historyB, 30) - - // Replace the cached chunk - await redisBackend.setCurrentChunk(projectId, chunkB) - - // Verify the new chunk replaced the old one - const cachedChunkB = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkB).to.not.be.null - expect(cachedChunkB.getStartVersion()).to.equal(30) - expect(cachedChunkB.getEndVersion()).to.equal(31) - expect(cachedChunkB.history.changes.length).to.equal(1) - expect(cachedChunkB).to.deep.equal(chunkB) - - // Verify the metadata was updated - const updatedMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(updatedMetadata.startVersion).to.equal(30) - expect(updatedMetadata.changesCount).to.equal(1) + expect(result.status).to.equal('ok') + expect(result.changes).to.be.an('array').that.is.empty }) - it('should replace a chunk with a zero-length chunk', async function () { - // Set initial chunk with changes - const snapshotA = new Snapshot() - const changesA = [ - new Change( - [new AddFileOperation('file1.tex', File.fromString('Content 1'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('file2.tex', File.fromString('Content 2'))], - new Date(), - [] - ), - ] - const historyA = new History(snapshotA, changesA) - const chunkA = new Chunk(historyA, 25) + it('should return out_of_bounds when requested version is greater than head version', async function () { + // Set head version + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) - await redisBackend.setCurrentChunk(projectId, chunkA) + // Request changes with version larger than head + const result = await redisBackend.getChangesSinceVersion( + projectId, + headVersion + 1 + ) - // Verify the initial chunk was cached - const cachedChunkA = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkA.getStartVersion()).to.equal(25) - expect(cachedChunkA.getEndVersion()).to.equal(27) - expect(cachedChunkA.history.changes.length).to.equal(2) - - // Create a zero-length chunk (with no changes) - const snapshotB = new Snapshot() - const changesB = [] - const historyB = new History(snapshotB, changesB) - const chunkB = new Chunk(historyB, 40) - - // Replace the cached chunk - await redisBackend.setCurrentChunk(projectId, chunkB) - - // Verify the new chunk replaced the old one - const cachedChunkB = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkB).to.not.be.null - expect(cachedChunkB.getStartVersion()).to.equal(40) - expect(cachedChunkB.getEndVersion()).to.equal(40) // Start version equals end version with no changes - expect(cachedChunkB.history.changes.length).to.equal(0) - expect(cachedChunkB).to.deep.equal(chunkB) - - // Verify the metadata was updated - const updatedMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(updatedMetadata.startVersion).to.equal(40) - expect(updatedMetadata.changesCount).to.equal(0) + expect(result.status).to.equal('out_of_bounds') }) - it('should replace a zero-length chunk with a non-empty chunk', async function () { - // Set initial empty chunk - const snapshotA = new Snapshot() - const changesA = [] - const historyA = new History(snapshotA, changesA) - const chunkA = new Chunk(historyA, 50) + it('should return out_of_bounds when requested version is too old', async function () { + // Set head version + const headVersion = 10 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) - await redisBackend.setCurrentChunk(projectId, chunkA) + // Create a few changes but less than what we'd need to reach requested version + const timestamp = new Date() + const change1 = new Change([], timestamp) + const change2 = new Change([], timestamp) + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()) + ) - // Verify the initial chunk was cached - const cachedChunkA = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkA.getStartVersion()).to.equal(50) - expect(cachedChunkA.getEndVersion()).to.equal(50) - expect(cachedChunkA.history.changes.length).to.equal(0) + // Request changes from version 5, which is too old (headVersion - changesCount = 10 - 2 = 8) + const result = await redisBackend.getChangesSinceVersion(projectId, 5) - // Create a non-empty chunk - const snapshotB = new Snapshot() - const changesB = [ - new Change( - [new AddFileOperation('newfile.tex', File.fromString('New content'))], - new Date(), - [] - ), - new Change( - [ - new AddFileOperation( - 'another.tex', - File.fromString('Another file') - ), - ], - new Date(), - [] - ), - ] - const historyB = new History(snapshotB, changesB) - const chunkB = new Chunk(historyB, 60) + expect(result.status).to.equal('out_of_bounds') + }) - // Replace the cached chunk - await redisBackend.setCurrentChunk(projectId, chunkB) + it('should return changes since requested version', async function () { + // Set head version + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) - // Verify the new chunk replaced the old one - const cachedChunkB = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunkB).to.not.be.null - expect(cachedChunkB.getStartVersion()).to.equal(60) - expect(cachedChunkB.getEndVersion()).to.equal(62) - expect(cachedChunkB.history.changes.length).to.equal(2) - expect(cachedChunkB).to.deep.equal(chunkB) + // Create changes + const timestamp = new Date() + const change1 = new Change([], timestamp) + const change2 = new Change([], timestamp) + const change3 = new Change([], timestamp) - // Verify the metadata was updated - const updatedMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(updatedMetadata.startVersion).to.equal(60) - expect(updatedMetadata.changesCount).to.equal(2) + // Push changes to Redis (representing versions 3, 4, and 5) + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()), + JSON.stringify(change3.toRaw()) + ) + + // Request changes since version 3 (should return changes for versions 4 and 5) + const result = await redisBackend.getChangesSinceVersion(projectId, 3) + + expect(result.status).to.equal('ok') + expect(result.changes).to.be.an('array').with.lengthOf(2) + + // The changes array should contain the raw changes + // Note: We're comparing raw objects, not the Change instances + expect(result.changes[0]).to.deep.equal(change2.toRaw()) + expect(result.changes[1]).to.deep.equal(change3.toRaw()) + }) + + it('should return all changes when requested version is earliest available', async function () { + // Set head version to 5 + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + + // Create changes + const timestamp = new Date() + const change1 = new Change([], timestamp) + const change2 = new Change([], timestamp) + const change3 = new Change([], timestamp) + + // Push changes to Redis (representing versions 3, 4, and 5) + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()), + JSON.stringify(change3.toRaw()) + ) + + // Request changes since version 2 (should return all 3 changes) + const result = await redisBackend.getChangesSinceVersion(projectId, 2) + + expect(result.status).to.equal('ok') + expect(result.changes).to.be.an('array').with.lengthOf(3) + expect(result.changes[0]).to.deep.equal(change1.toRaw()) + expect(result.changes[1]).to.deep.equal(change2.toRaw()) + expect(result.changes[2]).to.deep.equal(change3.toRaw()) }) }) - describe('checkCacheValidity', function () { - it('should return true when versions match', function () { - const snapshotA = new Snapshot() - const historyA = new History(snapshotA, []) - const chunkA = new Chunk(historyA, 10) - chunkA.pushChanges([ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello'))], - new Date(), - [] - ), - ]) - - const snapshotB = new Snapshot() - const historyB = new History(snapshotB, []) - const chunkB = new Chunk(historyB, 10) - chunkB.pushChanges([ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello'))], - new Date(), - [] - ), - ]) - - const isValid = redisBackend.checkCacheValidity(chunkA, chunkB) - expect(isValid).to.be.true + describe('getNonPersistedChanges', function () { + it('should return empty array when project does not exist', async function () { + const changes = await redisBackend.getNonPersistedChanges(projectId) + expect(changes).to.be.an('array').that.is.empty }) - it('should return false when start versions differ', function () { - const snapshotA = new Snapshot() - const historyA = new History(snapshotA, []) - const chunkA = new Chunk(historyA, 10) + it('should return all changes when persisted version is not set', async function () { + const changes = [makeChange(), makeChange(), makeChange()] + queueChanges(projectId, changes) - const snapshotB = new Snapshot() - const historyB = new History(snapshotB, []) - const chunkB = new Chunk(historyB, 11) - - const isValid = redisBackend.checkCacheValidity(chunkA, chunkB) - expect(isValid).to.be.false + const nonPersistedChanges = + await redisBackend.getNonPersistedChanges(projectId) + expect(nonPersistedChanges.map(change => change.toRaw())).to.deep.equal( + changes.map(change => change.toRaw()) + ) }) - it('should return false when end versions differ', function () { - const snapshotA = new Snapshot() - const historyA = new History(snapshotA, []) - const chunkA = new Chunk(historyA, 10) - chunkA.pushChanges([ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello'))], - new Date(), - [] - ), - ]) + it('should return empty array when persisted version equals head version', async function () { + // Set both head and persisted versions to be equal + const version = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + version.toString() + ) + await rclient.set( + keySchema.persistedVersion({ projectId }), + version.toString() + ) - const snapshotB = new Snapshot() - const historyB = new History(snapshotB, []) - const chunkB = new Chunk(historyB, 10) - chunkB.pushChanges([ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('other.tex', File.fromString('World'))], - new Date(), - [] - ), - ]) - - const isValid = redisBackend.checkCacheValidity(chunkA, chunkB) - expect(isValid).to.be.false + const changes = await redisBackend.getNonPersistedChanges(projectId) + expect(changes).to.be.an('array').that.is.empty }) - it('should return false when cached chunk is null', function () { - const snapshotB = new Snapshot() - const historyB = new History(snapshotB, []) - const chunkB = new Chunk(historyB, 10) + it('should return all non-persisted changes', async function () { + // Set head version to 5 and persisted version to 2 + const headVersion = 5 + const persistedVersion = 2 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() + ) - const isValid = redisBackend.checkCacheValidity(null, chunkB) - expect(isValid).to.be.false + // Create changes for versions 3, 4, 5 + const timestamp = new Date() + const change1 = new Change([], timestamp) // Version 3 + const change2 = new Change([], timestamp) // Version 4 + const change3 = new Change([], timestamp) // Version 5 + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()), + JSON.stringify(change3.toRaw()) + ) + + // Get non-persisted changes + const nonPersistedChanges = + await redisBackend.getNonPersistedChanges(projectId) + + // Should return changes for versions 3, 4, 5 + expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3) + expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change1.toRaw()) + expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change2.toRaw()) + expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change3.toRaw()) + }) + + it('should return a subset of changes when some are persisted', async function () { + // Set head version to 5 and persisted version to 3 + // This means versions 4 and 5 are not persisted + const headVersion = 5 + const persistedVersion = 3 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() + ) + + // Create changes for versions 1, 2, 3, 4, 5 + const timestamp = new Date() + const change1 = new Change([], timestamp) // Version 1 + const change2 = new Change([], timestamp) // Version 2 + const change3 = new Change([], timestamp) // Version 3 + const change4 = new Change([], timestamp) // Version 4 + const change5 = new Change([], timestamp) // Version 5 + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()), + JSON.stringify(change3.toRaw()), + JSON.stringify(change4.toRaw()), + JSON.stringify(change5.toRaw()) + ) + + // Get non-persisted changes + const nonPersistedChanges = + await redisBackend.getNonPersistedChanges(projectId) + + // Should return only changes for versions 4 and 5 + expect(nonPersistedChanges).to.be.an('array').with.lengthOf(2) + expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change4.toRaw()) + expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change5.toRaw()) + }) + + it('should throw an error when persisted version is higher than head version', async function () { + // This is an unusual case that should not happen in practice + // The system should throw an error to indicate this abnormal state + const headVersion = 3 + const persistedVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() + ) + + // Create changes + const timestamp = new Date() + const change1 = new Change([], timestamp) + const change2 = new Change([], timestamp) + const change3 = new Change([], timestamp) + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change1.toRaw()), + JSON.stringify(change2.toRaw()), + JSON.stringify(change3.toRaw()) + ) + + // Use chai-as-promised for cleaner async error assertion + await expect( + redisBackend.getNonPersistedChanges(projectId) + ).to.be.rejectedWith(/HEAD_VERSION_BEHIND_PERSISTED_VERSION/) + }) + + it('should handle case where persisted version is before start of changes list', async function () { + // Setup: head version is 5, persisted version is 1 + // But changes list only starts from version 3 + const headVersion = 5 + const persistedVersion = 1 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() + ) + + // Create changes for versions 3, 4, 5 only + const timestamp = new Date() + const change3 = new Change([], timestamp) // Version 3 + const change4 = new Change([], timestamp) // Version 4 + const change5 = new Change([], timestamp) // Version 5 + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + JSON.stringify(change3.toRaw()), + JSON.stringify(change4.toRaw()), + JSON.stringify(change5.toRaw()) + ) + + // Get non-persisted changes + const nonPersistedChanges = + await redisBackend.getNonPersistedChanges(projectId) + + // Should return all changes since the persisted version is before the start of the list + expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3) + expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change3.toRaw()) + expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change4.toRaw()) + expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change5.toRaw()) }) }) - describe('compareChunks', function () { - it('should return true when chunks are identical', function () { - // Create two identical chunks - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), // Using fixed date for consistent comparison - [] - ), - ] - const history1 = new History(snapshot, changes) - const chunk1 = new Chunk(history1, 5) - - // Create a separate but identical chunk - const snapshot2 = new Snapshot() - const changes2 = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), // Using same fixed date - [] - ), - ] - const history2 = new History(snapshot2, changes2) - const chunk2 = new Chunk(history2, 5) - - const result = redisBackend.compareChunks(projectId, chunk1, chunk2) - expect(result).to.be.true + describe('setPersistedVersion', function () { + it('should return not_found when project does not exist', async function () { + const result = await redisBackend.setPersistedVersion(projectId, 5) + expect(result).to.equal('not_found') }) - it('should return false when chunks differ', function () { - // Create first chunk - const snapshot1 = new Snapshot() - const changes1 = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), - [] - ), - ] - const history1 = new History(snapshot1, changes1) - const chunk1 = new Chunk(history1, 5) - - // Create a different chunk (different content) - const snapshot2 = new Snapshot() - const changes2 = [ - new Change( - [ - new AddFileOperation( - 'test.tex', - File.fromString('Different content') - ), - ], - new Date('2025-04-10T12:00:00Z'), - [] - ), - ] - const history2 = new History(snapshot2, changes2) - const chunk2 = new Chunk(history2, 5) - - const result = redisBackend.compareChunks(projectId, chunk1, chunk2) - expect(result).to.be.false - }) - - it('should return false when one chunk is null', function () { - // Create a chunk - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 5) - - const resultWithNullCached = redisBackend.compareChunks( - projectId, - null, - chunk + it('should set the persisted version', async function () { + // Set head version + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() ) - expect(resultWithNullCached).to.be.false - const resultWithNullCurrent = redisBackend.compareChunks( + // Set persisted version + const persistedVersion = 3 + const result = await redisBackend.setPersistedVersion( projectId, - chunk, - null + persistedVersion ) - expect(resultWithNullCurrent).to.be.false + + expect(result).to.equal('ok') + + // Verify the persisted version was set + const persistedVersionRedis = await rclient.get( + keySchema.persistedVersion({ projectId }) + ) + expect(parseInt(persistedVersionRedis, 10)).to.equal(persistedVersion) }) - it('should return false when chunks have different start versions', function () { - // Create first chunk with start version 5 - const snapshot1 = new Snapshot() - const changes1 = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), - [] - ), - ] - const history1 = new History(snapshot1, changes1) - const chunk1 = new Chunk(history1, 5) + it('should trim the changes list to keep only MAX_PERSISTED_CHANGES beyond persisted version', async function () { + // Get MAX_PERSISTED_CHANGES to ensure our test data is larger + const maxPersistedChanges = redisBackend.MAX_PERSISTED_CHANGES - // Create second chunk with identical content but different start version (10) - const snapshot2 = new Snapshot() - const changes2 = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date('2025-04-10T12:00:00Z'), - [] - ), - ] - const history2 = new History(snapshot2, changes2) - const chunk2 = new Chunk(history2, 10) + // Create a larger number of changes for the test + // Using MAX_PERSISTED_CHANGES + 10 to ensure we have enough changes to trigger trimming + const totalChanges = maxPersistedChanges + 10 - const result = redisBackend.compareChunks(projectId, chunk1, chunk2) - expect(result).to.be.false + // Set head version to match total number of changes + const headVersion = totalChanges + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + + // Create changes for versions 1 through totalChanges + const timestamp = new Date() + const changes = Array.from( + { length: totalChanges }, + (_, idx) => + new Change( + [new AddFileOperation(`file${idx}.tex`, File.fromString('hello'))], + timestamp + ) + ) + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + ...changes.map(change => JSON.stringify(change.toRaw())) + ) + + // Set persisted version to somewhere near the head version + const persistedVersion = headVersion - 5 + + // Set the persisted version + const result = await redisBackend.setPersistedVersion( + projectId, + persistedVersion + ) + expect(result).to.equal('ok') + + // Get all changes that remain in Redis + const remainingChanges = await rclient.lrange( + keySchema.changes({ projectId }), + 0, + -1 + ) + + // Calculate the expected number of changes to remain + expect(remainingChanges).to.have.lengthOf( + maxPersistedChanges + (headVersion - persistedVersion) + ) + + // Check that remaining changes are the expected ones + const expectedChanges = changes.slice( + persistedVersion - maxPersistedChanges, + totalChanges + ) + expect(remainingChanges).to.deep.equal( + expectedChanges.map(change => JSON.stringify(change.toRaw())) + ) + }) + + it('should keep all changes when there are fewer than MAX_PERSISTED_CHANGES', async function () { + // Set head version to 5 + const headVersion = 5 + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + + // Create changes for versions 1 through 5 + const timestamp = new Date() + const changes = Array.from({ length: 5 }, () => new Change([], timestamp)) + + // Push changes to Redis + await rclient.rpush( + keySchema.changes({ projectId }), + ...changes.map(change => JSON.stringify(change.toRaw())) + ) + + // Set persisted version to 3 + // All changes should remain since total count is small + const persistedVersion = 3 + + // Ensure MAX_PERSISTED_CHANGES is larger than our test dataset + expect(redisBackend.MAX_PERSISTED_CHANGES).to.be.greaterThan( + 5, + 'MAX_PERSISTED_CHANGES should be greater than 5 for this test' + ) + + // Set the persisted version + const result = await redisBackend.setPersistedVersion( + projectId, + persistedVersion + ) + expect(result).to.equal('ok') + + // Get all changes that remain in Redis + const remainingChanges = await rclient.lrange( + keySchema.changes({ projectId }), + 0, + -1 + ) + + // All changes should remain + expect(remainingChanges).to.have.lengthOf(5) }) }) - describe('integration with redis', function () { - it('should store and retrieve complex chunks correctly', async function () { - // Create a more complex chunk + describe('getState', function () { + it('should return complete project state from Redis', async function () { + // Set up the test data in Redis const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('file1.tex', File.fromString('Content 1'))], - new Date(), - [1234] - ), - new Change( - [new AddFileOperation('file2.tex', File.fromString('Content 2'))], - new Date(), - null, - new Origin('test-origin'), - ['5a296963ad5e82432674c839', null], - '123.4', - new V2DocVersions({ - 'random-doc-id': { pathname: 'file2.tex', v: 123 }, - }) - ), - new Change( - [new AddFileOperation('file3.tex', File.fromString('Content 3'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 20) + const rawSnapshot = JSON.stringify(snapshot.toRaw()) + const headVersion = 42 + const persistedVersion = 40 + const now = Date.now() + const expireTime = now + 60 * 60 * 1000 // 1 hour from now + const persistTime = now + 30 * 1000 // 30 seconds from now - // Cache the chunk - await redisBackend.setCurrentChunk(projectId, chunk) + // Create a change + const timestamp = new Date() + const change = new Change([], timestamp) + const serializedChange = JSON.stringify(change.toRaw()) - // Retrieve the cached chunk - const cachedChunk = await redisBackend.getCurrentChunk(projectId) - - expect(cachedChunk.getStartVersion()).to.equal(20) - expect(cachedChunk.getEndVersion()).to.equal(23) - expect(cachedChunk).to.deep.equal(chunk) - expect(cachedChunk.history.changes.length).to.equal(3) - - // Check that the operations were preserved correctly - const retrievedChanges = cachedChunk.history.changes - expect(retrievedChanges[0].getOperations()[0].getPathname()).to.equal( - 'file1.tex' + // Set everything in Redis + await rclient.set(keySchema.head({ projectId }), rawSnapshot) + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() ) - expect(retrievedChanges[1].getOperations()[0].getPathname()).to.equal( - 'file2.tex' + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() ) - expect(retrievedChanges[2].getOperations()[0].getPathname()).to.equal( - 'file3.tex' + await rclient.set( + keySchema.expireTime({ projectId }), + expireTime.toString() + ) + await rclient.set( + keySchema.persistTime({ projectId }), + persistTime.toString() + ) + await rclient.rpush(keySchema.changes({ projectId }), serializedChange) + + // Get the state + const state = await redisBackend.getState(projectId) + + // Verify everything matches + expect(state).to.exist + expect(state.headSnapshot).to.deep.equal(snapshot.toRaw()) + expect(state.headVersion).to.equal(headVersion) + expect(state.persistedVersion).to.equal(persistedVersion) + expect(state.expireTime).to.equal(expireTime) + expect(state.persistTime).to.equal(persistTime) + }) + + it('should return proper defaults for missing fields', async function () { + // Only set the head snapshot and version, leave others unset + const snapshot = new Snapshot() + const rawSnapshot = JSON.stringify(snapshot.toRaw()) + const headVersion = 42 + + await rclient.set(keySchema.head({ projectId }), rawSnapshot) + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() ) - // Check that the chunk was stored correctly using the chunk metadata - const chunkMetadata = - await redisBackend.getCurrentChunkMetadata(projectId) - expect(chunkMetadata).to.not.be.null - expect(chunkMetadata.startVersion).to.equal(20) - expect(chunkMetadata.changesCount).to.equal(3) + // Get the state + const state = await redisBackend.getState(projectId) + + // Verify only what we set exists, and other fields have correct defaults + expect(state).to.exist + expect(state.headSnapshot).to.deep.equal(snapshot.toRaw()) + expect(state.headVersion).to.equal(headVersion) + expect(state.persistedVersion).to.be.null + expect(state.expireTime).to.be.null + expect(state.persistTime).to.be.null }) }) - describe('getCurrentChunkIfValid', function () { - it('should return the chunk when versions and changes count match', async function () { - // Create and cache a sample chunk - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Valid content'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 7) // startVersion 7, endVersion 8 - await redisBackend.setCurrentChunk(projectId, chunk) + describe('setExpireTime', function () { + it('should set the expire time on an active project', async function () { + // Load a fake project in Redis + const change = makeChange() + await queueChanges(projectId, [change], { expireTime: 123 }) - // Prepare chunkRecord matching the cached chunk - const chunkRecord = { startVersion: 7, endVersion: 8 } + // Check that the right expire time was recorded + let state = await redisBackend.getState(projectId) + expect(state.expireTime).to.equal(123) - // Retrieve using getCurrentChunkIfValid - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - - expect(validChunk).to.not.be.null - expect(validChunk.getStartVersion()).to.equal(7) - expect(validChunk.getEndVersion()).to.equal(8) - expect(validChunk).to.deep.equal(chunk) + // Set the expire time to something else + await redisBackend.setExpireTime(projectId, 456) + state = await redisBackend.getState(projectId) + expect(state.expireTime).to.equal(456) }) - it('should return null when no chunk is cached', async function () { - // No chunk is cached for this projectId yet - const chunkRecord = { startVersion: 1, endVersion: 2 } - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - expect(validChunk).to.be.null - }) + it('should not set an expire time on an inactive project', async function () { + let state = await redisBackend.getState(projectId) + expect(state.expireTime).to.be.null - it('should return null when start version mismatches', async function () { - // Cache a chunk with startVersion 10 - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Content'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 10) // startVersion 10, endVersion 11 - await redisBackend.setCurrentChunk(projectId, chunk) - - // Attempt to retrieve with a different startVersion - const chunkRecord = { startVersion: 9, endVersion: 10 } // Incorrect startVersion - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - expect(validChunk).to.be.null - }) - - it('should return null when changes count mismatches', async function () { - // Cache a chunk with one change (startVersion 15, endVersion 16) - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Content'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 15) - await redisBackend.setCurrentChunk(projectId, chunk) - - // Attempt to retrieve with correct startVersion but incorrect endVersion (implying wrong changes count) - const chunkRecord = { startVersion: 15, endVersion: 17 } // Incorrect endVersion (implies 2 changes) - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - expect(validChunk).to.be.null - }) - - it('should return the chunk when versions and changes count match for a zero-change chunk', async function () { - // Cache a chunk with zero changes - const snapshot = new Snapshot() - const changes = [] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 20) // startVersion 20, endVersion 20 - await redisBackend.setCurrentChunk(projectId, chunk) - - // Prepare chunkRecord matching the zero-change chunk - const chunkRecord = { startVersion: 20, endVersion: 20 } - - // Retrieve using getCurrentChunkIfValid - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - - expect(validChunk).to.not.be.null - expect(validChunk.getStartVersion()).to.equal(20) - expect(validChunk.getEndVersion()).to.equal(20) - expect(validChunk.history.changes.length).to.equal(0) - expect(validChunk).to.deep.equal(chunk) - }) - - it('should return null when start version matches but changes count is wrong for zero-change chunk', async function () { - // Cache a chunk with zero changes - const snapshot = new Snapshot() - const changes = [] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 25) // startVersion 25, endVersion 25 - await redisBackend.setCurrentChunk(projectId, chunk) - - // Attempt to retrieve with correct startVersion but incorrect endVersion - const chunkRecord = { startVersion: 25, endVersion: 26 } // Incorrect endVersion (implies 1 change) - const validChunk = await redisBackend.getCurrentChunkIfValid( - projectId, - chunkRecord - ) - expect(validChunk).to.be.null + await redisBackend.setExpireTime(projectId, 456) + state = await redisBackend.getState(projectId) + expect(state.expireTime).to.be.null }) }) - describe('getCurrentChunkMetadata', function () { - it('should return metadata for a cached chunk', async function () { - // Cache a chunk - const snapshot = new Snapshot() - const history = new History(snapshot, [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello'))], - new Date(), - [] - ), - new Change( - [new AddFileOperation('other.tex', File.fromString('Bonjour'))], - new Date(), - [] - ), - ]) - const chunk = new Chunk(history, 10) - await redisBackend.setCurrentChunk(projectId, chunk) + describe('expireProject', function () { + it('should expire a persisted project', async function () { + // Load and persist a project in Redis + const change = makeChange() + await queueChanges(projectId, [change]) + await redisBackend.setPersistedVersion(projectId, 1) - const metadata = await redisBackend.getCurrentChunkMetadata(projectId) - expect(metadata).to.deep.equal({ startVersion: 10, changesCount: 2 }) + // Check that the project is loaded + let state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(1) + expect(state.persistedVersion).to.equal(1) + + // Expire the project + await redisBackend.expireProject(projectId) + state = await redisBackend.getState(projectId) + expect(state.headVersion).to.be.null }) - it('should return null if no chunk is cached for the project', async function () { - const metadata = await redisBackend.getCurrentChunkMetadata( - 'non-existent-project-id' - ) - expect(metadata).to.be.null + it('should not expire a non-persisted project', async function () { + // Load a project in Redis + const change = makeChange() + await queueChanges(projectId, [change]) + + // Check that the project is loaded + let state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(1) + expect(state.persistedVersion).to.equal(null) + + // Expire the project + await redisBackend.expireProject(projectId) + state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(1) }) - it('should return metadata with zero changes for a zero-change chunk', async function () { - // Cache a chunk with no changes - const snapshot = new Snapshot() - const history = new History(snapshot, []) - const chunk = new Chunk(history, 5) - await redisBackend.setCurrentChunk(projectId, chunk) + it('should not expire a partially persisted project', async function () { + // Load a fake project in Redis + const change1 = makeChange() + const change2 = makeChange() + await queueChanges(projectId, [change1, change2]) - const metadata = await redisBackend.getCurrentChunkMetadata(projectId) - expect(metadata).to.deep.equal({ startVersion: 5, changesCount: 0 }) + // Persist the first change + await redisBackend.setPersistedVersion(projectId, 1) + + // Check that the project is loaded + let state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(2) + expect(state.persistedVersion).to.equal(1) + + // Expire the project + await redisBackend.expireProject(projectId) + state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(2) + }) + + it('should handle a project that is not loaded', async function () { + // Check that the project is not loaded + let state = await redisBackend.getState(projectId) + expect(state.headVersion).to.be.null + + // Expire the project + await redisBackend.expireProject(projectId) + state = await redisBackend.getState(projectId) + expect(state.headVersion).to.be.null }) }) - describe('expireCurrentChunk', function () { - const TEMPORARY_CACHE_LIFETIME_MS = 300 * 1000 // Match the value in redis.js + describe('claimExpireJob', function () { + it("should claim the expire job when it's ready", async function () { + // Load a project in Redis + const change = makeChange() + const now = Date.now() + const expireTime = now - 1000 + await queueChanges(projectId, [change], { expireTime }) - it('should return false and not expire a non-expired chunk', async function () { - // Cache a chunk - const snapshot = new Snapshot() - const history = new History(snapshot, []) - const chunk = new Chunk(history, 10) - await redisBackend.setCurrentChunk(projectId, chunk) + // Check that the expire time has been set correctly + let state = await redisBackend.getState(projectId) + expect(state.expireTime).to.equal(expireTime) - // Attempt to expire immediately (should not be expired yet) - const expired = await redisBackend.expireCurrentChunk(projectId) - expect(expired).to.be.false + // Claim the job + await redisBackend.claimExpireJob(projectId) - // Verify the chunk still exists - const cachedChunk = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunk).to.not.be.null - expect(cachedChunk.getStartVersion()).to.equal(10) + // Check the job expires in the future + state = await redisBackend.getState(projectId) + expect(state.expireTime).to.satisfy(time => time > now) }) - it('should return true and expire an expired chunk using currentTime', async function () { - // Cache a chunk - const snapshot = new Snapshot() - const history = new History(snapshot, []) - const chunk = new Chunk(history, 10) - await redisBackend.setCurrentChunk(projectId, chunk) + it('should throw an error when the job is not ready', async function () { + // Load a project in Redis + const change = makeChange() + const now = Date.now() + const expireTime = now + 100_000 + await queueChanges(projectId, [change], { expireTime }) - // Calculate a time far enough in the future to ensure expiry - const futureTime = Date.now() + TEMPORARY_CACHE_LIFETIME_MS + 5000 // 5 seconds past expiry - - // Attempt to expire using the future time - const expired = await redisBackend.expireCurrentChunk( - projectId, - futureTime + // Claim the job + await expect(redisBackend.claimExpireJob(projectId)).to.be.rejectedWith( + JobNotReadyError ) - expect(expired).to.be.true - - // Verify the chunk is gone - const cachedChunk = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunk).to.be.null - - // Verify metadata is also gone - const metadata = await redisBackend.getCurrentChunkMetadata(projectId) - expect(metadata).to.be.null }) - it('should return false if no chunk is cached for the project', async function () { - const expired = await redisBackend.expireCurrentChunk( - 'non-existent-project' + it('should throw an error when the job is not found', async function () { + // Claim a job on a project that is not loaded + await expect(redisBackend.claimExpireJob(projectId)).to.be.rejectedWith( + JobNotFoundError ) - expect(expired).to.be.false - }) - - it('should return false if called with a currentTime before the expiry time', async function () { - // Cache a chunk - const snapshot = new Snapshot() - const history = new History(snapshot, []) - const chunk = new Chunk(history, 10) - await redisBackend.setCurrentChunk(projectId, chunk) - - // Use a time *before* the cache would normally expire - const pastTime = Date.now() - 10000 // 10 seconds ago - - // Attempt to expire using the past time - const expired = await redisBackend.expireCurrentChunk(projectId, pastTime) - expect(expired).to.be.false - - // Verify the chunk still exists - const cachedChunk = await redisBackend.getCurrentChunk(projectId) - expect(cachedChunk).to.not.be.null }) }) - describe('with a persist-time timestamp', function () { - const persistTimestamp = Date.now() + 1000 * 60 * 60 // 1 hour in the future + describe('claimPersistJob', function () { + it("should claim the persist job when it's ready", async function () { + // Load a project in Redis + const change = makeChange() + const now = Date.now() + const persistTime = now - 1000 + await queueChanges(projectId, [change], { persistTime }) + + // Check that the persist time has been set correctly + let state = await redisBackend.getState(projectId) + expect(state.persistTime).to.equal(persistTime) + + // Claim the job + await redisBackend.claimPersistJob(projectId) + + // Check the job is not ready + state = await redisBackend.getState(projectId) + expect(state.persistTime).to.satisfy(time => time > now) + }) + + it('should throw an error when the job is not ready', async function () { + // Load a project in Redis + const change = makeChange() + const now = Date.now() + const persistTime = now + 100_000 + await queueChanges(projectId, [change], { persistTime }) + + // Claim the job + await expect(redisBackend.claimPersistJob(projectId)).to.be.rejectedWith( + JobNotReadyError + ) + }) + + it('should throw an error when the job is not found', async function () { + // Claim a job on a project that is not loaded + await expect(redisBackend.claimExpireJob(projectId)).to.be.rejectedWith( + JobNotFoundError + ) + }) + }) + + describe('closing a job', function () { + let job beforeEach(async function () { - // Ensure a chunk exists before each test in this block - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Persist Test'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 100) - await redisBackend.setCurrentChunk(projectId, chunk) + // Load a project in Redis + const change = makeChange() + const now = Date.now() + const expireTime = now - 1000 + await queueChanges(projectId, [change], { expireTime }) + + // Check that the expire time has been set correctly + const state = await redisBackend.getState(projectId) + expect(state.expireTime).to.equal(expireTime) + + // Claim the job + job = await redisBackend.claimExpireJob(projectId) }) - it('should not clear a chunk if persist-time is set', async function () { - // Set persist time - await redisBackend.setPersistTime(projectId, persistTimestamp) - - // Attempt to clear the cache - const cleared = await redisBackend.clearCache(projectId) - expect(cleared).to.be.false // Expect clearCache to return false - - // Verify the chunk still exists - const chunk = await redisBackend.getCurrentChunk(projectId) - expect(chunk).to.not.be.null - expect(chunk.getStartVersion()).to.equal(100) + it("should delete the key if it hasn't changed", async function () { + await job.close() + const state = await redisBackend.getState(projectId) + expect(state.expireTime).to.be.null }) - it('should not expire a chunk if persist-time is set, even if expire-time has passed', async function () { - // Set persist time - await redisBackend.setPersistTime(projectId, persistTimestamp) - - // Attempt to expire the chunk with a time far in the future - const farFutureTime = Date.now() + 1000 * 60 * 60 * 24 // 24 hours in the future - const expired = await redisBackend.expireCurrentChunk( - projectId, - farFutureTime - ) - expect(expired).to.be.false // Expect expireCurrentChunk to return false - - // Verify the chunk still exists - const chunk = await redisBackend.getCurrentChunk(projectId) - expect(chunk).to.not.be.null - expect(chunk.getStartVersion()).to.equal(100) - }) - - it('getCurrentChunkStatus should return persist-time when set', async function () { - // Set persist time - await redisBackend.setPersistTime(projectId, persistTimestamp) - - const status = await redisBackend.getCurrentChunkStatus(projectId) - expect(status.persistTime).to.equal(persistTimestamp) - expect(status.expireTime).to.be.a('number') // expireTime is set by setCurrentChunk - }) - - it('getCurrentChunkStatus should return null for persist-time when not set', async function () { - const status = await redisBackend.getCurrentChunkStatus(projectId) - expect(status.persistTime).to.be.null - expect(status.expireTime).to.be.a('number') - }) - - it('getCurrentChunkStatus should return nulls after cache is cleared (without persist-time)', async function () { - // Clear cache (persistTime is not set here) - await redisBackend.clearCache(projectId) - - const status = await redisBackend.getCurrentChunkStatus(projectId) - expect(status.persistTime).to.be.null - expect(status.expireTime).to.be.null + it('should keep the key if it has changed', async function () { + const newTimestamp = job.claimTimestamp + 1000 + await redisBackend.setExpireTime(projectId, newTimestamp) + await job.close() + const state = await redisBackend.getState(projectId) + expect(state.expireTime).to.equal(newTimestamp) }) }) }) + +async function queueChanges(projectId, changes, opts = {}) { + const baseVersion = 0 + const headSnapshot = new Snapshot() + + // Set times + const now = Date.now() + const persistTime = opts.persistTime ?? now + 30 * 1000 // 30 seconds from now + const expireTime = opts.expireTime ?? now + 60 * 60 * 1000 // 1 hour from now + + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + changes, + persistTime, + expireTime + ) +} + +function makeChange() { + const timestamp = new Date() + return new Change([], timestamp) +} diff --git a/services/history-v1/test/acceptance/js/storage/expire_redis_chunks.test.js b/services/history-v1/test/acceptance/js/storage/expire_redis_chunks.test.js new file mode 100644 index 0000000000..b657991dda --- /dev/null +++ b/services/history-v1/test/acceptance/js/storage/expire_redis_chunks.test.js @@ -0,0 +1,209 @@ +'use strict' + +const { expect } = require('chai') +const { promisify } = require('node:util') +const { execFile } = require('node:child_process') +const { Snapshot, Author, Change } = require('overleaf-editor-core') +const cleanup = require('./support/cleanup') +const redisBackend = require('../../../../storage/lib/chunk_store/redis') +const redis = require('../../../../storage/lib/redis') +const rclient = redis.rclientHistory +const keySchema = redisBackend.keySchema + +const SCRIPT_PATH = 'storage/scripts/expire_redis_chunks.js' + +async function runExpireScript() { + const TIMEOUT = 10 * 1000 // 10 seconds + let result + try { + result = await promisify(execFile)('node', [SCRIPT_PATH], { + encoding: 'utf-8', + timeout: TIMEOUT, + env: { + ...process.env, + LOG_LEVEL: 'debug', // Override LOG_LEVEL for script output + }, + }) + result.status = 0 + } catch (err) { + const { stdout, stderr, code } = err + if (typeof code !== 'number') { + console.error('Error running expire script:', err) + throw err + } + result = { stdout, stderr, status: code } + } + // The script might exit with status 1 if it finds no keys to process, which is ok + if (result.status !== 0 && result.status !== 1) { + console.error('Expire script failed:', result.stderr) + throw new Error(`expire script failed with status ${result.status}`) + } + return result +} + +// Helper to set up a basic project state in Redis +async function setupProjectState( + projectId, + { + headVersion = 0, + persistedVersion = null, + expireTime = null, + persistTime = null, + changes = [], + } +) { + const headSnapshot = new Snapshot() + await rclient.set( + keySchema.head({ projectId }), + JSON.stringify(headSnapshot.toRaw()) + ) + await rclient.set( + keySchema.headVersion({ projectId }), + headVersion.toString() + ) + + if (persistedVersion !== null) { + await rclient.set( + keySchema.persistedVersion({ projectId }), + persistedVersion.toString() + ) + } + if (expireTime !== null) { + await rclient.set( + keySchema.expireTime({ projectId }), + expireTime.toString() + ) + } + if (persistTime !== null) { + await rclient.set( + keySchema.persistTime({ projectId }), + persistTime.toString() + ) + } + if (changes.length > 0) { + const rawChanges = changes.map(c => JSON.stringify(c.toRaw())) + await rclient.rpush(keySchema.changes({ projectId }), ...rawChanges) + } +} + +function makeChange() { + const timestamp = new Date() + const author = new Author(123, 'test@example.com', 'Test User') + return new Change([], timestamp, [author]) +} + +describe('expire_redis_chunks script', function () { + beforeEach(cleanup.everything) + + let now, past, future + + // Setup all projects and run the script once before tests + beforeEach(async function () { + now = Date.now() + past = now - 10000 // 10 seconds ago + future = now + 60000 // 1 minute in the future + + // Setup all project states explicitly + await setupProjectState('expired_persisted', { + headVersion: 2, + persistedVersion: 2, + expireTime: past, + }) + await setupProjectState('expired_initial_state', { + headVersion: 0, + persistedVersion: 0, + expireTime: past, + }) + await setupProjectState('expired_persisted_with_job', { + headVersion: 2, + persistedVersion: 2, + expireTime: past, + persistTime: future, + }) + await setupProjectState('expired_not_persisted', { + headVersion: 3, + persistedVersion: 2, + expireTime: past, + changes: [makeChange()], + }) + await setupProjectState('expired_no_persisted_version', { + headVersion: 1, + persistedVersion: null, + expireTime: past, + changes: [makeChange()], + }) + await setupProjectState('future_expired_persisted', { + headVersion: 2, + persistedVersion: 2, + expireTime: future, + }) + await setupProjectState('future_expired_not_persisted', { + headVersion: 3, + persistedVersion: 2, + expireTime: future, + changes: [makeChange()], + }) + await setupProjectState('no_expire_time', { + headVersion: 1, + persistedVersion: 1, + expireTime: null, + }) + + // Run the expire script once after all projects are set up + await runExpireScript() + }) + + async function checkProjectStatus(projectId) { + const exists = + (await rclient.exists(keySchema.headVersion({ projectId }))) === 1 + return exists ? 'exists' : 'deleted' + } + + it('should expire a project when expireTime is past and it is fully persisted', async function () { + const projectId = 'expired_persisted' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('deleted') + }) + + it('should expire a project when expireTime is past and it has no changes (initial state)', async function () { + const projectId = 'expired_initial_state' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('deleted') + }) + + it('should expire a project when expireTime is past and it is fully persisted even if persistTime is set', async function () { + const projectId = 'expired_persisted_with_job' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('deleted') + }) + + it('should not expire a project when expireTime is past but it is not fully persisted', async function () { + const projectId = 'expired_not_persisted' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('exists') + }) + + it('should not expire a project when expireTime is past but persistedVersion is not set', async function () { + const projectId = 'expired_no_persisted_version' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('exists') + }) + + it('should not expire a project when expireTime is in the future (even if fully persisted)', async function () { + const projectId = 'future_expired_persisted' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('exists') + }) + + it('should not expire a project when expireTime is in the future (if not fully persisted)', async function () { + const projectId = 'future_expired_not_persisted' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('exists') + }) + + it('should not expire a project when expireTime is not set', async function () { + const projectId = 'no_expire_time' + const status = await checkProjectStatus(projectId) + expect(status).to.equal('exists') + }) +})