From 850da3477856ddfab7908ff178a7fdc703549492 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 28 Apr 2025 10:50:28 +0100 Subject: [PATCH] Merge pull request #25086 from overleaf/bg-history-buffer-use-expire-time add expire time to redis buffer in history-v1 GitOrigin-RevId: 3d74957c341e62e536dc60869a7ca71ac173e380 --- .../storage/lib/chunk_store/redis.js | 104 +++++++++++++--- services/history-v1/storage/lib/scan.js | 17 ++- .../storage/scripts/expire_redis_chunks.js | 98 +++++++++++++++ .../storage/chunk_store_redis_backend.test.js | 115 ++++++++++++++++++ 4 files changed, 317 insertions(+), 17 deletions(-) create mode 100644 services/history-v1/storage/scripts/expire_redis_chunks.js diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index 7e542a2b01..7a8beb3556 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -16,6 +16,9 @@ const keySchema = { changes({ projectId }) { return `changes:{${projectId}}` }, + expireTime({ projectId }) { + return `expire-time:{${projectId}}` + }, } rclient.defineCommand('get_current_chunk', { @@ -122,6 +125,9 @@ rclient.defineCommand('get_current_chunk_metadata', { numberOfKeys: 2, lua: ` local startVersionValue = redis.call('GET', KEYS[1]) + if not startVersionValue then + return nil -- this is a cache-miss + end local changesCount = redis.call('LLEN', KEYS[2]) return {startVersionValue, changesCount} `, @@ -152,17 +158,19 @@ async function getCurrentChunkMetadata(projectId) { } rclient.defineCommand('set_current_chunk', { - numberOfKeys: 3, + numberOfKeys: 4, lua: ` local snapshotValue = ARGV[1] local startVersionValue = ARGV[2] - redis.call('SETEX', KEYS[1], ${TEMPORARY_CACHE_LIFETIME}, snapshotValue) - redis.call('SETEX', KEYS[2], ${TEMPORARY_CACHE_LIFETIME}, startVersionValue) - redis.call('DEL', KEYS[3]) -- clear the old changes list - if #ARGV >= 3 then - redis.call('RPUSH', KEYS[3], unpack(ARGV, 3)) - redis.call('EXPIRE', KEYS[3], ${TEMPORARY_CACHE_LIFETIME}) + local expireTime = ARGV[3] + redis.call('SET', KEYS[1], snapshotValue) + redis.call('SET', KEYS[2], startVersionValue) + redis.call('SET', KEYS[3], expireTime) + redis.call('DEL', KEYS[4]) -- clear the old changes list + if #ARGV >= 4 then + redis.call('RPUSH', KEYS[4], unpack(ARGV, 4)) end + `, }) @@ -178,24 +186,28 @@ async function setCurrentChunk(projectId, chunk) { const snapshotKey = keySchema.snapshot({ projectId }) const startVersionKey = keySchema.startVersion({ projectId }) const changesKey = keySchema.changes({ projectId }) + const expireTimeKey = keySchema.expireTime({ projectId }) const snapshot = chunk.history.snapshot const startVersion = chunk.startVersion const changes = chunk.history.changes + const expireTime = Date.now() + TEMPORARY_CACHE_LIFETIME * 1000 await rclient.set_current_chunk( - snapshotKey, - startVersionKey, - changesKey, - JSON.stringify(snapshot.toRaw()), - startVersion, - ...changes.map(c => JSON.stringify(c.toRaw())) + snapshotKey, // KEYS[1] + startVersionKey, // KEYS[2] + expireTimeKey, // KEYS[3] + changesKey, // KEYS[4] + JSON.stringify(snapshot.toRaw()), // ARGV[1] + startVersion, // ARGV[2] + expireTime, // ARGV[3] + ...changes.map(c => JSON.stringify(c.toRaw())) // ARGV[4..] ) metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'success' }) } catch (err) { logger.error( { err, projectId, chunk }, - 'error setting current chunk inredis' + 'error setting current chunk in redis' ) metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'error' }) return null // while testing we will suppress any errors @@ -266,14 +278,67 @@ function compareChunks(projectId, cachedChunk, currentChunk) { return identical } +// Define Lua script for atomic cache clearing +rclient.defineCommand('expire_chunk_cache', { + numberOfKeys: 4, + lua: ` + local currentTime = tonumber(ARGV[1]) + local expireTimeValue = redis.call('GET', KEYS[4]) + if not expireTimeValue then + return nil -- this is a cache-miss + end + local expireTime = tonumber(expireTimeValue) + if currentTime < expireTime then + return nil -- cache is still valid + end + -- Cache is expired, proceed to delete the keys atomically + redis.call('DEL', KEYS[1]) -- snapshot key + redis.call('DEL', KEYS[2]) -- startVersion key + redis.call('DEL', KEYS[3]) -- changes key + redis.call('DEL', KEYS[4]) -- expireTime key + return 1 + `, +}) + +/** + * Expire cache entries for a project's chunk data if needed + * @param {string} projectId - The ID of the project whose cache should be cleared + * @returns {Promise} A promise that resolves to true if successful, false on error + */ +async function expireCurrentChunk(projectId, currentTime) { + try { + const snapshotKey = keySchema.snapshot({ projectId }) + const startVersionKey = keySchema.startVersion({ projectId }) + const changesKey = keySchema.changes({ projectId }) + const expireTimeKey = keySchema.expireTime({ projectId }) + const result = await rclient.expire_chunk_cache( + snapshotKey, + startVersionKey, + changesKey, + expireTimeKey, + currentTime || Date.now() + ) + if (!result) { + return false // not expired + } + metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'success' }) + return true + } catch (err) { + logger.error({ err, projectId }, 'error clearing chunk cache from redis') + metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'error' }) + return false + } +} + // Define Lua script for atomic cache clearing rclient.defineCommand('clear_chunk_cache', { - numberOfKeys: 3, + numberOfKeys: 4, lua: ` -- Delete all keys related to a project's chunk cache atomically redis.call('DEL', KEYS[1]) -- snapshot key redis.call('DEL', KEYS[2]) -- startVersion key redis.call('DEL', KEYS[3]) -- changes key + redis.call('DEL', KEYS[4]) -- expireTime key return 1 `, }) @@ -288,8 +353,14 @@ async function clearCache(projectId) { const snapshotKey = keySchema.snapshot({ projectId }) const startVersionKey = keySchema.startVersion({ projectId }) const changesKey = keySchema.changes({ projectId }) + const expireTimeKey = keySchema.expireTime({ projectId }) - await rclient.clear_chunk_cache(snapshotKey, startVersionKey, changesKey) + await rclient.clear_chunk_cache( + snapshotKey, + startVersionKey, + changesKey, + expireTimeKey + ) metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' }) return true } catch (err) { @@ -307,5 +378,6 @@ module.exports = { checkCacheValidity, checkCacheValidityWithMetadata, compareChunks, + expireCurrentChunk, clearCache, } diff --git a/services/history-v1/storage/lib/scan.js b/services/history-v1/storage/lib/scan.js index 6527331479..45d0c327fe 100644 --- a/services/history-v1/storage/lib/scan.js +++ b/services/history-v1/storage/lib/scan.js @@ -34,4 +34,19 @@ async function* scanRedisCluster(redisClient, pattern, count = BATCH_SIZE) { } } -module.exports = { scanRedisCluster } +/** + * Extracts the content within the first pair of curly braces {} from a string. + * This is used to extract a user ID or project ID from a Redis key. + * + * @param {string} key - The input string containing content within curly braces. + * @returns {string | null} The extracted content (the key ID) if found, otherwise null. + */ +function extractKeyId(key) { + const match = key.match(/\{(.*?)\}/) + if (match && match[1]) { + return match[1] + } + return null +} + +module.exports = { scanRedisCluster, extractKeyId } diff --git a/services/history-v1/storage/scripts/expire_redis_chunks.js b/services/history-v1/storage/scripts/expire_redis_chunks.js new file mode 100644 index 0000000000..11b34101da --- /dev/null +++ b/services/history-v1/storage/scripts/expire_redis_chunks.js @@ -0,0 +1,98 @@ +const logger = require('@overleaf/logger') +const commandLineArgs = require('command-line-args') // Add this line +const redis = require('../lib/redis') +const { scanRedisCluster, extractKeyId } = require('../lib/scan') +const { expireCurrentChunk } = require('../lib/chunk_store/redis') + +const rclient = redis.rclientHistory +const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}` + +const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }] +const options = commandLineArgs(optionDefinitions) +const DRY_RUN = options['dry-run'] || false + +logger.initialize('expire-redis-chunks') + +function isExpiredKey(expireTimestamp, currentTime) { + const expireTime = parseInt(expireTimestamp, 10) + if (isNaN(expireTime)) { + return false + } + logger.debug( + { + expireTime, + currentTime, + expireIn: expireTime - currentTime, + expired: currentTime > expireTime, + }, + 'Checking if key is expired' + ) + return currentTime > expireTime +} + +async function processKeysBatch(keysBatch, rclient) { + let clearedKeyCount = 0 + if (keysBatch.length === 0) { + return 0 + } + // For efficiency, we use MGET to fetch all the timestamps in a single request + const expireTimestamps = await rclient.mget(keysBatch) + const currentTime = Date.now() + for (let i = 0; i < keysBatch.length; i++) { + const key = keysBatch[i] + // For each key, do a quick check to see if the key is expired before calling + // the LUA script to expire the chunk atomically. + if (isExpiredKey(expireTimestamps[i], currentTime)) { + const projectId = extractKeyId(key) + if (DRY_RUN) { + logger.info({ projectId }, '[Dry Run] Would expire chunk for project') + } else { + await expireCurrentChunk(projectId) + } + clearedKeyCount++ + } + } + return clearedKeyCount +} + +async function expireRedisChunks() { + let scannedKeyCount = 0 + let clearedKeyCount = 0 + const START_TIME = Date.now() + + if (DRY_RUN) { + // Use global DRY_RUN + logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode') + } else { + logger.info({}, 'starting expireRedisChunks scan') + } + + for await (const keysBatch of scanRedisCluster( + rclient, + EXPIRE_TIME_KEY_PATTERN + )) { + scannedKeyCount += keysBatch.length + clearedKeyCount += await processKeysBatch(keysBatch, rclient) + if (scannedKeyCount % 1000 === 0) { + logger.info( + { scannedKeyCount, clearedKeyCount }, + 'expireRedisChunks scan progress' + ) + } + } + logger.info( + { + scannedKeyCount, + clearedKeyCount, + elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000), + dryRun: DRY_RUN, + }, + 'expireRedisChunks scan complete' + ) + await redis.disconnect() +} + +expireRedisChunks().catch(err => { + logger.fatal({ err }, 'unhandled error in expireRedisChunks') + process.exit(1) +}) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index 4195632b50..a9aea55a97 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -731,4 +731,119 @@ describe('chunk store Redis backend', function () { expect(validChunk).to.be.null }) }) + + describe('getCurrentChunkMetadata', function () { + it('should return metadata for a cached chunk', async function () { + // Cache a chunk + const snapshot = new Snapshot() + const history = new History(snapshot, [ + new Change( + [new AddFileOperation('test.tex', File.fromString('Hello'))], + new Date(), + [] + ), + new Change( + [new AddFileOperation('other.tex', File.fromString('Bonjour'))], + new Date(), + [] + ), + ]) + const chunk = new Chunk(history, 10) + await redisBackend.setCurrentChunk(projectId, chunk) + + const metadata = await redisBackend.getCurrentChunkMetadata(projectId) + expect(metadata).to.deep.equal({ startVersion: 10, changesCount: 2 }) + }) + + it('should return null if no chunk is cached for the project', async function () { + const metadata = await redisBackend.getCurrentChunkMetadata( + 'non-existent-project-id' + ) + expect(metadata).to.be.null + }) + + it('should return metadata with zero changes for a zero-change chunk', async function () { + // Cache a chunk with no changes + const snapshot = new Snapshot() + const history = new History(snapshot, []) + const chunk = new Chunk(history, 5) + await redisBackend.setCurrentChunk(projectId, chunk) + + const metadata = await redisBackend.getCurrentChunkMetadata(projectId) + expect(metadata).to.deep.equal({ startVersion: 5, changesCount: 0 }) + }) + }) + + describe('expireCurrentChunk', function () { + const TEMPORARY_CACHE_LIFETIME_MS = 300 * 1000 // Match the value in redis.js + + it('should return false and not expire a non-expired chunk', async function () { + // Cache a chunk + const snapshot = new Snapshot() + const history = new History(snapshot, []) + const chunk = new Chunk(history, 10) + await redisBackend.setCurrentChunk(projectId, chunk) + + // Attempt to expire immediately (should not be expired yet) + const expired = await redisBackend.expireCurrentChunk(projectId) + expect(expired).to.be.false + + // Verify the chunk still exists + const cachedChunk = await redisBackend.getCurrentChunk(projectId) + expect(cachedChunk).to.not.be.null + expect(cachedChunk.getStartVersion()).to.equal(10) + }) + + it('should return true and expire an expired chunk using currentTime', async function () { + // Cache a chunk + const snapshot = new Snapshot() + const history = new History(snapshot, []) + const chunk = new Chunk(history, 10) + await redisBackend.setCurrentChunk(projectId, chunk) + + // Calculate a time far enough in the future to ensure expiry + const futureTime = Date.now() + TEMPORARY_CACHE_LIFETIME_MS + 5000 // 5 seconds past expiry + + // Attempt to expire using the future time + const expired = await redisBackend.expireCurrentChunk( + projectId, + futureTime + ) + expect(expired).to.be.true + + // Verify the chunk is gone + const cachedChunk = await redisBackend.getCurrentChunk(projectId) + expect(cachedChunk).to.be.null + + // Verify metadata is also gone + const metadata = await redisBackend.getCurrentChunkMetadata(projectId) + expect(metadata).to.be.null + }) + + it('should return false if no chunk is cached for the project', async function () { + const expired = await redisBackend.expireCurrentChunk( + 'non-existent-project' + ) + expect(expired).to.be.false + }) + + it('should return false if called with a currentTime before the expiry time', async function () { + // Cache a chunk + const snapshot = new Snapshot() + const history = new History(snapshot, []) + const chunk = new Chunk(history, 10) + await redisBackend.setCurrentChunk(projectId, chunk) + + // Use a time *before* the cache would normally expire + const pastTime = Date.now() - 10000 // 10 seconds ago + + // Attempt to expire using the past time + const expired = await redisBackend.expireCurrentChunk(projectId, pastTime) + expect(expired).to.be.false + + // Verify the chunk still exists + const cachedChunk = await redisBackend.getCurrentChunk(projectId) + expect(cachedChunk).to.not.be.null + }) + }) })