diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index 59bfd81e39..a2317dd859 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -395,6 +395,7 @@ rclient.defineCommand('get_non_persisted_changes', { local persistedVersionKey = KEYS[2] local changesKey = KEYS[3] local baseVersion = tonumber(ARGV[1]) + local maxChanges = tonumber(ARGV[2]) -- Check if head version exists local headVersion = tonumber(redis.call('GET', headVersionKey)) @@ -415,9 +416,20 @@ rclient.defineCommand('get_non_persisted_changes', { return {'ok', {}} else local numChanges = headVersion - baseVersion - local changes = redis.call('LRANGE', changesKey, -numChanges, -1) - if #changes < numChanges then + local endIndex, expectedChanges + if maxChanges > 0 and maxChanges < numChanges then + -- return only the first maxChanges changes; the end index is inclusive + endIndex = -numChanges + maxChanges - 1 + expectedChanges = maxChanges + else + endIndex = -1 + expectedChanges = numChanges + end + + local changes = redis.call('LRANGE', changesKey, -numChanges, endIndex) + + if #changes < expectedChanges then -- We didn't get as many changes as we expected return {'out_of_bounds'} end @@ -433,6 +445,9 @@ rclient.defineCommand('get_non_persisted_changes', { * @param {string} projectId - The unique identifier of the project. * @param {number} baseVersion - The version on top of which the changes should * be applied. + * @param {object} [opts] + * @param {number} [opts.maxChanges] - The maximum number of changes to return. + * Defaults to 0, meaning no limit. * @returns {Promise} Changes that can be applied on top of * baseVersion. An empty array means that the project doesn't have * changes to persist. A null value means that the non-persisted @@ -440,14 +455,15 @@ rclient.defineCommand('get_non_persisted_changes', { * * @throws {Error} If Redis operations fail. */ -async function getNonPersistedChanges(projectId, baseVersion) { +async function getNonPersistedChanges(projectId, baseVersion, opts = {}) { let result try { result = await rclient.get_non_persisted_changes( keySchema.headVersion({ projectId }), keySchema.persistedVersion({ projectId }), keySchema.changes({ projectId }), - baseVersion.toString() + baseVersion.toString(), + opts.maxChanges ?? 0 ) } catch (err) { metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { diff --git a/services/history-v1/storage/lib/persist_buffer.js b/services/history-v1/storage/lib/persist_buffer.js index d562388f87..68b71e148f 100644 --- a/services/history-v1/storage/lib/persist_buffer.js +++ b/services/history-v1/storage/lib/persist_buffer.js @@ -12,6 +12,8 @@ const persistChanges = require('./persist_changes') const resyncProject = require('./resync_project') const redisBackend = require('./chunk_store/redis') +const PERSIST_BATCH_SIZE = 50 + /** * Persist the changes from Redis buffer to the main storage * @@ -42,16 +44,147 @@ async function persistBuffer(projectId, limits) { endVersion = 0 // No chunks found, start from version 0 logger.debug({ projectId }, 'no existing chunks found in main storage') } + const originalEndVersion = endVersion logger.debug({ projectId, endVersion }, 'got latest persisted chunk') - // 2. Get non-persisted changes from Redis - const changesToPersist = await redisBackend.getNonPersistedChanges( - projectId, - endVersion - ) + // Process changes in batches + let numberOfChangesPersisted = 0 + let currentChunk = null + let resyncNeeded = false + let resyncChangesWerePersisted = false + while (true) { + // 2. Get non-persisted changes from Redis + const changesToPersist = await redisBackend.getNonPersistedChanges( + projectId, + endVersion, + { maxChanges: PERSIST_BATCH_SIZE } + ) - if (changesToPersist.length === 0) { + if (changesToPersist.length === 0) { + break + } + + logger.debug( + { + projectId, + endVersion, + count: changesToPersist.length, + }, + 'found changes in Redis to persist' + ) + + // 4. Load file blobs for these Redis changes. Errors will propagate. + const blobStore = new BlobStore(projectId) + const batchBlobStore = new BatchBlobStore(blobStore) + + const blobHashes = new Set() + for (const change of changesToPersist) { + change.findBlobHashes(blobHashes) + } + if (blobHashes.size > 0) { + await batchBlobStore.preload(Array.from(blobHashes)) + } + for (const change of changesToPersist) { + await change.loadFiles('lazy', blobStore) + } + + // 5. Run the persistChanges() algorithm. Errors will propagate. + logger.debug( + { + projectId, + endVersion, + changeCount: changesToPersist.length, + }, + 'calling persistChanges' + ) + + const persistResult = await persistChanges( + projectId, + changesToPersist, + limits, + endVersion + ) + + if (!persistResult || !persistResult.currentChunk) { + metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' }) + throw new OError( + 'persistChanges did not produce a new chunk for non-empty changes', + { + projectId, + endVersion, + changeCount: changesToPersist.length, + } + ) + } + + currentChunk = persistResult.currentChunk + const newEndVersion = currentChunk.getEndVersion() + + if (newEndVersion <= endVersion) { + metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' }) + throw new OError( + 'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes', + { + projectId, + newEndVersion, + endVersion, + changeCount: changesToPersist.length, + } + ) + } + + logger.debug( + { + projectId, + oldVersion: endVersion, + newVersion: newEndVersion, + }, + 'successfully persisted changes from Redis to main storage' + ) + + // 6. Set the persisted version in Redis. Errors will propagate. + const status = await redisBackend.setPersistedVersion( + projectId, + newEndVersion + ) + + if (status !== 'ok') { + metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' }) + throw new OError('failed to update persisted version in Redis', { + projectId, + newEndVersion, + status, + }) + } + + logger.debug( + { projectId, newEndVersion }, + 'updated persisted version in Redis' + ) + numberOfChangesPersisted += persistResult.numberOfChangesPersisted + endVersion = newEndVersion + + // Check if a resync might be needed + if (persistResult.resyncNeeded) { + resyncNeeded = true + } + + if ( + changesToPersist.some( + change => change.getOrigin()?.getKind() === 'history-resync' + ) + ) { + resyncChangesWerePersisted = true + } + + if (persistResult.numberOfChangesPersisted < PERSIST_BATCH_SIZE) { + // We reached the end of available changes + break + } + } + + if (numberOfChangesPersisted === 0) { logger.debug( { projectId, endVersion }, 'no new changes in Redis buffer to persist' @@ -61,124 +194,16 @@ async function persistBuffer(projectId, limits) { // to match the current endVersion. This shouldn't be needed // unless a worker failed to update the persisted version. await redisBackend.setPersistedVersion(projectId, endVersion) - const { chunk } = await chunkStore.loadByChunkRecord( - projectId, - latestChunkMetadata + } else { + logger.debug( + { projectId, finalPersistedVersion: endVersion }, + 'persistBuffer operation completed successfully' ) - // Return the result in the same format as persistChanges - // so that the caller can handle it uniformly. - return { - numberOfChangesPersisted: changesToPersist.length, - originalEndVersion: endVersion, - currentChunk: chunk, - } + metrics.inc('persist_buffer', 1, { status: 'persisted' }) } - logger.debug( - { - projectId, - endVersion, - count: changesToPersist.length, - }, - 'found changes in Redis to persist' - ) - - // 4. Load file blobs for these Redis changes. Errors will propagate. - const blobStore = new BlobStore(projectId) - const batchBlobStore = new BatchBlobStore(blobStore) - - const blobHashes = new Set() - for (const change of changesToPersist) { - change.findBlobHashes(blobHashes) - } - if (blobHashes.size > 0) { - await batchBlobStore.preload(Array.from(blobHashes)) - } - for (const change of changesToPersist) { - await change.loadFiles('lazy', blobStore) - } - - // 5. Run the persistChanges() algorithm. Errors will propagate. - logger.debug( - { - projectId, - endVersion, - changeCount: changesToPersist.length, - }, - 'calling persistChanges' - ) - - const persistResult = await persistChanges( - projectId, - changesToPersist, - limits, - endVersion - ) - - if (!persistResult || !persistResult.currentChunk) { - metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' }) - throw new OError( - 'persistChanges did not produce a new chunk for non-empty changes', - { - projectId, - endVersion, - changeCount: changesToPersist.length, - } - ) - } - - const newPersistedChunk = persistResult.currentChunk - const newEndVersion = newPersistedChunk.getEndVersion() - - if (newEndVersion <= endVersion) { - metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' }) - throw new OError( - 'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes', - { - projectId, - newEndVersion, - endVersion, - changeCount: changesToPersist.length, - } - ) - } - - logger.debug( - { - projectId, - oldVersion: endVersion, - newVersion: newEndVersion, - }, - 'successfully persisted changes from Redis to main storage' - ) - - // 6. Set the persisted version in Redis. Errors will propagate. - const status = await redisBackend.setPersistedVersion( - projectId, - newEndVersion - ) - - if (status !== 'ok') { - metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' }) - throw new OError('failed to update persisted version in Redis', { - projectId, - newEndVersion, - status, - }) - } - - logger.debug( - { projectId, newEndVersion }, - 'updated persisted version in Redis' - ) - - // 7. Resync the project if content hash validation failed - if (limits.autoResync && persistResult.resyncNeeded) { - if ( - changesToPersist.some( - change => change.getOrigin()?.getKind() === 'history-resync' - ) - ) { + if (limits.autoResync && resyncNeeded) { + if (resyncChangesWerePersisted) { // To avoid an infinite loop, do not resync if the current batch of // changes contains a history resync. logger.warn( @@ -193,14 +218,20 @@ async function persistBuffer(projectId, limits) { } } - logger.debug( - { projectId, finalPersistedVersion: newEndVersion }, - 'persistBuffer operation completed successfully' - ) + if (currentChunk == null) { + const { chunk } = await chunkStore.loadByChunkRecord( + projectId, + latestChunkMetadata + ) + currentChunk = chunk + } - metrics.inc('persist_buffer', 1, { status: 'persisted' }) - - return persistResult + return { + numberOfChangesPersisted, + originalEndVersion, + currentChunk, + resyncNeeded, + } } module.exports = persistBuffer diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index d34cd701d0..8abe7299a0 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -541,7 +541,7 @@ describe('chunk buffer Redis backend', function () { expect(nonPersistedChanges).to.deep.equal(changes) }) - it('should return part of the changes if requested', async function () { + it('should return part of the changes following a given base version if requested', async function () { const nonPersistedChanges = await redisBackend.getNonPersistedChanges( projectId, 3 @@ -549,6 +549,24 @@ describe('chunk buffer Redis backend', function () { expect(nonPersistedChanges).to.deep.equal(changes.slice(1)) }) + it('should limit the number of changes returned if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 2, + { maxChanges: 2 } + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(0, 2)) + }) + + it('should return all changes if limit is not reached', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 3, + { maxChanges: 10 } + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(1)) + }) + it('should error if the base version requested is too low', async function () { await expect( redisBackend.getNonPersistedChanges(projectId, 0) diff --git a/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs index 138a70e626..64eb4efcb1 100644 --- a/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs +++ b/services/history-v1/test/acceptance/js/storage/persist_buffer.test.mjs @@ -12,6 +12,7 @@ import { } from 'overleaf-editor-core' import persistBuffer from '../../../../storage/lib/persist_buffer.js' import chunkStore from '../../../../storage/lib/chunk_store/index.js' +import { BlobStore } from '../../../../storage/lib/blob_store/index.js' import redisBackend from '../../../../storage/lib/chunk_store/redis.js' import persistChanges from '../../../../storage/lib/persist_changes.js' import cleanup from './support/cleanup.js' @@ -22,6 +23,7 @@ describe('persistBuffer', function () { let projectId const initialVersion = 0 let limitsToPersistImmediately + let blobStore before(function () { const farFuture = new Date() @@ -39,6 +41,7 @@ describe('persistBuffer', function () { beforeEach(async function () { projectId = fixtures.docs.uninitializedProject.id await chunkStore.initializeProject(projectId) + blobStore = new BlobStore(projectId) }) describe('with an empty initial chunk (new project)', function () { @@ -340,6 +343,7 @@ describe('persistBuffer', function () { numberOfChangesPersisted: 0, originalEndVersion: persistedChunkEndVersion, currentChunk, + resyncNeeded: false, }) const chunksAfter = await chunkStore.getProjectChunks(projectId) @@ -389,6 +393,7 @@ describe('persistBuffer', function () { numberOfChangesPersisted: 0, originalEndVersion: persistedChunkEndVersion, currentChunk, + resyncNeeded: false, }) const chunksAfter = await chunkStore.getProjectChunks(projectId) @@ -516,4 +521,70 @@ describe('persistBuffer', function () { expect(nonPersisted).to.deep.equal([change3]) }) }) + + describe('with lots of changes to persist', function () { + it('should persist all changes', async function () { + const changes = [] + // Create an initial file with some content + const blob = await blobStore.putString('') + changes.push( + new Change( + [new AddFileOperation('main.tex', File.createLazyFromBlobs(blob))], + new Date(), + [] + ) + ) + + for (let i = 0; i < 500; i++) { + const op = new TextOperation().retain(i).insert('x') + changes.push( + new Change([new EditFileOperation('main.tex', op)], new Date()) + ) + } + + const now = Date.now() + await redisBackend.queueChanges( + projectId, + new Snapshot(), // dummy snapshot + 0, // startVersion for queued changes + changes, + { + persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS, + expireTime: now + redisBackend.PROJECT_TTL_MS, + } + ) + + const expectedEndVersion = 501 + const persistResult = await persistBuffer( + projectId, + limitsToPersistImmediately + ) + expect(persistResult.numberOfChangesPersisted).to.equal( + expectedEndVersion + ) + expect(persistResult.originalEndVersion).to.equal(0) + expect(persistResult.resyncNeeded).to.be.false + + // Check the latest persisted chunk + const latestChunk = await chunkStore.loadLatest(projectId, { + persistedOnly: true, + }) + expect(latestChunk).to.exist + expect(latestChunk.getEndVersion()).to.equal(expectedEndVersion) + + // Check that chunk returned by persistBuffer matches the latest chunk + expect(persistResult.currentChunk).to.deep.equal(latestChunk) + + // Check persisted version in Redis + const state = await redisBackend.getState(projectId) + expect(state.persistedVersion).to.equal(expectedEndVersion) + + // Check non-persisted changes in Redis + const nonPersisted = await redisBackend.getNonPersistedChanges( + projectId, + expectedEndVersion + ) + expect(nonPersisted).to.deep.equal([]) + }) + }) })