From a61afe06b302abc14f3d0fda8f1293594a9a6df7 Mon Sep 17 00:00:00 2001 From: Eric Mc Sween <5454374+emcsween@users.noreply.github.com> Date: Thu, 8 May 2025 11:27:17 -0400 Subject: [PATCH] Merge pull request #25306 from overleaf/em-redis-buffer-read-operations Add changes from Redis when reading chunks from the chunk store GitOrigin-RevId: c0ebf0669b91eb2efc5d1091d025e81efdff9fe4 --- .../api/controllers/project_import.js | 4 +- .../storage/lib/backupGenerator.mjs | 3 +- .../storage/lib/chunk_store/errors.js | 2 + .../storage/lib/chunk_store/index.js | 82 +++- .../storage/lib/chunk_store/redis.js | 87 +++-- .../history-v1/storage/lib/persist_changes.js | 12 +- .../storage/scripts/recover_doc_versions.js | 2 +- services/history-v1/storage/scripts/show.mjs | 25 +- .../storage/tasks/fix_duplicate_versions.js | 2 +- .../acceptance/js/api/backupVerifier.test.mjs | 2 +- .../acceptance/js/storage/chunk_store.test.js | 127 +++++- .../storage/chunk_store_redis_backend.test.js | 363 ++++++++++-------- 12 files changed, 487 insertions(+), 224 deletions(-) diff --git a/services/history-v1/api/controllers/project_import.js b/services/history-v1/api/controllers/project_import.js index 9d47fe06a9..edffb19a25 100644 --- a/services/history-v1/api/controllers/project_import.js +++ b/services/history-v1/api/controllers/project_import.js @@ -95,7 +95,9 @@ async function importChanges(req, res, next) { } async function buildResultSnapshot(resultChunk) { - const chunk = resultChunk || (await chunkStore.loadLatest(projectId)) + const chunk = + resultChunk || + (await chunkStore.loadLatest(projectId, { persistedOnly: true })) const snapshot = chunk.getSnapshot() snapshot.applyAll(chunk.getChanges()) const rawSnapshot = await snapshot.store(hashCheckBlobStore) diff --git a/services/history-v1/storage/lib/backupGenerator.mjs b/services/history-v1/storage/lib/backupGenerator.mjs index 4c18929d54..d8f1b0e99a 100644 --- a/services/history-v1/storage/lib/backupGenerator.mjs +++ b/services/history-v1/storage/lib/backupGenerator.mjs @@ -31,7 +31,8 @@ async function lookBehindForSeenBlobs( // so we find the set of backed up blobs from the previous chunk const previousChunk = await chunkStore.loadAtVersion( projectId, - lastBackedUpVersion + lastBackedUpVersion, + { persistedOnly: true } ) const previousChunkHistory = previousChunk.getHistory() previousChunkHistory.findBlobHashes(seenBlobs) diff --git a/services/history-v1/storage/lib/chunk_store/errors.js b/services/history-v1/storage/lib/chunk_store/errors.js index fc37dbe2a1..75b830f9a0 100644 --- a/services/history-v1/storage/lib/chunk_store/errors.js +++ b/services/history-v1/storage/lib/chunk_store/errors.js @@ -4,10 +4,12 @@ class ChunkVersionConflictError extends OError {} class BaseVersionConflictError extends OError {} class JobNotFoundError extends OError {} class JobNotReadyError extends OError {} +class VersionOutOfBoundsError extends OError {} module.exports = { ChunkVersionConflictError, BaseVersionConflictError, JobNotFoundError, JobNotReadyError, + VersionOutOfBoundsError, } diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index f75c017552..89efaf87ca 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -32,7 +32,15 @@ const { BlobStore } = require('../blob_store') const { historyStore } = require('../history_store') const mongoBackend = require('./mongo') const postgresBackend = require('./postgres') -const { ChunkVersionConflictError } = require('./errors') +const redisBackend = require('./redis') +const { + ChunkVersionConflictError, + VersionOutOfBoundsError, +} = require('./errors') + +/** + * @import { Change } from 'overleaf-editor-core' + */ const DEFAULT_DELETE_BATCH_SIZE = parseInt(config.get('maxDeleteKeys'), 10) const DEFAULT_DELETE_TIMEOUT_SECS = 3000 // 50 minutes @@ -103,12 +111,23 @@ async function loadLatestRaw(projectId, opts) { * Load the latest Chunk stored for a project, including blob metadata. * * @param {string} projectId - * @return {Promise.} + * @param {object} [opts] + * @param {boolean} [opts.persistedOnly] - only include persisted changes + * @return {Promise} */ -async function loadLatest(projectId) { +async function loadLatest(projectId, opts = {}) { const chunkRecord = await loadLatestRaw(projectId) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) + + if (!opts.persistedOnly) { + const nonPersistedChanges = await getChunkExtension( + projectId, + chunkRecord.endVersion + ) + history.pushChanges(nonPersistedChanges) + } + const blobStore = new BlobStore(projectId) const batchBlobStore = new BatchBlobStore(blobStore) await lazyLoadHistoryFiles(history, batchBlobStore) @@ -117,8 +136,13 @@ async function loadLatest(projectId) { /** * Load the the chunk that contains the given version, including blob metadata. + * + * @param {string} projectId + * @param {number} version + * @param {object} [opts] + * @param {boolean} [opts.persistedOnly] - only include persisted changes */ -async function loadAtVersion(projectId, version) { +async function loadAtVersion(projectId, version, opts = {}) { assert.projectId(projectId, 'bad projectId') assert.integer(version, 'bad version') @@ -129,6 +153,15 @@ async function loadAtVersion(projectId, version) { const chunkRecord = await backend.getChunkForVersion(projectId, version) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) + + if (!opts.persistedOnly) { + const nonPersistedChanges = await getChunkExtension( + projectId, + chunkRecord.endVersion + ) + history.pushChanges(nonPersistedChanges) + } + await lazyLoadHistoryFiles(history, batchBlobStore) return new Chunk(history, chunkRecord.endVersion - history.countChanges()) } @@ -136,8 +169,13 @@ async function loadAtVersion(projectId, version) { /** * Load the chunk that contains the version that was current at the given * timestamp, including blob metadata. + * + * @param {string} projectId + * @param {Date} timestamp + * @param {object} [opts] + * @param {boolean} [opts.persistedOnly] - only include persisted changes */ -async function loadAtTimestamp(projectId, timestamp) { +async function loadAtTimestamp(projectId, timestamp, opts = {}) { assert.projectId(projectId, 'bad projectId') assert.date(timestamp, 'bad timestamp') @@ -148,6 +186,15 @@ async function loadAtTimestamp(projectId, timestamp) { const chunkRecord = await backend.getChunkForTimestamp(projectId, timestamp) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) + + if (!opts.persistedOnly) { + const nonPersistedChanges = await getChunkExtension( + projectId, + chunkRecord.endVersion + ) + history.pushChanges(nonPersistedChanges) + } + await lazyLoadHistoryFiles(history, batchBlobStore) return new Chunk(history, chunkRecord.endVersion - history.countChanges()) } @@ -418,6 +465,31 @@ function getBackend(projectId) { } } +/** + * Gets non-persisted changes that could extend a chunk + * + * @param {string} projectId + * @param {number} chunkEndVersion - end version of the chunk to extend + * + * @return {Promise} + */ +async function getChunkExtension(projectId, chunkEndVersion) { + try { + const changes = await redisBackend.getNonPersistedChanges( + projectId, + chunkEndVersion + ) + return changes + } catch (err) { + if (err instanceof VersionOutOfBoundsError) { + // If we can't extend the chunk, simply return an empty list + return [] + } else { + throw err + } + } +} + class AlreadyInitialized extends OError { constructor(projectId) { super('Project is already initialized', { projectId }) diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index 58ee0d15a7..6858127a0c 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -2,13 +2,14 @@ const metrics = require('@overleaf/metrics') const OError = require('@overleaf/o-error') -const redis = require('../redis') -const rclient = redis.rclientHistory // const { Change, Snapshot } = require('overleaf-editor-core') +const redis = require('../redis') +const rclient = redis.rclientHistory const { BaseVersionConflictError, JobNotFoundError, JobNotReadyError, + VersionOutOfBoundsError, } = require('./errors') const MAX_PERSISTED_CHANGES = 100 // Maximum number of persisted changes to keep in the buffer for clients that need to catch up. @@ -393,34 +394,36 @@ rclient.defineCommand('get_non_persisted_changes', { local headVersionKey = KEYS[1] local persistedVersionKey = KEYS[2] local changesKey = KEYS[3] + local baseVersion = tonumber(ARGV[1]) -- Check if head version exists local headVersion = tonumber(redis.call('GET', headVersionKey)) if not headVersion then - return {} + return {'not_found'} end -- Check if persisted version exists local persistedVersion = tonumber(redis.call('GET', persistedVersionKey)) - - local startIndex if not persistedVersion then - -- None of the changes in Redis have been persisted - startIndex = 0 - elseif persistedVersion > headVersion then - -- This should never happen - return redis.error_reply('HEAD_VERSION_BEHIND_PERSISTED_VERSION') - elseif persistedVersion == headVersion then - return {} - else - -- startIndex is negative and counts from the end of the list of changes - startIndex = persistedVersion - headVersion + local changesCount = tonumber(redis.call('LLEN', changesKey)) + persistedVersion = headVersion - changesCount end - -- Get changes using LRANGE - local changes = redis.call('LRANGE', changesKey, startIndex, -1) + if baseVersion < persistedVersion or baseVersion > headVersion then + return {'out_of_bounds'} + elseif baseVersion == headVersion then + return {'ok', {}} + else + local numChanges = headVersion - baseVersion + local changes = redis.call('LRANGE', changesKey, -numChanges, -1) - return changes + if #changes < numChanges then + -- We didn't get as many changes as we expected + return {'out_of_bounds'} + end + + return {'ok', changes} + end `, }) @@ -428,32 +431,52 @@ rclient.defineCommand('get_non_persisted_changes', { * Retrieves non-persisted changes for a project from Redis. * * @param {string} projectId - The unique identifier of the project. - * @returns {Promise} A Promise that resolves to an array of non-persisted Change objects. + * @param {number} baseVersion - The version on top of which the changes should + * be applied. + * @returns {Promise} Changes that can be applied on top of + * baseVersion. An empty array means that the project doesn't have + * changes to persist. A null value means that the non-persisted + * changes can't be applied to the given base version. + * * @throws {Error} If Redis operations fail. */ -async function getNonPersistedChanges(projectId) { +async function getNonPersistedChanges(projectId, baseVersion) { + let result try { - const keys = [ + result = await rclient.get_non_persisted_changes( keySchema.headVersion({ projectId }), keySchema.persistedVersion({ projectId }), keySchema.changes({ projectId }), - ] - - const result = await rclient.get_non_persisted_changes(keys) - - // Parse the changes - const changes = result?.map(json => Change.fromRaw(JSON.parse(json))) ?? [] - - metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { - status: 'success', - }) - return changes + baseVersion.toString() + ) } catch (err) { metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { status: 'error', }) throw err } + + const status = result[0] + metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, { + status, + }) + + if (status === 'ok') { + return result[1].map(json => Change.fromRaw(JSON.parse(json))) + } else if (status === 'not_found') { + return [] + } else if (status === 'out_of_bounds') { + throw new VersionOutOfBoundsError( + "Non-persisted changes can't be applied to base version", + { projectId, baseVersion } + ) + } else { + throw new OError('unknown status for get_non_persisted_changes', { + projectId, + baseVersion, + status, + }) + } } rclient.defineCommand('set_persisted_version', { diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index 20080d20a6..09d41382d4 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -185,7 +185,9 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { } async function loadLatestChunk() { - const latestChunk = await chunkStore.loadLatest(projectId) + const latestChunk = await chunkStore.loadLatest(projectId, { + persistedOnly: true, + }) currentChunk = latestChunk originalEndVersion = latestChunk.getEndVersion() @@ -217,8 +219,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { } async function fakePersistRedisChanges() { - const nonPersistedChanges = - await redisBackend.getNonPersistedChanges(projectId) + const baseVersion = currentChunk.getEndVersion() + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + baseVersion + ) if ( serializeChanges(nonPersistedChanges) === @@ -232,7 +237,6 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { }) } - const baseVersion = currentChunk.getEndVersion() const persistedVersion = baseVersion + nonPersistedChanges.length await redisBackend.setPersistedVersion(projectId, persistedVersion) } diff --git a/services/history-v1/storage/scripts/recover_doc_versions.js b/services/history-v1/storage/scripts/recover_doc_versions.js index f121c60afd..650fb20324 100644 --- a/services/history-v1/storage/scripts/recover_doc_versions.js +++ b/services/history-v1/storage/scripts/recover_doc_versions.js @@ -279,7 +279,7 @@ async function processProject(project, summary) { async function getHistoryDocVersions(project) { const historyId = project.overleaf.history.id - const chunk = await chunkStore.loadLatest(historyId) + const chunk = await chunkStore.loadLatest(historyId, { persistedOnly: true }) if (chunk == null) { return [] } diff --git a/services/history-v1/storage/scripts/show.mjs b/services/history-v1/storage/scripts/show.mjs index b4ae1664e3..51697dc38f 100644 --- a/services/history-v1/storage/scripts/show.mjs +++ b/services/history-v1/storage/scripts/show.mjs @@ -48,7 +48,16 @@ async function listChunks(historyId) { async function fetchChunkLocal(historyId, version) { const chunkRecord = await getChunkMetadataForVersion(historyId, version) const chunk = await loadAtVersion(historyId, version) - return { key: version, chunk, metadata: chunkRecord, source: 'local storage' } + const persistedChunk = await loadAtVersion(historyId, version, { + persistedOnly: true, + }) + return { + key: version, + chunk, + persistedChunk, + metadata: chunkRecord, + source: 'local storage', + } } async function fetchChunkRemote(historyId, version) { @@ -73,7 +82,7 @@ async function fetchChunkRemote(historyId, version) { } async function displayChunk(historyId, version, options) { - const { key, chunk, metadata, source } = await (options.remote + const { key, chunk, persistedChunk, metadata, source } = await (options.remote ? fetchChunkRemote(historyId, version) : fetchChunkLocal(historyId, version)) console.log('Source:', source) @@ -81,6 +90,18 @@ async function displayChunk(historyId, version, options) { console.log('Key', key) // console.log('Number of changes', chunk.getChanges().length) console.log(JSON.stringify(chunk)) + if ( + persistedChunk && + persistedChunk.getChanges().length !== chunk.getChanges().length + ) { + console.warn( + 'Warning: Local chunk and persisted chunk have different number of changes:', + chunk.getChanges().length, + 'local (including buffer) vs', + persistedChunk.getChanges().length, + 'persisted' + ) + } } async function fetchBlobRemote(historyId, blobHash) { diff --git a/services/history-v1/storage/tasks/fix_duplicate_versions.js b/services/history-v1/storage/tasks/fix_duplicate_versions.js index a7db4b2765..ae9dcb4965 100755 --- a/services/history-v1/storage/tasks/fix_duplicate_versions.js +++ b/services/history-v1/storage/tasks/fix_duplicate_versions.js @@ -34,7 +34,7 @@ async function main() { async function processProject(projectId, save) { console.log(`Project ${projectId}:`) - const chunk = await chunkStore.loadLatest(projectId) + const chunk = await chunkStore.loadLatest(projectId, { persistedOnly: true }) let numChanges = 0 numChanges += removeDuplicateProjectVersions(chunk) numChanges += removeDuplicateDocVersions(chunk) diff --git a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs index 2b4001a9f0..8c351a2652 100644 --- a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs +++ b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs @@ -149,7 +149,7 @@ async function addFileInNewChunk( historyId, { creationDate = new Date() } ) { - const chunk = await chunkStore.loadLatest(historyId) + const chunk = await chunkStore.loadLatest(historyId, { persistedOnly: true }) const operation = Operation.addFile( `${historyId}.txt`, File.fromString(fileContents) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js index 50341fdcb5..60710b2407 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js @@ -18,7 +18,8 @@ const { EditFileOperation, TextOperation, } = require('overleaf-editor-core') -const { chunkStore, historyStore } = require('../../../../storage') +const { chunkStore, historyStore, BlobStore } = require('../../../../storage') +const redisBackend = require('../../../../storage/lib/chunk_store/redis') describe('chunkStore', function () { beforeEach(cleanup.everything) @@ -42,6 +43,7 @@ describe('chunkStore', function () { describe(scenario.description, function () { let projectId let projectRecord + let blobStore beforeEach(async function () { projectId = await scenario.createProject() @@ -49,6 +51,7 @@ describe('chunkStore', function () { projectRecord = await projects.insertOne({ overleaf: { history: { id: scenario.idMapping(projectId) } }, }) + blobStore = new BlobStore(projectId) }) it('loads empty latest chunk for a new project', async function () { @@ -62,10 +65,11 @@ describe('chunkStore', function () { const pendingChangeTimestamp = new Date('2014-01-01T00:00:00') const lastChangeTimestamp = new Date('2015-01-01T00:00:00') beforeEach(async function () { + const blob = await blobStore.putString('abc') const chunk = makeChunk( [ makeChange( - Operation.addFile('main.tex', File.fromString('abc')), + Operation.addFile('main.tex', File.createLazyFromBlobs(blob)), lastChangeTimestamp ), ], @@ -98,8 +102,11 @@ describe('chunkStore', function () { beforeEach(async function () { const chunk = await chunkStore.loadLatest(projectId) const oldEndVersion = chunk.getEndVersion() + const blob = await blobStore.putString('') const changes = [ - makeChange(Operation.addFile(testPathname, File.fromString(''))), + makeChange( + Operation.addFile(testPathname, File.createLazyFromBlobs(blob)) + ), makeChange(Operation.editFile(testPathname, testTextOperation)), ] lastChangeTimestamp = changes[1].getTimestamp() @@ -181,14 +188,15 @@ describe('chunkStore', function () { let firstChunk, secondChunk, thirdChunk beforeEach(async function () { + const blob = await blobStore.putString('') firstChunk = makeChunk( [ makeChange( - Operation.addFile('foo.tex', File.fromString('')), + Operation.addFile('foo.tex', File.createLazyFromBlobs(blob)), new Date(firstChunkTimestamp - 5000) ), makeChange( - Operation.addFile('bar.tex', File.fromString('')), + Operation.addFile('bar.tex', File.createLazyFromBlobs(blob)), firstChunkTimestamp ), ], @@ -205,11 +213,11 @@ describe('chunkStore', function () { secondChunk = makeChunk( [ makeChange( - Operation.addFile('baz.tex', File.fromString('')), + Operation.addFile('baz.tex', File.createLazyFromBlobs(blob)), new Date(secondChunkTimestamp - 5000) ), makeChange( - Operation.addFile('qux.tex', File.fromString('')), + Operation.addFile('qux.tex', File.createLazyFromBlobs(blob)), secondChunkTimestamp ), ], @@ -221,7 +229,7 @@ describe('chunkStore', function () { thirdChunk = makeChunk( [ makeChange( - Operation.addFile('quux.tex', File.fromString('')), + Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)), thirdChunkTimestamp ), ], @@ -317,11 +325,15 @@ describe('chunkStore', function () { let newChunk beforeEach(async function () { + const blob = await blobStore.putString('') newChunk = makeChunk( [ ...thirdChunk.getChanges(), makeChange( - Operation.addFile('onemore.tex', File.fromString('')), + Operation.addFile( + 'onemore.tex', + File.createLazyFromBlobs(blob) + ), thirdChunkTimestamp ), ], @@ -368,6 +380,79 @@ describe('chunkStore', function () { }) }) + describe('with changes queued in the Redis buffer', function () { + let queuedChanges + + beforeEach(async function () { + const snapshot = thirdChunk.getSnapshot() + snapshot.applyAll(thirdChunk.getChanges()) + const blob = await blobStore.putString('zzz') + queuedChanges = [ + makeChange( + Operation.addFile( + 'in-redis.tex', + File.createLazyFromBlobs(blob) + ), + new Date() + ), + ] + await redisBackend.queueChanges( + projectId, + snapshot, + thirdChunk.getEndVersion(), + queuedChanges + ) + }) + + it('includes the queued changes when getting the latest chunk', async function () { + const chunk = await chunkStore.loadLatest(projectId) + const expectedChanges = thirdChunk + .getChanges() + .concat(queuedChanges) + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + }) + + it('includes the queued changes when getting the latest chunk by timestamp', async function () { + const chunk = await chunkStore.loadAtTimestamp( + projectId, + thirdChunkTimestamp + ) + const expectedChanges = thirdChunk + .getChanges() + .concat(queuedChanges) + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + }) + + it("doesn't include the queued changes when getting another chunk by timestamp", async function () { + const chunk = await chunkStore.loadAtTimestamp( + projectId, + secondChunkTimestamp + ) + const expectedChanges = secondChunk.getChanges() + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + }) + + it('includes the queued changes when getting the latest chunk by version', async function () { + const chunk = await chunkStore.loadAtVersion( + projectId, + thirdChunk.getEndVersion() + ) + const expectedChanges = thirdChunk + .getChanges() + .concat(queuedChanges) + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + }) + + it("doesn't include the queued changes when getting another chunk by version", async function () { + const chunk = await chunkStore.loadAtVersion( + projectId, + secondChunk.getEndVersion() + ) + const expectedChanges = secondChunk.getChanges() + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + }) + }) + describe('when iterating the chunks with getProjectChunksFromVersion', function () { // The first chunk has startVersion:0 and endVersion:2 for (let startVersion = 0; startVersion <= 2; startVersion++) { @@ -470,8 +555,11 @@ describe('chunkStore', function () { let chunk = await chunkStore.loadLatest(projectId) expect(chunk.getEndVersion()).to.equal(oldEndVersion) + const blob = await blobStore.putString('') const changes = [ - makeChange(Operation.addFile(testPathname, File.fromString(''))), + makeChange( + Operation.addFile(testPathname, File.createLazyFromBlobs(blob)) + ), makeChange(Operation.editFile(testPathname, testTextOperation)), ] chunk.pushChanges(changes) @@ -487,9 +575,12 @@ describe('chunkStore', function () { describe('version checks', function () { beforeEach(async function () { // Create a chunk with start version 0, end version 3 + const blob = await blobStore.putString('abc') const chunk = makeChunk( [ - makeChange(Operation.addFile('main.tex', File.fromString('abc'))), + makeChange( + Operation.addFile('main.tex', File.createLazyFromBlobs(blob)) + ), makeChange( Operation.editFile( 'main.tex', @@ -509,8 +600,13 @@ describe('chunkStore', function () { }) it('refuses to create a chunk with the same start version', async function () { + const blob = await blobStore.putString('abc') const chunk = makeChunk( - [makeChange(Operation.addFile('main.tex', File.fromString('abc')))], + [ + makeChange( + Operation.addFile('main.tex', File.createLazyFromBlobs(blob)) + ), + ], 0 ) await expect(chunkStore.create(projectId, chunk)).to.be.rejectedWith( @@ -519,8 +615,13 @@ describe('chunkStore', function () { }) it("allows creating chunks that don't have version conflicts", async function () { + const blob = await blobStore.putString('abc') const chunk = makeChunk( - [makeChange(Operation.addFile('main.tex', File.fromString('abc')))], + [ + makeChange( + Operation.addFile('main.tex', File.createLazyFromBlobs(blob)) + ), + ], 3 ) await chunkStore.create(projectId, chunk) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index 91b7e3a5f4..5e226ff9e2 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -12,6 +12,7 @@ const redisBackend = require('../../../../storage/lib/chunk_store/redis') const { JobNotReadyError, JobNotFoundError, + VersionOutOfBoundsError, } = require('../../../../storage/lib/chunk_store/errors') const redis = require('../../../../storage/lib/redis') const rclient = redis.rclientHistory @@ -509,189 +510,191 @@ describe('chunk buffer Redis backend', function () { }) describe('getNonPersistedChanges', function () { - it('should return empty array when project does not exist', async function () { - const changes = await redisBackend.getNonPersistedChanges(projectId) - expect(changes).to.be.an('array').that.is.empty + describe('project not loaded', function () { + it('should return empty array', async function () { + const changes = await redisBackend.getNonPersistedChanges(projectId, 0) + expect(changes).to.be.an('array').that.is.empty + }) + + it('should handle any base version', async function () { + const changes = await redisBackend.getNonPersistedChanges(projectId, 2) + expect(changes).to.be.an('array').that.is.empty + }) }) - it('should return all changes when persisted version is not set', async function () { - const changes = [makeChange(), makeChange(), makeChange()] - queueChanges(projectId, changes) + describe('project never persisted', function () { + let changes - const nonPersistedChanges = - await redisBackend.getNonPersistedChanges(projectId) - expect(nonPersistedChanges.map(change => change.toRaw())).to.deep.equal( - changes.map(change => change.toRaw()) - ) + beforeEach(async function () { + changes = await setupState(projectId, { + headVersion: 5, + persistedVersion: null, + changes: 3, + }) + }) + + it('should return all changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 2 + ) + expect(nonPersistedChanges).to.deep.equal(changes) + }) + + it('should return part of the changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 3 + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(1)) + }) + + it('should error if the base version requested is too low', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 0) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) + + it('should return an empty array if the base version is the head version', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 5 + ) + expect(nonPersistedChanges).to.deep.equal([]) + }) + + it('should error if the base version requested is too high', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 6) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) }) - it('should return empty array when persisted version equals head version', async function () { - // Set both head and persisted versions to be equal - const version = 5 - await rclient.set( - keySchema.headVersion({ projectId }), - version.toString() - ) - await rclient.set( - keySchema.persistedVersion({ projectId }), - version.toString() - ) + describe('fully persisted changes', function () { + beforeEach(async function () { + await setupState(projectId, { + headVersion: 5, + persistedVersion: 5, + changes: 3, + }) + }) - const changes = await redisBackend.getNonPersistedChanges(projectId) - expect(changes).to.be.an('array').that.is.empty + it('should return an empty array when asked for the head version', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 5 + ) + expect(nonPersistedChanges).to.deep.equal([]) + }) + + it('should throw an error when asked for an older version', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 4) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) + + it('should throw an error when asked for a newer version', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 6) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) }) - it('should return all non-persisted changes', async function () { - // Set head version to 5 and persisted version to 2 - const headVersion = 5 - const persistedVersion = 2 - await rclient.set( - keySchema.headVersion({ projectId }), - headVersion.toString() - ) - await rclient.set( - keySchema.persistedVersion({ projectId }), - persistedVersion.toString() - ) + describe('partially persisted project', async function () { + let changes - // Create changes for versions 3, 4, 5 - const timestamp = new Date() - const change1 = new Change([], timestamp) // Version 3 - const change2 = new Change([], timestamp) // Version 4 - const change3 = new Change([], timestamp) // Version 5 + beforeEach(async function () { + changes = await setupState(projectId, { + headVersion: 10, + persistedVersion: 7, + changes: 6, + }) + }) - // Push changes to Redis - await rclient.rpush( - keySchema.changes({ projectId }), - JSON.stringify(change1.toRaw()), - JSON.stringify(change2.toRaw()), - JSON.stringify(change3.toRaw()) - ) + it('should return all non-persisted changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 7 + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(3)) + }) - // Get non-persisted changes - const nonPersistedChanges = - await redisBackend.getNonPersistedChanges(projectId) + it('should return part of the changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 8 + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(4)) + }) - // Should return changes for versions 3, 4, 5 - expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3) - expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change1.toRaw()) - expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change2.toRaw()) - expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change3.toRaw()) + it('should error if the base version requested is too low', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 5) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) + + it('should return an empty array if the base version is the head version', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 10 + ) + expect(nonPersistedChanges).to.deep.equal([]) + }) + + it('should error if the base version requested is too high', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 12) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) }) - it('should return a subset of changes when some are persisted', async function () { - // Set head version to 5 and persisted version to 3 - // This means versions 4 and 5 are not persisted - const headVersion = 5 - const persistedVersion = 3 - await rclient.set( - keySchema.headVersion({ projectId }), - headVersion.toString() - ) - await rclient.set( - keySchema.persistedVersion({ projectId }), - persistedVersion.toString() - ) + // This case should never happen, but we'll handle it anyway + describe('persisted version before start of changes list', async function () { + let changes - // Create changes for versions 1, 2, 3, 4, 5 - const timestamp = new Date() - const change1 = new Change([], timestamp) // Version 1 - const change2 = new Change([], timestamp) // Version 2 - const change3 = new Change([], timestamp) // Version 3 - const change4 = new Change([], timestamp) // Version 4 - const change5 = new Change([], timestamp) // Version 5 + beforeEach(async function () { + changes = await setupState(projectId, { + headVersion: 5, + persistedVersion: 1, + changes: 3, + }) + }) - // Push changes to Redis - await rclient.rpush( - keySchema.changes({ projectId }), - JSON.stringify(change1.toRaw()), - JSON.stringify(change2.toRaw()), - JSON.stringify(change3.toRaw()), - JSON.stringify(change4.toRaw()), - JSON.stringify(change5.toRaw()) - ) + it('should return all non-persisted changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 2 + ) + expect(nonPersistedChanges).to.deep.equal(changes) + }) - // Get non-persisted changes - const nonPersistedChanges = - await redisBackend.getNonPersistedChanges(projectId) + it('should return part of the changes if requested', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 3 + ) + expect(nonPersistedChanges).to.deep.equal(changes.slice(1)) + }) - // Should return only changes for versions 4 and 5 - expect(nonPersistedChanges).to.be.an('array').with.lengthOf(2) - expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change4.toRaw()) - expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change5.toRaw()) - }) + it('should error if the base version requested is too low', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 1) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) - it('should throw an error when persisted version is higher than head version', async function () { - // This is an unusual case that should not happen in practice - // The system should throw an error to indicate this abnormal state - const headVersion = 3 - const persistedVersion = 5 - await rclient.set( - keySchema.headVersion({ projectId }), - headVersion.toString() - ) - await rclient.set( - keySchema.persistedVersion({ projectId }), - persistedVersion.toString() - ) + it('should return an empty array if the base version is the head version', async function () { + const nonPersistedChanges = await redisBackend.getNonPersistedChanges( + projectId, + 5 + ) + expect(nonPersistedChanges).to.deep.equal([]) + }) - // Create changes - const timestamp = new Date() - const change1 = new Change([], timestamp) - const change2 = new Change([], timestamp) - const change3 = new Change([], timestamp) - - // Push changes to Redis - await rclient.rpush( - keySchema.changes({ projectId }), - JSON.stringify(change1.toRaw()), - JSON.stringify(change2.toRaw()), - JSON.stringify(change3.toRaw()) - ) - - // Use chai-as-promised for cleaner async error assertion - await expect( - redisBackend.getNonPersistedChanges(projectId) - ).to.be.rejectedWith(/HEAD_VERSION_BEHIND_PERSISTED_VERSION/) - }) - - it('should handle case where persisted version is before start of changes list', async function () { - // Setup: head version is 5, persisted version is 1 - // But changes list only starts from version 3 - const headVersion = 5 - const persistedVersion = 1 - await rclient.set( - keySchema.headVersion({ projectId }), - headVersion.toString() - ) - await rclient.set( - keySchema.persistedVersion({ projectId }), - persistedVersion.toString() - ) - - // Create changes for versions 3, 4, 5 only - const timestamp = new Date() - const change3 = new Change([], timestamp) // Version 3 - const change4 = new Change([], timestamp) // Version 4 - const change5 = new Change([], timestamp) // Version 5 - - // Push changes to Redis - await rclient.rpush( - keySchema.changes({ projectId }), - JSON.stringify(change3.toRaw()), - JSON.stringify(change4.toRaw()), - JSON.stringify(change5.toRaw()) - ) - - // Get non-persisted changes - const nonPersistedChanges = - await redisBackend.getNonPersistedChanges(projectId) - - // Should return all changes since the persisted version is before the start of the list - expect(nonPersistedChanges).to.be.an('array').with.lengthOf(3) - expect(nonPersistedChanges[0].toRaw()).to.deep.equal(change3.toRaw()) - expect(nonPersistedChanges[1].toRaw()).to.deep.equal(change4.toRaw()) - expect(nonPersistedChanges[2].toRaw()).to.deep.equal(change5.toRaw()) + it('should error if the base version requested is too high', async function () { + await expect( + redisBackend.getNonPersistedChanges(projectId, 6) + ).to.be.rejectedWith(VersionOutOfBoundsError) + }) }) }) @@ -1137,3 +1140,37 @@ function makeChange() { const timestamp = new Date() return new Change([], timestamp) } + +/** + * Setup Redis buffer state for tests + * + * @param {string} projectId + * @param {object} params + * @param {number} params.headVersion + * @param {number | null} params.persistedVersion + * @param {number} params.changes - number of changes to create + * @return {Promise} dummy changes that have been created + */ +async function setupState(projectId, params) { + await rclient.set(keySchema.headVersion({ projectId }), params.headVersion) + if (params.persistedVersion) { + await rclient.set( + keySchema.persistedVersion({ projectId }), + params.persistedVersion + ) + } + + const changes = [] + for (let i = 1; i <= params.changes; i++) { + const change = new Change( + [new AddFileOperation(`file${i}.tex`, File.createHollow(i, i))], + new Date() + ) + changes.push(change) + } + await rclient.rpush( + keySchema.changes({ projectId }), + changes.map(change => JSON.stringify(change.toRaw())) + ) + return changes +}