From f904933d6855c614b0ac98325ba57d7e27f93ff2 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 10 Jun 2025 10:41:44 +0100 Subject: [PATCH] Merge pull request #26180 from overleaf/bg-history-redis-add-queueChanges add queueChanges method to history-v1 GitOrigin-RevId: fb6da79bd5ca40e7cbdcb077ad3a036cc5509ced --- services/history-v1/storage/index.js | 1 + .../history-v1/storage/lib/queue_changes.js | 75 ++++ .../js/storage/queue_changes.test.js | 416 ++++++++++++++++++ 3 files changed, 492 insertions(+) create mode 100644 services/history-v1/storage/lib/queue_changes.js create mode 100644 services/history-v1/test/acceptance/js/storage/queue_changes.test.js diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index 46fa63b689..a07c98c026 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -9,6 +9,7 @@ exports.redis = require('./lib/redis') exports.persistChanges = require('./lib/persist_changes') exports.persistor = require('./lib/persistor') exports.persistBuffer = require('./lib/persist_buffer') +exports.queueChanges = require('./lib/queue_changes') exports.ProjectArchive = require('./lib/project_archive') exports.streams = require('./lib/streams') exports.temp = require('./lib/temp') diff --git a/services/history-v1/storage/lib/queue_changes.js b/services/history-v1/storage/lib/queue_changes.js new file mode 100644 index 0000000000..6b8d4b22b4 --- /dev/null +++ b/services/history-v1/storage/lib/queue_changes.js @@ -0,0 +1,75 @@ +// @ts-check + +'use strict' + +const redisBackend = require('./chunk_store/redis') +const { BlobStore } = require('./blob_store') +const chunkStore = require('./chunk_store') +const core = require('overleaf-editor-core') +const Chunk = core.Chunk + +/** + * Queues an incoming set of changes after validating them against the current snapshot. + * + * @async + * @function queueChanges + * @param {string} projectId - The project to queue changes for. + * @param {Array} changesToQueue - An array of change objects to be applied and queued. + * @param {number} endVersion - The expected version of the project before these changes are applied. + * This is used for optimistic concurrency control. + * @param {Object} [opts] - Additional options for queuing changes. + * @throws {Chunk.ConflictingEndVersion} If the provided `endVersion` does not match the + * current version of the project. + * @returns {Promise} A promise that resolves with the status returned by the + * `redisBackend.queueChanges` operation. + */ +async function queueChanges(projectId, changesToQueue, endVersion, opts) { + const result = await redisBackend.getHeadSnapshot(projectId) + let currentSnapshot = null + let currentVersion = null + if (result) { + // If we have a snapshot in redis, we can use it to check the current state + // of the project and apply changes to it. + currentSnapshot = result.snapshot + currentVersion = result.version + } else { + // Otherwise, load the latest chunk from the chunk store. + const latestChunk = await chunkStore.loadLatest(projectId, { + persistedOnly: true, + }) + // Throw an error if no latest chunk is found, indicating the project has not been initialised. + if (!latestChunk) { + throw new Chunk.NotFoundError(projectId) + } + currentSnapshot = latestChunk.getSnapshot() + currentSnapshot.applyAll(latestChunk.getChanges()) + currentVersion = latestChunk.getEndVersion() + } + + // Ensure the endVersion matches the current version of the project. + if (endVersion !== currentVersion) { + throw new Chunk.ConflictingEndVersion(endVersion, currentVersion) + } + + // Compute the new hollow snapshot to be saved to redis. + const hollowSnapshot = currentSnapshot + const blobStore = new BlobStore(projectId) + await hollowSnapshot.loadFiles('hollow', blobStore) + // Clone the changes to avoid modifying the original ones when computing the hollow snapshot. + const hollowChanges = changesToQueue.map(change => change.clone()) + for (const change of hollowChanges) { + await change.loadFiles('hollow', blobStore) + } + hollowSnapshot.applyAll(hollowChanges, { strict: true }) + const baseVersion = currentVersion + const status = await redisBackend.queueChanges( + projectId, + hollowSnapshot, + baseVersion, + changesToQueue, + opts + ) + return status +} + +module.exports = queueChanges diff --git a/services/history-v1/test/acceptance/js/storage/queue_changes.test.js b/services/history-v1/test/acceptance/js/storage/queue_changes.test.js new file mode 100644 index 0000000000..dbfe8c7e56 --- /dev/null +++ b/services/history-v1/test/acceptance/js/storage/queue_changes.test.js @@ -0,0 +1,416 @@ +'use strict' + +const { expect } = require('chai') +const sinon = require('sinon') + +const cleanup = require('./support/cleanup') +const fixtures = require('./support/fixtures') +const testFiles = require('./support/test_files.js') +const storage = require('../../../../storage') +const chunkStore = storage.chunkStore +const queueChanges = storage.queueChanges +const redisBackend = require('../../../../storage/lib/chunk_store/redis') + +const core = require('overleaf-editor-core') +const AddFileOperation = core.AddFileOperation +const EditFileOperation = core.EditFileOperation +const TextOperation = core.TextOperation +const Change = core.Change +const Chunk = core.Chunk +const File = core.File +const Snapshot = core.Snapshot +const BlobStore = storage.BlobStore +const persistChanges = storage.persistChanges + +describe('queueChanges', function () { + let limitsToPersistImmediately + before(function () { + // Used to provide a limit which forces us to persist all of the changes + const farFuture = new Date() + farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000) + limitsToPersistImmediately = { + minChangeTimestamp: farFuture, + maxChangeTimestamp: farFuture, + maxChanges: 10, + maxChunkChanges: 10, + } + }) + + beforeEach(cleanup.everything) + beforeEach(fixtures.create) + afterEach(function () { + sinon.restore() + }) + + it('queues changes when redis has no snapshot (falls back to chunkStore with an empty chunk)', async function () { + // Start with an empty chunk store for the project + const projectId = fixtures.docs.uninitializedProject.id + await chunkStore.initializeProject(projectId) + + // Ensure that the initial state in redis is empty + const initialRedisState = await redisBackend.getState(projectId) + expect(initialRedisState.headVersion).to.be.null + expect(initialRedisState.headSnapshot).to.be.null + expect(initialRedisState.changes).to.be.an('array').that.is.empty + + // Add a test file to the blob store + const blobStore = new BlobStore(projectId) + await blobStore.putFile(testFiles.path('hello.txt')) + + // Prepare an initial change to add a single file to an empty project + const change = new Change( + [ + new AddFileOperation( + 'test.tex', + File.fromHash(testFiles.HELLO_TXT_HASH) + ), + ], + new Date(), + [] + ) + const changesToQueue = [change] + const endVersion = 0 + + // Queue the changes to add the test file + const status = await queueChanges(projectId, changesToQueue, endVersion) + expect(status).to.equal('ok') + + // Verify that we now have some state in redis + const redisState = await redisBackend.getState(projectId) + expect(redisState).to.not.be.null + + // Compute the expected snapshot after applying the changes + const expectedSnapshot = new Snapshot() + await expectedSnapshot.loadFiles('hollow', blobStore) + for (const change of changesToQueue) { + const hollowChange = change.clone() + await hollowChange.loadFiles('hollow', blobStore) + hollowChange.applyTo(expectedSnapshot, { strict: true }) + } + + // Confirm that state in redis matches the expected snapshot and changes queue + const expectedVersionInRedis = endVersion + changesToQueue.length + expect(redisState.headVersion).to.equal(expectedVersionInRedis) + expect(redisState.headSnapshot).to.deep.equal(expectedSnapshot.toRaw()) + expect(redisState.changes).to.deep.equal(changesToQueue.map(c => c.toRaw())) + }) + + it('queues changes when redis has no snapshot (falls back to chunkStore with an existing chunk)', async function () { + const projectId = fixtures.docs.uninitializedProject.id + + // Initialise the project in the chunk store using the "Hello World" test file + await chunkStore.initializeProject(projectId) + const blobStore = new BlobStore(projectId) + await blobStore.putFile(testFiles.path('hello.txt')) + const change = new Change( + [ + new AddFileOperation( + 'hello.tex', + File.fromHash(testFiles.HELLO_TXT_HASH) + ), + ], + new Date(), + [] + ) + const initialChanges = [change] + const initialVersion = 0 + + const result = await persistChanges( + projectId, + initialChanges, + limitsToPersistImmediately, + initialVersion + ) + // Compute the state after the initial changes are persisted for later comparison + const endVersion = initialVersion + initialChanges.length + const { currentChunk } = result + const originalSnapshot = result.currentChunk.getSnapshot() + await originalSnapshot.loadFiles('hollow', blobStore) + originalSnapshot.applyAll(currentChunk.getChanges()) + + // Ensure that the initial state in redis is empty + const initialRedisState = await redisBackend.getState(projectId) + expect(initialRedisState.headVersion).to.be.null + expect(initialRedisState.headSnapshot).to.be.null + expect(initialRedisState.changes).to.be.an('array').that.is.empty + + // Prepare a change to edit the existing file + const editFileOp = new EditFileOperation( + 'hello.tex', + new TextOperation() + .insert('Hello') + .retain(testFiles.HELLO_TXT_UTF8_LENGTH) + ) + const editFileChange = new Change([editFileOp], new Date(), []) + const changesToQueue = [editFileChange] + + // Queue the changes to edit the existing file + const status = await queueChanges(projectId, changesToQueue, endVersion) + expect(status).to.equal('ok') + + // Verify that we now have some state in redis + const redisState = await redisBackend.getState(projectId) + expect(redisState).to.not.be.null + + // Compute the expected snapshot after applying the changes + const expectedSnapshot = originalSnapshot.clone() + await expectedSnapshot.loadFiles('hollow', blobStore) + expectedSnapshot.applyAll(changesToQueue) + + // Confirm that state in redis matches the expected snapshot and changes queue + const expectedVersionInRedis = endVersion + changesToQueue.length + expect(redisState.headVersion).to.equal(expectedVersionInRedis) + expect(redisState.headSnapshot).to.deep.equal(expectedSnapshot.toRaw()) + expect(redisState.changes).to.deep.equal(changesToQueue.map(c => c.toRaw())) + }) + + it('queues changes when redis has a snapshot with existing changes', async function () { + const projectId = fixtures.docs.uninitializedProject.id + + // Initialise the project in redis using the "Hello World" test file + await chunkStore.initializeProject(projectId) + const blobStore = new BlobStore(projectId) + await blobStore.putFile(testFiles.path('hello.txt')) + const initialChangeOp = new AddFileOperation( + 'existing.tex', + File.fromHash(testFiles.HELLO_TXT_HASH) + ) + const initialChange = new Change([initialChangeOp], new Date(), []) + const initialChangesToQueue = [initialChange] + const versionBeforeInitialQueue = 0 + + // Queue the initial changes + const status = await queueChanges( + projectId, + initialChangesToQueue, + versionBeforeInitialQueue + ) + // Confirm that the initial changes were queued successfully + expect(status).to.equal('ok') + const versionAfterInitialQueue = + versionBeforeInitialQueue + initialChangesToQueue.length + + // Compute the snapshot after the initial changes for later use + const initialSnapshot = new Snapshot() + await initialSnapshot.loadFiles('hollow', blobStore) + for (const change of initialChangesToQueue) { + const hollowChange = change.clone() + await hollowChange.loadFiles('hollow', blobStore) + hollowChange.applyTo(initialSnapshot, { strict: true }) + } + + // Now prepare some subsequent changes for the queue + await blobStore.putFile(testFiles.path('graph.png')) + const addFileOp = new AddFileOperation( + 'graph.png', + File.fromHash(testFiles.GRAPH_PNG_HASH) + ) + const addFileChange = new Change([addFileOp], new Date(), []) + const editFileOp = new EditFileOperation( + 'existing.tex', + new TextOperation() + .insert('Hello') + .retain(testFiles.HELLO_TXT_UTF8_LENGTH) + ) + const editFileChange = new Change([editFileOp], new Date(), []) + + const subsequentChangesToQueue = [addFileChange, editFileChange] + const versionBeforeSubsequentQueue = versionAfterInitialQueue + + // Queue the subsequent changes + const subsequentStatus = await queueChanges( + projectId, + subsequentChangesToQueue, + versionBeforeSubsequentQueue + ) + expect(subsequentStatus).to.equal('ok') + + // Compute the expected snapshot after applying all changes + const expectedSnapshot = initialSnapshot.clone() + await expectedSnapshot.loadFiles('hollow', blobStore) + for (const change of subsequentChangesToQueue) { + const hollowChange = change.clone() + await hollowChange.loadFiles('hollow', blobStore) + hollowChange.applyTo(expectedSnapshot, { strict: true }) + } + + // Confirm that state in redis matches the expected snapshot and changes queue + const finalRedisState = await redisBackend.getState(projectId) + expect(finalRedisState).to.not.be.null + const expectedFinalVersion = + versionBeforeSubsequentQueue + subsequentChangesToQueue.length + expect(finalRedisState.headVersion).to.equal(expectedFinalVersion) + expect(finalRedisState.headSnapshot).to.deep.equal(expectedSnapshot.toRaw()) + const allQueuedChangesRaw = initialChangesToQueue + .concat(subsequentChangesToQueue) + .map(c => c.toRaw()) + expect(finalRedisState.changes).to.deep.equal(allQueuedChangesRaw) + }) + + it('skips queuing changes when there is no snapshot and the onlyIfExists flag is set', async function () { + // Start with an empty chunk store for the project + const projectId = fixtures.docs.uninitializedProject.id + await chunkStore.initializeProject(projectId) + + // Ensure that the initial state in redis is empty + const initialRedisState = await redisBackend.getState(projectId) + expect(initialRedisState.headVersion).to.be.null + expect(initialRedisState.headSnapshot).to.be.null + expect(initialRedisState.changes).to.be.an('array').that.is.empty + + // Add a test file to the blob store + const blobStore = new BlobStore(projectId) + await blobStore.putFile(testFiles.path('hello.txt')) + + // Prepare an initial change to add a single file to an empty project + const change = new Change( + [ + new AddFileOperation( + 'test.tex', + File.fromHash(testFiles.HELLO_TXT_HASH) + ), + ], + new Date(), + [] + ) + const changesToQueue = [change] + const endVersion = 0 + + // Queue the changes to add the test file + const status = await queueChanges(projectId, changesToQueue, endVersion, { + onlyIfExists: true, + }) + expect(status).to.equal('ignore') + + // Verify that the state in redis has not changed + const redisState = await redisBackend.getState(projectId) + expect(redisState).to.deep.equal(initialRedisState) + }) + + it('creates an initial hollow snapshot when redis has no snapshot (falls back to chunkStore with an empty chunk)', async function () { + // Start with an empty chunk store for the project + const projectId = fixtures.docs.uninitializedProject.id + await chunkStore.initializeProject(projectId) + const blobStore = new BlobStore(projectId) + await blobStore.putFile(testFiles.path('hello.txt')) + + // Prepare an initial change to add a single file to an empty project + const change = new Change( + [ + new AddFileOperation( + 'test.tex', + File.fromHash(testFiles.HELLO_TXT_HASH) + ), + ], + new Date(), + [] + ) + const changesToQueue = [change] + const endVersion = 0 + + // Queue the changes to add the test file + const status = await queueChanges(projectId, changesToQueue, endVersion) + expect(status).to.equal('ok') + + // Verify that we now have some state in redis + const redisState = await redisBackend.getState(projectId) + expect(redisState).to.not.be.null + expect(redisState.headSnapshot.files['test.tex']).to.deep.equal({ + stringLength: testFiles.HELLO_TXT_UTF8_LENGTH, + }) + }) + + it('throws ConflictingEndVersion if endVersion does not match current version (from chunkStore)', async function () { + const projectId = fixtures.docs.uninitializedProject.id + // Initialise an empty project in the chunk store + await chunkStore.initializeProject(projectId) + + // Ensure that the initial state in redis is empty + const initialRedisState = await redisBackend.getState(projectId) + expect(initialRedisState.headVersion).to.be.null + + // Prepare a change to add a file + const change = new Change( + [new AddFileOperation('test.tex', File.fromString(''))], + new Date(), + [] + ) + const changesToQueue = [change] + const incorrectEndVersion = 1 + + // Attempt to queue the changes with an incorrect endVersion (1 instead of 0) + await expect(queueChanges(projectId, changesToQueue, incorrectEndVersion)) + .to.be.rejectedWith(Chunk.ConflictingEndVersion) + .and.eventually.satisfies(err => { + expect(err.info).to.have.property( + 'clientEndVersion', + incorrectEndVersion + ) + expect(err.info).to.have.property('latestEndVersion', 0) + return true + }) + + // Verify that the state in redis has not changed + const redisStateAfterError = await redisBackend.getState(projectId) + expect(redisStateAfterError).to.deep.equal(initialRedisState) + }) + + it('throws ConflictingEndVersion if endVersion does not match current version (from redis snapshot)', async function () { + const projectId = fixtures.docs.uninitializedProject.id + + // Initialise the project in the redis with a test file + await chunkStore.initializeProject(projectId) + const initialChange = new Change( + [new AddFileOperation('initial.tex', File.fromString('content'))], + new Date(), + [] + ) + const initialChangesToQueue = [initialChange] + const versionBeforeInitialQueue = 0 + + // Queue the initial changes + await queueChanges( + projectId, + initialChangesToQueue, + versionBeforeInitialQueue + ) + const versionInRedisAfterSetup = + versionBeforeInitialQueue + initialChangesToQueue.length + + // Confirm that the initial changes were queued successfully + const initialRedisState = await redisBackend.getState(projectId) + expect(initialRedisState).to.not.be.null + expect(initialRedisState.headVersion).to.equal(versionInRedisAfterSetup) + + // Now prepare a subsequent change for the queue + const subsequentChange = new Change( + [new AddFileOperation('another.tex', File.fromString(''))], + new Date(), + [] + ) + const subsequentChangesToQueue = [subsequentChange] + const incorrectEndVersion = 0 + + // Attempt to queue the changes with an incorrect endVersion (0 instead of 1) + await expect( + queueChanges(projectId, subsequentChangesToQueue, incorrectEndVersion) + ) + .to.be.rejectedWith(Chunk.ConflictingEndVersion) + .and.eventually.satisfies(err => { + expect(err.info).to.have.property( + 'clientEndVersion', + incorrectEndVersion + ) + expect(err.info).to.have.property( + 'latestEndVersion', + versionInRedisAfterSetup + ) + return true + }) + + // Verify that the state in redis has not changed + const redisStateAfterError = await redisBackend.getState(projectId) + expect(redisStateAfterError).to.not.be.null + expect(redisStateAfterError).to.deep.equal(initialRedisState) + }) +})