mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-31 21:01:33 +02:00
[docstore] optimize op-log size using pipeline update (#31752)
Co-authored-by: Brian Gough <brian.gough@overleaf.com> GitOrigin-RevId: 2850d6b419df360768d8fe172f19e70baf7b4442
This commit is contained in:
@@ -2,6 +2,8 @@ import mongodb from './mongodb.js'
|
||||
import Settings from '@overleaf/settings'
|
||||
import Errors from './Errors.js'
|
||||
import Metrics from '@overleaf/metrics'
|
||||
import logger from '@overleaf/logger'
|
||||
import _ from 'lodash'
|
||||
|
||||
const { db, ObjectId, BSON } = mongodb
|
||||
|
||||
@@ -90,6 +92,21 @@ async function getNonDeletedArchivedProjectDocs(projectId, maxResults) {
|
||||
return docs
|
||||
}
|
||||
|
||||
function convertUpdateToPipeline(update) {
|
||||
const pipeline = []
|
||||
for (const [operation, ops] of Object.entries(update)) {
|
||||
for (const [field, value] of Object.entries(ops)) {
|
||||
if (operation === '$unset') {
|
||||
// $unset uses a different schema in a pipeline
|
||||
pipeline.push({ [operation]: field })
|
||||
} else {
|
||||
pipeline.push({ [operation]: { [field]: value } })
|
||||
}
|
||||
}
|
||||
}
|
||||
return pipeline
|
||||
}
|
||||
|
||||
async function upsertIntoDocCollection(projectId, docId, previousRev, updates) {
|
||||
if (previousRev) {
|
||||
const update = {
|
||||
@@ -97,21 +114,46 @@ async function upsertIntoDocCollection(projectId, docId, previousRev, updates) {
|
||||
$unset: { inS3: true },
|
||||
}
|
||||
if (updates.lines || updates.ranges) {
|
||||
update.$inc = { rev: 1 }
|
||||
update.$set.rev = previousRev + 1
|
||||
}
|
||||
const payloadSize = BSON.calculateObjectSize(update)
|
||||
const pipeline = convertUpdateToPipeline(update)
|
||||
const payloadSize = BSON.calculateObjectSize(pipeline)
|
||||
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'update' })
|
||||
const result = await db.docs.updateOne(
|
||||
const result = await db.docs.findOneAndUpdate(
|
||||
{
|
||||
_id: new ObjectId(docId),
|
||||
project_id: new ObjectId(projectId),
|
||||
rev: previousRev,
|
||||
},
|
||||
update
|
||||
pipeline,
|
||||
{ returnDocument: 'after' }
|
||||
)
|
||||
if (result.matchedCount !== 1) {
|
||||
if (!result) {
|
||||
throw new Errors.DocRevValueError()
|
||||
}
|
||||
let fallbackUpdate = false
|
||||
if (updates.lines && !_.isEqual(updates.lines, result.lines)) {
|
||||
logger.warn({ projectId, docId }, 'lines are different after pipeline')
|
||||
fallbackUpdate = true
|
||||
}
|
||||
if (updates.ranges && !_.isEqual(updates.ranges, result.ranges)) {
|
||||
logger.warn({ projectId, docId }, 'ranges are different after pipeline')
|
||||
fallbackUpdate = true
|
||||
}
|
||||
if (fallbackUpdate) {
|
||||
update.$set.rev = previousRev + 2
|
||||
const result = await db.docs.updateOne(
|
||||
{
|
||||
_id: new ObjectId(docId),
|
||||
project_id: new ObjectId(projectId),
|
||||
rev: previousRev + 1,
|
||||
},
|
||||
update
|
||||
)
|
||||
if (result.matchedCount !== 1) {
|
||||
throw new Errors.DocRevValueError()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const payloadSize = BSON.calculateObjectSize(updates)
|
||||
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'insert' })
|
||||
@@ -202,16 +244,42 @@ async function restoreArchivedDoc(projectId, docId, archivedDoc) {
|
||||
inS3: true,
|
||||
},
|
||||
}
|
||||
const payloadSize = BSON.calculateObjectSize(update)
|
||||
const pipeline = convertUpdateToPipeline(update)
|
||||
const payloadSize = BSON.calculateObjectSize(pipeline)
|
||||
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'restore' })
|
||||
const result = await db.docs.updateOne(query, update)
|
||||
|
||||
if (result.matchedCount === 0) {
|
||||
const result = await db.docs.findOneAndUpdate(query, pipeline, {
|
||||
returnDocument: 'after',
|
||||
})
|
||||
if (!result) {
|
||||
throw new Errors.DocRevValueError('failed to unarchive doc', {
|
||||
docId,
|
||||
rev: archivedDoc.rev,
|
||||
})
|
||||
}
|
||||
let fallbackUpdate = false
|
||||
if (!_.isEqual(update.$set.lines, result.lines)) {
|
||||
logger.warn(
|
||||
{ projectId, docId },
|
||||
'lines are different after pipeline when unarchiving'
|
||||
)
|
||||
fallbackUpdate = true
|
||||
}
|
||||
if (!_.isEqual(update.$set.ranges, result.ranges)) {
|
||||
logger.warn(
|
||||
{ projectId, docId },
|
||||
'ranges are different after pipeline when unarchiving'
|
||||
)
|
||||
fallbackUpdate = true
|
||||
}
|
||||
if (fallbackUpdate) {
|
||||
const result = await db.docs.updateOne(query, update)
|
||||
if (result.matchedCount === 0) {
|
||||
throw new Errors.DocRevValueError('failed to unarchive doc', {
|
||||
docId,
|
||||
rev: archivedDoc.rev,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function getDocRev(docId) {
|
||||
@@ -251,6 +319,7 @@ async function destroyProject(projectId) {
|
||||
}
|
||||
|
||||
export default {
|
||||
convertUpdateToPipeline,
|
||||
findDoc,
|
||||
getProjectsDeletedDocs,
|
||||
getProjectsDocs,
|
||||
|
||||
@@ -15,6 +15,7 @@ describe('MongoManager', () => {
|
||||
beforeEach(async ctx => {
|
||||
ctx.db = {
|
||||
docs: {
|
||||
findOneAndUpdate: sinon.stub().resolves({}),
|
||||
updateOne: sinon.stub().resolves({ matchedCount: 1 }),
|
||||
insertOne: sinon.stub().resolves(),
|
||||
},
|
||||
@@ -47,6 +48,22 @@ describe('MongoManager', () => {
|
||||
ctx.lines = ['Three French hens', 'Two turtle doves']
|
||||
})
|
||||
|
||||
describe('convertUpdateToPipeline', () => {
|
||||
it('should convert the update', ctx => {
|
||||
const update = {
|
||||
$set: { lines: ['foo', 'bar'], ranges: { comments: [] }, rev: 42 },
|
||||
$unset: { inS3: true },
|
||||
}
|
||||
const pipeline = ctx.MongoManager.convertUpdateToPipeline(update)
|
||||
expect(pipeline).to.deep.equal([
|
||||
{ $set: { lines: ['foo', 'bar'] } },
|
||||
{ $set: { ranges: { comments: [] } } },
|
||||
{ $set: { rev: 42 } },
|
||||
{ $unset: 'inS3' },
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
describe('findDoc', () => {
|
||||
beforeEach(async ctx => {
|
||||
ctx.doc = { name: 'mock-doc' }
|
||||
@@ -207,6 +224,9 @@ describe('MongoManager', () => {
|
||||
describe('upsertIntoDocCollection', () => {
|
||||
beforeEach(ctx => {
|
||||
ctx.oldRev = 77
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: ctx.lines,
|
||||
})
|
||||
})
|
||||
|
||||
it('should upsert the document', async ctx => {
|
||||
@@ -217,17 +237,55 @@ describe('MongoManager', () => {
|
||||
{ lines: ctx.lines }
|
||||
)
|
||||
|
||||
const args = ctx.db.docs.findOneAndUpdate.args[0]
|
||||
assert.deepEqual(args[0], {
|
||||
_id: new ObjectId(ctx.docId),
|
||||
project_id: new ObjectId(ctx.projectId),
|
||||
rev: ctx.oldRev,
|
||||
})
|
||||
assert.equal(args[1][0].$set.lines, ctx.lines)
|
||||
assert.equal(args[1][1].$set.rev, ctx.oldRev + 1)
|
||||
})
|
||||
|
||||
it('should fallback on mismatch', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: [],
|
||||
})
|
||||
await ctx.MongoManager.upsertIntoDocCollection(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.oldRev,
|
||||
{ lines: ctx.lines }
|
||||
)
|
||||
|
||||
const args = ctx.db.docs.updateOne.args[0]
|
||||
assert.deepEqual(args[0], {
|
||||
_id: new ObjectId(ctx.docId),
|
||||
project_id: new ObjectId(ctx.projectId),
|
||||
rev: ctx.oldRev,
|
||||
rev: ctx.oldRev + 1,
|
||||
})
|
||||
assert.equal(args[1].$set.lines, ctx.lines)
|
||||
assert.equal(args[1].$inc.rev, 1)
|
||||
assert.equal(args[1].$set.rev, ctx.oldRev + 2)
|
||||
})
|
||||
|
||||
it('should handle update error', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.rejects(ctx.stubbedErr)
|
||||
await expect(
|
||||
ctx.MongoManager.upsertIntoDocCollection(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.rev,
|
||||
{
|
||||
lines: ctx.lines,
|
||||
}
|
||||
)
|
||||
).to.be.rejectedWith(ctx.stubbedErr)
|
||||
})
|
||||
|
||||
it('should handle update error of the fallback', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: [],
|
||||
})
|
||||
ctx.db.docs.updateOne.rejects(ctx.stubbedErr)
|
||||
await expect(
|
||||
ctx.MongoManager.upsertIntoDocCollection(
|
||||
@@ -341,14 +399,50 @@ describe('MongoManager', () => {
|
||||
|
||||
describe('complete doc', () => {
|
||||
beforeEach(async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: ctx.archivedDoc.lines,
|
||||
ranges: ctx.archivedDoc.ranges,
|
||||
})
|
||||
})
|
||||
|
||||
it('updates Mongo', async ctx => {
|
||||
await ctx.MongoManager.restoreArchivedDoc(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.archivedDoc
|
||||
)
|
||||
expect(ctx.db.docs.findOneAndUpdate).to.have.been.calledWith(
|
||||
{
|
||||
_id: new ObjectId(ctx.docId),
|
||||
project_id: new ObjectId(ctx.projectId),
|
||||
rev: ctx.archivedDoc.rev,
|
||||
},
|
||||
[
|
||||
{
|
||||
$set: {
|
||||
lines: ctx.archivedDoc.lines,
|
||||
},
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
ranges: ctx.archivedDoc.ranges,
|
||||
},
|
||||
},
|
||||
{ $unset: 'inS3' },
|
||||
]
|
||||
)
|
||||
})
|
||||
|
||||
it('updates Mongo', ctx => {
|
||||
it('updates Mongo on fallback', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: ['foo'],
|
||||
ranges: ctx.archivedDoc.ranges,
|
||||
})
|
||||
await ctx.MongoManager.restoreArchivedDoc(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.archivedDoc
|
||||
)
|
||||
expect(ctx.db.docs.updateOne).to.have.been.calledWith(
|
||||
{
|
||||
_id: new ObjectId(ctx.docId),
|
||||
@@ -371,35 +465,54 @@ describe('MongoManager', () => {
|
||||
describe('without ranges', () => {
|
||||
beforeEach(async ctx => {
|
||||
delete ctx.archivedDoc.ranges
|
||||
})
|
||||
|
||||
it('sets ranges to an empty object', async ctx => {
|
||||
await ctx.MongoManager.restoreArchivedDoc(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.archivedDoc
|
||||
)
|
||||
})
|
||||
|
||||
it('sets ranges to an empty object', ctx => {
|
||||
expect(ctx.db.docs.updateOne).to.have.been.calledWith(
|
||||
expect(ctx.db.docs.findOneAndUpdate).to.have.been.calledWith(
|
||||
{
|
||||
_id: new ObjectId(ctx.docId),
|
||||
project_id: new ObjectId(ctx.projectId),
|
||||
rev: ctx.archivedDoc.rev,
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
lines: ctx.archivedDoc.lines,
|
||||
ranges: {},
|
||||
[
|
||||
{
|
||||
$set: {
|
||||
lines: ctx.archivedDoc.lines,
|
||||
},
|
||||
},
|
||||
$unset: {
|
||||
inS3: true,
|
||||
{
|
||||
$set: {
|
||||
ranges: {},
|
||||
},
|
||||
},
|
||||
}
|
||||
{ $unset: 'inS3' },
|
||||
]
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe("when the update doesn't succeed", () => {
|
||||
it('throws a DocRevValueError', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves(null)
|
||||
await expect(
|
||||
ctx.MongoManager.restoreArchivedDoc(
|
||||
ctx.projectId,
|
||||
ctx.docId,
|
||||
ctx.archivedDoc
|
||||
)
|
||||
).to.be.rejectedWith(Errors.DocRevValueError)
|
||||
})
|
||||
|
||||
it('throws a DocRevValueError on fallback', async ctx => {
|
||||
ctx.db.docs.findOneAndUpdate.resolves({
|
||||
lines: ['foo'],
|
||||
ranges: ctx.archivedDoc.ranges,
|
||||
})
|
||||
ctx.db.docs.updateOne.resolves({ matchedCount: 0 })
|
||||
await expect(
|
||||
ctx.MongoManager.restoreArchivedDoc(
|
||||
|
||||
Reference in New Issue
Block a user