From aa367bcd1da2d85d285149847adc2fbb7b037a64 Mon Sep 17 00:00:00 2001 From: Eric Mc Sween <5454374+emcsween@users.noreply.github.com> Date: Wed, 16 Apr 2025 09:37:53 -0400 Subject: [PATCH] Merge pull request #24897 from overleaf/em-chunks-concurrency Concurrency handling for history chunks with Mongo backend GitOrigin-RevId: 30abe11237c80e7803c8934a20a57a7223afa85a --- services/history-v1/storage/index.js | 3 + .../storage/lib/chunk_store/index.js | 47 +-- .../storage/lib/chunk_store/mongo.js | 223 +++++++++---- .../storage/lib/chunk_store/postgres.js | 17 +- .../js/storage/chunk_buffer.test.js | 306 +++++++++--------- .../storage/chunk_store_mongo_backend.test.js | 87 ++++- services/history-v1/test/setup.js | 2 +- 7 files changed, 440 insertions(+), 245 deletions(-) diff --git a/services/history-v1/storage/index.js b/services/history-v1/storage/index.js index 009a867148..5fe283a34c 100644 --- a/services/history-v1/storage/index.js +++ b/services/history-v1/storage/index.js @@ -20,3 +20,6 @@ exports.loadGlobalBlobs = loadGlobalBlobs const { InvalidChangeError } = require('./lib/errors') exports.InvalidChangeError = InvalidChangeError + +const { ChunkVersionConflictError } = require('./lib/chunk_store/errors') +exports.ChunkVersionConflictError = ChunkVersionConflictError diff --git a/services/history-v1/storage/lib/chunk_store/index.js b/services/history-v1/storage/lib/chunk_store/index.js index c1fbb9d607..f75c017552 100644 --- a/services/history-v1/storage/lib/chunk_store/index.js +++ b/services/history-v1/storage/lib/chunk_store/index.js @@ -1,3 +1,5 @@ +// @ts-check + 'use strict' /** @@ -156,7 +158,6 @@ async function loadAtTimestamp(projectId, timestamp) { * @param {string} projectId * @param {Chunk} chunk * @param {Date} [earliestChangeTimestamp] - * @return {Promise.} for the chunkId of the inserted chunk */ async function create(projectId, chunk, earliestChangeTimestamp) { assert.projectId(projectId, 'bad projectId') @@ -164,13 +165,18 @@ async function create(projectId, chunk, earliestChangeTimestamp) { assert.maybe.date(earliestChangeTimestamp, 'bad timestamp') const backend = getBackend(projectId) + const chunkStart = chunk.getStartVersion() const chunkId = await uploadChunk(projectId, chunk) - await backend.confirmCreate( - projectId, - chunk, - chunkId, - earliestChangeTimestamp - ) + + const opts = {} + if (chunkStart > 0) { + opts.oldChunkId = await getChunkIdForVersion(projectId, chunkStart - 1) + } + if (earliestChangeTimestamp != null) { + opts.earliestChangeTimestamp = earliestChangeTimestamp + } + + await backend.confirmCreate(projectId, chunk, chunkId, opts) } /** @@ -220,13 +226,12 @@ async function update( const oldChunkId = await getChunkIdForVersion(projectId, oldEndVersion) const newChunkId = await uploadChunk(projectId, newChunk) - await backend.confirmUpdate( - projectId, - oldChunkId, - newChunk, - newChunkId, - earliestChangeTimestamp - ) + const opts = {} + if (earliestChangeTimestamp != null) { + opts.earliestChangeTimestamp = earliestChangeTimestamp + } + + await backend.confirmUpdate(projectId, oldChunkId, newChunk, newChunkId, opts) } /** @@ -234,7 +239,7 @@ async function update( * * @param {string} projectId * @param {number} version - * @return {Promise.} + * @return {Promise.} */ async function getChunkIdForVersion(projectId, version) { const backend = getBackend(projectId) @@ -343,10 +348,14 @@ async function deleteProjectChunks(projectId) { * Delete a given number of old chunks from both the database * and from object storage. * - * @param {number} count - number of chunks to delete - * @param {number} minAgeSecs - how many seconds ago must chunks have been - * deleted - * @return {Promise} + * @param {object} options + * @param {number} [options.batchSize] - number of chunks to delete in each + * batch + * @param {number} [options.maxBatches] - maximum number of batches to process + * @param {number} [options.minAgeSecs] - minimum age of chunks to delete + * @param {number} [options.timeout] - maximum time to spend deleting chunks + * + * @return {Promise} number of chunks deleted */ async function deleteOldChunks(options = {}) { const batchSize = options.batchSize ?? DEFAULT_DELETE_BATCH_SIZE diff --git a/services/history-v1/storage/lib/chunk_store/mongo.js b/services/history-v1/storage/lib/chunk_store/mongo.js index bb93679fec..a34b7194af 100644 --- a/services/history-v1/storage/lib/chunk_store/mongo.js +++ b/services/history-v1/storage/lib/chunk_store/mongo.js @@ -1,4 +1,6 @@ -const { ObjectId, ReadPreference } = require('mongodb') +// @ts-check + +const { ObjectId, ReadPreference, MongoError } = require('mongodb') const { Chunk } = require('overleaf-editor-core') const OError = require('@overleaf/o-error') const assert = require('../assert') @@ -7,6 +9,10 @@ const { ChunkVersionConflictError } = require('./errors') const DUPLICATE_KEY_ERROR_CODE = 11000 +/** + * @import { ClientSession } from 'mongodb' + */ + /** * Get the latest chunk's metadata from the database * @param {string} projectId @@ -18,7 +24,10 @@ async function getLatestChunk(projectId, opts = {}) { const { readOnly = false } = opts const record = await mongodb.chunks.findOne( - { projectId: new ObjectId(projectId), state: 'active' }, + { + projectId: new ObjectId(projectId), + state: { $in: ['active', 'closed'] }, + }, { sort: { startVersion: -1 }, readPreference: readOnly @@ -42,7 +51,7 @@ async function getChunkForVersion(projectId, version) { const record = await mongodb.chunks.findOne( { projectId: new ObjectId(projectId), - state: 'active', + state: { $in: ['active', 'closed'] }, startVersion: { $lte: version }, endVersion: { $gte: version }, }, @@ -94,7 +103,7 @@ async function getChunkForTimestamp(projectId, timestamp) { const record = await mongodb.chunks.findOne( { projectId: new ObjectId(projectId), - state: 'active', + state: { $in: ['active', 'closed'] }, endTimestamp: { $gte: timestamp }, }, // We use the index on the startVersion for sorting records. This assumes @@ -126,7 +135,7 @@ async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) { const record = await mongodb.chunks.findOne( { projectId: new ObjectId(projectId), - state: 'active', + state: { $in: ['active', 'closed'] }, $or: [ { endTimestamp: { @@ -155,7 +164,10 @@ async function getProjectChunkIds(projectId) { assert.mongoId(projectId, 'bad projectId') const cursor = mongodb.chunks.find( - { projectId: new ObjectId(projectId), state: 'active' }, + { + projectId: new ObjectId(projectId), + state: { $in: ['active', 'closed'] }, + }, { projection: { _id: 1 } } ) return await cursor.map(record => record._id).toArray() @@ -169,7 +181,10 @@ async function getProjectChunks(projectId) { const cursor = mongodb.chunks .find( - { projectId: new ObjectId(projectId), state: 'active' }, + { + projectId: new ObjectId(projectId), + state: { $in: ['active', 'closed'] }, + }, { projection: { state: 0 } } ) .sort({ startVersion: 1 }) @@ -198,48 +213,35 @@ async function insertPendingChunk(projectId, chunk) { /** * Record that a new chunk was created. + * + * @param {string} projectId + * @param {Chunk} chunk + * @param {string} chunkId + * @param {object} opts + * @param {Date} [opts.earliestChangeTimestamp] + * @param {string} [opts.oldChunkId] */ -async function confirmCreate( - projectId, - chunk, - chunkId, - earliestChangeTimestamp, - mongoOpts = {} -) { +async function confirmCreate(projectId, chunk, chunkId, opts = {}) { assert.mongoId(projectId, 'bad projectId') - assert.instance(chunk, Chunk, 'bad chunk') - assert.mongoId(chunkId, 'bad chunkId') + assert.instance(chunk, Chunk, 'bad newChunk') + assert.mongoId(chunkId, 'bad newChunkId') - let result - try { - result = await mongodb.chunks.updateOne( - { - _id: new ObjectId(chunkId), - projectId: new ObjectId(projectId), - state: 'pending', - }, - { $set: { state: 'active', updatedAt: new Date() } }, - mongoOpts - ) - } catch (err) { - if (err.code === DUPLICATE_KEY_ERROR_CODE) { - throw new ChunkVersionConflictError('chunk start version is not unique', { + await mongodb.client.withSession(async session => { + await session.withTransaction(async () => { + if (opts.oldChunkId != null) { + await closeChunk(projectId, opts.oldChunkId, { session }) + } + + await activateChunk(projectId, chunkId, { session }) + + await updateProjectRecord( projectId, - chunkId, - }) - } else { - throw err - } - } - if (result.matchedCount === 0) { - throw new OError('pending chunk not found', { projectId, chunkId }) - } - await updateProjectRecord( - projectId, - chunk, - earliestChangeTimestamp, - mongoOpts - ) + chunk, + opts.earliestChangeTimestamp, + { session } + ) + }) + }) } /** @@ -276,41 +278,145 @@ async function updateProjectRecord( /** * Record that a chunk was replaced by a new one. + * + * @param {string} projectId + * @param {string} oldChunkId + * @param {Chunk} newChunk + * @param {string} newChunkId + * @param {object} [opts] + * @param {Date} [opts.earliestChangeTimestamp] */ async function confirmUpdate( projectId, oldChunkId, newChunk, newChunkId, - earliestChangeTimestamp + opts = {} ) { assert.mongoId(projectId, 'bad projectId') assert.mongoId(oldChunkId, 'bad oldChunkId') assert.instance(newChunk, Chunk, 'bad newChunk') assert.mongoId(newChunkId, 'bad newChunkId') - const session = mongodb.client.startSession() - try { + await mongodb.client.withSession(async session => { await session.withTransaction(async () => { - await deleteChunk(projectId, oldChunkId, { session }) - await confirmCreate( + await deleteActiveChunk(projectId, oldChunkId, { session }) + + await activateChunk(projectId, newChunkId, { session }) + + await updateProjectRecord( projectId, newChunk, - newChunkId, - earliestChangeTimestamp, + opts.earliestChangeTimestamp, { session } ) }) - } finally { - await session.endSession() + }) +} + +/** + * Activate a pending chunk + * + * @param {string} projectId + * @param {string} chunkId + * @param {object} [opts] + * @param {ClientSession} [opts.session] + */ +async function activateChunk(projectId, chunkId, opts = {}) { + assert.mongoId(projectId, 'bad projectId') + assert.mongoId(chunkId, 'bad chunkId') + + let result + try { + result = await mongodb.chunks.updateOne( + { + _id: new ObjectId(chunkId), + projectId: new ObjectId(projectId), + state: 'pending', + }, + { $set: { state: 'active', updatedAt: new Date() } }, + opts + ) + } catch (err) { + if (err instanceof MongoError && err.code === DUPLICATE_KEY_ERROR_CODE) { + throw new ChunkVersionConflictError('chunk start version is not unique', { + projectId, + chunkId, + }) + } else { + throw err + } + } + if (result.matchedCount === 0) { + throw new OError('pending chunk not found', { projectId, chunkId }) + } +} + +/** + * Close a chunk + * + * A closed chunk is one that can't be extended anymore. + * + * @param {string} projectId + * @param {string} chunkId + * @param {object} [opts] + * @param {ClientSession} [opts.session] + */ +async function closeChunk(projectId, chunkId, opts = {}) { + const result = await mongodb.chunks.updateOne( + { + _id: new ObjectId(chunkId), + projectId: new ObjectId(projectId), + state: 'active', + }, + { $set: { state: 'closed' } }, + opts + ) + + if (result.matchedCount === 0) { + throw new ChunkVersionConflictError('unable to close chunk', { + projectId, + chunkId, + }) + } +} + +/** + * Delete an active chunk + * + * This is used to delete chunks that are in the process of being extended. It + * will refuse to delete chunks that are already closed and can therefore not be + * extended. + * + * @param {string} projectId + * @param {string} chunkId + * @param {object} [opts] + * @param {ClientSession} [opts.session] + */ +async function deleteActiveChunk(projectId, chunkId, opts = {}) { + const updateResult = await mongodb.chunks.updateOne( + { + _id: new ObjectId(chunkId), + projectId: new ObjectId(projectId), + state: 'active', + }, + { $set: { state: 'deleted', updatedAt: new Date() } }, + opts + ) + + if (updateResult.matchedCount === 0) { + throw new ChunkVersionConflictError('unable to delete active chunk', { + projectId, + chunkId, + }) } } /** * Delete a chunk. * - * @param {number} projectId - * @param {number} chunkId + * @param {string} projectId + * @param {string} chunkId * @return {Promise} */ async function deleteChunk(projectId, chunkId, mongoOpts = {}) { @@ -331,7 +437,10 @@ async function deleteProjectChunks(projectId) { assert.mongoId(projectId, 'bad projectId') await mongodb.chunks.updateMany( - { projectId: new ObjectId(projectId), state: 'active' }, + { + projectId: new ObjectId(projectId), + state: { $in: ['active', 'closed'] }, + }, { $set: { state: 'deleted', updatedAt: new Date() } } ) } diff --git a/services/history-v1/storage/lib/chunk_store/postgres.js b/services/history-v1/storage/lib/chunk_store/postgres.js index b89b18d87a..0ddde27234 100644 --- a/services/history-v1/storage/lib/chunk_store/postgres.js +++ b/services/history-v1/storage/lib/chunk_store/postgres.js @@ -193,12 +193,7 @@ async function insertPendingChunk(projectId, chunk) { /** * Record that a new chunk was created. */ -async function confirmCreate( - projectId, - chunk, - chunkId, - earliestChangeTimestamp -) { +async function confirmCreate(projectId, chunk, chunkId, opts = {}) { assert.postgresId(projectId, 'bad projectId') projectId = parseInt(projectId, 10) @@ -207,7 +202,7 @@ async function confirmCreate( _deletePendingChunk(tx, projectId, chunkId), _insertChunk(tx, projectId, chunk, chunkId), ]) - await updateProjectRecord(projectId, chunk, earliestChangeTimestamp) + await updateProjectRecord(projectId, chunk, opts.earliestChangeTimestamp) }) } @@ -219,7 +214,7 @@ async function confirmUpdate( oldChunkId, newChunk, newChunkId, - earliestChangeTimestamp + opts = {} ) { assert.postgresId(projectId, 'bad projectId') projectId = parseInt(projectId, 10) @@ -230,7 +225,7 @@ async function confirmUpdate( _deletePendingChunk(tx, projectId, newChunkId), _insertChunk(tx, projectId, newChunk, newChunkId), ]) - await updateProjectRecord(projectId, newChunk, earliestChangeTimestamp) + await updateProjectRecord(projectId, newChunk, opts.earliestChangeTimestamp) }) } @@ -267,10 +262,6 @@ async function _insertChunk(tx, projectId, chunk, chunkId) { /** * Delete a chunk. - * - * @param {number} projectId - * @param {number} chunkId - * @return {Promise} */ async function deleteChunk(projectId, chunkId) { assert.postgresId(projectId, 'bad projectId') diff --git a/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js b/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js index 377a8da0dc..bfd979d0ff 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_buffer.test.js @@ -34,177 +34,177 @@ describe('chunk buffer', function () { beforeEach(async function () { // Initialize project in chunk store await chunkStore.initializeProject(projectId) - - // Create a sample chunk with some content - const snapshot = new Snapshot() - const changes = [ - new Change( - [new AddFileOperation('test.tex', File.fromString('Hello World'))], - new Date(), - [] - ), - ] - const history = new History(snapshot, changes) - const chunk = new Chunk(history, 1) // startVersion 1 - - // Store the chunk directly in the chunk store using create method - // which internally calls uploadChunk - await chunkStore.create(projectId, chunk) - - // Clear any existing cache - await redisBackend.clearCache(projectId) }) - it('should load from chunk store and update cache on first access (cache miss)', async function () { - // First access should load from chunk store and populate cache - const firstResult = await chunkBuffer.loadLatest(projectId) + describe('with an existing chunk', function () { + beforeEach(async function () { + // Create a sample chunk with some content + const snapshot = new Snapshot() + const changes = [ + new Change( + [new AddFileOperation('test.tex', File.fromString('Hello World'))], + new Date(), + [] + ), + ] + const history = new History(snapshot, changes) + const chunk = new Chunk(history, 1) // startVersion 1 - // Verify the chunk is correct - expect(firstResult).to.not.be.null - expect(firstResult.getStartVersion()).to.equal(1) - expect(firstResult.getEndVersion()).to.equal(2) + // Store the chunk directly in the chunk store using create method + // which internally calls uploadChunk + await chunkStore.create(projectId, chunk) - // Verify that we got a cache miss metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true + // Clear any existing cache + await redisBackend.clearCache(projectId) + }) - // Reset the metrics spy - metrics.inc.resetHistory() + it('should load from chunk store and update cache on first access (cache miss)', async function () { + // First access should load from chunk store and populate cache + const firstResult = await chunkBuffer.loadLatest(projectId) - // Second access should hit the cache - const secondResult = await chunkBuffer.loadLatest(projectId) + // Verify the chunk is correct + expect(firstResult).to.not.be.null + expect(firstResult.getStartVersion()).to.equal(1) + expect(firstResult.getEndVersion()).to.equal(2) - // Verify we got the same chunk - expect(secondResult).to.not.be.null - expect(secondResult.getStartVersion()).to.equal(1) - expect(secondResult.getEndVersion()).to.equal(2) + // Verify that we got a cache miss metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true - // Verify that we got a cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true + // Reset the metrics spy + metrics.inc.resetHistory() - // Verify both chunks are equivalent - expect(secondResult.getStartVersion()).to.equal( - firstResult.getStartVersion() - ) - expect(secondResult.getEndVersion()).to.equal(firstResult.getEndVersion()) + // Second access should hit the cache + const secondResult = await chunkBuffer.loadLatest(projectId) + + // Verify we got the same chunk + expect(secondResult).to.not.be.null + expect(secondResult.getStartVersion()).to.equal(1) + expect(secondResult.getEndVersion()).to.equal(2) + + // Verify that we got a cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + + // Verify both chunks are equivalent + expect(secondResult.getStartVersion()).to.equal( + firstResult.getStartVersion() + ) + expect(secondResult.getEndVersion()).to.equal( + firstResult.getEndVersion() + ) + }) + + it('should refresh the cache when chunk changes in the store', async function () { + // First access to load into cache + const firstResult = await chunkBuffer.loadLatest(projectId) + expect(firstResult.getStartVersion()).to.equal(1) + + // Reset metrics spy + metrics.inc.resetHistory() + + // Create a new chunk with different content + const newSnapshot = new Snapshot() + const newChanges = [ + new Change( + [ + new AddFileOperation( + 'updated.tex', + File.fromString('Updated content') + ), + ], + new Date(), + [] + ), + ] + const newHistory = new History(newSnapshot, newChanges) + const newChunk = new Chunk(newHistory, 2) // Different start version + + // Store the new chunk directly in the chunk store + await chunkStore.create(projectId, newChunk) + + // Access again - should detect the change and refresh cache + const secondResult = await chunkBuffer.loadLatest(projectId) + + // Verify we got the updated chunk + expect(secondResult.getStartVersion()).to.equal(2) + expect(secondResult.getEndVersion()).to.equal(3) + + // Verify that we got a cache miss metric (since the cached chunk was invalidated) + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true + }) + + it('should continue using cache when chunk in store has not changed', async function () { + // First access to load into cache + await chunkBuffer.loadLatest(projectId) + + // Reset metrics spy + metrics.inc.resetHistory() + + // Access again without changing the underlying chunk + const result = await chunkBuffer.loadLatest(projectId) + + // Verify we got the same chunk + expect(result.getStartVersion()).to.equal(1) + expect(result.getEndVersion()).to.equal(2) + + // Verify that we got a cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + }) }) - it('should refresh the cache when chunk changes in the store', async function () { - // First access to load into cache - const firstResult = await chunkBuffer.loadLatest(projectId) - expect(firstResult.getStartVersion()).to.equal(1) + describe('with an empty project', function () { + it('should handle a case with empty chunks (no changes)', async function () { + // Clear the cache + await redisBackend.clearCache(projectId) - // Reset metrics spy - metrics.inc.resetHistory() + // Load the initial empty chunk via buffer + const result = await chunkBuffer.loadLatest(projectId) - // Create a new chunk with different content - const newSnapshot = new Snapshot() - const newChanges = [ - new Change( - [ - new AddFileOperation( - 'updated.tex', - File.fromString('Updated content') - ), - ], - new Date(), - [] - ), - ] - const newHistory = new History(newSnapshot, newChanges) - const newChunk = new Chunk(newHistory, 5) // Different start version + // Verify we got the empty chunk + expect(result.getStartVersion()).to.equal(0) + expect(result.getEndVersion()).to.equal(0) // Start equals end for empty chunks + expect(result.history.changes.length).to.equal(0) - // Store the new chunk directly in the chunk store - await chunkStore.create(projectId, newChunk) + // Verify cache miss metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-miss', + }) + ).to.be.true - // Access again - should detect the change and refresh cache - const secondResult = await chunkBuffer.loadLatest(projectId) + // Reset metrics + metrics.inc.resetHistory() - // Verify we got the updated chunk - expect(secondResult.getStartVersion()).to.equal(5) - expect(secondResult.getEndVersion()).to.equal(6) + // Second access should hit the cache + const secondResult = await chunkBuffer.loadLatest(projectId) - // Verify that we got a cache miss metric (since the cached chunk was invalidated) - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - }) + // Verify we got the same empty chunk + expect(secondResult.getStartVersion()).to.equal(0) + expect(secondResult.getEndVersion()).to.equal(0) + expect(secondResult.history.changes.length).to.equal(0) - it('should continue using cache when chunk in store has not changed', async function () { - // First access to load into cache - await chunkBuffer.loadLatest(projectId) - - // Reset metrics spy - metrics.inc.resetHistory() - - // Access again without changing the underlying chunk - const result = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same chunk - expect(result.getStartVersion()).to.equal(1) - expect(result.getEndVersion()).to.equal(2) - - // Verify that we got a cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true - }) - - it('should handle a case with empty chunks (no changes)', async function () { - // Replace with an empty chunk - const emptySnapshot = new Snapshot() - const emptyHistory = new History(emptySnapshot, []) - const emptyChunk = new Chunk(emptyHistory, 10) - - // Store the empty chunk - await chunkStore.create(projectId, emptyChunk) - - // Clear the cache - await redisBackend.clearCache(projectId) - - // Load the chunk via buffer - const result = await chunkBuffer.loadLatest(projectId) - - // Verify we got the empty chunk - expect(result.getStartVersion()).to.equal(10) - expect(result.getEndVersion()).to.equal(10) // Start equals end for empty chunks - expect(result.history.changes.length).to.equal(0) - - // Verify cache miss metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-miss', - }) - ).to.be.true - - // Reset metrics - metrics.inc.resetHistory() - - // Second access should hit the cache - const secondResult = await chunkBuffer.loadLatest(projectId) - - // Verify we got the same empty chunk - expect(secondResult.getStartVersion()).to.equal(10) - expect(secondResult.getEndVersion()).to.equal(10) - expect(secondResult.history.changes.length).to.equal(0) - - // Verify cache hit metric - expect( - metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { - status: 'cache-hit', - }) - ).to.be.true + // Verify cache hit metric + expect( + metrics.inc.calledWith('chunk_buffer.loadLatest', 1, { + status: 'cache-hit', + }) + ).to.be.true + }) }) }) }) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_mongo_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_mongo_backend.test.js index 61d80810f1..98cdd2db4d 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_mongo_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_mongo_backend.test.js @@ -1,8 +1,16 @@ const { expect } = require('chai') const { ObjectId } = require('mongodb') -const { Chunk, Snapshot, History } = require('overleaf-editor-core') +const { + Chunk, + Snapshot, + History, + Change, + AddFileOperation, + File, +} = require('overleaf-editor-core') const cleanup = require('./support/cleanup') const backend = require('../../../../storage/lib/chunk_store/mongo') +const { ChunkVersionConflictError } = require('../../../../storage') describe('chunk store Mongo backend', function () { beforeEach(cleanup.everything) @@ -42,11 +50,86 @@ describe('chunk store Mongo backend', function () { expect(oldChunks).to.deep.equal([]) }) }) + + describe('concurrency handling', function () { + it('prevents chunks from being created with the same start version', async function () { + const projectId = new ObjectId().toString() + const chunks = [makeChunk([], 10), makeChunk([], 10)] + + const chunkIds = [] + for (const chunk of chunks) { + const chunkId = await backend.insertPendingChunk(projectId, chunk) + chunkIds.push(chunkId) + } + + await backend.confirmCreate(projectId, chunks[0], chunkIds[0]) + await expect( + backend.confirmCreate(projectId, chunks[1], chunkIds[1]) + ).to.be.rejectedWith(ChunkVersionConflictError) + }) + + describe('conflicts between chunk extension and chunk creation', function () { + let projectId, + baseChunkId, + updatedChunkId, + newChunkId, + updatedChunk, + newChunk + + beforeEach(async function () { + projectId = new ObjectId().toString() + const baseChunk = makeChunk([], 0) + baseChunkId = await backend.insertPendingChunk(projectId, baseChunk) + await backend.confirmCreate(projectId, baseChunk, baseChunkId) + + const change = new Change( + [new AddFileOperation('main.tex', File.fromString('hello'))], + new Date() + ) + + updatedChunk = makeChunk([change], 0) + updatedChunkId = await backend.insertPendingChunk( + projectId, + updatedChunk + ) + newChunk = makeChunk([change], 1) + newChunkId = await backend.insertPendingChunk(projectId, newChunk) + }) + + it('prevents creation after extension', async function () { + await backend.confirmUpdate( + projectId, + baseChunkId, + updatedChunk, + updatedChunkId + ) + await expect( + backend.confirmCreate(projectId, newChunk, newChunkId, { + oldChunkId: baseChunkId, + }) + ).to.be.rejectedWith(ChunkVersionConflictError) + }) + + it('prevents extension after creation', async function () { + await backend.confirmCreate(projectId, newChunk, newChunkId, { + oldChunkId: baseChunkId, + }) + await expect( + backend.confirmUpdate( + projectId, + baseChunkId, + updatedChunk, + updatedChunkId + ) + ).to.be.rejectedWith(ChunkVersionConflictError) + }) + }) + }) }) function makeChunk(changes, versionNumber) { const snapshot = Snapshot.fromRaw({ files: {} }) - const history = new History(snapshot, []) + const history = new History(snapshot, changes) const chunk = new Chunk(history, versionNumber) return chunk } diff --git a/services/history-v1/test/setup.js b/services/history-v1/test/setup.js index 5b4947affc..60974173de 100644 --- a/services/history-v1/test/setup.js +++ b/services/history-v1/test/setup.js @@ -21,7 +21,7 @@ async function setupMongoDatabase() { { key: { projectId: 1, startVersion: 1 }, name: 'projectId_1_startVersion_1', - partialFilterExpression: { state: 'active' }, + partialFilterExpression: { state: { $in: ['active', 'closed'] } }, unique: true, }, {