mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-25 02:00:10 +02:00
Merge pull request #24897 from overleaf/em-chunks-concurrency
Concurrency handling for history chunks with Mongo backend GitOrigin-RevId: 30abe11237c80e7803c8934a20a57a7223afa85a
This commit is contained in:
@@ -20,3 +20,6 @@ exports.loadGlobalBlobs = loadGlobalBlobs
|
||||
|
||||
const { InvalidChangeError } = require('./lib/errors')
|
||||
exports.InvalidChangeError = InvalidChangeError
|
||||
|
||||
const { ChunkVersionConflictError } = require('./lib/chunk_store/errors')
|
||||
exports.ChunkVersionConflictError = ChunkVersionConflictError
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
// @ts-check
|
||||
|
||||
'use strict'
|
||||
|
||||
/**
|
||||
@@ -156,7 +158,6 @@ async function loadAtTimestamp(projectId, timestamp) {
|
||||
* @param {string} projectId
|
||||
* @param {Chunk} chunk
|
||||
* @param {Date} [earliestChangeTimestamp]
|
||||
* @return {Promise.<number>} for the chunkId of the inserted chunk
|
||||
*/
|
||||
async function create(projectId, chunk, earliestChangeTimestamp) {
|
||||
assert.projectId(projectId, 'bad projectId')
|
||||
@@ -164,13 +165,18 @@ async function create(projectId, chunk, earliestChangeTimestamp) {
|
||||
assert.maybe.date(earliestChangeTimestamp, 'bad timestamp')
|
||||
|
||||
const backend = getBackend(projectId)
|
||||
const chunkStart = chunk.getStartVersion()
|
||||
const chunkId = await uploadChunk(projectId, chunk)
|
||||
await backend.confirmCreate(
|
||||
projectId,
|
||||
chunk,
|
||||
chunkId,
|
||||
earliestChangeTimestamp
|
||||
)
|
||||
|
||||
const opts = {}
|
||||
if (chunkStart > 0) {
|
||||
opts.oldChunkId = await getChunkIdForVersion(projectId, chunkStart - 1)
|
||||
}
|
||||
if (earliestChangeTimestamp != null) {
|
||||
opts.earliestChangeTimestamp = earliestChangeTimestamp
|
||||
}
|
||||
|
||||
await backend.confirmCreate(projectId, chunk, chunkId, opts)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -220,13 +226,12 @@ async function update(
|
||||
const oldChunkId = await getChunkIdForVersion(projectId, oldEndVersion)
|
||||
const newChunkId = await uploadChunk(projectId, newChunk)
|
||||
|
||||
await backend.confirmUpdate(
|
||||
projectId,
|
||||
oldChunkId,
|
||||
newChunk,
|
||||
newChunkId,
|
||||
earliestChangeTimestamp
|
||||
)
|
||||
const opts = {}
|
||||
if (earliestChangeTimestamp != null) {
|
||||
opts.earliestChangeTimestamp = earliestChangeTimestamp
|
||||
}
|
||||
|
||||
await backend.confirmUpdate(projectId, oldChunkId, newChunk, newChunkId, opts)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -234,7 +239,7 @@ async function update(
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {number} version
|
||||
* @return {Promise.<number>}
|
||||
* @return {Promise.<string>}
|
||||
*/
|
||||
async function getChunkIdForVersion(projectId, version) {
|
||||
const backend = getBackend(projectId)
|
||||
@@ -343,10 +348,14 @@ async function deleteProjectChunks(projectId) {
|
||||
* Delete a given number of old chunks from both the database
|
||||
* and from object storage.
|
||||
*
|
||||
* @param {number} count - number of chunks to delete
|
||||
* @param {number} minAgeSecs - how many seconds ago must chunks have been
|
||||
* deleted
|
||||
* @return {Promise}
|
||||
* @param {object} options
|
||||
* @param {number} [options.batchSize] - number of chunks to delete in each
|
||||
* batch
|
||||
* @param {number} [options.maxBatches] - maximum number of batches to process
|
||||
* @param {number} [options.minAgeSecs] - minimum age of chunks to delete
|
||||
* @param {number} [options.timeout] - maximum time to spend deleting chunks
|
||||
*
|
||||
* @return {Promise<number>} number of chunks deleted
|
||||
*/
|
||||
async function deleteOldChunks(options = {}) {
|
||||
const batchSize = options.batchSize ?? DEFAULT_DELETE_BATCH_SIZE
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
const { ObjectId, ReadPreference } = require('mongodb')
|
||||
// @ts-check
|
||||
|
||||
const { ObjectId, ReadPreference, MongoError } = require('mongodb')
|
||||
const { Chunk } = require('overleaf-editor-core')
|
||||
const OError = require('@overleaf/o-error')
|
||||
const assert = require('../assert')
|
||||
@@ -7,6 +9,10 @@ const { ChunkVersionConflictError } = require('./errors')
|
||||
|
||||
const DUPLICATE_KEY_ERROR_CODE = 11000
|
||||
|
||||
/**
|
||||
* @import { ClientSession } from 'mongodb'
|
||||
*/
|
||||
|
||||
/**
|
||||
* Get the latest chunk's metadata from the database
|
||||
* @param {string} projectId
|
||||
@@ -18,7 +24,10 @@ async function getLatestChunk(projectId, opts = {}) {
|
||||
const { readOnly = false } = opts
|
||||
|
||||
const record = await mongodb.chunks.findOne(
|
||||
{ projectId: new ObjectId(projectId), state: 'active' },
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: { $in: ['active', 'closed'] },
|
||||
},
|
||||
{
|
||||
sort: { startVersion: -1 },
|
||||
readPreference: readOnly
|
||||
@@ -42,7 +51,7 @@ async function getChunkForVersion(projectId, version) {
|
||||
const record = await mongodb.chunks.findOne(
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'active',
|
||||
state: { $in: ['active', 'closed'] },
|
||||
startVersion: { $lte: version },
|
||||
endVersion: { $gte: version },
|
||||
},
|
||||
@@ -94,7 +103,7 @@ async function getChunkForTimestamp(projectId, timestamp) {
|
||||
const record = await mongodb.chunks.findOne(
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'active',
|
||||
state: { $in: ['active', 'closed'] },
|
||||
endTimestamp: { $gte: timestamp },
|
||||
},
|
||||
// We use the index on the startVersion for sorting records. This assumes
|
||||
@@ -126,7 +135,7 @@ async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
|
||||
const record = await mongodb.chunks.findOne(
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'active',
|
||||
state: { $in: ['active', 'closed'] },
|
||||
$or: [
|
||||
{
|
||||
endTimestamp: {
|
||||
@@ -155,7 +164,10 @@ async function getProjectChunkIds(projectId) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
|
||||
const cursor = mongodb.chunks.find(
|
||||
{ projectId: new ObjectId(projectId), state: 'active' },
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: { $in: ['active', 'closed'] },
|
||||
},
|
||||
{ projection: { _id: 1 } }
|
||||
)
|
||||
return await cursor.map(record => record._id).toArray()
|
||||
@@ -169,7 +181,10 @@ async function getProjectChunks(projectId) {
|
||||
|
||||
const cursor = mongodb.chunks
|
||||
.find(
|
||||
{ projectId: new ObjectId(projectId), state: 'active' },
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: { $in: ['active', 'closed'] },
|
||||
},
|
||||
{ projection: { state: 0 } }
|
||||
)
|
||||
.sort({ startVersion: 1 })
|
||||
@@ -198,48 +213,35 @@ async function insertPendingChunk(projectId, chunk) {
|
||||
|
||||
/**
|
||||
* Record that a new chunk was created.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {Chunk} chunk
|
||||
* @param {string} chunkId
|
||||
* @param {object} opts
|
||||
* @param {Date} [opts.earliestChangeTimestamp]
|
||||
* @param {string} [opts.oldChunkId]
|
||||
*/
|
||||
async function confirmCreate(
|
||||
projectId,
|
||||
chunk,
|
||||
chunkId,
|
||||
earliestChangeTimestamp,
|
||||
mongoOpts = {}
|
||||
) {
|
||||
async function confirmCreate(projectId, chunk, chunkId, opts = {}) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
assert.instance(chunk, Chunk, 'bad chunk')
|
||||
assert.mongoId(chunkId, 'bad chunkId')
|
||||
assert.instance(chunk, Chunk, 'bad newChunk')
|
||||
assert.mongoId(chunkId, 'bad newChunkId')
|
||||
|
||||
let result
|
||||
try {
|
||||
result = await mongodb.chunks.updateOne(
|
||||
{
|
||||
_id: new ObjectId(chunkId),
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'pending',
|
||||
},
|
||||
{ $set: { state: 'active', updatedAt: new Date() } },
|
||||
mongoOpts
|
||||
)
|
||||
} catch (err) {
|
||||
if (err.code === DUPLICATE_KEY_ERROR_CODE) {
|
||||
throw new ChunkVersionConflictError('chunk start version is not unique', {
|
||||
await mongodb.client.withSession(async session => {
|
||||
await session.withTransaction(async () => {
|
||||
if (opts.oldChunkId != null) {
|
||||
await closeChunk(projectId, opts.oldChunkId, { session })
|
||||
}
|
||||
|
||||
await activateChunk(projectId, chunkId, { session })
|
||||
|
||||
await updateProjectRecord(
|
||||
projectId,
|
||||
chunkId,
|
||||
})
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
if (result.matchedCount === 0) {
|
||||
throw new OError('pending chunk not found', { projectId, chunkId })
|
||||
}
|
||||
await updateProjectRecord(
|
||||
projectId,
|
||||
chunk,
|
||||
earliestChangeTimestamp,
|
||||
mongoOpts
|
||||
)
|
||||
chunk,
|
||||
opts.earliestChangeTimestamp,
|
||||
{ session }
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -276,41 +278,145 @@ async function updateProjectRecord(
|
||||
|
||||
/**
|
||||
* Record that a chunk was replaced by a new one.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {string} oldChunkId
|
||||
* @param {Chunk} newChunk
|
||||
* @param {string} newChunkId
|
||||
* @param {object} [opts]
|
||||
* @param {Date} [opts.earliestChangeTimestamp]
|
||||
*/
|
||||
async function confirmUpdate(
|
||||
projectId,
|
||||
oldChunkId,
|
||||
newChunk,
|
||||
newChunkId,
|
||||
earliestChangeTimestamp
|
||||
opts = {}
|
||||
) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
assert.mongoId(oldChunkId, 'bad oldChunkId')
|
||||
assert.instance(newChunk, Chunk, 'bad newChunk')
|
||||
assert.mongoId(newChunkId, 'bad newChunkId')
|
||||
|
||||
const session = mongodb.client.startSession()
|
||||
try {
|
||||
await mongodb.client.withSession(async session => {
|
||||
await session.withTransaction(async () => {
|
||||
await deleteChunk(projectId, oldChunkId, { session })
|
||||
await confirmCreate(
|
||||
await deleteActiveChunk(projectId, oldChunkId, { session })
|
||||
|
||||
await activateChunk(projectId, newChunkId, { session })
|
||||
|
||||
await updateProjectRecord(
|
||||
projectId,
|
||||
newChunk,
|
||||
newChunkId,
|
||||
earliestChangeTimestamp,
|
||||
opts.earliestChangeTimestamp,
|
||||
{ session }
|
||||
)
|
||||
})
|
||||
} finally {
|
||||
await session.endSession()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Activate a pending chunk
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {string} chunkId
|
||||
* @param {object} [opts]
|
||||
* @param {ClientSession} [opts.session]
|
||||
*/
|
||||
async function activateChunk(projectId, chunkId, opts = {}) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
assert.mongoId(chunkId, 'bad chunkId')
|
||||
|
||||
let result
|
||||
try {
|
||||
result = await mongodb.chunks.updateOne(
|
||||
{
|
||||
_id: new ObjectId(chunkId),
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'pending',
|
||||
},
|
||||
{ $set: { state: 'active', updatedAt: new Date() } },
|
||||
opts
|
||||
)
|
||||
} catch (err) {
|
||||
if (err instanceof MongoError && err.code === DUPLICATE_KEY_ERROR_CODE) {
|
||||
throw new ChunkVersionConflictError('chunk start version is not unique', {
|
||||
projectId,
|
||||
chunkId,
|
||||
})
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
if (result.matchedCount === 0) {
|
||||
throw new OError('pending chunk not found', { projectId, chunkId })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a chunk
|
||||
*
|
||||
* A closed chunk is one that can't be extended anymore.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {string} chunkId
|
||||
* @param {object} [opts]
|
||||
* @param {ClientSession} [opts.session]
|
||||
*/
|
||||
async function closeChunk(projectId, chunkId, opts = {}) {
|
||||
const result = await mongodb.chunks.updateOne(
|
||||
{
|
||||
_id: new ObjectId(chunkId),
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'active',
|
||||
},
|
||||
{ $set: { state: 'closed' } },
|
||||
opts
|
||||
)
|
||||
|
||||
if (result.matchedCount === 0) {
|
||||
throw new ChunkVersionConflictError('unable to close chunk', {
|
||||
projectId,
|
||||
chunkId,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an active chunk
|
||||
*
|
||||
* This is used to delete chunks that are in the process of being extended. It
|
||||
* will refuse to delete chunks that are already closed and can therefore not be
|
||||
* extended.
|
||||
*
|
||||
* @param {string} projectId
|
||||
* @param {string} chunkId
|
||||
* @param {object} [opts]
|
||||
* @param {ClientSession} [opts.session]
|
||||
*/
|
||||
async function deleteActiveChunk(projectId, chunkId, opts = {}) {
|
||||
const updateResult = await mongodb.chunks.updateOne(
|
||||
{
|
||||
_id: new ObjectId(chunkId),
|
||||
projectId: new ObjectId(projectId),
|
||||
state: 'active',
|
||||
},
|
||||
{ $set: { state: 'deleted', updatedAt: new Date() } },
|
||||
opts
|
||||
)
|
||||
|
||||
if (updateResult.matchedCount === 0) {
|
||||
throw new ChunkVersionConflictError('unable to delete active chunk', {
|
||||
projectId,
|
||||
chunkId,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a chunk.
|
||||
*
|
||||
* @param {number} projectId
|
||||
* @param {number} chunkId
|
||||
* @param {string} projectId
|
||||
* @param {string} chunkId
|
||||
* @return {Promise}
|
||||
*/
|
||||
async function deleteChunk(projectId, chunkId, mongoOpts = {}) {
|
||||
@@ -331,7 +437,10 @@ async function deleteProjectChunks(projectId) {
|
||||
assert.mongoId(projectId, 'bad projectId')
|
||||
|
||||
await mongodb.chunks.updateMany(
|
||||
{ projectId: new ObjectId(projectId), state: 'active' },
|
||||
{
|
||||
projectId: new ObjectId(projectId),
|
||||
state: { $in: ['active', 'closed'] },
|
||||
},
|
||||
{ $set: { state: 'deleted', updatedAt: new Date() } }
|
||||
)
|
||||
}
|
||||
|
||||
@@ -193,12 +193,7 @@ async function insertPendingChunk(projectId, chunk) {
|
||||
/**
|
||||
* Record that a new chunk was created.
|
||||
*/
|
||||
async function confirmCreate(
|
||||
projectId,
|
||||
chunk,
|
||||
chunkId,
|
||||
earliestChangeTimestamp
|
||||
) {
|
||||
async function confirmCreate(projectId, chunk, chunkId, opts = {}) {
|
||||
assert.postgresId(projectId, 'bad projectId')
|
||||
projectId = parseInt(projectId, 10)
|
||||
|
||||
@@ -207,7 +202,7 @@ async function confirmCreate(
|
||||
_deletePendingChunk(tx, projectId, chunkId),
|
||||
_insertChunk(tx, projectId, chunk, chunkId),
|
||||
])
|
||||
await updateProjectRecord(projectId, chunk, earliestChangeTimestamp)
|
||||
await updateProjectRecord(projectId, chunk, opts.earliestChangeTimestamp)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -219,7 +214,7 @@ async function confirmUpdate(
|
||||
oldChunkId,
|
||||
newChunk,
|
||||
newChunkId,
|
||||
earliestChangeTimestamp
|
||||
opts = {}
|
||||
) {
|
||||
assert.postgresId(projectId, 'bad projectId')
|
||||
projectId = parseInt(projectId, 10)
|
||||
@@ -230,7 +225,7 @@ async function confirmUpdate(
|
||||
_deletePendingChunk(tx, projectId, newChunkId),
|
||||
_insertChunk(tx, projectId, newChunk, newChunkId),
|
||||
])
|
||||
await updateProjectRecord(projectId, newChunk, earliestChangeTimestamp)
|
||||
await updateProjectRecord(projectId, newChunk, opts.earliestChangeTimestamp)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -267,10 +262,6 @@ async function _insertChunk(tx, projectId, chunk, chunkId) {
|
||||
|
||||
/**
|
||||
* Delete a chunk.
|
||||
*
|
||||
* @param {number} projectId
|
||||
* @param {number} chunkId
|
||||
* @return {Promise}
|
||||
*/
|
||||
async function deleteChunk(projectId, chunkId) {
|
||||
assert.postgresId(projectId, 'bad projectId')
|
||||
|
||||
@@ -34,177 +34,177 @@ describe('chunk buffer', function () {
|
||||
beforeEach(async function () {
|
||||
// Initialize project in chunk store
|
||||
await chunkStore.initializeProject(projectId)
|
||||
|
||||
// Create a sample chunk with some content
|
||||
const snapshot = new Snapshot()
|
||||
const changes = [
|
||||
new Change(
|
||||
[new AddFileOperation('test.tex', File.fromString('Hello World'))],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const history = new History(snapshot, changes)
|
||||
const chunk = new Chunk(history, 1) // startVersion 1
|
||||
|
||||
// Store the chunk directly in the chunk store using create method
|
||||
// which internally calls uploadChunk
|
||||
await chunkStore.create(projectId, chunk)
|
||||
|
||||
// Clear any existing cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
})
|
||||
|
||||
it('should load from chunk store and update cache on first access (cache miss)', async function () {
|
||||
// First access should load from chunk store and populate cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
describe('with an existing chunk', function () {
|
||||
beforeEach(async function () {
|
||||
// Create a sample chunk with some content
|
||||
const snapshot = new Snapshot()
|
||||
const changes = [
|
||||
new Change(
|
||||
[new AddFileOperation('test.tex', File.fromString('Hello World'))],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const history = new History(snapshot, changes)
|
||||
const chunk = new Chunk(history, 1) // startVersion 1
|
||||
|
||||
// Verify the chunk is correct
|
||||
expect(firstResult).to.not.be.null
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
expect(firstResult.getEndVersion()).to.equal(2)
|
||||
// Store the chunk directly in the chunk store using create method
|
||||
// which internally calls uploadChunk
|
||||
await chunkStore.create(projectId, chunk)
|
||||
|
||||
// Verify that we got a cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
// Clear any existing cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
})
|
||||
|
||||
// Reset the metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
it('should load from chunk store and update cache on first access (cache miss)', async function () {
|
||||
// First access should load from chunk store and populate cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
// Verify the chunk is correct
|
||||
expect(firstResult).to.not.be.null
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
expect(firstResult.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(secondResult).to.not.be.null
|
||||
expect(secondResult.getStartVersion()).to.equal(1)
|
||||
expect(secondResult.getEndVersion()).to.equal(2)
|
||||
// Verify that we got a cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
// Reset the metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Verify both chunks are equivalent
|
||||
expect(secondResult.getStartVersion()).to.equal(
|
||||
firstResult.getStartVersion()
|
||||
)
|
||||
expect(secondResult.getEndVersion()).to.equal(firstResult.getEndVersion())
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(secondResult).to.not.be.null
|
||||
expect(secondResult.getStartVersion()).to.equal(1)
|
||||
expect(secondResult.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Verify both chunks are equivalent
|
||||
expect(secondResult.getStartVersion()).to.equal(
|
||||
firstResult.getStartVersion()
|
||||
)
|
||||
expect(secondResult.getEndVersion()).to.equal(
|
||||
firstResult.getEndVersion()
|
||||
)
|
||||
})
|
||||
|
||||
it('should refresh the cache when chunk changes in the store', async function () {
|
||||
// First access to load into cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Create a new chunk with different content
|
||||
const newSnapshot = new Snapshot()
|
||||
const newChanges = [
|
||||
new Change(
|
||||
[
|
||||
new AddFileOperation(
|
||||
'updated.tex',
|
||||
File.fromString('Updated content')
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const newHistory = new History(newSnapshot, newChanges)
|
||||
const newChunk = new Chunk(newHistory, 2) // Different start version
|
||||
|
||||
// Store the new chunk directly in the chunk store
|
||||
await chunkStore.create(projectId, newChunk)
|
||||
|
||||
// Access again - should detect the change and refresh cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the updated chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(2)
|
||||
expect(secondResult.getEndVersion()).to.equal(3)
|
||||
|
||||
// Verify that we got a cache miss metric (since the cached chunk was invalidated)
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
|
||||
it('should continue using cache when chunk in store has not changed', async function () {
|
||||
// First access to load into cache
|
||||
await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Access again without changing the underlying chunk
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(result.getStartVersion()).to.equal(1)
|
||||
expect(result.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
})
|
||||
|
||||
it('should refresh the cache when chunk changes in the store', async function () {
|
||||
// First access to load into cache
|
||||
const firstResult = await chunkBuffer.loadLatest(projectId)
|
||||
expect(firstResult.getStartVersion()).to.equal(1)
|
||||
describe('with an empty project', function () {
|
||||
it('should handle a case with empty chunks (no changes)', async function () {
|
||||
// Clear the cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
// Load the initial empty chunk via buffer
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Create a new chunk with different content
|
||||
const newSnapshot = new Snapshot()
|
||||
const newChanges = [
|
||||
new Change(
|
||||
[
|
||||
new AddFileOperation(
|
||||
'updated.tex',
|
||||
File.fromString('Updated content')
|
||||
),
|
||||
],
|
||||
new Date(),
|
||||
[]
|
||||
),
|
||||
]
|
||||
const newHistory = new History(newSnapshot, newChanges)
|
||||
const newChunk = new Chunk(newHistory, 5) // Different start version
|
||||
// Verify we got the empty chunk
|
||||
expect(result.getStartVersion()).to.equal(0)
|
||||
expect(result.getEndVersion()).to.equal(0) // Start equals end for empty chunks
|
||||
expect(result.history.changes.length).to.equal(0)
|
||||
|
||||
// Store the new chunk directly in the chunk store
|
||||
await chunkStore.create(projectId, newChunk)
|
||||
// Verify cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Access again - should detect the change and refresh cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
// Reset metrics
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Verify we got the updated chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(5)
|
||||
expect(secondResult.getEndVersion()).to.equal(6)
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify that we got a cache miss metric (since the cached chunk was invalidated)
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
// Verify we got the same empty chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(0)
|
||||
expect(secondResult.getEndVersion()).to.equal(0)
|
||||
expect(secondResult.history.changes.length).to.equal(0)
|
||||
|
||||
it('should continue using cache when chunk in store has not changed', async function () {
|
||||
// First access to load into cache
|
||||
await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Reset metrics spy
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Access again without changing the underlying chunk
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same chunk
|
||||
expect(result.getStartVersion()).to.equal(1)
|
||||
expect(result.getEndVersion()).to.equal(2)
|
||||
|
||||
// Verify that we got a cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
|
||||
it('should handle a case with empty chunks (no changes)', async function () {
|
||||
// Replace with an empty chunk
|
||||
const emptySnapshot = new Snapshot()
|
||||
const emptyHistory = new History(emptySnapshot, [])
|
||||
const emptyChunk = new Chunk(emptyHistory, 10)
|
||||
|
||||
// Store the empty chunk
|
||||
await chunkStore.create(projectId, emptyChunk)
|
||||
|
||||
// Clear the cache
|
||||
await redisBackend.clearCache(projectId)
|
||||
|
||||
// Load the chunk via buffer
|
||||
const result = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the empty chunk
|
||||
expect(result.getStartVersion()).to.equal(10)
|
||||
expect(result.getEndVersion()).to.equal(10) // Start equals end for empty chunks
|
||||
expect(result.history.changes.length).to.equal(0)
|
||||
|
||||
// Verify cache miss metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-miss',
|
||||
})
|
||||
).to.be.true
|
||||
|
||||
// Reset metrics
|
||||
metrics.inc.resetHistory()
|
||||
|
||||
// Second access should hit the cache
|
||||
const secondResult = await chunkBuffer.loadLatest(projectId)
|
||||
|
||||
// Verify we got the same empty chunk
|
||||
expect(secondResult.getStartVersion()).to.equal(10)
|
||||
expect(secondResult.getEndVersion()).to.equal(10)
|
||||
expect(secondResult.history.changes.length).to.equal(0)
|
||||
|
||||
// Verify cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
// Verify cache hit metric
|
||||
expect(
|
||||
metrics.inc.calledWith('chunk_buffer.loadLatest', 1, {
|
||||
status: 'cache-hit',
|
||||
})
|
||||
).to.be.true
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
const { expect } = require('chai')
|
||||
const { ObjectId } = require('mongodb')
|
||||
const { Chunk, Snapshot, History } = require('overleaf-editor-core')
|
||||
const {
|
||||
Chunk,
|
||||
Snapshot,
|
||||
History,
|
||||
Change,
|
||||
AddFileOperation,
|
||||
File,
|
||||
} = require('overleaf-editor-core')
|
||||
const cleanup = require('./support/cleanup')
|
||||
const backend = require('../../../../storage/lib/chunk_store/mongo')
|
||||
const { ChunkVersionConflictError } = require('../../../../storage')
|
||||
|
||||
describe('chunk store Mongo backend', function () {
|
||||
beforeEach(cleanup.everything)
|
||||
@@ -42,11 +50,86 @@ describe('chunk store Mongo backend', function () {
|
||||
expect(oldChunks).to.deep.equal([])
|
||||
})
|
||||
})
|
||||
|
||||
describe('concurrency handling', function () {
|
||||
it('prevents chunks from being created with the same start version', async function () {
|
||||
const projectId = new ObjectId().toString()
|
||||
const chunks = [makeChunk([], 10), makeChunk([], 10)]
|
||||
|
||||
const chunkIds = []
|
||||
for (const chunk of chunks) {
|
||||
const chunkId = await backend.insertPendingChunk(projectId, chunk)
|
||||
chunkIds.push(chunkId)
|
||||
}
|
||||
|
||||
await backend.confirmCreate(projectId, chunks[0], chunkIds[0])
|
||||
await expect(
|
||||
backend.confirmCreate(projectId, chunks[1], chunkIds[1])
|
||||
).to.be.rejectedWith(ChunkVersionConflictError)
|
||||
})
|
||||
|
||||
describe('conflicts between chunk extension and chunk creation', function () {
|
||||
let projectId,
|
||||
baseChunkId,
|
||||
updatedChunkId,
|
||||
newChunkId,
|
||||
updatedChunk,
|
||||
newChunk
|
||||
|
||||
beforeEach(async function () {
|
||||
projectId = new ObjectId().toString()
|
||||
const baseChunk = makeChunk([], 0)
|
||||
baseChunkId = await backend.insertPendingChunk(projectId, baseChunk)
|
||||
await backend.confirmCreate(projectId, baseChunk, baseChunkId)
|
||||
|
||||
const change = new Change(
|
||||
[new AddFileOperation('main.tex', File.fromString('hello'))],
|
||||
new Date()
|
||||
)
|
||||
|
||||
updatedChunk = makeChunk([change], 0)
|
||||
updatedChunkId = await backend.insertPendingChunk(
|
||||
projectId,
|
||||
updatedChunk
|
||||
)
|
||||
newChunk = makeChunk([change], 1)
|
||||
newChunkId = await backend.insertPendingChunk(projectId, newChunk)
|
||||
})
|
||||
|
||||
it('prevents creation after extension', async function () {
|
||||
await backend.confirmUpdate(
|
||||
projectId,
|
||||
baseChunkId,
|
||||
updatedChunk,
|
||||
updatedChunkId
|
||||
)
|
||||
await expect(
|
||||
backend.confirmCreate(projectId, newChunk, newChunkId, {
|
||||
oldChunkId: baseChunkId,
|
||||
})
|
||||
).to.be.rejectedWith(ChunkVersionConflictError)
|
||||
})
|
||||
|
||||
it('prevents extension after creation', async function () {
|
||||
await backend.confirmCreate(projectId, newChunk, newChunkId, {
|
||||
oldChunkId: baseChunkId,
|
||||
})
|
||||
await expect(
|
||||
backend.confirmUpdate(
|
||||
projectId,
|
||||
baseChunkId,
|
||||
updatedChunk,
|
||||
updatedChunkId
|
||||
)
|
||||
).to.be.rejectedWith(ChunkVersionConflictError)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
function makeChunk(changes, versionNumber) {
|
||||
const snapshot = Snapshot.fromRaw({ files: {} })
|
||||
const history = new History(snapshot, [])
|
||||
const history = new History(snapshot, changes)
|
||||
const chunk = new Chunk(history, versionNumber)
|
||||
return chunk
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ async function setupMongoDatabase() {
|
||||
{
|
||||
key: { projectId: 1, startVersion: 1 },
|
||||
name: 'projectId_1_startVersion_1',
|
||||
partialFilterExpression: { state: 'active' },
|
||||
partialFilterExpression: { state: { $in: ['active', 'closed'] } },
|
||||
unique: true,
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user