diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index cbdf3d2dcf..286a8d8764 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -151,20 +151,44 @@ async function loadAtVersion(projectId, version, opts = {}) { const backend = getBackend(projectId) const blobStore = new BlobStore(projectId) const batchBlobStore = new BatchBlobStore(blobStore) + const latestChunkMetadata = await getLatestChunkMetadata(projectId) - const chunkRecord = await backend.getChunkForVersion(projectId, version, { - preferNewer: opts.preferNewer, - }) + // When loading a chunk for a version there are three cases to consider: + // 1. If `persistedOnly` is true, we always use the requested version + // to fetch the chunk. + // 2. If `persistedOnly` is false and the requested version is in the + // persisted chunk version range, we use the requested version. + // 3. If `persistedOnly` is false and the requested version is ahead of + // the persisted chunk versions, we fetch the latest chunk and see if + // the non-persisted changes include the requested version. + const targetChunkVersion = opts.persistedOnly + ? version + : Math.min(latestChunkMetadata.endVersion, version) + + const chunkRecord = await backend.getChunkForVersion( + projectId, + targetChunkVersion, + { + preferNewer: opts.preferNewer, + } + ) const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id) const history = History.fromRaw(rawHistory) const startVersion = chunkRecord.endVersion - history.countChanges() if (!opts.persistedOnly) { + // Try to extend the chunk with any non-persisted changes that + // follow the chunk's end version. const nonPersistedChanges = await getChunkExtension( projectId, chunkRecord.endVersion ) history.pushChanges(nonPersistedChanges) + + // Check that the changes do actually contain the requested version + if (version > chunkRecord.endVersion + nonPersistedChanges.length) { + throw new Chunk.VersionNotFoundError(projectId, version) + } } await lazyLoadHistoryFiles(history, batchBlobStore) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js index c6c33404d6..8b06b8e412 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store.test.js @@ -470,6 +470,8 @@ describe('chunkStore', function () { describe('with changes queued in the Redis buffer', function () { let queuedChanges + const firstQueuedChangeTimestamp = new Date('2017-01-01T00:01:00') + const lastQueuedChangeTimestamp = new Date('2017-01-01T00:02:00') beforeEach(async function () { const snapshot = thirdChunk.getSnapshot() @@ -481,7 +483,15 @@ describe('chunkStore', function () { 'in-redis.tex', File.createLazyFromBlobs(blob) ), - new Date() + firstQueuedChangeTimestamp + ), + makeChange( + // Add a second change to make the buffer more interesting + Operation.editFile( + 'in-redis.tex', + TextOperation.fromJSON({ textOperation: ['hello'] }) + ), + lastQueuedChangeTimestamp ), ] await redisBackend.queueChanges( @@ -504,6 +514,9 @@ describe('chunkStore', function () { expect(chunk.getEndVersion()).to.equal( thirdChunk.getEndVersion() + queuedChanges.length ) + expect(chunk.getEndTimestamp()).to.deep.equal( + lastQueuedChangeTimestamp + ) }) it('includes the queued changes when getting the latest chunk by timestamp', async function () { @@ -534,6 +547,7 @@ describe('chunkStore', function () { secondChunk.getStartVersion() ) expect(chunk.getEndVersion()).to.equal(secondChunk.getEndVersion()) + expect(chunk.getEndTimestamp()).to.deep.equal(secondChunkTimestamp) }) it('includes the queued changes when getting the latest chunk by version', async function () { @@ -551,6 +565,9 @@ describe('chunkStore', function () { expect(chunk.getEndVersion()).to.equal( thirdChunk.getEndVersion() + queuedChanges.length ) + expect(chunk.getEndTimestamp()).to.deep.equal( + lastQueuedChangeTimestamp + ) }) it("doesn't include the queued changes when getting another chunk by version", async function () { @@ -564,6 +581,43 @@ describe('chunkStore', function () { secondChunk.getStartVersion() ) expect(chunk.getEndVersion()).to.equal(secondChunk.getEndVersion()) + expect(chunk.getEndTimestamp()).to.deep.equal(secondChunkTimestamp) + }) + + it('loads a version that is only in the Redis buffer', async function () { + const versionInRedis = thirdChunk.getEndVersion() + 1 // the first change in Redis + const chunk = await chunkStore.loadAtVersion( + projectId, + versionInRedis + ) + // The chunk should contain changes from the thirdChunk and the queuedChanges + const expectedChanges = thirdChunk + .getChanges() + .concat(queuedChanges) + expect(chunk.getChanges()).to.deep.equal(expectedChanges) + expect(chunk.getStartVersion()).to.equal( + thirdChunk.getStartVersion() + ) + expect(chunk.getEndVersion()).to.equal( + thirdChunk.getEndVersion() + queuedChanges.length + ) + expect(chunk.getEndTimestamp()).to.deep.equal( + lastQueuedChangeTimestamp + ) + }) + + it('throws an error when loading a version beyond the Redis buffer', async function () { + const versionBeyondRedis = + thirdChunk.getEndVersion() + queuedChanges.length + 1 + await expect( + chunkStore.loadAtVersion(projectId, versionBeyondRedis) + ) + .to.be.rejectedWith(chunkStore.VersionOutOfBoundsError) + .and.eventually.satisfy(err => { + expect(err.info).to.have.property('projectId', projectId) + expect(err.info).to.have.property('version', versionBeyondRedis) + return true + }) }) })