mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-06-11 07:00:47 +02:00
Merge pull request #25086 from overleaf/bg-history-buffer-use-expire-time
add expire time to redis buffer in history-v1 GitOrigin-RevId: 3d74957c341e62e536dc60869a7ca71ac173e380
This commit is contained in:
@@ -16,6 +16,9 @@ const keySchema = {
|
||||
changes({ projectId }) {
|
||||
return `changes:{${projectId}}`
|
||||
},
|
||||
expireTime({ projectId }) {
|
||||
return `expire-time:{${projectId}}`
|
||||
},
|
||||
}
|
||||
|
||||
rclient.defineCommand('get_current_chunk', {
|
||||
@@ -122,6 +125,9 @@ rclient.defineCommand('get_current_chunk_metadata', {
|
||||
numberOfKeys: 2,
|
||||
lua: `
|
||||
local startVersionValue = redis.call('GET', KEYS[1])
|
||||
if not startVersionValue then
|
||||
return nil -- this is a cache-miss
|
||||
end
|
||||
local changesCount = redis.call('LLEN', KEYS[2])
|
||||
return {startVersionValue, changesCount}
|
||||
`,
|
||||
@@ -152,17 +158,19 @@ async function getCurrentChunkMetadata(projectId) {
|
||||
}
|
||||
|
||||
rclient.defineCommand('set_current_chunk', {
|
||||
numberOfKeys: 3,
|
||||
numberOfKeys: 4,
|
||||
lua: `
|
||||
local snapshotValue = ARGV[1]
|
||||
local startVersionValue = ARGV[2]
|
||||
redis.call('SETEX', KEYS[1], ${TEMPORARY_CACHE_LIFETIME}, snapshotValue)
|
||||
redis.call('SETEX', KEYS[2], ${TEMPORARY_CACHE_LIFETIME}, startVersionValue)
|
||||
redis.call('DEL', KEYS[3]) -- clear the old changes list
|
||||
if #ARGV >= 3 then
|
||||
redis.call('RPUSH', KEYS[3], unpack(ARGV, 3))
|
||||
redis.call('EXPIRE', KEYS[3], ${TEMPORARY_CACHE_LIFETIME})
|
||||
local expireTime = ARGV[3]
|
||||
redis.call('SET', KEYS[1], snapshotValue)
|
||||
redis.call('SET', KEYS[2], startVersionValue)
|
||||
redis.call('SET', KEYS[3], expireTime)
|
||||
redis.call('DEL', KEYS[4]) -- clear the old changes list
|
||||
if #ARGV >= 4 then
|
||||
redis.call('RPUSH', KEYS[4], unpack(ARGV, 4))
|
||||
end
|
||||
|
||||
`,
|
||||
})
|
||||
|
||||
@@ -178,24 +186,28 @@ async function setCurrentChunk(projectId, chunk) {
|
||||
const snapshotKey = keySchema.snapshot({ projectId })
|
||||
const startVersionKey = keySchema.startVersion({ projectId })
|
||||
const changesKey = keySchema.changes({ projectId })
|
||||
const expireTimeKey = keySchema.expireTime({ projectId })
|
||||
|
||||
const snapshot = chunk.history.snapshot
|
||||
const startVersion = chunk.startVersion
|
||||
const changes = chunk.history.changes
|
||||
const expireTime = Date.now() + TEMPORARY_CACHE_LIFETIME * 1000
|
||||
|
||||
await rclient.set_current_chunk(
|
||||
snapshotKey,
|
||||
startVersionKey,
|
||||
changesKey,
|
||||
JSON.stringify(snapshot.toRaw()),
|
||||
startVersion,
|
||||
...changes.map(c => JSON.stringify(c.toRaw()))
|
||||
snapshotKey, // KEYS[1]
|
||||
startVersionKey, // KEYS[2]
|
||||
expireTimeKey, // KEYS[3]
|
||||
changesKey, // KEYS[4]
|
||||
JSON.stringify(snapshot.toRaw()), // ARGV[1]
|
||||
startVersion, // ARGV[2]
|
||||
expireTime, // ARGV[3]
|
||||
...changes.map(c => JSON.stringify(c.toRaw())) // ARGV[4..]
|
||||
)
|
||||
metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'success' })
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ err, projectId, chunk },
|
||||
'error setting current chunk inredis'
|
||||
'error setting current chunk in redis'
|
||||
)
|
||||
metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'error' })
|
||||
return null // while testing we will suppress any errors
|
||||
@@ -266,14 +278,67 @@ function compareChunks(projectId, cachedChunk, currentChunk) {
|
||||
return identical
|
||||
}
|
||||
|
||||
// Define Lua script for atomic cache clearing
|
||||
rclient.defineCommand('expire_chunk_cache', {
|
||||
numberOfKeys: 4,
|
||||
lua: `
|
||||
local currentTime = tonumber(ARGV[1])
|
||||
local expireTimeValue = redis.call('GET', KEYS[4])
|
||||
if not expireTimeValue then
|
||||
return nil -- this is a cache-miss
|
||||
end
|
||||
local expireTime = tonumber(expireTimeValue)
|
||||
if currentTime < expireTime then
|
||||
return nil -- cache is still valid
|
||||
end
|
||||
-- Cache is expired, proceed to delete the keys atomically
|
||||
redis.call('DEL', KEYS[1]) -- snapshot key
|
||||
redis.call('DEL', KEYS[2]) -- startVersion key
|
||||
redis.call('DEL', KEYS[3]) -- changes key
|
||||
redis.call('DEL', KEYS[4]) -- expireTime key
|
||||
return 1
|
||||
`,
|
||||
})
|
||||
|
||||
/**
|
||||
* Expire cache entries for a project's chunk data if needed
|
||||
* @param {string} projectId - The ID of the project whose cache should be cleared
|
||||
* @returns {Promise<boolean>} A promise that resolves to true if successful, false on error
|
||||
*/
|
||||
async function expireCurrentChunk(projectId, currentTime) {
|
||||
try {
|
||||
const snapshotKey = keySchema.snapshot({ projectId })
|
||||
const startVersionKey = keySchema.startVersion({ projectId })
|
||||
const changesKey = keySchema.changes({ projectId })
|
||||
const expireTimeKey = keySchema.expireTime({ projectId })
|
||||
const result = await rclient.expire_chunk_cache(
|
||||
snapshotKey,
|
||||
startVersionKey,
|
||||
changesKey,
|
||||
expireTimeKey,
|
||||
currentTime || Date.now()
|
||||
)
|
||||
if (!result) {
|
||||
return false // not expired
|
||||
}
|
||||
metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'success' })
|
||||
return true
|
||||
} catch (err) {
|
||||
logger.error({ err, projectId }, 'error clearing chunk cache from redis')
|
||||
metrics.inc('chunk_store.redis.expire_cache', 1, { status: 'error' })
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Define Lua script for atomic cache clearing
|
||||
rclient.defineCommand('clear_chunk_cache', {
|
||||
numberOfKeys: 3,
|
||||
numberOfKeys: 4,
|
||||
lua: `
|
||||
-- Delete all keys related to a project's chunk cache atomically
|
||||
redis.call('DEL', KEYS[1]) -- snapshot key
|
||||
redis.call('DEL', KEYS[2]) -- startVersion key
|
||||
redis.call('DEL', KEYS[3]) -- changes key
|
||||
redis.call('DEL', KEYS[4]) -- expireTime key
|
||||
return 1
|
||||
`,
|
||||
})
|
||||
@@ -288,8 +353,14 @@ async function clearCache(projectId) {
|
||||
const snapshotKey = keySchema.snapshot({ projectId })
|
||||
const startVersionKey = keySchema.startVersion({ projectId })
|
||||
const changesKey = keySchema.changes({ projectId })
|
||||
const expireTimeKey = keySchema.expireTime({ projectId })
|
||||
|
||||
await rclient.clear_chunk_cache(snapshotKey, startVersionKey, changesKey)
|
||||
await rclient.clear_chunk_cache(
|
||||
snapshotKey,
|
||||
startVersionKey,
|
||||
changesKey,
|
||||
expireTimeKey
|
||||
)
|
||||
metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' })
|
||||
return true
|
||||
} catch (err) {
|
||||
@@ -307,5 +378,6 @@ module.exports = {
|
||||
checkCacheValidity,
|
||||
checkCacheValidityWithMetadata,
|
||||
compareChunks,
|
||||
expireCurrentChunk,
|
||||
clearCache,
|
||||
}
|
||||
|
||||
@@ -34,4 +34,19 @@ async function* scanRedisCluster(redisClient, pattern, count = BATCH_SIZE) {
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { scanRedisCluster }
|
||||
/**
|
||||
* Extracts the content within the first pair of curly braces {} from a string.
|
||||
* This is used to extract a user ID or project ID from a Redis key.
|
||||
*
|
||||
* @param {string} key - The input string containing content within curly braces.
|
||||
* @returns {string | null} The extracted content (the key ID) if found, otherwise null.
|
||||
*/
|
||||
function extractKeyId(key) {
|
||||
const match = key.match(/\{(.*?)\}/)
|
||||
if (match && match[1]) {
|
||||
return match[1]
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
module.exports = { scanRedisCluster, extractKeyId }
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
const logger = require('@overleaf/logger')
|
||||
const commandLineArgs = require('command-line-args') // Add this line
|
||||
const redis = require('../lib/redis')
|
||||
const { scanRedisCluster, extractKeyId } = require('../lib/scan')
|
||||
const { expireCurrentChunk } = require('../lib/chunk_store/redis')
|
||||
|
||||
const rclient = redis.rclientHistory
|
||||
const EXPIRE_TIME_KEY_PATTERN = `expire-time:{*}`
|
||||
|
||||
const optionDefinitions = [{ name: 'dry-run', alias: 'd', type: Boolean }]
|
||||
const options = commandLineArgs(optionDefinitions)
|
||||
const DRY_RUN = options['dry-run'] || false
|
||||
|
||||
logger.initialize('expire-redis-chunks')
|
||||
|
||||
function isExpiredKey(expireTimestamp, currentTime) {
|
||||
const expireTime = parseInt(expireTimestamp, 10)
|
||||
if (isNaN(expireTime)) {
|
||||
return false
|
||||
}
|
||||
logger.debug(
|
||||
{
|
||||
expireTime,
|
||||
currentTime,
|
||||
expireIn: expireTime - currentTime,
|
||||
expired: currentTime > expireTime,
|
||||
},
|
||||
'Checking if key is expired'
|
||||
)
|
||||
return currentTime > expireTime
|
||||
}
|
||||
|
||||
async function processKeysBatch(keysBatch, rclient) {
|
||||
let clearedKeyCount = 0
|
||||
if (keysBatch.length === 0) {
|
||||
return 0
|
||||
}
|
||||
// For efficiency, we use MGET to fetch all the timestamps in a single request
|
||||
const expireTimestamps = await rclient.mget(keysBatch)
|
||||
const currentTime = Date.now()
|
||||
for (let i = 0; i < keysBatch.length; i++) {
|
||||
const key = keysBatch[i]
|
||||
// For each key, do a quick check to see if the key is expired before calling
|
||||
// the LUA script to expire the chunk atomically.
|
||||
if (isExpiredKey(expireTimestamps[i], currentTime)) {
|
||||
const projectId = extractKeyId(key)
|
||||
if (DRY_RUN) {
|
||||
logger.info({ projectId }, '[Dry Run] Would expire chunk for project')
|
||||
} else {
|
||||
await expireCurrentChunk(projectId)
|
||||
}
|
||||
clearedKeyCount++
|
||||
}
|
||||
}
|
||||
return clearedKeyCount
|
||||
}
|
||||
|
||||
async function expireRedisChunks() {
|
||||
let scannedKeyCount = 0
|
||||
let clearedKeyCount = 0
|
||||
const START_TIME = Date.now()
|
||||
|
||||
if (DRY_RUN) {
|
||||
// Use global DRY_RUN
|
||||
logger.info({}, 'starting expireRedisChunks scan in DRY RUN mode')
|
||||
} else {
|
||||
logger.info({}, 'starting expireRedisChunks scan')
|
||||
}
|
||||
|
||||
for await (const keysBatch of scanRedisCluster(
|
||||
rclient,
|
||||
EXPIRE_TIME_KEY_PATTERN
|
||||
)) {
|
||||
scannedKeyCount += keysBatch.length
|
||||
clearedKeyCount += await processKeysBatch(keysBatch, rclient)
|
||||
if (scannedKeyCount % 1000 === 0) {
|
||||
logger.info(
|
||||
{ scannedKeyCount, clearedKeyCount },
|
||||
'expireRedisChunks scan progress'
|
||||
)
|
||||
}
|
||||
}
|
||||
logger.info(
|
||||
{
|
||||
scannedKeyCount,
|
||||
clearedKeyCount,
|
||||
elapsedTimeInSeconds: Math.floor((Date.now() - START_TIME) / 1000),
|
||||
dryRun: DRY_RUN,
|
||||
},
|
||||
'expireRedisChunks scan complete'
|
||||
)
|
||||
await redis.disconnect()
|
||||
}
|
||||
|
||||
expireRedisChunks().catch(err => {
|
||||
logger.fatal({ err }, 'unhandled error in expireRedisChunks')
|
||||
process.exit(1)
|
||||
})
|
||||
@@ -731,4 +731,119 @@ describe('chunk store Redis backend', function () {
|
||||
expect(validChunk).to.be.null
|
||||
})
|
||||
})
|
||||
|
||||
describe('getCurrentChunkMetadata', function () {
|
||||
it('should return metadata for a cached chunk', async function () {
|
||||
// Cache a chunk
|
||||
const snapshot = new Snapshot()
|
||||
const history = new History(snapshot, [
|
||||
new Change(
|
||||
[new AddFileOperation('test.tex', File.fromString('Hello'))],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
new Change(
|
||||
[new AddFileOperation('other.tex', File.fromString('Bonjour'))],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
])
|
||||
const chunk = new Chunk(history, 10)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
|
||||
const metadata = await redisBackend.getCurrentChunkMetadata(projectId)
|
||||
expect(metadata).to.deep.equal({ startVersion: 10, changesCount: 2 })
|
||||
})
|
||||
|
||||
it('should return null if no chunk is cached for the project', async function () {
|
||||
const metadata = await redisBackend.getCurrentChunkMetadata(
|
||||
'non-existent-project-id'
|
||||
)
|
||||
expect(metadata).to.be.null
|
||||
})
|
||||
|
||||
it('should return metadata with zero changes for a zero-change chunk', async function () {
|
||||
// Cache a chunk with no changes
|
||||
const snapshot = new Snapshot()
|
||||
const history = new History(snapshot, [])
|
||||
const chunk = new Chunk(history, 5)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
|
||||
const metadata = await redisBackend.getCurrentChunkMetadata(projectId)
|
||||
expect(metadata).to.deep.equal({ startVersion: 5, changesCount: 0 })
|
||||
})
|
||||
})
|
||||
|
||||
describe('expireCurrentChunk', function () {
|
||||
const TEMPORARY_CACHE_LIFETIME_MS = 300 * 1000 // Match the value in redis.js
|
||||
|
||||
it('should return false and not expire a non-expired chunk', async function () {
|
||||
// Cache a chunk
|
||||
const snapshot = new Snapshot()
|
||||
const history = new History(snapshot, [])
|
||||
const chunk = new Chunk(history, 10)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
|
||||
// Attempt to expire immediately (should not be expired yet)
|
||||
const expired = await redisBackend.expireCurrentChunk(projectId)
|
||||
expect(expired).to.be.false
|
||||
|
||||
// Verify the chunk still exists
|
||||
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
|
||||
expect(cachedChunk).to.not.be.null
|
||||
expect(cachedChunk.getStartVersion()).to.equal(10)
|
||||
})
|
||||
|
||||
it('should return true and expire an expired chunk using currentTime', async function () {
|
||||
// Cache a chunk
|
||||
const snapshot = new Snapshot()
|
||||
const history = new History(snapshot, [])
|
||||
const chunk = new Chunk(history, 10)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
|
||||
// Calculate a time far enough in the future to ensure expiry
|
||||
const futureTime = Date.now() + TEMPORARY_CACHE_LIFETIME_MS + 5000 // 5 seconds past expiry
|
||||
|
||||
// Attempt to expire using the future time
|
||||
const expired = await redisBackend.expireCurrentChunk(
|
||||
projectId,
|
||||
futureTime
|
||||
)
|
||||
expect(expired).to.be.true
|
||||
|
||||
// Verify the chunk is gone
|
||||
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
|
||||
expect(cachedChunk).to.be.null
|
||||
|
||||
// Verify metadata is also gone
|
||||
const metadata = await redisBackend.getCurrentChunkMetadata(projectId)
|
||||
expect(metadata).to.be.null
|
||||
})
|
||||
|
||||
it('should return false if no chunk is cached for the project', async function () {
|
||||
const expired = await redisBackend.expireCurrentChunk(
|
||||
'non-existent-project'
|
||||
)
|
||||
expect(expired).to.be.false
|
||||
})
|
||||
|
||||
it('should return false if called with a currentTime before the expiry time', async function () {
|
||||
// Cache a chunk
|
||||
const snapshot = new Snapshot()
|
||||
const history = new History(snapshot, [])
|
||||
const chunk = new Chunk(history, 10)
|
||||
await redisBackend.setCurrentChunk(projectId, chunk)
|
||||
|
||||
// Use a time *before* the cache would normally expire
|
||||
const pastTime = Date.now() - 10000 // 10 seconds ago
|
||||
|
||||
// Attempt to expire using the past time
|
||||
const expired = await redisBackend.expireCurrentChunk(projectId, pastTime)
|
||||
expect(expired).to.be.false
|
||||
|
||||
// Verify the chunk still exists
|
||||
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
|
||||
expect(cachedChunk).to.not.be.null
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user