From 626416ed028d64e82ba85abfbfa15a366a1db1e4 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 23 Apr 2025 12:37:31 +0100 Subject: [PATCH] Merge pull request #24945 from overleaf/bg-redis-buffer-stats-script add script for listing buffer stats from redis GitOrigin-RevId: 7836563d51a5c6ded264d0e709d5cfcda70596e3 --- services/history-v1/storage/lib/scan.js | 37 +++++ .../scripts/list_redis_buffer_stats.js | 145 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 services/history-v1/storage/lib/scan.js create mode 100644 services/history-v1/storage/scripts/list_redis_buffer_stats.js diff --git a/services/history-v1/storage/lib/scan.js b/services/history-v1/storage/lib/scan.js new file mode 100644 index 0000000000..6527331479 --- /dev/null +++ b/services/history-v1/storage/lib/scan.js @@ -0,0 +1,37 @@ +const BATCH_SIZE = 1000 // Default batch size for SCAN + +/** + * Asynchronously scans a Redis instance or cluster for keys matching a pattern. + * + * This function handles both standalone Redis instances and Redis clusters. + * For clusters, it iterates over all master nodes. It yields keys in batches + * as they are found by the SCAN command. + * + * @param {object} redisClient - The Redis client instance (from @overleaf/redis-wrapper). + * @param {string} pattern - The pattern to match keys against (e.g., 'user:*'). + * @param {number} [count=BATCH_SIZE] - Optional hint for Redis SCAN count per iteration. + * @yields {string[]} A batch of matching keys. + */ +async function* scanRedisCluster(redisClient, pattern, count = BATCH_SIZE) { + const nodes = redisClient.nodes ? redisClient.nodes('master') : [redisClient] + + for (const node of nodes) { + let cursor = '0' + do { + // redisClient from @overleaf/redis-wrapper uses ioredis style commands + const [nextCursor, keys] = await node.scan( + cursor, + 'MATCH', + pattern, + 'COUNT', + count + ) + cursor = nextCursor + if (keys.length > 0) { + yield keys + } + } while (cursor !== '0') + } +} + +module.exports = { scanRedisCluster } diff --git a/services/history-v1/storage/scripts/list_redis_buffer_stats.js b/services/history-v1/storage/scripts/list_redis_buffer_stats.js new file mode 100644 index 0000000000..a53a939e44 --- /dev/null +++ b/services/history-v1/storage/scripts/list_redis_buffer_stats.js @@ -0,0 +1,145 @@ +const { rclientHistory, disconnect } = require('../lib/redis') +const { scanRedisCluster } = require('../lib/scan') + +// Lua script to get snapshot length, change lengths, and change timestamps +// Assumes snapshot key is a string and changes key is a list. +const LUA_SCRIPT = ` + -- local cjson = require('cjson') + local snapshotKey = KEYS[1] + local changesKey = KEYS[2] + + -- Get snapshot length (returns 0 if key does not exist) + local snapshotLen = redis.call('STRLEN', snapshotKey) + + -- Return nil if snapshot is empty + if snapshotLen == 0 then + return nil + end + + local changeLengths = {} + local changeTimestamps = {} + + -- Get all changes (returns empty list if key does not exist) + local changes = redis.call('LRANGE', changesKey, 0, -1) + + -- FIXME: it would be better to send all the changes back and do the processing + -- in JS to avoid blocking redis, if we need to run this script regularly + for i, change in ipairs(changes) do + -- Calculate length + table.insert(changeLengths, string.len(change)) + + -- Attempt to decode JSON and extract timestamp + local ok, decoded = pcall(cjson.decode, change) + if ok and type(decoded) == 'table' and decoded.timestamp then + table.insert(changeTimestamps, decoded.timestamp) + else + -- Handle cases where decoding fails or timestamp is missing + -- Log or insert a placeholder like nil if needed, otherwise skip + table.insert(changeTimestamps, nil) -- Keep placeholder for consistency + end + end + + -- Return snapshot length, list of change lengths, and list of change timestamps + return {snapshotLen, changeLengths, changeTimestamps} +` + +// Define the command if it doesn't exist +if (!rclientHistory.getProjectBufferStats) { + rclientHistory.defineCommand('getProjectBufferStats', { + numberOfKeys: 2, + lua: LUA_SCRIPT, + }) +} + +/** + * Processes a single project ID: fetches its buffer stats from Redis + * and writes the results to the output stream in CSV format. + * + * @param {string} projectId The project ID to process. + * @param {WritableStream} outputStream The stream to write CSV output to. + */ +async function processProject(projectId, outputStream) { + try { + // Get current time in milliseconds *before* fetching data + const nowMs = Date.now() + + // Execute the Lua script + const result = await rclientHistory.getProjectBufferStats( + `snapshot:${projectId}`, + `changes:${projectId}` + ) + + // Check if the result is null (e.g., snapshot is empty) + if (result === null) { + console.log( + `Skipping project ${projectId}: Snapshot is empty or does not exist.` + ) + return + } + + const [snapshotSize, changeSizes, changeTimestamps] = result + + // Output snapshot size + outputStream.write(`${projectId},snapshotSize,${snapshotSize}\n`) + outputStream.write(`${projectId},changeCount,${changeSizes.length}\n`) + + const changes = changeSizes.map((size, index) => [ + size, + changeTimestamps[index], + ]) + + let totalChangeSize = 0 + // Output change sizes + for (const [changeSize, changeTimestamp] of changes) { + totalChangeSize += parseInt(changeSize, 10) + const age = nowMs - new Date(changeTimestamp) + const ageInSeconds = Math.floor(age / 1000) + outputStream.write(`${projectId},change,${changeSize},${ageInSeconds}\n`) + } + outputStream.write(`${projectId},totalChangeSize,${totalChangeSize}\n`) + } catch (err) { + // Log error for this specific project but continue with others + console.error(`Error processing project ${projectId}:`, err) + } +} + +async function main() { + const outputStream = process.stdout + + // Write CSV header + outputStream.write('projectId,type,size,age\n') + + try { + const scanPattern = 'snapshot:*' + console.log(`Scanning Redis for keys matching "${scanPattern}"...`) + + for await (const keysBatch of scanRedisCluster( + rclientHistory, + scanPattern + )) { + for (const key of keysBatch) { + const parts = key.split(':') + if (parts.length !== 2 || parts[0] !== 'snapshot') { + console.warn(`Skipping malformed key: ${key}`) + continue + } + const projectId = parts[1] + + // Call processProject directly and await it sequentially + await processProject(projectId, outputStream) + } + } + + console.log('Finished processing keys.') + } catch (error) { + console.error('Error during Redis scan:', error) + } finally { + await disconnect() + console.log('Redis connections closed.') + } +} + +main().catch(err => { + console.error('Unhandled error in main:', err) + process.exit(1) +})