Files
overleaf-cep/services/docstore/app/js/MongoManager.js
Jakob Ackermann 75a12dda17 [web] resync_projects: use the secondaries for all reads (#33684)
* [docstore] add useSecondary flag to projectHasRanges

The rev-check for unarchiving always consults with the primary.

Two extra changes:
- Add a projection argument to peekDoc in order to skip lines download
   from projectHasRanges.
- Add one retry to peekDoc to reduce chances of surfacing a rev-check
   violation.

* [web] resync_projects: use the secondaries for all reads

* [web] add default value for useSecondary

* [docstore] add default value for useSecondary

* [k8s] docstore: set MONGO_HAS_SECONDARIES=true

GitOrigin-RevId: f15ec4fdc1cabe74c1eab87bec85f28d6f7a587d
2026-05-14 08:06:26 +00:00

292 lines
7.3 KiB
JavaScript

import mongodb from './mongodb.js'
import Settings from '@overleaf/settings'
import Errors from './Errors.js'
import Metrics from '@overleaf/metrics'
const { db, ObjectId, BSON } = mongodb
const ARCHIVING_LOCK_DURATION_MS = Settings.archivingLockDurationMs
function readPreference(useSecondary) {
if (useSecondary) return { readPreference: mongodb.READ_PREFERENCE_SECONDARY }
return {}
}
async function findDoc(projectId, docId, projection, useSecondary = false) {
const doc = await db.docs.findOne(
{
_id: new ObjectId(docId.toString()),
project_id: new ObjectId(projectId.toString()),
},
{ projection, ...readPreference(useSecondary) }
)
if (doc && projection.version && !doc.version) {
doc.version = 0
}
return doc
}
async function getProjectsDeletedDocs(projectId, projection) {
const docs = await db.docs
.find(
{
project_id: new ObjectId(projectId.toString()),
deleted: true,
},
{
projection,
sort: { deletedAt: -1 },
limit: Settings.max_deleted_docs,
}
)
.toArray()
return docs
}
async function getProjectsDocs(projectId, options, projection) {
const query = { project_id: new ObjectId(projectId.toString()) }
if (!options.include_deleted) {
query.deleted = { $ne: true }
}
const queryOptions = {
projection,
...readPreference(options.useSecondary),
}
if (options.limit) {
queryOptions.limit = options.limit
}
const docs = await db.docs.find(query, queryOptions).toArray()
return docs
}
async function getArchivedProjectDocs(projectId, maxResults) {
const query = {
project_id: new ObjectId(projectId.toString()),
inS3: true,
}
const docs = await db.docs
.find(query, { projection: { _id: 1 }, limit: maxResults })
.toArray()
return docs
}
async function getNonArchivedProjectDocIds(projectId) {
const docs = await db.docs
.find(
{
project_id: new ObjectId(projectId),
inS3: { $ne: true },
},
{ projection: { _id: 1 } }
)
.map(doc => doc._id)
.toArray()
return docs
}
async function getNonDeletedArchivedProjectDocs(projectId, maxResults) {
const query = {
project_id: new ObjectId(projectId.toString()),
deleted: { $ne: true },
inS3: true,
}
const docs = await db.docs
.find(query, { projection: { _id: 1 }, limit: maxResults })
.toArray()
return docs
}
function convertUpdateToPipeline(update) {
const pipeline = []
for (const [operation, ops] of Object.entries(update)) {
for (const [field, value] of Object.entries(ops)) {
if (operation === '$unset') {
// $unset uses a different schema in a pipeline
pipeline.push({ [operation]: field })
} else {
// Avoid evaluating '$foo' strings
pipeline.push({ [operation]: { [field]: { $literal: value } } })
}
}
}
return pipeline
}
async function upsertIntoDocCollection(projectId, docId, previousRev, updates) {
if (previousRev) {
const update = {
$set: updates,
$unset: { inS3: true },
}
if (updates.lines || updates.ranges) {
update.$set.rev = previousRev + 1
}
const pipeline = convertUpdateToPipeline(update)
const payloadSize = BSON.calculateObjectSize(pipeline)
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'update' })
const result = await db.docs.updateOne(
{
_id: new ObjectId(docId),
project_id: new ObjectId(projectId),
rev: previousRev,
},
pipeline
)
if (result.matchedCount !== 1) {
throw new Errors.DocRevValueError()
}
} else {
const payloadSize = BSON.calculateObjectSize(updates)
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'insert' })
try {
await db.docs.insertOne({
_id: new ObjectId(docId),
project_id: new ObjectId(projectId),
rev: 1,
...updates,
})
} catch (err) {
if (err.code === 11000) {
// duplicate doc _id
throw new Errors.DocRevValueError()
} else {
throw err
}
}
}
}
async function patchDoc(projectId, docId, meta) {
const payloadSize = BSON.calculateObjectSize(meta)
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'patch' })
await db.docs.updateOne(
{
_id: new ObjectId(docId),
project_id: new ObjectId(projectId),
},
{ $set: meta }
)
}
/**
* Fetch a doc and lock it for archiving
*
* This will return null if the doc is not found, if it's already archived or
* if the lock can't be acquired.
*/
async function getDocForArchiving(projectId, docId) {
const archivingUntil = new Date(Date.now() + ARCHIVING_LOCK_DURATION_MS)
const result = await db.docs.findOneAndUpdate(
{
_id: new ObjectId(docId),
project_id: new ObjectId(projectId),
inS3: { $ne: true },
$or: [{ archivingUntil: null }, { archivingUntil: { $lt: new Date() } }],
},
{ $set: { archivingUntil } },
{
projection: { lines: 1, ranges: 1, rev: 1 },
includeResultMetadata: true,
}
)
return result.value
}
/**
* Clear the doc contents from Mongo and release the archiving lock
*/
async function markDocAsArchived(projectId, docId, rev) {
await db.docs.updateOne(
{ _id: new ObjectId(docId), rev },
{
$set: { inS3: true },
$unset: { lines: 1, ranges: 1, archivingUntil: 1 },
}
)
}
/**
* Restore an archived doc
*
* This checks that the archived doc's rev matches.
*/
async function restoreArchivedDoc(projectId, docId, archivedDoc) {
const query = {
_id: new ObjectId(docId),
project_id: new ObjectId(projectId),
rev: archivedDoc.rev,
}
const update = {
$set: {
lines: archivedDoc.lines,
ranges: archivedDoc.ranges || {},
},
$unset: {
inS3: true,
},
}
const pipeline = convertUpdateToPipeline(update)
const payloadSize = BSON.calculateObjectSize(pipeline)
Metrics.count('mongo_docs_write', payloadSize, 1, { method: 'restore' })
const result = await db.docs.updateOne(query, pipeline)
if (result.matchedCount !== 1) {
throw new Errors.DocRevValueError('failed to unarchive doc', {
docId,
rev: archivedDoc.rev,
})
}
}
async function getDocRev(docId) {
const doc = await db.docs.findOne(
{ _id: new ObjectId(docId.toString()) },
{ projection: { rev: 1 } }
)
return doc && doc.rev
}
/**
* Helper method to support optimistic locking.
*
* Check that the rev of an existing doc is unchanged. If the rev has
* changed, return a DocModifiedError.
*/
async function checkRevUnchanged(doc) {
const currentRev = await getDocRev(doc._id)
if (isNaN(currentRev) || isNaN(doc.rev)) {
throw new Errors.DocRevValueError('doc rev is NaN', {
doc_id: doc._id,
rev: doc.rev,
currentRev,
})
}
if (doc.rev !== currentRev) {
throw new Errors.DocModifiedError('doc rev has changed', {
doc_id: doc._id,
rev: doc.rev,
currentRev,
})
}
}
async function destroyProject(projectId) {
await db.docs.deleteMany({ project_id: new ObjectId(projectId) })
}
export default {
convertUpdateToPipeline,
findDoc,
getProjectsDeletedDocs,
getProjectsDocs,
getArchivedProjectDocs,
getNonArchivedProjectDocIds,
getNonDeletedArchivedProjectDocs,
upsertIntoDocCollection,
restoreArchivedDoc,
patchDoc,
getDocForArchiving,
markDocAsArchived,
checkRevUnchanged,
destroyProject,
}