diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index 12c0924707..b026a073d4 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -213,16 +213,29 @@ async function create(projectId, chunk, earliestChangeTimestamp) { const backend = getBackend(projectId) const chunkStart = chunk.getStartVersion() - const chunkId = await uploadChunk(projectId, chunk) const opts = {} if (chunkStart > 0) { - opts.oldChunkId = await getChunkIdForVersion(projectId, chunkStart - 1) + const oldChunk = await backend.getChunkForVersion(projectId, chunkStart - 1) + + if (oldChunk.endVersion !== chunkStart) { + throw new ChunkVersionConflictError( + 'unexpected end version on chunk to be updated', + { + projectId, + expectedVersion: chunkStart, + actualVersion: oldChunk.endVersion, + } + ) + } + + opts.oldChunkId = oldChunk.id } if (earliestChangeTimestamp != null) { opts.earliestChangeTimestamp = earliestChangeTimestamp } + const chunkId = await uploadChunk(projectId, chunk) await backend.confirmCreate(projectId, chunk, chunkId, opts) } @@ -253,24 +266,44 @@ async function uploadChunk(projectId, chunk) { * chunk. * * @param {string} projectId - * @param {number} oldEndVersion * @param {Chunk} newChunk * @param {Date} [earliestChangeTimestamp] * @return {Promise} */ -async function update( - projectId, - oldEndVersion, - newChunk, - earliestChangeTimestamp -) { +async function update(projectId, newChunk, earliestChangeTimestamp) { assert.projectId(projectId, 'bad projectId') - assert.integer(oldEndVersion, 'bad oldEndVersion') assert.instance(newChunk, Chunk, 'bad newChunk') assert.maybe.date(earliestChangeTimestamp, 'bad timestamp') const backend = getBackend(projectId) - const oldChunkId = await getChunkIdForVersion(projectId, oldEndVersion) + const oldChunk = await backend.getChunkForVersion( + projectId, + newChunk.getStartVersion(), + { preferNewer: true } + ) + + if (oldChunk.startVersion !== newChunk.getStartVersion()) { + throw new ChunkVersionConflictError( + 'unexpected start version on chunk to be updated', + { + projectId, + expectedVersion: newChunk.getStartVersion(), + actualVersion: oldChunk.startVersion, + } + ) + } + + if (oldChunk.endVersion > newChunk.getEndVersion()) { + throw new ChunkVersionConflictError( + 'chunk update would decrease chunk version', + { + projectId, + currentVersion: oldChunk.endVersion, + newVersion: newChunk.getEndVersion(), + } + ) + } + const newChunkId = await uploadChunk(projectId, newChunk) const opts = {} @@ -278,7 +311,13 @@ async function update( opts.earliestChangeTimestamp = earliestChangeTimestamp } - await backend.confirmUpdate(projectId, oldChunkId, newChunk, newChunkId, opts) + await backend.confirmUpdate( + projectId, + oldChunk.id, + newChunk, + newChunkId, + opts + ) } /** diff --git a/services/history-v1/storage/lib/chunk_store/mongo.js b/services/history-v1/storage/lib/chunk_store/mongo.js index a34b7194af..d84e7a8327 100644 --- a/services/history-v1/storage/lib/chunk_store/mongo.js +++ b/services/history-v1/storage/lib/chunk_store/mongo.js @@ -43,8 +43,14 @@ async function getLatestChunk(projectId, opts = {}) { /** * Get the metadata for the chunk that contains the given version. + * + * @param {string} projectId + * @param {number} version + * @param {object} [opts] + * @param {boolean} [opts.preferNewer] - If the version is at the boundary of + * two chunks, return the newer chunk. */ -async function getChunkForVersion(projectId, version) { +async function getChunkForVersion(projectId, version, opts = {}) { assert.mongoId(projectId, 'bad projectId') assert.integer(version, 'bad version') @@ -55,7 +61,7 @@ async function getChunkForVersion(projectId, version) { startVersion: { $lte: version }, endVersion: { $gte: version }, }, - { sort: { startVersion: 1 } } + { sort: { startVersion: opts.preferNewer ? -1 : 1 } } ) if (record == null) { throw new Chunk.VersionNotFoundError(projectId, version) diff --git a/services/history-v1/storage/lib/chunk_store/postgres.js b/services/history-v1/storage/lib/chunk_store/postgres.js index 0c33c0fd82..bfb5c6954a 100644 --- a/services/history-v1/storage/lib/chunk_store/postgres.js +++ b/services/history-v1/storage/lib/chunk_store/postgres.js @@ -38,14 +38,18 @@ async function getLatestChunk(projectId, opts = {}) { * * @param {string} projectId * @param {number} version + * @param {object} [opts] + * @param {boolean} [opts.preferNewer] - If the version is at the boundary of + * two chunks, return the newer chunk. */ -async function getChunkForVersion(projectId, version) { +async function getChunkForVersion(projectId, version, opts = {}) { assert.postgresId(projectId, 'bad projectId') const record = await knex('chunks') .where('doc_id', parseInt(projectId, 10)) + .where('start_version', '<=', version) .where('end_version', '>=', version) - .orderBy('end_version') + .orderBy('end_version', opts.preferNewer ? 'desc' : 'asc') .first() if (!record) { throw new Chunk.VersionNotFoundError(projectId, version) diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index 6858127a0c..0ae7cee2e5 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -495,6 +495,12 @@ rclient.defineCommand('set_persisted_version', { return 'not_found' end + -- Get current persisted version + local persistedVersion = tonumber(redis.call('GET', persistedVersionKey)) + if persistedVersion and persistedVersion > newPersistedVersion then + return 'too_low' + end + -- Set the persisted version redis.call('SET', persistedVersionKey, newPersistedVersion) diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index 09d41382d4..5b80285eb0 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -250,12 +250,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { checkElapsedTime(timer) - await chunkStore.update( - projectId, - originalEndVersion, - currentChunk, - earliestChangeTimestamp - ) + await chunkStore.update(projectId, currentChunk, earliestChangeTimestamp) } async function createNewChunksAsNeeded() { diff --git a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs index 44e1a2652b..0a1fa528ab 100644 --- a/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs +++ b/services/history-v1/test/acceptance/js/api/backupVerifier.test.mjs @@ -156,7 +156,7 @@ async function addFileInNewChunk( ) const changes = [new Change([operation], creationDate, [])] chunk.pushChanges(changes) - await chunkStore.update(historyId, 0, chunk) + await chunkStore.update(historyId, chunk) } /** diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js index 48c142817d..da70467934 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js @@ -6,6 +6,9 @@ const { expect } = require('chai') const sinon = require('sinon') const { ObjectId } = require('mongodb') const { projects } = require('../../../../storage/lib/mongodb') +const { + ChunkVersionConflictError, +} = require('../../../../storage/lib/chunk_store/errors') const { Chunk, @@ -66,17 +69,33 @@ describe('chunkStore', function () { const lastChangeTimestamp = new Date('2015-01-01T00:00:00') beforeEach(async function () { const blob = await blobStore.putString('abc') - const chunk = makeChunk( + const firstChunk = makeChunk( [ makeChange( Operation.addFile('main.tex', File.createLazyFromBlobs(blob)), lastChangeTimestamp ), ], + 0 + ) + await chunkStore.update(projectId, firstChunk, pendingChangeTimestamp) + + const secondChunk = makeChunk( + [ + makeChange( + Operation.addFile('other.tex', File.createLazyFromBlobs(blob)), + lastChangeTimestamp + ), + ], 1 ) - await chunkStore.create(projectId, chunk, pendingChangeTimestamp) + await chunkStore.create( + projectId, + secondChunk, + pendingChangeTimestamp + ) }) + it('creates a chunk and inserts the pending change timestamp', async function () { const project = await projects.findOne({ _id: new ObjectId(projectRecord.insertedId), @@ -101,7 +120,6 @@ describe('chunkStore', function () { beforeEach(async function () { const chunk = await chunkStore.loadLatest(projectId) - const oldEndVersion = chunk.getEndVersion() const blob = await blobStore.putString('') const changes = [ makeChange( @@ -111,12 +129,7 @@ describe('chunkStore', function () { ] lastChangeTimestamp = changes[1].getTimestamp() chunk.pushChanges(changes) - await chunkStore.update( - projectId, - oldEndVersion, - chunk, - pendingChangeTimestamp - ) + await chunkStore.update(projectId, chunk, pendingChangeTimestamp) }) it('records the correct metadata in db readOnly=false', async function () { @@ -204,12 +217,7 @@ describe('chunkStore', function () { ], 0 ) - await chunkStore.update( - projectId, - 0, - firstChunk, - pendingChangeTimestamp - ) + await chunkStore.update(projectId, firstChunk, pendingChangeTimestamp) firstChunk = await chunkStore.loadLatest(projectId) secondChunk = makeChunk( @@ -234,6 +242,10 @@ describe('chunkStore', function () { Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)), thirdChunkTimestamp ), + makeChange( + Operation.addFile('barbar.tex', File.createLazyFromBlobs(blob)), + thirdChunkTimestamp + ), ], 4 ) @@ -308,7 +320,7 @@ describe('chunkStore', function () { const project = await projects.findOne({ _id: new ObjectId(projectRecord.insertedId), }) - expect(project.overleaf.history.currentEndVersion).to.equal(5) + expect(project.overleaf.history.currentEndVersion).to.equal(6) expect(project.overleaf.history.currentEndTimestamp).to.deep.equal( thirdChunkTimestamp ) @@ -323,6 +335,80 @@ describe('chunkStore', function () { ) }) + describe('chunk update', function () { + it('rejects a chunk that removes changes', async function () { + const newChunk = makeChunk([thirdChunk.getChanges()[0]], 4) + await expect( + chunkStore.update(projectId, newChunk) + ).to.be.rejectedWith(ChunkVersionConflictError) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw()) + }) + + it('accepts the same chunk', async function () { + await chunkStore.update(projectId, thirdChunk) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw()) + }) + + it('accepts a larger chunk', async function () { + const blob = await blobStore.putString('foobar') + const newChunk = makeChunk( + [ + ...thirdChunk.getChanges(), + makeChange( + Operation.addFile( + 'onemore.tex', + File.createLazyFromBlobs(blob) + ), + thirdChunkTimestamp + ), + ], + 4 + ) + await chunkStore.update(projectId, newChunk) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(newChunk.toRaw()) + }) + }) + + describe('chunk create', function () { + let change + + beforeEach(async function () { + const blob = await blobStore.putString('foobar') + change = makeChange( + Operation.addFile('onemore.tex', File.createLazyFromBlobs(blob)), + thirdChunkTimestamp + ) + }) + + it('rejects a base version that is too low', async function () { + const newChunk = makeChunk([change], 5) + await expect( + chunkStore.create(projectId, newChunk) + ).to.be.rejectedWith(ChunkVersionConflictError) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw()) + }) + + it('rejects a base version that is too high', async function () { + const newChunk = makeChunk([change], 7) + await expect( + chunkStore.create(projectId, newChunk) + ).to.be.rejectedWith(ChunkVersionConflictError) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw()) + }) + + it('accepts the right base version', async function () { + const newChunk = makeChunk([change], 6) + await chunkStore.create(projectId, newChunk) + const latestChunk = await chunkStore.loadLatest(projectId) + expect(latestChunk.toRaw()).to.deep.equal(newChunk.toRaw()) + }) + }) + describe('after updating the last chunk', function () { let newChunk @@ -341,12 +427,12 @@ describe('chunkStore', function () { ], 4 ) - await chunkStore.update(projectId, 5, newChunk) + await chunkStore.update(projectId, newChunk) newChunk = await chunkStore.loadLatest(projectId) }) it('replaces the latest chunk', function () { - expect(newChunk.getChanges()).to.have.length(2) + expect(newChunk.getChanges()).to.have.length(3) }) it('returns the right chunk when querying by version', async function () { @@ -366,7 +452,7 @@ describe('chunkStore', function () { const project = await projects.findOne({ _id: new ObjectId(projectRecord.insertedId), }) - expect(project.overleaf.history.currentEndVersion).to.equal(6) + expect(project.overleaf.history.currentEndVersion).to.equal(7) expect(project.overleaf.history.currentEndTimestamp).to.deep.equal( thirdChunkTimestamp ) @@ -529,7 +615,7 @@ describe('chunkStore', function () { const chunkRecords = [] for await (const chunk of chunkStore.getProjectChunksFromVersion( projectId, - 6 + 7 )) { chunkRecords.push(chunk) } @@ -566,9 +652,9 @@ describe('chunkStore', function () { ] chunk.pushChanges(changes) - await expect( - chunkStore.update(projectId, oldEndVersion, chunk) - ).to.be.rejectedWith('S3 Error') + await expect(chunkStore.update(projectId, chunk)).to.be.rejectedWith( + 'S3 Error' + ) chunk = await chunkStore.loadLatest(projectId) expect(chunk.getEndVersion()).to.equal(oldEndVersion) }) @@ -598,7 +684,7 @@ describe('chunkStore', function () { ], 0 ) - await chunkStore.update(projectId, 0, chunk) + await chunkStore.update(projectId, chunk) }) it('refuses to create a chunk with the same start version', async function () { diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index 5e226ff9e2..2b13343fc4 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -704,28 +704,42 @@ describe('chunk buffer Redis backend', function () { expect(result).to.equal('not_found') }) - it('should set the persisted version', async function () { - // Set head version - const headVersion = 5 - await rclient.set( - keySchema.headVersion({ projectId }), - headVersion.toString() - ) + describe('when the persisted version is not set', function () { + beforeEach(async function () { + await setupState(projectId, { + headVersion: 5, + persistedVersion: null, + changes: 5, + }) + }) - // Set persisted version - const persistedVersion = 3 - const result = await redisBackend.setPersistedVersion( - projectId, - persistedVersion - ) + it('should set the persisted version', async function () { + await redisBackend.setPersistedVersion(projectId, 3) + const state = await redisBackend.getState(projectId) + expect(state.persistedVersion).to.equal(3) + }) + }) - expect(result).to.equal('ok') + describe('when the persisted version is set', function () { + beforeEach(async function () { + await setupState(projectId, { + headVersion: 5, + persistedVersion: 3, + changes: 5, + }) + }) - // Verify the persisted version was set - const persistedVersionRedis = await rclient.get( - keySchema.persistedVersion({ projectId }) - ) - expect(parseInt(persistedVersionRedis, 10)).to.equal(persistedVersion) + it('should set the persisted version', async function () { + await redisBackend.setPersistedVersion(projectId, 5) + const state = await redisBackend.getState(projectId) + expect(state.persistedVersion).to.equal(5) + }) + + it('should not decrease the persisted version', async function () { + await redisBackend.setPersistedVersion(projectId, 2) + const state = await redisBackend.getState(projectId) + expect(state.persistedVersion).to.equal(3) + }) }) it('should trim the changes list to keep only MAX_PERSISTED_CHANGES beyond persisted version', async function () {