From 84ed96522b247fa427ffe1a89d71daa68b34dad9 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 24 Feb 2026 09:44:39 +0100 Subject: [PATCH] [docstore] optimize op-log size using pipeline update (#31752) Co-authored-by: Brian Gough GitOrigin-RevId: 2850d6b419df360768d8fe172f19e70baf7b4442 --- services/docstore/app/js/MongoManager.js | 87 +++++++++-- .../test/unit/js/MongoManager.test.js | 141 ++++++++++++++++-- 2 files changed, 205 insertions(+), 23 deletions(-) diff --git a/services/docstore/app/js/MongoManager.js b/services/docstore/app/js/MongoManager.js index 5e4110dace..0650c5b9e0 100644 --- a/services/docstore/app/js/MongoManager.js +++ b/services/docstore/app/js/MongoManager.js @@ -2,6 +2,8 @@ import mongodb from './mongodb.js' import Settings from '@overleaf/settings' import Errors from './Errors.js' import Metrics from '@overleaf/metrics' +import logger from '@overleaf/logger' +import _ from 'lodash' const { db, ObjectId, BSON } = mongodb @@ -90,6 +92,21 @@ async function getNonDeletedArchivedProjectDocs(projectId, maxResults) { return docs } +function convertUpdateToPipeline(update) { + const pipeline = [] + for (const [operation, ops] of Object.entries(update)) { + for (const [field, value] of Object.entries(ops)) { + if (operation === '$unset') { + // $unset uses a different schema in a pipeline + pipeline.push({ [operation]: field }) + } else { + pipeline.push({ [operation]: { [field]: value } }) + } + } + } + return pipeline +} + async function upsertIntoDocCollection(projectId, docId, previousRev, updates) { if (previousRev) { const update = { @@ -97,21 +114,46 @@ async function upsertIntoDocCollection(projectId, docId, previousRev, updates) { $unset: { inS3: true }, } if (updates.lines || updates.ranges) { - update.$inc = { rev: 1 } + update.$set.rev = previousRev + 1 } - const payloadSize = BSON.calculateObjectSize(update) + const pipeline = convertUpdateToPipeline(update) + const payloadSize = BSON.calculateObjectSize(pipeline) Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'update' }) - const result = await db.docs.updateOne( + const result = await db.docs.findOneAndUpdate( { _id: new ObjectId(docId), project_id: new ObjectId(projectId), rev: previousRev, }, - update + pipeline, + { returnDocument: 'after' } ) - if (result.matchedCount !== 1) { + if (!result) { throw new Errors.DocRevValueError() } + let fallbackUpdate = false + if (updates.lines && !_.isEqual(updates.lines, result.lines)) { + logger.warn({ projectId, docId }, 'lines are different after pipeline') + fallbackUpdate = true + } + if (updates.ranges && !_.isEqual(updates.ranges, result.ranges)) { + logger.warn({ projectId, docId }, 'ranges are different after pipeline') + fallbackUpdate = true + } + if (fallbackUpdate) { + update.$set.rev = previousRev + 2 + const result = await db.docs.updateOne( + { + _id: new ObjectId(docId), + project_id: new ObjectId(projectId), + rev: previousRev + 1, + }, + update + ) + if (result.matchedCount !== 1) { + throw new Errors.DocRevValueError() + } + } } else { const payloadSize = BSON.calculateObjectSize(updates) Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'insert' }) @@ -202,16 +244,42 @@ async function restoreArchivedDoc(projectId, docId, archivedDoc) { inS3: true, }, } - const payloadSize = BSON.calculateObjectSize(update) + const pipeline = convertUpdateToPipeline(update) + const payloadSize = BSON.calculateObjectSize(pipeline) Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'restore' }) - const result = await db.docs.updateOne(query, update) - - if (result.matchedCount === 0) { + const result = await db.docs.findOneAndUpdate(query, pipeline, { + returnDocument: 'after', + }) + if (!result) { throw new Errors.DocRevValueError('failed to unarchive doc', { docId, rev: archivedDoc.rev, }) } + let fallbackUpdate = false + if (!_.isEqual(update.$set.lines, result.lines)) { + logger.warn( + { projectId, docId }, + 'lines are different after pipeline when unarchiving' + ) + fallbackUpdate = true + } + if (!_.isEqual(update.$set.ranges, result.ranges)) { + logger.warn( + { projectId, docId }, + 'ranges are different after pipeline when unarchiving' + ) + fallbackUpdate = true + } + if (fallbackUpdate) { + const result = await db.docs.updateOne(query, update) + if (result.matchedCount === 0) { + throw new Errors.DocRevValueError('failed to unarchive doc', { + docId, + rev: archivedDoc.rev, + }) + } + } } async function getDocRev(docId) { @@ -251,6 +319,7 @@ async function destroyProject(projectId) { } export default { + convertUpdateToPipeline, findDoc, getProjectsDeletedDocs, getProjectsDocs, diff --git a/services/docstore/test/unit/js/MongoManager.test.js b/services/docstore/test/unit/js/MongoManager.test.js index df6bb8a8d3..45dd8ff51f 100644 --- a/services/docstore/test/unit/js/MongoManager.test.js +++ b/services/docstore/test/unit/js/MongoManager.test.js @@ -15,6 +15,7 @@ describe('MongoManager', () => { beforeEach(async ctx => { ctx.db = { docs: { + findOneAndUpdate: sinon.stub().resolves({}), updateOne: sinon.stub().resolves({ matchedCount: 1 }), insertOne: sinon.stub().resolves(), }, @@ -47,6 +48,22 @@ describe('MongoManager', () => { ctx.lines = ['Three French hens', 'Two turtle doves'] }) + describe('convertUpdateToPipeline', () => { + it('should convert the update', ctx => { + const update = { + $set: { lines: ['foo', 'bar'], ranges: { comments: [] }, rev: 42 }, + $unset: { inS3: true }, + } + const pipeline = ctx.MongoManager.convertUpdateToPipeline(update) + expect(pipeline).to.deep.equal([ + { $set: { lines: ['foo', 'bar'] } }, + { $set: { ranges: { comments: [] } } }, + { $set: { rev: 42 } }, + { $unset: 'inS3' }, + ]) + }) + }) + describe('findDoc', () => { beforeEach(async ctx => { ctx.doc = { name: 'mock-doc' } @@ -207,6 +224,9 @@ describe('MongoManager', () => { describe('upsertIntoDocCollection', () => { beforeEach(ctx => { ctx.oldRev = 77 + ctx.db.docs.findOneAndUpdate.resolves({ + lines: ctx.lines, + }) }) it('should upsert the document', async ctx => { @@ -217,17 +237,55 @@ describe('MongoManager', () => { { lines: ctx.lines } ) + const args = ctx.db.docs.findOneAndUpdate.args[0] + assert.deepEqual(args[0], { + _id: new ObjectId(ctx.docId), + project_id: new ObjectId(ctx.projectId), + rev: ctx.oldRev, + }) + assert.equal(args[1][0].$set.lines, ctx.lines) + assert.equal(args[1][1].$set.rev, ctx.oldRev + 1) + }) + + it('should fallback on mismatch', async ctx => { + ctx.db.docs.findOneAndUpdate.resolves({ + lines: [], + }) + await ctx.MongoManager.upsertIntoDocCollection( + ctx.projectId, + ctx.docId, + ctx.oldRev, + { lines: ctx.lines } + ) + const args = ctx.db.docs.updateOne.args[0] assert.deepEqual(args[0], { _id: new ObjectId(ctx.docId), project_id: new ObjectId(ctx.projectId), - rev: ctx.oldRev, + rev: ctx.oldRev + 1, }) assert.equal(args[1].$set.lines, ctx.lines) - assert.equal(args[1].$inc.rev, 1) + assert.equal(args[1].$set.rev, ctx.oldRev + 2) }) it('should handle update error', async ctx => { + ctx.db.docs.findOneAndUpdate.rejects(ctx.stubbedErr) + await expect( + ctx.MongoManager.upsertIntoDocCollection( + ctx.projectId, + ctx.docId, + ctx.rev, + { + lines: ctx.lines, + } + ) + ).to.be.rejectedWith(ctx.stubbedErr) + }) + + it('should handle update error of the fallback', async ctx => { + ctx.db.docs.findOneAndUpdate.resolves({ + lines: [], + }) ctx.db.docs.updateOne.rejects(ctx.stubbedErr) await expect( ctx.MongoManager.upsertIntoDocCollection( @@ -341,14 +399,50 @@ describe('MongoManager', () => { describe('complete doc', () => { beforeEach(async ctx => { + ctx.db.docs.findOneAndUpdate.resolves({ + lines: ctx.archivedDoc.lines, + ranges: ctx.archivedDoc.ranges, + }) + }) + + it('updates Mongo', async ctx => { await ctx.MongoManager.restoreArchivedDoc( ctx.projectId, ctx.docId, ctx.archivedDoc ) + expect(ctx.db.docs.findOneAndUpdate).to.have.been.calledWith( + { + _id: new ObjectId(ctx.docId), + project_id: new ObjectId(ctx.projectId), + rev: ctx.archivedDoc.rev, + }, + [ + { + $set: { + lines: ctx.archivedDoc.lines, + }, + }, + { + $set: { + ranges: ctx.archivedDoc.ranges, + }, + }, + { $unset: 'inS3' }, + ] + ) }) - it('updates Mongo', ctx => { + it('updates Mongo on fallback', async ctx => { + ctx.db.docs.findOneAndUpdate.resolves({ + lines: ['foo'], + ranges: ctx.archivedDoc.ranges, + }) + await ctx.MongoManager.restoreArchivedDoc( + ctx.projectId, + ctx.docId, + ctx.archivedDoc + ) expect(ctx.db.docs.updateOne).to.have.been.calledWith( { _id: new ObjectId(ctx.docId), @@ -371,35 +465,54 @@ describe('MongoManager', () => { describe('without ranges', () => { beforeEach(async ctx => { delete ctx.archivedDoc.ranges + }) + + it('sets ranges to an empty object', async ctx => { await ctx.MongoManager.restoreArchivedDoc( ctx.projectId, ctx.docId, ctx.archivedDoc ) - }) - - it('sets ranges to an empty object', ctx => { - expect(ctx.db.docs.updateOne).to.have.been.calledWith( + expect(ctx.db.docs.findOneAndUpdate).to.have.been.calledWith( { _id: new ObjectId(ctx.docId), project_id: new ObjectId(ctx.projectId), rev: ctx.archivedDoc.rev, }, - { - $set: { - lines: ctx.archivedDoc.lines, - ranges: {}, + [ + { + $set: { + lines: ctx.archivedDoc.lines, + }, }, - $unset: { - inS3: true, + { + $set: { + ranges: {}, + }, }, - } + { $unset: 'inS3' }, + ] ) }) }) describe("when the update doesn't succeed", () => { it('throws a DocRevValueError', async ctx => { + ctx.db.docs.findOneAndUpdate.resolves(null) + await expect( + ctx.MongoManager.restoreArchivedDoc( + ctx.projectId, + ctx.docId, + ctx.archivedDoc + ) + ).to.be.rejectedWith(Errors.DocRevValueError) + }) + + it('throws a DocRevValueError on fallback', async ctx => { + ctx.db.docs.findOneAndUpdate.resolves({ + lines: ['foo'], + ranges: ctx.archivedDoc.ranges, + }) ctx.db.docs.updateOne.resolves({ matchedCount: 0 }) await expect( ctx.MongoManager.restoreArchivedDoc(