diff --git a/services/history-v1/api/controllers/projects.js b/services/history-v1/api/controllers/projects.js index d42e29ab81..67707589fd 100644 --- a/services/history-v1/api/controllers/projects.js +++ b/services/history-v1/api/controllers/projects.js @@ -18,6 +18,7 @@ const { HashCheckBlobStore, ProjectArchive, zipStore, + chunkBuffer, } = require('../../storage') const render = require('./render') @@ -44,7 +45,7 @@ async function initializeProject(req, res, next) { async function getLatestContent(req, res, next) { const projectId = req.swagger.params.project_id.value const blobStore = new BlobStore(projectId) - const chunk = await chunkStore.loadLatest(projectId) + const chunk = await chunkBuffer.loadLatest(projectId) const snapshot = chunk.getSnapshot() snapshot.applyAll(chunk.getChanges()) await snapshot.loadFiles('eager', blobStore) @@ -63,7 +64,7 @@ async function getContentAtVersion(req, res, next) { async function getLatestHashedContent(req, res, next) { const projectId = req.swagger.params.project_id.value const blobStore = new HashCheckBlobStore(new BlobStore(projectId)) - const chunk = await chunkStore.loadLatest(projectId) + const chunk = await chunkBuffer.loadLatest(projectId) const snapshot = chunk.getSnapshot() snapshot.applyAll(chunk.getChanges()) await snapshot.loadFiles('eager', blobStore) @@ -74,7 +75,7 @@ async function getLatestHashedContent(req, res, next) { async function getLatestHistory(req, res, next) { const projectId = req.swagger.params.project_id.value try { - const chunk = await chunkStore.loadLatest(projectId) + const chunk = await chunkBuffer.loadLatest(projectId) const chunkResponse = new ChunkResponse(chunk) res.json(chunkResponse.toRaw()) } catch (err) { @@ -153,7 +154,7 @@ async function getChanges(req, res, next) { } const changes = [] - let chunk = await chunkStore.loadLatest(projectId) + let chunk = await chunkBuffer.loadLatest(projectId) if (since > chunk.getEndVersion()) { return res.status(400).json({ diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index 9e2f7a61dc..009a867148 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -1,6 +1,7 @@ exports.BatchBlobStore = require('./lib/batch_blob_store') exports.blobHash = require('./lib/blob_hash') exports.HashCheckBlobStore = require('./lib/hash_check_blob_store') +exports.chunkBuffer = require('./lib/chunk_buffer') exports.chunkStore = require('./lib/chunk_store') exports.historyStore = require('./lib/history_store').historyStore exports.knex = require('./lib/knex') diff --git a/services/history-v1/storage/lib/chunk_buffer/index.js b/services/history-v1/storage/lib/chunk_buffer/index.js new file mode 100644 index 0000000000..fe30b993d7 --- /dev/null +++ b/services/history-v1/storage/lib/chunk_buffer/index.js @@ -0,0 +1,40 @@ +'use strict' + +/** + * @module storage/lib/chunk_buffer + */ + +const chunkStore = require('../chunk_store') +const redisBackend = require('../chunk_store/redis') +const metrics = require('@overleaf/metrics') +/** + * Load the latest Chunk stored for a project, including blob metadata. + * + * @param {string} projectId + * @return {Promise.} + */ +async function loadLatest(projectId) { + const cachedChunk = await redisBackend.getCurrentChunk(projectId) + const chunkRecord = await chunkStore.loadLatestRaw(projectId) + const cachedChunkIsValid = redisBackend.checkCacheValidityWithMetadata( + cachedChunk, + chunkRecord + ) + if (cachedChunkIsValid) { + metrics.inc('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + return cachedChunk + } else { + metrics.inc('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + const chunk = await chunkStore.loadLatest(projectId) + await redisBackend.setCurrentChunk(projectId, chunk) + return chunk + } +} + +module.exports = { + loadLatest, +} diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index bae99f020b..c1fbb9d607 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -30,7 +30,6 @@ const { BlobStore } = require('../blob_store') const { historyStore } = require('../history_store') const mongoBackend = require('./mongo') const postgresBackend = require('./postgres') -const redisBackend = require('./redis') const { ChunkVersionConflictError } = require('./errors') const DEFAULT_DELETE_BATCH_SIZE = parseInt(config.get('maxDeleteKeys'), 10) @@ -105,23 +104,13 @@ async function loadLatestRaw(projectId, opts) { * @return {Promise.} */ async function loadLatest(projectId) { - // Test out the redis caching backend - not in use yet - const cachedChunk = await redisBackend.getCurrentChunk(projectId) const chunkRecord = await loadLatestRaw(projectId) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) const blobStore = new BlobStore(projectId) const batchBlobStore = new BatchBlobStore(blobStore) await lazyLoadHistoryFiles(history, batchBlobStore) - const chunk = new Chunk(history, chunkRecord.startVersion) - // if the cached chunk is no longer valid, update it - const cachedChunkIsValid = redisBackend.checkCacheValidity(cachedChunk, chunk) - if (!cachedChunkIsValid) { - await redisBackend.setCurrentChunk(projectId, chunk) - } else { - await redisBackend.compareChunks(projectId, cachedChunk, chunk) - } - return chunk + return new Chunk(history, chunkRecord.startVersion) } /** diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index e13fa358f3..55f36ffa3b 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -148,7 +148,7 @@ async function setCurrentChunk(projectId, chunk) { /** * Checks whether a cached chunk's version metadata matches the current chunk's metadata - * @param {Chunk}} cachedChunk - The chunk retrieved from cache + * @param {Chunk} cachedChunk - The chunk retrieved from cache * @param {Chunk} currentChunk - The current chunk to compare against * @returns {boolean} - Returns true if the chunks have matching start and end versions, false otherwise */ @@ -160,6 +160,22 @@ function checkCacheValidity(cachedChunk, currentChunk) { ) } +/** + * Validates if a cached chunk matches the current chunk metadata by comparing versions + * @param {Object} cachedChunk - The cached chunk object to validate + * @param {Object} currentChunkMetadata - The current chunk metadata to compare against + * @param {number} currentChunkMetadata.startVersion - The starting version number + * @param {number} currentChunkMetadata.endVersion - The ending version number + * @returns {boolean} - True if the cached chunk is valid, false otherwise + */ +function checkCacheValidityWithMetadata(cachedChunk, currentChunkMetadata) { + return Boolean( + cachedChunk && + cachedChunk.getStartVersion() === currentChunkMetadata.startVersion && + cachedChunk.getEndVersion() === currentChunkMetadata.endVersion + ) +} + /** * Compares two chunks for equality using stringified JSON comparison * @param {string} projectId - The ID of the project @@ -194,10 +210,45 @@ function compareChunks(projectId, cachedChunk, currentChunk) { return identical } +// Define Lua script for atomic cache clearing +rclient.defineCommand('clear_chunk_cache', { + numberOfKeys: 3, + lua: ` + -- Delete all keys related to a project's chunk cache atomically + redis.call('DEL', KEYS[1]) -- snapshot key + redis.call('DEL', KEYS[2]) -- startVersion key + redis.call('DEL', KEYS[3]) -- changes key + return 1 + `, +}) + +/** + * Clears all cache entries for a project's chunk data + * @param {string} projectId - The ID of the project whose cache should be cleared + * @returns {Promise} A promise that resolves to true if successful, false on error + */ +async function clearCache(projectId) { + try { + const snapshotKey = keySchema.snapshot({ projectId }) + const startVersionKey = keySchema.startVersion({ projectId }) + const changesKey = keySchema.changes({ projectId }) + + await rclient.clear_chunk_cache(snapshotKey, startVersionKey, changesKey) + metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' }) + return true + } catch (err) { + logger.error({ err, projectId }, 'error clearing chunk cache from redis') + metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'error' }) + return false + } +} + module.exports = { getCurrentChunk, setCurrentChunk, getCurrentChunkMetadata, checkCacheValidity, + checkCacheValidityWithMetadata, compareChunks, + clearCache, } diff --git a/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js b/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js new file mode 100644 index 0000000000..377a8da0dc --- /dev/null +++ b/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js @@ -0,0 +1,210 @@ +'use strict' + +const { expect } = require('chai') +const sinon = require('sinon') +const { + Chunk, + Snapshot, + History, + File, + AddFileOperation, + Change, +} = require('overleaf-editor-core') +const cleanup = require('./support/cleanup') +const fixtures = require('./support/fixtures') +const chunkBuffer = require('../../../../storage/lib/chunk_buffer') +const chunkStore = require('../../../../storage/lib/chunk_store') +const redisBackend = require('../../../../storage/lib/chunk_store/redis') +const metrics = require('@overleaf/metrics') + +describe('chunk buffer', function () { + beforeEach(cleanup.everything) + beforeEach(fixtures.create) + beforeEach(function () { + sinon.spy(metrics, 'inc') + }) + afterEach(function () { + metrics.inc.restore() + }) + + const projectId = '123456' + + describe('loadLatest', function () { + // Initialize project and create a test chunk + beforeEach(async function () { + // Initialize project in chunk store + await chunkStore.initializeProject(projectId) + + // Create a sample chunk with some content + const snapshot = new Snapshot() + const changes = [ + new Change( + [new AddFileOperation('test.tex', File.fromString('Hello World'))], + new Date(), + [] + ), + ] + const history = new History(snapshot, changes) + const chunk = new Chunk(history, 1) // startVersion 1 + + // Store the chunk directly in the chunk store using create method + // which internally calls uploadChunk + await chunkStore.create(projectId, chunk) + + // Clear any existing cache + await redisBackend.clearCache(projectId) + }) + + it('should load from chunk store and update cache on first access (cache miss)', async function () { + // First access should load from chunk store and populate cache + const firstResult = await chunkBuffer.loadLatest(projectId) + + // Verify the chunk is correct + expect(firstResult).to.not.be.null + expect(firstResult.getStartVersion()).to.equal(1) + expect(firstResult.getEndVersion()).to.equal(2) + + // Verify that we got a cache miss metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true + + // Reset the metrics spy + metrics.inc.resetHistory() + + // Second access should hit the cache + const secondResult = await chunkBuffer.loadLatest(projectId) + + // Verify we got the same chunk + expect(secondResult).to.not.be.null + expect(secondResult.getStartVersion()).to.equal(1) + expect(secondResult.getEndVersion()).to.equal(2) + + // Verify that we got a cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + + // Verify both chunks are equivalent + expect(secondResult.getStartVersion()).to.equal( + firstResult.getStartVersion() + ) + expect(secondResult.getEndVersion()).to.equal(firstResult.getEndVersion()) + }) + + it('should refresh the cache when chunk changes in the store', async function () { + // First access to load into cache + const firstResult = await chunkBuffer.loadLatest(projectId) + expect(firstResult.getStartVersion()).to.equal(1) + + // Reset metrics spy + metrics.inc.resetHistory() + + // Create a new chunk with different content + const newSnapshot = new Snapshot() + const newChanges = [ + new Change( + [ + new AddFileOperation( + 'updated.tex', + File.fromString('Updated content') + ), + ], + new Date(), + [] + ), + ] + const newHistory = new History(newSnapshot, newChanges) + const newChunk = new Chunk(newHistory, 5) // Different start version + + // Store the new chunk directly in the chunk store + await chunkStore.create(projectId, newChunk) + + // Access again - should detect the change and refresh cache + const secondResult = await chunkBuffer.loadLatest(projectId) + + // Verify we got the updated chunk + expect(secondResult.getStartVersion()).to.equal(5) + expect(secondResult.getEndVersion()).to.equal(6) + + // Verify that we got a cache miss metric (since the cached chunk was invalidated) + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true + }) + + it('should continue using cache when chunk in store has not changed', async function () { + // First access to load into cache + await chunkBuffer.loadLatest(projectId) + + // Reset metrics spy + metrics.inc.resetHistory() + + // Access again without changing the underlying chunk + const result = await chunkBuffer.loadLatest(projectId) + + // Verify we got the same chunk + expect(result.getStartVersion()).to.equal(1) + expect(result.getEndVersion()).to.equal(2) + + // Verify that we got a cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + }) + + it('should handle a case with empty chunks (no changes)', async function () { + // Replace with an empty chunk + const emptySnapshot = new Snapshot() + const emptyHistory = new History(emptySnapshot, []) + const emptyChunk = new Chunk(emptyHistory, 10) + + // Store the empty chunk + await chunkStore.create(projectId, emptyChunk) + + // Clear the cache + await redisBackend.clearCache(projectId) + + // Load the chunk via buffer + const result = await chunkBuffer.loadLatest(projectId) + + // Verify we got the empty chunk + expect(result.getStartVersion()).to.equal(10) + expect(result.getEndVersion()).to.equal(10) // Start equals end for empty chunks + expect(result.history.changes.length).to.equal(0) + + // Verify cache miss metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true + + // Reset metrics + metrics.inc.resetHistory() + + // Second access should hit the cache + const secondResult = await chunkBuffer.loadLatest(projectId) + + // Verify we got the same empty chunk + expect(secondResult.getStartVersion()).to.equal(10) + expect(secondResult.getEndVersion()).to.equal(10) + expect(secondResult.history.changes.length).to.equal(0) + + // Verify cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + }) + }) +})