From 386e1337664ced06ecdb98ce23f0484684299b30 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 12 Sep 2025 08:31:52 +0100 Subject: [PATCH] Merge pull request #28436 from overleaf/bg-load-changes-directly-from-redis-buffer load changes directly from redis buffer for getChanges requests GitOrigin-RevId: 99673c47a137ff4222d331fa88eb6e5103270551 --- .../history-v1/api/controllers/projects.js | 17 +- .../storage/lib/chunk_store/index.js | 48 +++ .../storage/lib/chunk_store/redis.js | 8 +- .../acceptance/js/storage/chunk_store.test.js | 403 ++++++++++++++++++ .../storage/chunk_store_redis_backend.test.js | 12 +- 5 files changed, 466 insertions(+), 22 deletions(-) diff --git a/services/history-v1/api/controllers/projects.js b/services/history-v1/api/controllers/projects.js index 1f27379259..e24ee81ca1 100644 --- a/services/history-v1/api/controllers/projects.js +++ b/services/history-v1/api/controllers/projects.js @@ -156,11 +156,12 @@ async function getChanges(req, res, next) { }) } - let chunk try { - chunk = await chunkStore.loadAtVersion(projectId, since, { - preferNewer: true, - }) + const { changes, hasMore } = await chunkStore.getChangesSinceVersion( + projectId, + since + ) + res.json({ changes: changes.map(change => change.toRaw()), hasMore }) } catch (err) { if (err instanceof Chunk.VersionNotFoundError) { return res.status(400).json({ @@ -169,14 +170,6 @@ async function getChanges(req, res, next) { } throw err } - - const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId) - - // Extract the relevant changes from the chunk that contains the start version - const changes = chunk.getChanges().slice(since - chunk.getStartVersion()) - const hasMore = latestChunkMetadata.endVersion > chunk.getEndVersion() - - res.json({ changes: changes.map(change => change.toRaw()), hasMore }) } async function getZip(req, res, next) { diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index f387b68d90..ca46d85964 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -24,6 +24,7 @@ const config = require('config') const OError = require('@overleaf/o-error') +const metrics = require('@overleaf/metrics') const { Chunk, History, Snapshot } = require('overleaf-editor-core') const assert = require('../assert') @@ -229,6 +230,52 @@ async function loadAtTimestamp(projectId, timestamp, opts = {}) { return new Chunk(history, startVersion) } +/** Get the changes since a given version (since), including non-persisted changes. + * Note that if there are multiple chunks since the given version, the changes from + * the first chunk will be returned with a hasMore flag to indicate that there are + * more changes available. The 'since' version is exclusive. + * @param {string} projectId + * @param {number} since - version to get changes since (exclusive) + * @return {Promise<{changes: Change[], hasMore: boolean}>} - object with array of changes and boolean indicating if there are more changes available + */ +async function getChangesSinceVersion(projectId, since) { + assert.projectId(projectId, 'bad projectId') + assert.integer(since, 'bad since version') + + // First try to get changes directly from Redis buffer + const result = await redisBackend.getChangesSinceVersion(projectId, since) + if (result.status === 'ok') { + // Successfully got changes from Redis, no more changes available beyond what Redis has + metrics.inc('chunk_store.get_changes_since_version', 1, { source: 'redis' }) + return { changes: result.changes || [], hasMore: false } + } + + // If status is 'not_found' or 'out_of_bounds', fall through to chunk-based approach + const chunk = await loadAtVersion(projectId, since, { + preferNewer: true, + }) + + // Validate that 'since' is within the bounds of the chunk + if (since < chunk.getStartVersion()) { + throw new VersionOutOfBoundsError('Chunk does not include since version', { + projectId, + since, + }) + } + // Extract the changes after 'since' from the chunk + const changes = chunk.getChanges().slice(since - chunk.getStartVersion()) + + // Check if there are more changes beyond the current chunk + const latestChunkMetadata = await getLatestChunkMetadata(projectId) + const hasMore = latestChunkMetadata.endVersion > chunk.getEndVersion() + metrics.inc('chunk_store.get_changes_since_version', 1, { + source: 'gcs', + hasMore: hasMore ? 'true' : 'false', + status: result.status, + }) + return { changes, hasMore } +} + /** * Store the chunk and insert corresponding records in the database. * @@ -581,6 +628,7 @@ module.exports = { getProjectChunkIds, getProjectChunks, getProjectChunksFromVersion, + getChangesSinceVersion, deleteProjectChunks, deleteOldChunks, AlreadyInitialized, diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index b8a79b498d..af84bd6410 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -364,9 +364,11 @@ async function getChangesSinceVersion(projectId, version) { if (status === 'ok') { // If status is OK, parse the changes const changes = result[1] - ? result[1].map(rawChange => - typeof rawChange === 'string' ? JSON.parse(rawChange) : rawChange - ) + ? result[1] + .map(rawChange => + typeof rawChange === 'string' ? JSON.parse(rawChange) : rawChange + ) + .map(Change.fromRaw) : [] metrics.inc('chunk_store.redis.get_changes_since_version', 1, { diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js index df1e36e9cd..7452fd4fdf 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js @@ -741,6 +741,409 @@ describe('chunkStore', function () { }) }) + describe('getChangesSinceVersion', function () { + describe('single chunk scenarios', function () { + let singleChunk + + beforeEach(async function () { + // Create a single chunk with start version 0, end version 2 + const blob = await blobStore.putString('single chunk content') + singleChunk = makeChunk( + [ + makeChange( + Operation.addFile( + 'file1.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2020-01-01T00:00:00') + ), + makeChange( + Operation.addFile( + 'file2.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2020-01-01T00:01:00') + ), + ], + 0 + ) + await chunkStore.update(projectId, singleChunk) + singleChunk = await chunkStore.loadLatest(projectId) + }) + + describe('without Redis changes', function () { + it('returns empty changes when since equals latest version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 2 + ) + expect(result.changes).to.have.length(0) + expect(result.hasMore).to.be.false + }) + + it('returns all changes when since is 0', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 0 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal(singleChunk.getChanges()) + }) + + it('returns subset of changes when since is 1', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 1 + ) + expect(result.changes).to.have.length(1) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal([ + singleChunk.getChanges()[1], + ]) + }) + + it('throws error when since is negative', async function () { + await expect( + chunkStore.getChangesSinceVersion(projectId, -1) + ).to.be.rejectedWith(VersionNotFoundError) + }) + + it('throws VersionNotFoundError when since is beyond latest version', async function () { + await expect( + chunkStore.getChangesSinceVersion(projectId, 10) + ).to.be.rejectedWith(VersionNotFoundError) + }) + }) + + describe('with Redis changes', function () { + let queuedChanges + + beforeEach(async function () { + const snapshot = singleChunk.getSnapshot() + snapshot.applyAll(singleChunk.getChanges()) + const blob = await blobStore.putString('redis content') + queuedChanges = [ + makeChange( + Operation.addFile( + 'redis1.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2020-01-01T00:02:00') + ), + makeChange( + Operation.addFile( + 'redis2.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2020-01-01T00:03:00') + ), + ] + await redisBackend.queueChanges( + projectId, + snapshot, + singleChunk.getEndVersion(), + queuedChanges + ) + }) + + it('returns Redis changes when since equals chunk end version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 2 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal(queuedChanges) + }) + + it('returns partial Redis changes when since is within Redis buffer', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 3 + ) + expect(result.changes).to.have.length(1) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal([queuedChanges[1]]) + }) + + it('returns chunk changes plus Redis changes when since is within chunk', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 1 + ) + expect(result.changes).to.have.length(3) + expect(result.hasMore).to.be.false + // Should contain the second chunk change plus Redis changes + const expectedChanges = [singleChunk.getChanges()[1]].concat( + queuedChanges + ) + expect(result.changes).to.deep.equal(expectedChanges) + }) + + it('returns empty changes when since equals current head version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 4 + ) + expect(result.changes).to.have.length(0) + expect(result.hasMore).to.be.false + }) + }) + }) + + describe('multiple chunks scenarios', function () { + let firstChunk, secondChunk, thirdChunk + + beforeEach(async function () { + // Reuse the existing multiple chunks setup + const blob = await blobStore.putString('') + firstChunk = makeChunk( + [ + makeChange( + Operation.addFile('foo.tex', File.createLazyFromBlobs(blob)), + new Date('2015-01-01T00:00:00') + ), + makeChange( + Operation.addFile('bar.tex', File.createLazyFromBlobs(blob)), + new Date('2015-01-01T00:01:00') + ), + ], + 0 + ) + await chunkStore.update(projectId, firstChunk) + + secondChunk = makeChunk( + [ + makeChange( + Operation.addFile('baz.tex', File.createLazyFromBlobs(blob)), + new Date('2016-01-01T00:00:00') + ), + makeChange( + Operation.addFile('qux.tex', File.createLazyFromBlobs(blob)), + new Date('2016-01-01T00:01:00') + ), + ], + 2 + ) + await chunkStore.create(projectId, secondChunk) + + thirdChunk = makeChunk( + [ + makeChange( + Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)), + new Date('2017-01-01T00:00:00') + ), + makeChange( + Operation.addFile( + 'barbar.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2017-01-01T00:01:00') + ), + ], + 4 + ) + await chunkStore.create(projectId, thirdChunk) + + // Load the actual chunks for comparison + firstChunk = await chunkStore.loadAtVersion(projectId, 1) + secondChunk = await chunkStore.loadAtVersion(projectId, 3) + thirdChunk = await chunkStore.loadAtVersion(projectId, 5) + }) + + describe('without Redis changes', function () { + it('returns changes from first chunk when since is 0', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 0 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.true + expect(result.changes).to.deep.equal(firstChunk.getChanges()) + }) + + it('returns changes from second chunk when since is 2', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 2 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.true + expect(result.changes).to.deep.equal(secondChunk.getChanges()) + }) + + it('returns partial changes from second chunk when since is 3', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 3 + ) + expect(result.changes).to.have.length(1) + expect(result.hasMore).to.be.true + expect(result.changes).to.deep.equal([ + secondChunk.getChanges()[1], + ]) + }) + + it('returns changes from third chunk when since is 4', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 4 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal(thirdChunk.getChanges()) + }) + + it('returns empty changes when since equals final version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 6 + ) + expect(result.changes).to.have.length(0) + expect(result.hasMore).to.be.false + }) + + it('returns partial changes from third chunk when since is 5', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 5 + ) + expect(result.changes).to.have.length(1) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal([thirdChunk.getChanges()[1]]) + }) + }) + + describe('with Redis changes', function () { + let queuedChanges + + beforeEach(async function () { + // Add Redis changes after the third chunk + const latestChunk = await chunkStore.loadLatest(projectId) + const snapshot = latestChunk.getSnapshot() + snapshot.applyAll(latestChunk.getChanges()) + const blob = await blobStore.putString('redis multi content') + queuedChanges = [ + makeChange( + Operation.addFile( + 'redis-multi1.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2017-01-01T00:02:00') + ), + makeChange( + Operation.addFile( + 'redis-multi2.tex', + File.createLazyFromBlobs(blob) + ), + new Date('2017-01-01T00:03:00') + ), + ] + await redisBackend.queueChanges( + projectId, + snapshot, + latestChunk.getEndVersion(), + queuedChanges + ) + }) + + it('returns changes from second chunk when since is 2', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 2 + ) + // Current implementation limitation: when Redis doesn't have the version, + // it falls back to chunk-based approach which only returns changes from + // the single chunk that contains the start version, not subsequent chunks or Redis + expect(result.changes).to.have.length(2) // Only from second chunk + expect(result.hasMore).to.be.true // There are more chunks after this one + expect(result.changes).to.deep.equal(secondChunk.getChanges()) + }) + + it('returns changes from third chunk including Redis changes when since is 4', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 4 + ) + // When requesting changes from the latest chunk, Redis changes are included + // because loadAtVersion for the latest chunk includes non-persisted changes + expect(result.changes).to.have.length(4) // 2 from third chunk + 2 from Redis + expect(result.hasMore).to.be.false // Redis returns hasMore: false + const expectedChanges = thirdChunk + .getChanges() + .concat(queuedChanges) + expect(result.changes).to.deep.equal(expectedChanges) + }) + + it('returns Redis changes when since equals chunk end version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 6 + ) + expect(result.changes).to.have.length(2) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal(queuedChanges) + }) + + it('returns partial Redis changes when since is within Redis buffer', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 7 + ) + expect(result.changes).to.have.length(1) + expect(result.hasMore).to.be.false + expect(result.changes).to.deep.equal([queuedChanges[1]]) + }) + + it('returns empty changes when since equals current head version', async function () { + const result = await chunkStore.getChangesSinceVersion( + projectId, + 8 + ) + expect(result.changes).to.have.length(0) + expect(result.hasMore).to.be.false + }) + + it('iterates through all the changes using the hasMore parameter', async function () { + const allChanges = [] + let currentVersion = 0 + let hasMore = true + + while (hasMore) { + const result = await chunkStore.getChangesSinceVersion( + projectId, + currentVersion + ) + allChanges.push(...result.changes) + hasMore = result.hasMore + + if (hasMore) { + // Move to the next version after the last change we received + currentVersion += result.changes.length + } + } + + // Should have collected the changes from all chunks plus Redis + const expectedTotalChanges = + firstChunk.getChanges().length + + secondChunk.getChanges().length + + thirdChunk.getChanges().length + + queuedChanges.length + expect(allChanges).to.have.length(expectedTotalChanges) + + // Verify we got the expected changes in order + const expectedChanges = [] + .concat(firstChunk.getChanges()) + .concat(secondChunk.getChanges()) + .concat(thirdChunk.getChanges()) + .concat(queuedChanges) + expect(allChanges).to.deep.equal(expectedChanges) + }) + }) + }) + }) + describe('version checks', function () { beforeEach(async function () { // Create a chunk with start version 0, end version 3 diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index fc176de192..a9600531a9 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -470,10 +470,8 @@ describe('chunk buffer Redis backend', function () { expect(result.status).to.equal('ok') expect(result.changes).to.be.an('array').with.lengthOf(2) - // The changes array should contain the raw changes - // Note: We're comparing raw objects, not the Change instances - expect(result.changes[0]).to.deep.equal(change2.toRaw()) - expect(result.changes[1]).to.deep.equal(change3.toRaw()) + expect(result.changes[0]).to.deep.equal(change2) + expect(result.changes[1]).to.deep.equal(change3) }) it('should return all changes when requested version is earliest available', async function () { @@ -503,9 +501,9 @@ describe('chunk buffer Redis backend', function () { expect(result.status).to.equal('ok') expect(result.changes).to.be.an('array').with.lengthOf(3) - expect(result.changes[0]).to.deep.equal(change1.toRaw()) - expect(result.changes[1]).to.deep.equal(change2.toRaw()) - expect(result.changes[2]).to.deep.equal(change3.toRaw()) + expect(result.changes[0]).to.deep.equal(change1) + expect(result.changes[1]).to.deep.equal(change2) + expect(result.changes[2]).to.deep.equal(change3) }) })