diff --git a/services/history-v1/api/controllers/project_import.js b/services/history-v1/api/controllers/project_import.js index 72df912a88..dee45efce8 100644 --- a/services/history-v1/api/controllers/project_import.js +++ b/services/history-v1/api/controllers/project_import.js @@ -110,7 +110,9 @@ async function importChanges(req, res, next) { let result try { - result = await persistChanges(projectId, changes, limits, endVersion) + result = await persistChanges(projectId, changes, limits, endVersion, { + queueChangesInRedis: true, + }) } catch (err) { if ( err instanceof Chunk.ConflictingEndVersion || diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index 2aa492f46e..a9d8e2fc03 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -8,6 +8,7 @@ exports.mongodb = require('./lib/mongodb') exports.redis = require('./lib/redis') exports.persistChanges = require('./lib/persist_changes') exports.persistor = require('./lib/persistor') +exports.persistBuffer = require('./lib/persist_buffer').persistBuffer exports.ProjectArchive = require('./lib/project_archive') exports.streams = require('./lib/streams') exports.temp = require('./lib/temp') diff --git a/services/history-v1/storage/lib/persist_buffer.js b/services/history-v1/storage/lib/persist_buffer.js new file mode 100644 index 0000000000..0dfeb9a38c --- /dev/null +++ b/services/history-v1/storage/lib/persist_buffer.js @@ -0,0 +1,173 @@ +// @ts-check +'use strict' + +const logger = require('@overleaf/logger') +const OError = require('@overleaf/o-error') +const assert = require('./assert') +const chunkStore = require('./chunk_store') +const { BlobStore } = require('./blob_store') +const BatchBlobStore = require('./batch_blob_store') +const persistChanges = require('./persist_changes') +const redisBackend = require('./chunk_store/redis') + +/** + * Persist the changes from Redis buffer to the main storage + * + * Algorithm Outline: + * 1. Get the latest chunk's endVersion from the database + * 2. Get non-persisted changes from Redis that are after this endVersion. + * 3. If no such changes, exit. + * 4. Load file blobs for these Redis changes. + * 5. Run the persistChanges() algorithm to store these changes into a new chunk(s) in GCS. + * - This must not decrease the endVersion. If changes were processed, it must advance. + * 6. Set the new persisted version (endVersion of the latest persisted chunk) in Redis. + * + * @param {string} projectId + * @throws {Error | OError} If a critical error occurs during persistence. + */ +async function persistBuffer(projectId) { + assert.projectId(projectId) + logger.debug({ projectId }, 'starting persistBuffer operation') + + // Set limits to force us to persist all of the changes. + const farFuture = new Date() + farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000) + const limits = { + maxChanges: 0, + minChangeTimestamp: farFuture, + maxChangeTimestamp: farFuture, + } + + // 1. Get the latest chunk's endVersion from GCS/main store + let endVersion + const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId) + + if (latestChunkMetadata) { + endVersion = latestChunkMetadata.endVersion + } else { + endVersion = 0 // No chunks found, start from version 0 + logger.debug({ projectId }, 'no existing chunks found in main storage') + } + + logger.debug({ projectId, endVersion }, 'got latest persisted chunk') + + // 2. Get non-persisted changes from Redis + const changesToPersist = await redisBackend.getNonPersistedChanges( + projectId, + endVersion + ) + + if (changesToPersist.length === 0) { + logger.debug( + { projectId, endVersion }, + 'no new changes in Redis buffer to persist' + ) + // No changes to persist, update the persisted version in Redis + // to match the current endVersion. This shouldn't be needed + // unless a worker failed to update the persisted version. + await redisBackend.setPersistedVersion(projectId, endVersion) + return + } + + logger.debug( + { + projectId, + endVersion, + count: changesToPersist.length, + }, + 'found changes in Redis to persist' + ) + + // 4. Load file blobs for these Redis changes. Errors will propagate. + const blobStore = new BlobStore(projectId) + const batchBlobStore = new BatchBlobStore(blobStore) + + const blobHashes = new Set() + for (const change of changesToPersist) { + change.findBlobHashes(blobHashes) + } + if (blobHashes.size > 0) { + await batchBlobStore.preload(Array.from(blobHashes)) + } + for (const change of changesToPersist) { + await change.loadFiles('lazy', blobStore) + } + + // 5. Run the persistChanges() algorithm. Errors will propagate. + logger.debug( + { + projectId, + endVersion, + changeCount: changesToPersist.length, + }, + 'calling persistChanges' + ) + + const persistResult = await persistChanges( + projectId, + changesToPersist, + limits, + endVersion + ) + + if (!persistResult || !persistResult.currentChunk) { + throw new OError( + 'persistChanges did not produce a new chunk for non-empty changes', + { + projectId, + endVersion, + changeCount: changesToPersist.length, + } + ) + } + + const newPersistedChunk = persistResult.currentChunk + const newEndVersion = newPersistedChunk.getEndVersion() + + if (newEndVersion <= endVersion) { + throw new OError( + 'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes', + { + projectId, + newEndVersion, + endVersion, + changeCount: changesToPersist.length, + } + ) + } + + logger.debug( + { + projectId, + oldVersion: endVersion, + newVersion: newEndVersion, + }, + 'successfully persisted changes from Redis to main storage' + ) + + // 6. Set the persisted version in Redis. Errors will propagate. + const status = await redisBackend.setPersistedVersion( + projectId, + newEndVersion + ) + + if (status !== 'ok') { + throw new OError('failed to update persisted version in Redis', { + projectId, + newEndVersion, + status, + }) + } + + logger.debug( + { projectId, newEndVersion }, + 'updated persisted version in Redis' + ) + + logger.debug( + { projectId, finalPersistedVersion: newEndVersion }, + 'persistBuffer operation completed successfully' + ) +} + +module.exports = { persistBuffer } diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index 5b80285eb0..95ffdc67d2 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -57,9 +57,18 @@ Timer.prototype.elapsed = function () { * @param {core.Change[]} allChanges * @param {Object} limits * @param {number} clientEndVersion + * @param {Object} options + * @param {Boolean} [options.queueChangesInRedis] + * If true, queue the changes in Redis for testing purposes. * @return {Promise.} */ -async function persistChanges(projectId, allChanges, limits, clientEndVersion) { +async function persistChanges( + projectId, + allChanges, + limits, + clientEndVersion, + options = {} +) { assert.projectId(projectId) assert.array(allChanges) assert.maybe.object(limits) @@ -289,11 +298,13 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { const numberOfChangesToPersist = oldChanges.length await loadLatestChunk() - try { - await queueChangesInRedis() - await fakePersistRedisChanges() - } catch (err) { - logger.error({ err }, 'Chunk buffer verification failed') + if (options.queueChangesInRedis) { + try { + await queueChangesInRedis() + await fakePersistRedisChanges() + } catch (err) { + logger.error({ err }, 'Chunk buffer verification failed') + } } await extendLastChunkIfPossible() await createNewChunksAsNeeded() diff --git a/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs new file mode 100644 index 0000000000..64772c4b70 --- /dev/null +++ b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs @@ -0,0 +1,338 @@ +'use strict' + +import fs from 'node:fs' +import { expect } from 'chai' +import { + Change, + Snapshot, + File, + TextOperation, + AddFileOperation, + EditFileOperation, // Added EditFileOperation +} from 'overleaf-editor-core' +import { persistBuffer } from '../../../../storage/lib/persist_buffer.js' +import chunkStore from '../../../../storage/lib/chunk_store/index.js' +import redisBackend from '../../../../storage/lib/chunk_store/redis.js' +import persistChanges from '../../../../storage/lib/persist_changes.js' +import cleanup from './support/cleanup.js' +import fixtures from './support/fixtures.js' +import testFiles from './support/test_files.js' + +describe('persistBuffer', function () { + let projectId + const initialVersion = 0 + let limitsToPersistImmediately + + before(function () { + const farFuture = new Date() + farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000) + limitsToPersistImmediately = { + minChangeTimestamp: farFuture, + maxChangeTimestamp: farFuture, + maxChunkChanges: 10, + } + }) + + beforeEach(cleanup.everything) + beforeEach(fixtures.create) + + beforeEach(async function () { + projectId = fixtures.docs.uninitializedProject.id + await chunkStore.initializeProject(projectId) + }) + + describe('with an empty initial chunk (new project)', function () { + it('should persist changes from Redis to a new chunk', async function () { + // create an initial snapshot and add the empty file `main.tex` + const HELLO_TXT = fs.readFileSync(testFiles.path('hello.txt')).toString() + + const createFile = new Change( + [new AddFileOperation('main.tex', File.fromString(HELLO_TXT))], + new Date(), + [] + ) + + await persistChanges( + projectId, + [createFile], + limitsToPersistImmediately, + 0 + ) + // Now queue some changes in Redis + const op1 = new TextOperation().insert('Hello').retain(HELLO_TXT.length) + const change1 = new Change( + [new EditFileOperation('main.tex', op1)], + new Date() + ) + + const op2 = new TextOperation() + .retain('Hello'.length) + .insert(' World') + .retain(HELLO_TXT.length) + const change2 = new Change( + [new EditFileOperation('main.tex', op2)], + new Date() + ) + + const changesToQueue = [change1, change2] + + const finalHeadVersion = initialVersion + 1 + changesToQueue.length + + const now = Date.now() + await redisBackend.queueChanges( + projectId, + new Snapshot(), // dummy snapshot + 1, + changesToQueue, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + await redisBackend.setPersistedVersion(projectId, initialVersion) + + // Persist the changes from Redis to the chunk store + await persistBuffer(projectId) + + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk).to.exist + expect(latestChunk.getStartVersion()).to.equal(initialVersion) + expect(latestChunk.getEndVersion()).to.equal(finalHeadVersion) + expect(latestChunk.getChanges().length).to.equal( + changesToQueue.length + 1 + ) + + const chunkSnapshot = latestChunk.getSnapshot() + expect(Object.keys(chunkSnapshot.getFileMap()).length).to.equal(1) + + const persistedVersionInRedis = (await redisBackend.getState(projectId)) + .persistedVersion + expect(persistedVersionInRedis).to.equal(finalHeadVersion) + + const nonPersisted = await redisBackend.getNonPersistedChanges( + projectId, + finalHeadVersion + ) + expect(nonPersisted).to.be.an('array').that.is.empty + }) + }) + + describe('with an existing chunk and new changes in Redis', function () { + it('should persist new changes from Redis, appending to existing history', async function () { + const initialContent = 'Initial document content.\n' + + const addInitialFileChange = new Change( + [new AddFileOperation('main.tex', File.fromString(initialContent))], + new Date(), + [] + ) + + await persistChanges( + projectId, + [addInitialFileChange], + limitsToPersistImmediately, + initialVersion + ) + const versionAfterInitialSetup = initialVersion + 1 // Now version is 1 + + const opForChunk1 = new TextOperation() + .retain(initialContent.length) + .insert(' First addition.') + const changesForChunk1 = [ + new Change( + [new EditFileOperation('main.tex', opForChunk1)], + new Date(), + [] + ), + ] + + await persistChanges( + projectId, + changesForChunk1, + limitsToPersistImmediately, // Original limits for this step + versionAfterInitialSetup // Correct clientEndVersion + ) + // Update persistedChunkEndVersion: 1 (from setup) + 1 (from changesForChunk1) = 2 + const persistedChunkEndVersion = + versionAfterInitialSetup + changesForChunk1.length + const contentAfterChunk1 = initialContent + ' First addition.' + + const opVersion2 = new TextOperation() + .retain(contentAfterChunk1.length) + .insert(' Second addition.') + const changeVersion2 = new Change( + [new EditFileOperation('main.tex', opVersion2)], + new Date(), + [] + ) + + const contentAfterChange2 = contentAfterChunk1 + ' Second addition.' + const opVersion3 = new TextOperation() + .retain(contentAfterChange2.length) + .insert(' Third addition.') + const changeVersion3 = new Change( + [new EditFileOperation('main.tex', opVersion3)], + new Date(), + [] + ) + + const redisChangesToPush = [changeVersion2, changeVersion3] + const finalHeadVersionAfterRedisPush = + persistedChunkEndVersion + redisChangesToPush.length + const now = Date.now() + + await redisBackend.queueChanges( + projectId, + new Snapshot(), // Use new Snapshot() like in the first test + persistedChunkEndVersion, + redisChangesToPush, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + await redisBackend.setPersistedVersion( + projectId, + persistedChunkEndVersion + ) + + await persistBuffer(projectId) + + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk).to.exist + expect(latestChunk.getStartVersion()).to.equal(0) + expect(latestChunk.getEndVersion()).to.equal( + finalHeadVersionAfterRedisPush + ) + expect(latestChunk.getChanges().length).to.equal( + persistedChunkEndVersion + redisChangesToPush.length + ) + + const persistedVersionInRedisAfter = ( + await redisBackend.getState(projectId) + ).persistedVersion + expect(persistedVersionInRedisAfter).to.equal( + finalHeadVersionAfterRedisPush + ) + + const nonPersisted = await redisBackend.getNonPersistedChanges( + projectId, + finalHeadVersionAfterRedisPush + ) + expect(nonPersisted).to.be.an('array').that.is.empty + }) + }) + + describe('when Redis has no new changes', function () { + let persistedChunkEndVersion + let changesForChunk1 + + beforeEach(async function () { + const initialContent = 'Content.' + + const addInitialFileChange = new Change( + [new AddFileOperation('main.tex', File.fromString(initialContent))], + new Date(), + [] + ) + + // Replace chunkStore.create with persistChanges + // clientEndVersion is initialVersion (0). This advances version to 1. + await persistChanges( + projectId, + [addInitialFileChange], + limitsToPersistImmediately, + initialVersion + ) + const versionAfterInitialSetup = initialVersion + 1 // Now version is 1 + + const opForChunk1 = new TextOperation() + .retain(initialContent.length) + .insert(' More.') + changesForChunk1 = [ + new Change( + [new EditFileOperation('main.tex', opForChunk1)], + new Date(), + [] + ), + ] + // Corrected persistChanges call: clientEndVersion is versionAfterInitialSetup (1) + await persistChanges( + projectId, + changesForChunk1, + limitsToPersistImmediately, // Original limits for this step + versionAfterInitialSetup // Correct clientEndVersion + ) + // Update persistedChunkEndVersion: 1 (from setup) + 1 (from changesForChunk1) = 2 + persistedChunkEndVersion = + versionAfterInitialSetup + changesForChunk1.length + }) + + it('should leave the persisted version and stored chunks unchanged', async function () { + const now = Date.now() + await redisBackend.queueChanges( + projectId, + new Snapshot(), + persistedChunkEndVersion - 1, + changesForChunk1, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + await redisBackend.setPersistedVersion( + projectId, + persistedChunkEndVersion + ) + + const chunksBefore = await chunkStore.getProjectChunks(projectId) + + await persistBuffer(projectId) + + const chunksAfter = await chunkStore.getProjectChunks(projectId) + expect(chunksAfter.length).to.equal(chunksBefore.length) + expect(chunksAfter).to.deep.equal(chunksBefore) + + const finalPersistedVersionInRedis = ( + await redisBackend.getState(projectId) + ).persistedVersion + expect(finalPersistedVersionInRedis).to.equal(persistedChunkEndVersion) + }) + + it('should update the persisted version if it is behind the chunk store end version', async function () { + const now = Date.now() + + await redisBackend.queueChanges( + projectId, + new Snapshot(), + persistedChunkEndVersion - 1, + changesForChunk1, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + // Force the persisted version in Redis to lag behind the chunk store, + // simulating the situation where a worker has persisted changes to the + // chunk store but failed to update the version in redis. + await redisBackend.setPersistedVersion( + projectId, + persistedChunkEndVersion - 1 + ) + + const chunksBefore = await chunkStore.getProjectChunks(projectId) + + // Persist buffer (which should do nothing as there are no new changes) + await persistBuffer(projectId, limitsToPersistImmediately) + + const chunksAfter = await chunkStore.getProjectChunks(projectId) + expect(chunksAfter.length).to.equal(chunksBefore.length) + expect(chunksAfter).to.deep.equal(chunksBefore) + + const finalPersistedVersionInRedis = ( + await redisBackend.getState(projectId) + ).persistedVersion + expect(finalPersistedVersionInRedis).to.equal(persistedChunkEndVersion) + }) + }) +})