Merge pull request #24968 from overleaf/em-chunks-concurrency-postgres

Handle concurrency during chunk extension in the Postgres backend

GitOrigin-RevId: fd706b73deacf141cbd478d3ed47f298e6c6db72
This commit is contained in:
Eric Mc Sween
2025-04-17 10:03:07 -04:00
committed by Copybot
parent 1658910780
commit d4dc0db23c
8 changed files with 266 additions and 63 deletions

View File

@@ -9,7 +9,7 @@ const assert = check.assert
const MONGO_ID_REGEXP = /^[0-9a-f]{24}$/
const POSTGRES_ID_REGEXP = /^[1-9][0-9]{0,9}$/
const PROJECT_ID_REGEXP = /^([0-9a-f]{24}|[1-9][0-9]{0,9})$/
const MONGO_OR_POSTGRES_ID_REGEXP = /^([0-9a-f]{24}|[1-9][0-9]{0,9})$/
function transaction(transaction, message) {
assert.function(transaction, message)
@@ -29,20 +29,20 @@ function blobHash(arg, message) {
*/
function projectId(arg, message) {
try {
assert.match(arg, PROJECT_ID_REGEXP, message)
assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
/**
* A chunk id is either a number (for projects stored in Postgres) or a 24
* character string (for projects stored in Mongo)
* A chunk id is a string that contains either an integer (for projects stored in Postgres) or 24
* hex digits (for projects stored in Mongo)
*/
function chunkId(arg, message) {
const valid = check.integer(arg) || check.match(arg, MONGO_ID_REGEXP)
if (!valid) {
const error = new TypeError(message)
try {
assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}

View File

@@ -1,3 +1,5 @@
// @ts-check
const { Chunk } = require('overleaf-editor-core')
const assert = require('../assert')
const knex = require('../knex')
@@ -7,6 +9,10 @@ const { updateProjectRecord } = require('./mongo')
const DUPLICATE_KEY_ERROR_CODE = '23505'
/**
* @import { Knex } from 'knex'
*/
/**
* Get the latest chunk's metadata from the database
* @param {string} projectId
@@ -15,11 +21,10 @@ const DUPLICATE_KEY_ERROR_CODE = '23505'
*/
async function getLatestChunk(projectId, opts = {}) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const { readOnly = false } = opts
const record = await (readOnly ? knexReadOnly : knex)('chunks')
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.orderBy('end_version', 'desc')
.first()
if (record == null) {
@@ -30,13 +35,15 @@ async function getLatestChunk(projectId, opts = {}) {
/**
* Get the metadata for the chunk that contains the given version.
*
* @param {string} projectId
* @param {number} version
*/
async function getChunkForVersion(projectId, version) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const record = await knex('chunks')
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.where('end_version', '>=', version)
.orderBy('end_version')
.first()
@@ -48,20 +55,23 @@ async function getChunkForVersion(projectId, version) {
/**
* Get the metadata for the chunk that contains the given version.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
const recordActive = await getChunkForVersion(projectId, 0)
// projectId must be valid if getChunkForVersion did not throw
projectId = parseInt(projectId, 10)
if (recordActive && recordActive.endTimestamp <= timestamp) {
return recordActive
}
// fallback to deleted chunk
const recordDeleted = await knex('old_chunks')
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.where('start_version', '=', 0)
.where('end_timestamp', '<=', timestamp)
.orderBy('end_version', 'desc')
@@ -75,14 +85,16 @@ async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const query = knex('chunks')
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.where(function () {
this.where('end_timestamp', '<=', timestamp).orWhere(
'end_timestamp',
@@ -102,10 +114,12 @@ async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getChunkForTimestamp(projectId, timestamp) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
// This query will find the latest chunk after the timestamp (query orders
// in reverse chronological order), OR the latest chunk
@@ -118,11 +132,11 @@ async function getChunkForTimestamp(projectId, timestamp) {
'WHERE doc_id = ? ' +
'ORDER BY end_version desc LIMIT 1' +
')',
[timestamp, projectId]
[timestamp, parseInt(projectId, 10)]
)
const record = await knex('chunks')
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.where(whereAfterEndTimestampOrLatestChunk)
.orderBy('end_version')
.first()
@@ -137,7 +151,7 @@ async function getChunkForTimestamp(projectId, timestamp) {
*/
function chunkFromRecord(record) {
return {
id: record.id,
id: record.id.toString(),
startVersion: record.start_version,
endVersion: record.end_version,
endTimestamp: record.end_timestamp,
@@ -146,35 +160,41 @@ function chunkFromRecord(record) {
/**
* Get all of a project's chunk ids
*
* @param {string} projectId
*/
async function getProjectChunkIds(projectId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const records = await knex('chunks').select('id').where('doc_id', projectId)
const records = await knex('chunks')
.select('id')
.where('doc_id', parseInt(projectId, 10))
return records.map(record => record.id)
}
/**
* Get all of a projects chunks directly
*
* @param {string} projectId
*/
async function getProjectChunks(projectId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const records = await knex('chunks')
.select()
.where('doc_id', projectId)
.where('doc_id', parseInt(projectId, 10))
.orderBy('end_version')
return records.map(chunkFromRecord)
}
/**
* Insert a pending chunk before sending it to object storage.
*
* @param {string} projectId
* @param {Chunk} chunk
*/
async function insertPendingChunk(projectId, chunk) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const result = await knex.first(
knex.raw("nextval('chunks_id_seq'::regclass)::integer as chunkid")
@@ -182,32 +202,52 @@ async function insertPendingChunk(projectId, chunk) {
const chunkId = result.chunkid
await knex('pending_chunks').insert({
id: chunkId,
doc_id: projectId,
doc_id: parseInt(projectId, 10),
end_version: chunk.getEndVersion(),
start_version: chunk.getStartVersion(),
end_timestamp: chunk.getEndTimestamp(),
})
return chunkId
return chunkId.toString()
}
/**
* Record that a new chunk was created.
*
* @param {string} projectId
* @param {Chunk} chunk
* @param {string} chunkId
* @param {object} opts
* @param {Date} [opts.earliestChangeTimestamp]
* @param {string} [opts.oldChunkId]
*/
async function confirmCreate(projectId, chunk, chunkId, opts = {}) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
await knex.transaction(async tx => {
if (opts.oldChunkId != null) {
await _assertChunkIsNotClosed(tx, projectId, opts.oldChunkId)
await _closeChunk(tx, projectId, opts.oldChunkId)
}
await Promise.all([
_deletePendingChunk(tx, projectId, chunkId),
_insertChunk(tx, projectId, chunk, chunkId),
])
await updateProjectRecord(projectId, chunk, opts.earliestChangeTimestamp)
await updateProjectRecord(
// The history id in Mongo is an integer for Postgres projects
parseInt(projectId, 10),
chunk,
opts.earliestChangeTimestamp
)
})
}
/**
* Record that a chunk was replaced by a new one.
*
* @param {string} projectId
* @param {string} oldChunkId
* @param {Chunk} newChunk
* @param {string} newChunkId
*/
async function confirmUpdate(
projectId,
@@ -217,40 +257,64 @@ async function confirmUpdate(
opts = {}
) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
await knex.transaction(async tx => {
await _assertChunkIsNotClosed(tx, projectId, oldChunkId)
await _deleteChunks(tx, { doc_id: projectId, id: oldChunkId })
await Promise.all([
_deletePendingChunk(tx, projectId, newChunkId),
_insertChunk(tx, projectId, newChunk, newChunkId),
])
await updateProjectRecord(projectId, newChunk, opts.earliestChangeTimestamp)
await updateProjectRecord(
// The history id in Mongo is an integer for Postgres projects
parseInt(projectId, 10),
newChunk,
opts.earliestChangeTimestamp
)
})
}
/**
* Delete a pending chunk
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _deletePendingChunk(tx, projectId, chunkId) {
await tx('pending_chunks')
.where({
doc_id: projectId,
id: chunkId,
doc_id: parseInt(projectId, 10),
id: parseInt(chunkId, 10),
})
.del()
}
/**
* Adds an active chunk
*
* @param {Knex} tx
* @param {string} projectId
* @param {Chunk} chunk
* @param {string} chunkId
*/
async function _insertChunk(tx, projectId, chunk, chunkId) {
const startVersion = chunk.getStartVersion()
const endVersion = chunk.getEndVersion()
try {
await tx('chunks').insert({
id: chunkId,
doc_id: projectId,
id: parseInt(chunkId, 10),
doc_id: parseInt(projectId, 10),
start_version: startVersion,
end_version: endVersion,
end_timestamp: chunk.getEndTimestamp(),
})
} catch (err) {
if (err.code === DUPLICATE_KEY_ERROR_CODE) {
if (
err instanceof Error &&
'code' in err &&
err.code === DUPLICATE_KEY_ERROR_CODE
) {
throw new ChunkVersionConflictError(
'chunk start or end version is not unique',
{ projectId, chunkId, startVersion, endVersion }
@@ -260,31 +324,92 @@ async function _insertChunk(tx, projectId, chunk, chunkId) {
}
}
/**
* Check that a chunk is not closed
*
* This is used to synchronize chunk creations and extensions.
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _assertChunkIsNotClosed(tx, projectId, chunkId) {
const record = await tx('chunks')
.forUpdate()
.select('closed')
.where('doc_id', parseInt(projectId, 10))
.where('id', parseInt(chunkId, 10))
.first()
if (!record) {
throw new ChunkVersionConflictError('unable to close chunk: not found', {
projectId,
chunkId,
})
}
if (record.closed) {
throw new ChunkVersionConflictError(
'unable to close chunk: already closed',
{
projectId,
chunkId,
}
)
}
}
/**
* Close a chunk
*
* A closed chunk can no longer be extended.
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _closeChunk(tx, projectId, chunkId) {
await tx('chunks')
.update({ closed: true })
.where('doc_id', parseInt(projectId, 10))
.where('id', parseInt(chunkId, 10))
}
/**
* Delete a chunk.
*
* @param {string} projectId
* @param {string} chunkId
*/
async function deleteChunk(projectId, chunkId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
assert.integer(chunkId, 'bad chunkId')
await _deleteChunks(knex, { doc_id: projectId, id: chunkId })
await _deleteChunks(knex, {
doc_id: parseInt(projectId, 10),
id: parseInt(chunkId, 10),
})
}
/**
* Delete all of a project's chunks
*
* @param {string} projectId
*/
async function deleteProjectChunks(projectId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
await knex.transaction(async tx => {
await _deleteChunks(knex, { doc_id: projectId })
await _deleteChunks(knex, { doc_id: parseInt(projectId, 10) })
})
}
/**
* Delete many chunks
*
* @param {Knex} tx
* @param {any} whereClause
*/
async function _deleteChunks(tx, whereClause) {
const rows = await tx('chunks').returning('*').where(whereClause).del()
const rows = await tx('chunks').where(whereClause).del().returning('*')
if (rows.length === 0) {
return
}
@@ -302,6 +427,9 @@ async function _deleteChunks(tx, whereClause) {
/**
* Get a batch of old chunks for deletion
*
* @param {number} count
* @param {number} minAgeSecs
*/
async function getOldChunksBatch(count, minAgeSecs) {
const maxDeletedAt = new Date(Date.now() - minAgeSecs * 1000)
@@ -312,15 +440,22 @@ async function getOldChunksBatch(count, minAgeSecs) {
.limit(count)
return records.map(oldChunk => ({
projectId: oldChunk.doc_id.toString(),
chunkId: oldChunk.chunk_id,
chunkId: oldChunk.chunk_id.toString(),
}))
}
/**
* Delete a batch of old chunks from the database
*
* @param {string[]} chunkIds
*/
async function deleteOldChunks(chunkIds) {
await knex('old_chunks').whereIn('chunk_id', chunkIds).del()
await knex('old_chunks')
.whereIn(
'chunk_id',
chunkIds.map(id => parseInt(id, 10))
)
.del()
}
/**

View File

@@ -25,8 +25,8 @@ const gunzip = promisify(zlib.gunzip)
class LoadError extends OError {
/**
* @param {number|string} projectId
* @param {number|string} chunkId
* @param {string} projectId
* @param {string} chunkId
* @param {any} cause
*/
constructor(projectId, chunkId, cause) {
@@ -42,8 +42,8 @@ class LoadError extends OError {
class StoreError extends OError {
/**
* @param {number|string} projectId
* @param {number|string} chunkId
* @param {string} projectId
* @param {string} chunkId
* @param {any} cause
*/
constructor(projectId, chunkId, cause) {
@@ -58,8 +58,8 @@ class StoreError extends OError {
}
/**
* @param {number|string} projectId
* @param {number|string} chunkId
* @param {string} projectId
* @param {string} chunkId
* @return {string}
*/
function getKey(projectId, chunkId) {
@@ -89,8 +89,8 @@ class HistoryStore {
/**
* Load the raw object for a History.
*
* @param {number|string} projectId
* @param {number|string} chunkId
* @param {string} projectId
* @param {string} chunkId
* @return {Promise<import('overleaf-editor-core/lib/types').RawHistory>}
*/
async loadRaw(projectId, chunkId) {
@@ -144,8 +144,8 @@ class HistoryStore {
/**
* Compress and store a {@link History}.
*
* @param {number|string} projectId
* @param {number|string} chunkId
* @param {string} projectId
* @param {string} chunkId
* @param {import('overleaf-editor-core/lib/types').RawHistory} rawHistory
*/
async storeRaw(projectId, chunkId, rawHistory) {

View File

@@ -1,6 +1,8 @@
// @ts-check
'use strict'
const env = process.env.NODE_ENV || 'development'
const knexfile = require('../../knexfile')
module.exports = require('knex')(knexfile[env])
module.exports = require('knex').default(knexfile[env])

View File

@@ -92,9 +92,9 @@ describe('assert', function () {
).to.not.throw()
})
it('should not throw for valid integer chunk ids', function () {
it('should not throw for valid postgres chunk ids', function () {
expect(() =>
assert.chunkId(123456789, 'should be a chunk id')
assert.chunkId('123456789', 'should be a chunk id')
).to.not.throw()
})
@@ -109,14 +109,14 @@ describe('assert', function () {
}
})
it('should throw for string integer chunk ids', function () {
it('should throw for integer chunk ids', function () {
try {
assert.chunkId('12345', 'should be a chunk id')
assert.chunkId(12345, 'should be a chunk id')
expect.fail()
} catch (error) {
expect(error).to.be.instanceOf(TypeError)
expect(error.message).to.equal('should be a chunk id')
expect(OError.getFullInfo(error)).to.deep.equal({ arg: '12345' })
expect(OError.getFullInfo(error)).to.deep.equal({ arg: 12345 })
}
})
})

View File

@@ -1,7 +1,15 @@
const { expect } = require('chai')
const { ObjectId } = require('mongodb')
const { Chunk, Snapshot, History } = require('overleaf-editor-core')
const {
Chunk,
Snapshot,
History,
Change,
AddFileOperation,
File,
} = require('overleaf-editor-core')
const cleanup = require('./support/cleanup')
const { ChunkVersionConflictError } = require('../../../../storage')
const backend = require('../../../../storage/lib/chunk_store/postgres')
describe('chunk store Postgres backend', function () {
@@ -38,6 +46,60 @@ describe('chunk store Postgres backend', function () {
backend.deleteProjectChunks(invalidProjectId)
).to.be.rejectedWith('bad projectId')
})
describe('conflicts between chunk extension and chunk creation', function () {
let projectId,
baseChunkId,
updatedChunkId,
newChunkId,
updatedChunk,
newChunk
beforeEach(async function () {
projectId = '1234'
const baseChunk = makeChunk([], 0)
baseChunkId = await backend.insertPendingChunk(projectId, baseChunk)
await backend.confirmCreate(projectId, baseChunk, baseChunkId)
const change = new Change(
[new AddFileOperation('main.tex', File.fromString('hello'))],
new Date()
)
updatedChunk = makeChunk([change], 0)
updatedChunkId = await backend.insertPendingChunk(projectId, updatedChunk)
newChunk = makeChunk([change], 1)
newChunkId = await backend.insertPendingChunk(projectId, newChunk)
})
it('prevents creation after extension', async function () {
await backend.confirmUpdate(
projectId,
baseChunkId,
updatedChunk,
updatedChunkId
)
await expect(
backend.confirmCreate(projectId, newChunk, newChunkId, {
oldChunkId: baseChunkId,
})
).to.be.rejectedWith(ChunkVersionConflictError)
})
it('prevents extension after creation', async function () {
await backend.confirmCreate(projectId, newChunk, newChunkId, {
oldChunkId: baseChunkId,
})
await expect(
backend.confirmUpdate(
projectId,
baseChunkId,
updatedChunk,
updatedChunkId
)
).to.be.rejectedWith(ChunkVersionConflictError)
})
})
})
function makeChunk(changes, versionNumber) {

View File

@@ -15,7 +15,7 @@ exports.chunks = {
exports.histories = {
chunkOne: {
projectId: DocFixtures.initializedProject.id,
chunkId: 1000000,
chunkId: '1000000',
json: { snapshot: { files: {} }, changes: [] },
},
}

View File

@@ -76,9 +76,13 @@ describe('tasks', function () {
await mongodb.chunks.insertMany(mongoChunks)
await Promise.all([
...postgresChunks.map(chunk =>
historyStore.storeRaw(postgresProjectId.toString(), chunk.chunk_id, {
history: 'raw history',
})
historyStore.storeRaw(
postgresProjectId.toString(),
chunk.chunk_id.toString(),
{
history: 'raw history',
}
)
),
...mongoChunks.map(chunk =>
historyStore.storeRaw(mongoProjectId.toString(), chunk._id.toString(), {