From d4dc0db23c3d455a309aa8177a8eebcd5005ce09 Mon Sep 17 00:00:00 2001 From: Eric Mc Sween <5454374+emcsween@users.noreply.github.com> Date: Thu, 17 Apr 2025 10:03:07 -0400 Subject: [PATCH] Merge pull request #24968 from overleaf/em-chunks-concurrency-postgres Handle concurrency during chunk extension in the Postgres backend GitOrigin-RevId: fd706b73deacf141cbd478d3ed47f298e6c6db72 --- services/history-v1/storage/lib/assert.js | 14 +- .../storage/lib/chunk_store/postgres.js | 205 +++++++++++++++--- .../history-v1/storage/lib/history_store.js | 20 +- services/history-v1/storage/lib/knex.js | 4 +- .../test/acceptance/js/storage/assert.test.js | 10 +- .../chunk_store_postgres_backend.test.js | 64 +++++- .../acceptance/js/storage/fixtures/chunks.js | 2 +- .../test/acceptance/js/storage/tasks.test.js | 10 +- 8 files changed, 266 insertions(+), 63 deletions(-) diff --git a/services/history-v1/storage/lib/assert.js b/services/history-v1/storage/lib/assert.js index 676b1e51ac..91f24da7e0 100644 --- a/services/history-v1/storage/lib/assert.js +++ b/services/history-v1/storage/lib/assert.js @@ -9,7 +9,7 @@ const assert = check.assert const MONGO_ID_REGEXP = /^[0-9a-f]{24}$/ const POSTGRES_ID_REGEXP = /^[1-9][0-9]{0,9}$/ -const PROJECT_ID_REGEXP = /^([0-9a-f]{24}|[1-9][0-9]{0,9})$/ +const MONGO_OR_POSTGRES_ID_REGEXP = /^([0-9a-f]{24}|[1-9][0-9]{0,9})$/ function transaction(transaction, message) { assert.function(transaction, message) @@ -29,20 +29,20 @@ function blobHash(arg, message) { */ function projectId(arg, message) { try { - assert.match(arg, PROJECT_ID_REGEXP, message) + assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message) } catch (error) { throw OError.tag(error, message, { arg }) } } /** - * A chunk id is either a number (for projects stored in Postgres) or a 24 - * character string (for projects stored in Mongo) + * A chunk id is a string that contains either an integer (for projects stored in Postgres) or 24 + * hex digits (for projects stored in Mongo) */ function chunkId(arg, message) { - const valid = check.integer(arg) || check.match(arg, MONGO_ID_REGEXP) - if (!valid) { - const error = new TypeError(message) + try { + assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message) + } catch (error) { throw OError.tag(error, message, { arg }) } } diff --git a/services/history-v1/storage/lib/chunk_store/postgres.js b/services/history-v1/storage/lib/chunk_store/postgres.js index 0ddde27234..0c33c0fd82 100644 --- a/services/history-v1/storage/lib/chunk_store/postgres.js +++ b/services/history-v1/storage/lib/chunk_store/postgres.js @@ -1,3 +1,5 @@ +// @ts-check + const { Chunk } = require('overleaf-editor-core') const assert = require('../assert') const knex = require('../knex') @@ -7,6 +9,10 @@ const { updateProjectRecord } = require('./mongo') const DUPLICATE_KEY_ERROR_CODE = '23505' +/** + * @import { Knex } from 'knex' + */ + /** * Get the latest chunk's metadata from the database * @param {string} projectId @@ -15,11 +21,10 @@ const DUPLICATE_KEY_ERROR_CODE = '23505' */ async function getLatestChunk(projectId, opts = {}) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) const { readOnly = false } = opts const record = await (readOnly ? knexReadOnly : knex)('chunks') - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .orderBy('end_version', 'desc') .first() if (record == null) { @@ -30,13 +35,15 @@ async function getLatestChunk(projectId, opts = {}) { /** * Get the metadata for the chunk that contains the given version. + * + * @param {string} projectId + * @param {number} version */ async function getChunkForVersion(projectId, version) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) const record = await knex('chunks') - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .where('end_version', '>=', version) .orderBy('end_version') .first() @@ -48,20 +55,23 @@ async function getChunkForVersion(projectId, version) { /** * Get the metadata for the chunk that contains the given version. + * + * @param {string} projectId + * @param {Date} timestamp */ async function getFirstChunkBeforeTimestamp(projectId, timestamp) { assert.date(timestamp, 'bad timestamp') const recordActive = await getChunkForVersion(projectId, 0) + // projectId must be valid if getChunkForVersion did not throw - projectId = parseInt(projectId, 10) if (recordActive && recordActive.endTimestamp <= timestamp) { return recordActive } // fallback to deleted chunk const recordDeleted = await knex('old_chunks') - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .where('start_version', '=', 0) .where('end_timestamp', '<=', timestamp) .orderBy('end_version', 'desc') @@ -75,14 +85,16 @@ async function getFirstChunkBeforeTimestamp(projectId, timestamp) { /** * Get the metadata for the chunk that contains the version that was current at * the given timestamp. + * + * @param {string} projectId + * @param {Date} timestamp */ async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) { assert.date(timestamp, 'bad timestamp') assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) const query = knex('chunks') - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .where(function () { this.where('end_timestamp', '<=', timestamp).orWhere( 'end_timestamp', @@ -102,10 +114,12 @@ async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) { /** * Get the metadata for the chunk that contains the version that was current at * the given timestamp. + * + * @param {string} projectId + * @param {Date} timestamp */ async function getChunkForTimestamp(projectId, timestamp) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) // This query will find the latest chunk after the timestamp (query orders // in reverse chronological order), OR the latest chunk @@ -118,11 +132,11 @@ async function getChunkForTimestamp(projectId, timestamp) { 'WHERE doc_id = ? ' + 'ORDER BY end_version desc LIMIT 1' + ')', - [timestamp, projectId] + [timestamp, parseInt(projectId, 10)] ) const record = await knex('chunks') - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .where(whereAfterEndTimestampOrLatestChunk) .orderBy('end_version') .first() @@ -137,7 +151,7 @@ async function getChunkForTimestamp(projectId, timestamp) { */ function chunkFromRecord(record) { return { - id: record.id, + id: record.id.toString(), startVersion: record.start_version, endVersion: record.end_version, endTimestamp: record.end_timestamp, @@ -146,35 +160,41 @@ function chunkFromRecord(record) { /** * Get all of a project's chunk ids + * + * @param {string} projectId */ async function getProjectChunkIds(projectId) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) - const records = await knex('chunks').select('id').where('doc_id', projectId) + const records = await knex('chunks') + .select('id') + .where('doc_id', parseInt(projectId, 10)) return records.map(record => record.id) } /** * Get all of a projects chunks directly + * + * @param {string} projectId */ async function getProjectChunks(projectId) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) const records = await knex('chunks') .select() - .where('doc_id', projectId) + .where('doc_id', parseInt(projectId, 10)) .orderBy('end_version') return records.map(chunkFromRecord) } /** * Insert a pending chunk before sending it to object storage. + * + * @param {string} projectId + * @param {Chunk} chunk */ async function insertPendingChunk(projectId, chunk) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) const result = await knex.first( knex.raw("nextval('chunks_id_seq'::regclass)::integer as chunkid") @@ -182,32 +202,52 @@ async function insertPendingChunk(projectId, chunk) { const chunkId = result.chunkid await knex('pending_chunks').insert({ id: chunkId, - doc_id: projectId, + doc_id: parseInt(projectId, 10), end_version: chunk.getEndVersion(), start_version: chunk.getStartVersion(), end_timestamp: chunk.getEndTimestamp(), }) - return chunkId + return chunkId.toString() } /** * Record that a new chunk was created. + * + * @param {string} projectId + * @param {Chunk} chunk + * @param {string} chunkId + * @param {object} opts + * @param {Date} [opts.earliestChangeTimestamp] + * @param {string} [opts.oldChunkId] */ async function confirmCreate(projectId, chunk, chunkId, opts = {}) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) await knex.transaction(async tx => { + if (opts.oldChunkId != null) { + await _assertChunkIsNotClosed(tx, projectId, opts.oldChunkId) + await _closeChunk(tx, projectId, opts.oldChunkId) + } await Promise.all([ _deletePendingChunk(tx, projectId, chunkId), _insertChunk(tx, projectId, chunk, chunkId), ]) - await updateProjectRecord(projectId, chunk, opts.earliestChangeTimestamp) + await updateProjectRecord( + // The history id in Mongo is an integer for Postgres projects + parseInt(projectId, 10), + chunk, + opts.earliestChangeTimestamp + ) }) } /** * Record that a chunk was replaced by a new one. + * + * @param {string} projectId + * @param {string} oldChunkId + * @param {Chunk} newChunk + * @param {string} newChunkId */ async function confirmUpdate( projectId, @@ -217,40 +257,64 @@ async function confirmUpdate( opts = {} ) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) await knex.transaction(async tx => { + await _assertChunkIsNotClosed(tx, projectId, oldChunkId) await _deleteChunks(tx, { doc_id: projectId, id: oldChunkId }) await Promise.all([ _deletePendingChunk(tx, projectId, newChunkId), _insertChunk(tx, projectId, newChunk, newChunkId), ]) - await updateProjectRecord(projectId, newChunk, opts.earliestChangeTimestamp) + await updateProjectRecord( + // The history id in Mongo is an integer for Postgres projects + parseInt(projectId, 10), + newChunk, + opts.earliestChangeTimestamp + ) }) } +/** + * Delete a pending chunk + * + * @param {Knex} tx + * @param {string} projectId + * @param {string} chunkId + */ async function _deletePendingChunk(tx, projectId, chunkId) { await tx('pending_chunks') .where({ - doc_id: projectId, - id: chunkId, + doc_id: parseInt(projectId, 10), + id: parseInt(chunkId, 10), }) .del() } +/** + * Adds an active chunk + * + * @param {Knex} tx + * @param {string} projectId + * @param {Chunk} chunk + * @param {string} chunkId + */ async function _insertChunk(tx, projectId, chunk, chunkId) { const startVersion = chunk.getStartVersion() const endVersion = chunk.getEndVersion() try { await tx('chunks').insert({ - id: chunkId, - doc_id: projectId, + id: parseInt(chunkId, 10), + doc_id: parseInt(projectId, 10), start_version: startVersion, end_version: endVersion, end_timestamp: chunk.getEndTimestamp(), }) } catch (err) { - if (err.code === DUPLICATE_KEY_ERROR_CODE) { + if ( + err instanceof Error && + 'code' in err && + err.code === DUPLICATE_KEY_ERROR_CODE + ) { throw new ChunkVersionConflictError( 'chunk start or end version is not unique', { projectId, chunkId, startVersion, endVersion } @@ -260,31 +324,92 @@ async function _insertChunk(tx, projectId, chunk, chunkId) { } } +/** + * Check that a chunk is not closed + * + * This is used to synchronize chunk creations and extensions. + * + * @param {Knex} tx + * @param {string} projectId + * @param {string} chunkId + */ +async function _assertChunkIsNotClosed(tx, projectId, chunkId) { + const record = await tx('chunks') + .forUpdate() + .select('closed') + .where('doc_id', parseInt(projectId, 10)) + .where('id', parseInt(chunkId, 10)) + .first() + if (!record) { + throw new ChunkVersionConflictError('unable to close chunk: not found', { + projectId, + chunkId, + }) + } + if (record.closed) { + throw new ChunkVersionConflictError( + 'unable to close chunk: already closed', + { + projectId, + chunkId, + } + ) + } +} + +/** + * Close a chunk + * + * A closed chunk can no longer be extended. + * + * @param {Knex} tx + * @param {string} projectId + * @param {string} chunkId + */ +async function _closeChunk(tx, projectId, chunkId) { + await tx('chunks') + .update({ closed: true }) + .where('doc_id', parseInt(projectId, 10)) + .where('id', parseInt(chunkId, 10)) +} + /** * Delete a chunk. + * + * @param {string} projectId + * @param {string} chunkId */ async function deleteChunk(projectId, chunkId) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) assert.integer(chunkId, 'bad chunkId') - await _deleteChunks(knex, { doc_id: projectId, id: chunkId }) + await _deleteChunks(knex, { + doc_id: parseInt(projectId, 10), + id: parseInt(chunkId, 10), + }) } /** * Delete all of a project's chunks + * + * @param {string} projectId */ async function deleteProjectChunks(projectId) { assert.postgresId(projectId, 'bad projectId') - projectId = parseInt(projectId, 10) await knex.transaction(async tx => { - await _deleteChunks(knex, { doc_id: projectId }) + await _deleteChunks(knex, { doc_id: parseInt(projectId, 10) }) }) } +/** + * Delete many chunks + * + * @param {Knex} tx + * @param {any} whereClause + */ async function _deleteChunks(tx, whereClause) { - const rows = await tx('chunks').returning('*').where(whereClause).del() + const rows = await tx('chunks').where(whereClause).del().returning('*') if (rows.length === 0) { return } @@ -302,6 +427,9 @@ async function _deleteChunks(tx, whereClause) { /** * Get a batch of old chunks for deletion + * + * @param {number} count + * @param {number} minAgeSecs */ async function getOldChunksBatch(count, minAgeSecs) { const maxDeletedAt = new Date(Date.now() - minAgeSecs * 1000) @@ -312,15 +440,22 @@ async function getOldChunksBatch(count, minAgeSecs) { .limit(count) return records.map(oldChunk => ({ projectId: oldChunk.doc_id.toString(), - chunkId: oldChunk.chunk_id, + chunkId: oldChunk.chunk_id.toString(), })) } /** * Delete a batch of old chunks from the database + * + * @param {string[]} chunkIds */ async function deleteOldChunks(chunkIds) { - await knex('old_chunks').whereIn('chunk_id', chunkIds).del() + await knex('old_chunks') + .whereIn( + 'chunk_id', + chunkIds.map(id => parseInt(id, 10)) + ) + .del() } /** diff --git a/services/history-v1/storage/lib/history_store.js b/services/history-v1/storage/lib/history_store.js index d16820d74c..e51bdc25c5 100644 --- a/services/history-v1/storage/lib/history_store.js +++ b/services/history-v1/storage/lib/history_store.js @@ -25,8 +25,8 @@ const gunzip = promisify(zlib.gunzip) class LoadError extends OError { /** - * @param {number|string} projectId - * @param {number|string} chunkId + * @param {string} projectId + * @param {string} chunkId * @param {any} cause */ constructor(projectId, chunkId, cause) { @@ -42,8 +42,8 @@ class LoadError extends OError { class StoreError extends OError { /** - * @param {number|string} projectId - * @param {number|string} chunkId + * @param {string} projectId + * @param {string} chunkId * @param {any} cause */ constructor(projectId, chunkId, cause) { @@ -58,8 +58,8 @@ class StoreError extends OError { } /** - * @param {number|string} projectId - * @param {number|string} chunkId + * @param {string} projectId + * @param {string} chunkId * @return {string} */ function getKey(projectId, chunkId) { @@ -89,8 +89,8 @@ class HistoryStore { /** * Load the raw object for a History. * - * @param {number|string} projectId - * @param {number|string} chunkId + * @param {string} projectId + * @param {string} chunkId * @return {Promise} */ async loadRaw(projectId, chunkId) { @@ -144,8 +144,8 @@ class HistoryStore { /** * Compress and store a {@link History}. * - * @param {number|string} projectId - * @param {number|string} chunkId + * @param {string} projectId + * @param {string} chunkId * @param {import('overleaf-editor-core/lib/types').RawHistory} rawHistory */ async storeRaw(projectId, chunkId, rawHistory) { diff --git a/services/history-v1/storage/lib/knex.js b/services/history-v1/storage/lib/knex.js index 5cdc85e2ab..7000fe034c 100644 --- a/services/history-v1/storage/lib/knex.js +++ b/services/history-v1/storage/lib/knex.js @@ -1,6 +1,8 @@ +// @ts-check + 'use strict' const env = process.env.NODE_ENV || 'development' const knexfile = require('../../knexfile') -module.exports = require('knex')(knexfile[env]) +module.exports = require('knex').default(knexfile[env]) diff --git a/services/history-v1/test/acceptance/js/storage/assert.test.js b/services/history-v1/test/acceptance/js/storage/assert.test.js index 29a38f7765..6ba30e2562 100644 --- a/services/history-v1/test/acceptance/js/storage/assert.test.js +++ b/services/history-v1/test/acceptance/js/storage/assert.test.js @@ -92,9 +92,9 @@ describe('assert', function () { ).to.not.throw() }) - it('should not throw for valid integer chunk ids', function () { + it('should not throw for valid postgres chunk ids', function () { expect(() => - assert.chunkId(123456789, 'should be a chunk id') + assert.chunkId('123456789', 'should be a chunk id') ).to.not.throw() }) @@ -109,14 +109,14 @@ describe('assert', function () { } }) - it('should throw for string integer chunk ids', function () { + it('should throw for integer chunk ids', function () { try { - assert.chunkId('12345', 'should be a chunk id') + assert.chunkId(12345, 'should be a chunk id') expect.fail() } catch (error) { expect(error).to.be.instanceOf(TypeError) expect(error.message).to.equal('should be a chunk id') - expect(OError.getFullInfo(error)).to.deep.equal({ arg: '12345' }) + expect(OError.getFullInfo(error)).to.deep.equal({ arg: 12345 }) } }) }) diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_postgres_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_postgres_backend.test.js index 9b7ca4d763..cd1d705bdc 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_postgres_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_postgres_backend.test.js @@ -1,7 +1,15 @@ const { expect } = require('chai') const { ObjectId } = require('mongodb') -const { Chunk, Snapshot, History } = require('overleaf-editor-core') +const { + Chunk, + Snapshot, + History, + Change, + AddFileOperation, + File, +} = require('overleaf-editor-core') const cleanup = require('./support/cleanup') +const { ChunkVersionConflictError } = require('../../../../storage') const backend = require('../../../../storage/lib/chunk_store/postgres') describe('chunk store Postgres backend', function () { @@ -38,6 +46,60 @@ describe('chunk store Postgres backend', function () { backend.deleteProjectChunks(invalidProjectId) ).to.be.rejectedWith('bad projectId') }) + + describe('conflicts between chunk extension and chunk creation', function () { + let projectId, + baseChunkId, + updatedChunkId, + newChunkId, + updatedChunk, + newChunk + + beforeEach(async function () { + projectId = '1234' + const baseChunk = makeChunk([], 0) + baseChunkId = await backend.insertPendingChunk(projectId, baseChunk) + await backend.confirmCreate(projectId, baseChunk, baseChunkId) + + const change = new Change( + [new AddFileOperation('main.tex', File.fromString('hello'))], + new Date() + ) + + updatedChunk = makeChunk([change], 0) + updatedChunkId = await backend.insertPendingChunk(projectId, updatedChunk) + newChunk = makeChunk([change], 1) + newChunkId = await backend.insertPendingChunk(projectId, newChunk) + }) + + it('prevents creation after extension', async function () { + await backend.confirmUpdate( + projectId, + baseChunkId, + updatedChunk, + updatedChunkId + ) + await expect( + backend.confirmCreate(projectId, newChunk, newChunkId, { + oldChunkId: baseChunkId, + }) + ).to.be.rejectedWith(ChunkVersionConflictError) + }) + + it('prevents extension after creation', async function () { + await backend.confirmCreate(projectId, newChunk, newChunkId, { + oldChunkId: baseChunkId, + }) + await expect( + backend.confirmUpdate( + projectId, + baseChunkId, + updatedChunk, + updatedChunkId + ) + ).to.be.rejectedWith(ChunkVersionConflictError) + }) + }) }) function makeChunk(changes, versionNumber) { diff --git a/services/history-v1/test/acceptance/js/storage/fixtures/chunks.js b/services/history-v1/test/acceptance/js/storage/fixtures/chunks.js index 8f67c17d71..0fb50e49e9 100644 --- a/services/history-v1/test/acceptance/js/storage/fixtures/chunks.js +++ b/services/history-v1/test/acceptance/js/storage/fixtures/chunks.js @@ -15,7 +15,7 @@ exports.chunks = { exports.histories = { chunkOne: { projectId: DocFixtures.initializedProject.id, - chunkId: 1000000, + chunkId: '1000000', json: { snapshot: { files: {} }, changes: [] }, }, } diff --git a/services/history-v1/test/acceptance/js/storage/tasks.test.js b/services/history-v1/test/acceptance/js/storage/tasks.test.js index 04f9cd12c3..e43bdac79f 100644 --- a/services/history-v1/test/acceptance/js/storage/tasks.test.js +++ b/services/history-v1/test/acceptance/js/storage/tasks.test.js @@ -76,9 +76,13 @@ describe('tasks', function () { await mongodb.chunks.insertMany(mongoChunks) await Promise.all([ ...postgresChunks.map(chunk => - historyStore.storeRaw(postgresProjectId.toString(), chunk.chunk_id, { - history: 'raw history', - }) + historyStore.storeRaw( + postgresProjectId.toString(), + chunk.chunk_id.toString(), + { + history: 'raw history', + } + ) ), ...mongoChunks.map(chunk => historyStore.storeRaw(mongoProjectId.toString(), chunk._id.toString(), {