From 4c697a56593fbf96f97f23593a8fb358e13e7962 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 3 Jun 2025 11:51:07 +0100 Subject: [PATCH] Merge pull request #25993 from overleaf/bg-history-refactor-persist-buffer-limits refactor persist buffer to add limits GitOrigin-RevId: 4a40a7a8812acf5bb7f98bfd7b94d81ebe19fc57 --- .../history-v1/storage/lib/persist_buffer.js | 12 +- .../storage/scripts/persist_redis_chunks.js | 10 +- .../js/storage/persist_buffer.test.mjs | 107 +++++++++++++++++- 3 files changed, 115 insertions(+), 14 deletions(-) diff --git a/services/history-v1/storage/lib/persist_buffer.js b/services/history-v1/storage/lib/persist_buffer.js index 0dfeb9a38c..4cfd7ecab3 100644 --- a/services/history-v1/storage/lib/persist_buffer.js +++ b/services/history-v1/storage/lib/persist_buffer.js @@ -23,21 +23,13 @@ const redisBackend = require('./chunk_store/redis') * 6. Set the new persisted version (endVersion of the latest persisted chunk) in Redis. * * @param {string} projectId + * @param {Object} limits * @throws {Error | OError} If a critical error occurs during persistence. */ -async function persistBuffer(projectId) { +async function persistBuffer(projectId, limits) { assert.projectId(projectId) logger.debug({ projectId }, 'starting persistBuffer operation') - // Set limits to force us to persist all of the changes. - const farFuture = new Date() - farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000) - const limits = { - maxChanges: 0, - minChangeTimestamp: farFuture, - maxChangeTimestamp: farFuture, - } - // 1. Get the latest chunk's endVersion from GCS/main store let endVersion const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId) diff --git a/services/history-v1/storage/scripts/persist_redis_chunks.js b/services/history-v1/storage/scripts/persist_redis_chunks.js index 88964bac69..9d64964f81 100644 --- a/services/history-v1/storage/scripts/persist_redis_chunks.js +++ b/services/history-v1/storage/scripts/persist_redis_chunks.js @@ -18,7 +18,15 @@ logger.initialize('persist-redis-chunks') async function persistProjectAction(projectId) { const job = await claimPersistJob(projectId) - await persistBuffer(projectId) + // Set limits to force us to persist all of the changes. + const farFuture = new Date() + farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000) + const limits = { + maxChanges: 0, + minChangeTimestamp: farFuture, + maxChangeTimestamp: farFuture, + } + await persistBuffer(projectId, limits) if (job && job.close) { await job.close() } diff --git a/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs index 64772c4b70..496d16cd1e 100644 --- a/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs @@ -92,7 +92,7 @@ describe('persistBuffer', function () { await redisBackend.setPersistedVersion(projectId, initialVersion) // Persist the changes from Redis to the chunk store - await persistBuffer(projectId) + await persistBuffer(projectId, limitsToPersistImmediately) const latestChunk = await chunkStore.loadLatest(projectId) expect(latestChunk).to.exist @@ -196,7 +196,7 @@ describe('persistBuffer', function () { persistedChunkEndVersion ) - await persistBuffer(projectId) + await persistBuffer(projectId, limitsToPersistImmediately) const latestChunk = await chunkStore.loadLatest(projectId) expect(latestChunk).to.exist @@ -287,7 +287,8 @@ describe('persistBuffer', function () { const chunksBefore = await chunkStore.getProjectChunks(projectId) - await persistBuffer(projectId) + // Persist buffer (which should do nothing as there are no new changes) + await persistBuffer(projectId, limitsToPersistImmediately) const chunksAfter = await chunkStore.getProjectChunks(projectId) expect(chunksAfter.length).to.equal(chunksBefore.length) @@ -335,4 +336,104 @@ describe('persistBuffer', function () { expect(finalPersistedVersionInRedis).to.equal(persistedChunkEndVersion) }) }) + + describe('when limits restrict the number of changes to persist', function () { + it('should persist only a subset of changes and update persistedVersion accordingly', async function () { + const now = Date.now() + const oneDayAgo = now - 1000 * 60 * 60 * 24 + const oneHourAgo = now - 1000 * 60 * 60 + const twoHoursAgo = now - 1000 * 60 * 60 * 2 + const threeHoursAgo = now - 1000 * 60 * 60 * 3 + + // Create an initial file with some content + const initialContent = 'Initial content.' + const addInitialFileChange = new Change( + [new AddFileOperation('main.tex', File.fromString(initialContent))], + new Date(oneDayAgo), + [] + ) + + await persistChanges( + projectId, + [addInitialFileChange], + limitsToPersistImmediately, + initialVersion + ) + const versionAfterInitialSetup = initialVersion + 1 // Version is 1 + + // Queue three additional changes in Redis + const op1 = new TextOperation() + .retain(initialContent.length) + .insert(' Change 1.') + const change1 = new Change( + [new EditFileOperation('main.tex', op1)], + new Date(threeHoursAgo) + ) + const contentAfterC1 = initialContent + ' Change 1.' + + const op2 = new TextOperation() + .retain(contentAfterC1.length) + .insert(' Change 2.') + const change2 = new Change( + [new EditFileOperation('main.tex', op2)], + new Date(twoHoursAgo) + ) + const contentAfterC2 = contentAfterC1 + ' Change 2.' + + const op3 = new TextOperation() + .retain(contentAfterC2.length) + .insert(' Change 3.') + const change3 = new Change( + [new EditFileOperation('main.tex', op3)], + new Date(oneHourAgo) + ) + + const changesToQueue = [change1, change2, change3] + await redisBackend.queueChanges( + projectId, + new Snapshot(), // dummy snapshot + versionAfterInitialSetup, // startVersion for queued changes + changesToQueue, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + await redisBackend.setPersistedVersion( + projectId, + versionAfterInitialSetup + ) + + // Define limits to only persist 2 additional changes (on top of the initial file creation), + // which should leave the final change (change3) in the redis buffer. + const restrictiveLimits = { + minChangeTimestamp: new Date(oneHourAgo), // only changes more than 1 hour old are considered + maxChangeTimestamp: new Date(twoHoursAgo), // they will be persisted if any change is older than 2 hours + } + + await persistBuffer(projectId, restrictiveLimits) + + // Check the latest persisted chunk, it should only have the initial file and the first two changes + const latestChunk = await chunkStore.loadLatest(projectId, { + persistedOnly: true, + }) + expect(latestChunk).to.exist + expect(latestChunk.getChanges().length).to.equal(3) // addInitialFileChange + change1 + change2 + expect(latestChunk.getStartVersion()).to.equal(initialVersion) + const expectedEndVersion = versionAfterInitialSetup + 2 // Persisted two changes from the queue + expect(latestChunk.getEndVersion()).to.equal(expectedEndVersion) + + // Check persisted version in Redis + const state = await redisBackend.getState(projectId) + expect(state.persistedVersion).to.equal(expectedEndVersion) + + // Check non-persisted changes in Redis + const nonPersisted = await redisBackend.getNonPersistedChanges( + projectId, + expectedEndVersion + ) + expect(nonPersisted).to.be.an('array').with.lengthOf(1) // change3 should remain + expect(nonPersisted).to.deep.equal([change3]) + }) + }) })