Merge pull request #25456 from overleaf/em-concurrency-handling

Add consistency constraints to the chunk store and Redis buffer

GitOrigin-RevId: 6f983ff207a13d204645e343290c94443dc537b0
This commit is contained in:
Eric Mc Sween
2025-05-12 07:58:39 -04:00
committed by Copybot
parent ab723a67bd
commit eb462fad02
8 changed files with 216 additions and 66 deletions
@@ -213,16 +213,29 @@ async function create(projectId, chunk, earliestChangeTimestamp) {
const backend = getBackend(projectId)
const chunkStart = chunk.getStartVersion()
const chunkId = await uploadChunk(projectId, chunk)
const opts = {}
if (chunkStart > 0) {
opts.oldChunkId = await getChunkIdForVersion(projectId, chunkStart - 1)
const oldChunk = await backend.getChunkForVersion(projectId, chunkStart - 1)
if (oldChunk.endVersion !== chunkStart) {
throw new ChunkVersionConflictError(
'unexpected end version on chunk to be updated',
{
projectId,
expectedVersion: chunkStart,
actualVersion: oldChunk.endVersion,
}
)
}
opts.oldChunkId = oldChunk.id
}
if (earliestChangeTimestamp != null) {
opts.earliestChangeTimestamp = earliestChangeTimestamp
}
const chunkId = await uploadChunk(projectId, chunk)
await backend.confirmCreate(projectId, chunk, chunkId, opts)
}
@@ -253,24 +266,44 @@ async function uploadChunk(projectId, chunk) {
* chunk.
*
* @param {string} projectId
* @param {number} oldEndVersion
* @param {Chunk} newChunk
* @param {Date} [earliestChangeTimestamp]
* @return {Promise}
*/
async function update(
projectId,
oldEndVersion,
newChunk,
earliestChangeTimestamp
) {
async function update(projectId, newChunk, earliestChangeTimestamp) {
assert.projectId(projectId, 'bad projectId')
assert.integer(oldEndVersion, 'bad oldEndVersion')
assert.instance(newChunk, Chunk, 'bad newChunk')
assert.maybe.date(earliestChangeTimestamp, 'bad timestamp')
const backend = getBackend(projectId)
const oldChunkId = await getChunkIdForVersion(projectId, oldEndVersion)
const oldChunk = await backend.getChunkForVersion(
projectId,
newChunk.getStartVersion(),
{ preferNewer: true }
)
if (oldChunk.startVersion !== newChunk.getStartVersion()) {
throw new ChunkVersionConflictError(
'unexpected start version on chunk to be updated',
{
projectId,
expectedVersion: newChunk.getStartVersion(),
actualVersion: oldChunk.startVersion,
}
)
}
if (oldChunk.endVersion > newChunk.getEndVersion()) {
throw new ChunkVersionConflictError(
'chunk update would decrease chunk version',
{
projectId,
currentVersion: oldChunk.endVersion,
newVersion: newChunk.getEndVersion(),
}
)
}
const newChunkId = await uploadChunk(projectId, newChunk)
const opts = {}
@@ -278,7 +311,13 @@ async function update(
opts.earliestChangeTimestamp = earliestChangeTimestamp
}
await backend.confirmUpdate(projectId, oldChunkId, newChunk, newChunkId, opts)
await backend.confirmUpdate(
projectId,
oldChunk.id,
newChunk,
newChunkId,
opts
)
}
/**
@@ -43,8 +43,14 @@ async function getLatestChunk(projectId, opts = {}) {
/**
* Get the metadata for the chunk that contains the given version.
*
* @param {string} projectId
* @param {number} version
* @param {object} [opts]
* @param {boolean} [opts.preferNewer] - If the version is at the boundary of
* two chunks, return the newer chunk.
*/
async function getChunkForVersion(projectId, version) {
async function getChunkForVersion(projectId, version, opts = {}) {
assert.mongoId(projectId, 'bad projectId')
assert.integer(version, 'bad version')
@@ -55,7 +61,7 @@ async function getChunkForVersion(projectId, version) {
startVersion: { $lte: version },
endVersion: { $gte: version },
},
{ sort: { startVersion: 1 } }
{ sort: { startVersion: opts.preferNewer ? -1 : 1 } }
)
if (record == null) {
throw new Chunk.VersionNotFoundError(projectId, version)
@@ -38,14 +38,18 @@ async function getLatestChunk(projectId, opts = {}) {
*
* @param {string} projectId
* @param {number} version
* @param {object} [opts]
* @param {boolean} [opts.preferNewer] - If the version is at the boundary of
* two chunks, return the newer chunk.
*/
async function getChunkForVersion(projectId, version) {
async function getChunkForVersion(projectId, version, opts = {}) {
assert.postgresId(projectId, 'bad projectId')
const record = await knex('chunks')
.where('doc_id', parseInt(projectId, 10))
.where('start_version', '<=', version)
.where('end_version', '>=', version)
.orderBy('end_version')
.orderBy('end_version', opts.preferNewer ? 'desc' : 'asc')
.first()
if (!record) {
throw new Chunk.VersionNotFoundError(projectId, version)
@@ -495,6 +495,12 @@ rclient.defineCommand('set_persisted_version', {
return 'not_found'
end
-- Get current persisted version
local persistedVersion = tonumber(redis.call('GET', persistedVersionKey))
if persistedVersion and persistedVersion > newPersistedVersion then
return 'too_low'
end
-- Set the persisted version
redis.call('SET', persistedVersionKey, newPersistedVersion)
@@ -250,12 +250,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
checkElapsedTime(timer)
await chunkStore.update(
projectId,
originalEndVersion,
currentChunk,
earliestChangeTimestamp
)
await chunkStore.update(projectId, currentChunk, earliestChangeTimestamp)
}
async function createNewChunksAsNeeded() {
@@ -156,7 +156,7 @@ async function addFileInNewChunk(
)
const changes = [new Change([operation], creationDate, [])]
chunk.pushChanges(changes)
await chunkStore.update(historyId, 0, chunk)
await chunkStore.update(historyId, chunk)
}
/**
@@ -6,6 +6,9 @@ const { expect } = require('chai')
const sinon = require('sinon')
const { ObjectId } = require('mongodb')
const { projects } = require('../../../../storage/lib/mongodb')
const {
ChunkVersionConflictError,
} = require('../../../../storage/lib/chunk_store/errors')
const {
Chunk,
@@ -66,17 +69,33 @@ describe('chunkStore', function () {
const lastChangeTimestamp = new Date('2015-01-01T00:00:00')
beforeEach(async function () {
const blob = await blobStore.putString('abc')
const chunk = makeChunk(
const firstChunk = makeChunk(
[
makeChange(
Operation.addFile('main.tex', File.createLazyFromBlobs(blob)),
lastChangeTimestamp
),
],
0
)
await chunkStore.update(projectId, firstChunk, pendingChangeTimestamp)
const secondChunk = makeChunk(
[
makeChange(
Operation.addFile('other.tex', File.createLazyFromBlobs(blob)),
lastChangeTimestamp
),
],
1
)
await chunkStore.create(projectId, chunk, pendingChangeTimestamp)
await chunkStore.create(
projectId,
secondChunk,
pendingChangeTimestamp
)
})
it('creates a chunk and inserts the pending change timestamp', async function () {
const project = await projects.findOne({
_id: new ObjectId(projectRecord.insertedId),
@@ -101,7 +120,6 @@ describe('chunkStore', function () {
beforeEach(async function () {
const chunk = await chunkStore.loadLatest(projectId)
const oldEndVersion = chunk.getEndVersion()
const blob = await blobStore.putString('')
const changes = [
makeChange(
@@ -111,12 +129,7 @@ describe('chunkStore', function () {
]
lastChangeTimestamp = changes[1].getTimestamp()
chunk.pushChanges(changes)
await chunkStore.update(
projectId,
oldEndVersion,
chunk,
pendingChangeTimestamp
)
await chunkStore.update(projectId, chunk, pendingChangeTimestamp)
})
it('records the correct metadata in db readOnly=false', async function () {
@@ -204,12 +217,7 @@ describe('chunkStore', function () {
],
0
)
await chunkStore.update(
projectId,
0,
firstChunk,
pendingChangeTimestamp
)
await chunkStore.update(projectId, firstChunk, pendingChangeTimestamp)
firstChunk = await chunkStore.loadLatest(projectId)
secondChunk = makeChunk(
@@ -234,6 +242,10 @@ describe('chunkStore', function () {
Operation.addFile('quux.tex', File.createLazyFromBlobs(blob)),
thirdChunkTimestamp
),
makeChange(
Operation.addFile('barbar.tex', File.createLazyFromBlobs(blob)),
thirdChunkTimestamp
),
],
4
)
@@ -308,7 +320,7 @@ describe('chunkStore', function () {
const project = await projects.findOne({
_id: new ObjectId(projectRecord.insertedId),
})
expect(project.overleaf.history.currentEndVersion).to.equal(5)
expect(project.overleaf.history.currentEndVersion).to.equal(6)
expect(project.overleaf.history.currentEndTimestamp).to.deep.equal(
thirdChunkTimestamp
)
@@ -323,6 +335,80 @@ describe('chunkStore', function () {
)
})
describe('chunk update', function () {
it('rejects a chunk that removes changes', async function () {
const newChunk = makeChunk([thirdChunk.getChanges()[0]], 4)
await expect(
chunkStore.update(projectId, newChunk)
).to.be.rejectedWith(ChunkVersionConflictError)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw())
})
it('accepts the same chunk', async function () {
await chunkStore.update(projectId, thirdChunk)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw())
})
it('accepts a larger chunk', async function () {
const blob = await blobStore.putString('foobar')
const newChunk = makeChunk(
[
...thirdChunk.getChanges(),
makeChange(
Operation.addFile(
'onemore.tex',
File.createLazyFromBlobs(blob)
),
thirdChunkTimestamp
),
],
4
)
await chunkStore.update(projectId, newChunk)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(newChunk.toRaw())
})
})
describe('chunk create', function () {
let change
beforeEach(async function () {
const blob = await blobStore.putString('foobar')
change = makeChange(
Operation.addFile('onemore.tex', File.createLazyFromBlobs(blob)),
thirdChunkTimestamp
)
})
it('rejects a base version that is too low', async function () {
const newChunk = makeChunk([change], 5)
await expect(
chunkStore.create(projectId, newChunk)
).to.be.rejectedWith(ChunkVersionConflictError)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw())
})
it('rejects a base version that is too high', async function () {
const newChunk = makeChunk([change], 7)
await expect(
chunkStore.create(projectId, newChunk)
).to.be.rejectedWith(ChunkVersionConflictError)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(thirdChunk.toRaw())
})
it('accepts the right base version', async function () {
const newChunk = makeChunk([change], 6)
await chunkStore.create(projectId, newChunk)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk.toRaw()).to.deep.equal(newChunk.toRaw())
})
})
describe('after updating the last chunk', function () {
let newChunk
@@ -341,12 +427,12 @@ describe('chunkStore', function () {
],
4
)
await chunkStore.update(projectId, 5, newChunk)
await chunkStore.update(projectId, newChunk)
newChunk = await chunkStore.loadLatest(projectId)
})
it('replaces the latest chunk', function () {
expect(newChunk.getChanges()).to.have.length(2)
expect(newChunk.getChanges()).to.have.length(3)
})
it('returns the right chunk when querying by version', async function () {
@@ -366,7 +452,7 @@ describe('chunkStore', function () {
const project = await projects.findOne({
_id: new ObjectId(projectRecord.insertedId),
})
expect(project.overleaf.history.currentEndVersion).to.equal(6)
expect(project.overleaf.history.currentEndVersion).to.equal(7)
expect(project.overleaf.history.currentEndTimestamp).to.deep.equal(
thirdChunkTimestamp
)
@@ -529,7 +615,7 @@ describe('chunkStore', function () {
const chunkRecords = []
for await (const chunk of chunkStore.getProjectChunksFromVersion(
projectId,
6
7
)) {
chunkRecords.push(chunk)
}
@@ -566,9 +652,9 @@ describe('chunkStore', function () {
]
chunk.pushChanges(changes)
await expect(
chunkStore.update(projectId, oldEndVersion, chunk)
).to.be.rejectedWith('S3 Error')
await expect(chunkStore.update(projectId, chunk)).to.be.rejectedWith(
'S3 Error'
)
chunk = await chunkStore.loadLatest(projectId)
expect(chunk.getEndVersion()).to.equal(oldEndVersion)
})
@@ -598,7 +684,7 @@ describe('chunkStore', function () {
],
0
)
await chunkStore.update(projectId, 0, chunk)
await chunkStore.update(projectId, chunk)
})
it('refuses to create a chunk with the same start version', async function () {
@@ -704,28 +704,42 @@ describe('chunk buffer Redis backend', function () {
expect(result).to.equal('not_found')
})
it('should set the persisted version', async function () {
// Set head version
const headVersion = 5
await rclient.set(
keySchema.headVersion({ projectId }),
headVersion.toString()
)
describe('when the persisted version is not set', function () {
beforeEach(async function () {
await setupState(projectId, {
headVersion: 5,
persistedVersion: null,
changes: 5,
})
})
// Set persisted version
const persistedVersion = 3
const result = await redisBackend.setPersistedVersion(
projectId,
persistedVersion
)
it('should set the persisted version', async function () {
await redisBackend.setPersistedVersion(projectId, 3)
const state = await redisBackend.getState(projectId)
expect(state.persistedVersion).to.equal(3)
})
})
expect(result).to.equal('ok')
describe('when the persisted version is set', function () {
beforeEach(async function () {
await setupState(projectId, {
headVersion: 5,
persistedVersion: 3,
changes: 5,
})
})
// Verify the persisted version was set
const persistedVersionRedis = await rclient.get(
keySchema.persistedVersion({ projectId })
)
expect(parseInt(persistedVersionRedis, 10)).to.equal(persistedVersion)
it('should set the persisted version', async function () {
await redisBackend.setPersistedVersion(projectId, 5)
const state = await redisBackend.getState(projectId)
expect(state.persistedVersion).to.equal(5)
})
it('should not decrease the persisted version', async function () {
await redisBackend.setPersistedVersion(projectId, 2)
const state = await redisBackend.getState(projectId)
expect(state.persistedVersion).to.equal(3)
})
})
it('should trim the changes list to keep only MAX_PERSISTED_CHANGES beyond persisted version', async function () {