Merge pull request #28401 from overleaf/em-promisify-document-updater-handler

Promisify DocumentUpdaterHandler

GitOrigin-RevId: 8793f30ef51f5cd5886d0f74773f4b952761c2d4
This commit is contained in:
Eric Mc Sween
2025-09-12 09:43:54 -04:00
committed by Copybot
parent b87812d102
commit ee094a21eb
2 changed files with 793 additions and 1270 deletions

View File

@@ -1,271 +1,210 @@
const request = require('request').defaults({ timeout: 30 * 1000 })
const OError = require('@overleaf/o-error')
const settings = require('@overleaf/settings')
const {
fetchJson,
fetchNothing,
fetchString,
RequestFailedError,
} = require('@overleaf/fetch-utils')
const _ = require('lodash')
const async = require('async')
const logger = require('@overleaf/logger')
const metrics = require('@overleaf/metrics')
const { promisify, callbackify } = require('util')
const { promisifyMultiResult } = require('@overleaf/promise-utils')
const { callbackifyAll } = require('@overleaf/promise-utils')
const ProjectGetter = require('../Project/ProjectGetter')
const Modules = require('../../infrastructure/Modules')
function getProjectLastUpdatedAt(projectId, callback) {
_makeRequest(
{
path: `/project/${projectId}/last_updated_at`,
method: 'GET',
json: true,
},
projectId,
'project.redis.last_updated_at',
(err, body) => {
if (err || !body?.lastUpdatedAt) return callback(err, null)
callback(null, new Date(body.lastUpdatedAt))
}
const REQUEST_TIMEOUT_MS = 30 * 1000
const RESYNC_TIMEOUT_MS = 6 * 60 * 1000
const BASE_URL = settings.apis.documentupdater.url
async function getProjectLastUpdatedAt(projectId) {
const body = await fetchJson(
`${BASE_URL}/project/${projectId}/last_updated_at`,
{ signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS) }
)
return body.lastUpdatedAt != null ? new Date(body.lastUpdatedAt) : null
}
/**
* @param {string} projectId
*/
function flushProjectToMongo(projectId, callback) {
_makeRequest(
{
path: `/project/${projectId}/flush`,
method: 'POST',
},
projectId,
'flushing.mongo.project',
callback
)
}
function flushMultipleProjectsToMongo(projectIds, callback) {
const jobs = projectIds.map(projectId => callback => {
flushProjectToMongo(projectId, callback)
async function flushProjectToMongo(projectId) {
await fetchNothing(`${BASE_URL}/project/${projectId}/flush`, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
async.series(jobs, callback)
}
async function flushMultipleProjectsToMongo(projectIds) {
for (const projectId of projectIds) {
await flushProjectToMongo(projectId)
}
}
/**
* @param {string} projectId
*/
function flushProjectToMongoAndDelete(projectId, callback) {
_makeRequest(
{
path: `/project/${projectId}`,
method: 'DELETE',
},
projectId,
'flushing.mongo.project',
callback
)
async function flushProjectToMongoAndDelete(projectId) {
await fetchNothing(`${BASE_URL}/project/${projectId}`, {
method: 'DELETE',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
}
/**
* @param {string} projectId
* @param {string} docId
* @param {Callback} callback
*/
function flushDocToMongo(projectId, docId, callback) {
_makeRequest(
{
path: `/project/${projectId}/doc/${docId}/flush`,
method: 'POST',
},
projectId,
'flushing.mongo.doc',
callback
)
async function flushDocToMongo(projectId, docId) {
await fetchNothing(`${BASE_URL}/project/${projectId}/doc/${docId}/flush`, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
}
function deleteDoc(projectId, docId, ignoreFlushErrors, callback) {
if (typeof ignoreFlushErrors === 'function') {
callback = ignoreFlushErrors
ignoreFlushErrors = false
}
let path = `/project/${projectId}/doc/${docId}`
async function deleteDoc(projectId, docId, ignoreFlushErrors = false) {
const url = new URL(`${BASE_URL}/project/${projectId}/doc/${docId}`)
if (ignoreFlushErrors) {
path += '?ignore_flush_errors=true'
url.searchParams.set('ignore_flush_errors', 'true')
}
const method = 'DELETE'
_makeRequest(
{
path,
method,
},
projectId,
'delete.mongo.doc',
callback
)
await fetchNothing(url, {
method: 'DELETE',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
}
function getComment(projectId, docId, commentId, callback) {
_makeRequest(
{
path: `/project/${projectId}/doc/${docId}/comment/${commentId}`,
json: true,
},
projectId,
'get-comment',
function (error, comment) {
if (error) {
return callback(error)
}
callback(null, comment)
}
async function getComment(projectId, docId, commentId) {
const comment = await fetchJson(
`${BASE_URL}/project/${projectId}/doc/${docId}/comment/${commentId}`,
{ signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS) }
)
return comment
}
function getDocument(projectId, docId, fromVersion, callback) {
_makeRequest(
{
path: `/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}`,
json: true,
},
projectId,
'get-document',
function (error, doc) {
if (error) {
return callback(error)
}
callback(null, doc.lines, doc.version, doc.ranges, doc.ops)
}
)
async function getDocument(projectId, docId, fromVersion) {
const url = new URL(`${BASE_URL}/project/${projectId}/doc/${docId}`)
url.searchParams.set('fromVersion', fromVersion)
const doc = await fetchJson(url, {
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
return {
lines: doc.lines,
version: doc.version,
ranges: doc.ranges,
ops: doc.ops,
}
}
/**
* Get a document with its history ranges
* @param {string} projectId
* @param {string} docId
* @param {Callback} callback
*/
function getDocumentWithHistoryRanges(projectId, docId, callback) {
_makeRequest(
{
path: `/project/${projectId}/doc/${docId}?historyRanges=true`,
json: true,
},
projectId,
'get-document-with-history-ranges',
function (error, doc) {
if (error) {
return callback(error)
}
callback(null, doc)
}
async function getDocumentWithHistoryRanges(projectId, docId) {
const doc = await fetchJson(
`${BASE_URL}/project/${projectId}/doc/${docId}?historyRanges=true`,
{ signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS) }
)
return doc
}
function setDocument(projectId, docId, userId, docLines, source, callback) {
_makeRequest(
async function setDocument(projectId, docId, userId, docLines, source) {
const maybeJson = await fetchString(
`${BASE_URL}/project/${projectId}/doc/${docId}`,
{
path: `/project/${projectId}/doc/${docId}`,
method: 'POST',
json: {
lines: docLines,
source,
user_id: userId,
},
},
projectId,
'set-document',
callback
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
// The set document endpoint sometimes returns json and sometimes just an
// empty response
try {
const { rev, modified } = JSON.parse(maybeJson)
return { rev, modified }
} catch (err) {
return undefined
}
}
function appendToDocument(projectId, docId, userId, lines, source, callback) {
_makeRequest(
async function appendToDocument(projectId, docId, userId, lines, source) {
const maybeJson = await fetchString(
`${BASE_URL}/project/${projectId}/doc/${docId}/append`,
{
path: `/project/${projectId}/doc/${docId}/append`,
method: 'POST',
json: {
lines,
source,
user_id: userId,
},
},
projectId,
'append-to-document',
callback
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
// The append to document endpoint sometimes returns json and sometimes just an
// empty response
try {
const { rev, modified } = JSON.parse(maybeJson)
return { rev, modified }
} catch (err) {
return undefined
}
}
function getProjectDocsIfMatch(projectId, projectStateHash, callback) {
async function getProjectDocsIfMatch(projectId, projectStateHash) {
// If the project state hasn't changed, we can get all the latest
// docs from redis via the docupdater. Otherwise we will need to
// fall back to getting them from mongo.
const timer = new metrics.Timer('get-project-docs')
const url = `${settings.apis.documentupdater.url}/project/${projectId}/get_and_flush_if_old?state=${projectStateHash}`
request.post(url, function (error, res, body) {
timer.done()
if (error) {
OError.tag(error, 'error getting project docs from doc updater', {
url,
projectId,
})
return callback(error)
}
if (res.statusCode === 409) {
const url = new URL(`${BASE_URL}/project/${projectId}/get_and_flush_if_old`)
url.searchParams.set('state', projectStateHash)
let docs
try {
docs = await fetchJson(url, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
} catch (err) {
if (err instanceof RequestFailedError && err.response.status === 409) {
// HTTP response code "409 Conflict"
// Docupdater has checked the projectStateHash and found that
// it has changed. This means that the docs currently in redis
// aren't the only change to the project and the full set of
// docs/files should be retreived from docstore/filestore
// instead.
callback()
} else if (res.statusCode >= 200 && res.statusCode < 300) {
let docs
try {
docs = JSON.parse(body)
} catch (error1) {
return callback(OError.tag(error1))
}
callback(null, docs)
return undefined
} else {
callback(
new OError(
`doc updater returned a non-success status code: ${res.statusCode}`,
{
projectId,
url,
}
)
)
throw err
}
})
}
return docs
}
function clearProjectState(projectId, callback) {
_makeRequest(
{
path: `/project/${projectId}/clearState`,
method: 'POST',
},
projectId,
'clear-project-state',
callback
)
async function clearProjectState(projectId) {
await fetchNothing(`${BASE_URL}/project/${projectId}/clearState`, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
}
/**
* @param {string} projectId
* @param {string} docId
* @param {string[]} changeIds
* @param {Callback} callback
*/
async function acceptChanges(projectId, docId, changeIds) {
await _makeRequestAsync(
await fetchNothing(
`${BASE_URL}/project/${projectId}/doc/${docId}/change/accept`,
{
path: `/project/${projectId}/doc/${docId}/change/accept`,
json: { change_ids: changeIds },
method: 'POST',
},
projectId,
'accept-changes'
json: { change_ids: changeIds },
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
await Modules.promises.hooks.fire('changesAccepted', projectId, docId)
}
@@ -273,18 +212,33 @@ async function acceptChanges(projectId, docId, changeIds) {
* @param {string} projectId
* @param {string} docId
* @param {string[]} changeIds
* @param {Callback} callback
*/
function rejectChanges(projectId, docId, changeIds, userId, callback) {
_makeRequest(
async function rejectChanges(projectId, docId, changeIds, userId) {
const { rejectedChangeIds } = await fetchJson(
`${BASE_URL}/project/${projectId}/doc/${docId}/change/reject`,
{
path: `/project/${projectId}/doc/${docId}/change/reject`,
method: 'POST',
json: { change_ids: changeIds, user_id: userId },
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
return { rejectedChangeIds }
}
/**
* @param {string} projectId
* @param {string} docId
* @param {string} threadId
* @param {string} userId
*/
async function resolveThread(projectId, docId, threadId, userId) {
await fetchNothing(
`${BASE_URL}/project/${projectId}/doc/${docId}/comment/${threadId}/resolve`,
{
method: 'POST',
},
projectId,
'reject-changes',
callback
json: { user_id: userId },
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
}
@@ -293,67 +247,35 @@ function rejectChanges(projectId, docId, changeIds, userId, callback) {
* @param {string} docId
* @param {string} threadId
* @param {string} userId
* @param {Callback} callback
*/
function resolveThread(projectId, docId, threadId, userId, callback) {
_makeRequest(
async function reopenThread(projectId, docId, threadId, userId) {
await fetchNothing(
`${BASE_URL}/project/${projectId}/doc/${docId}/comment/${threadId}/reopen`,
{
path: `/project/${projectId}/doc/${docId}/comment/${threadId}/resolve`,
method: 'POST',
json: {
user_id: userId,
},
},
projectId,
'resolve-thread',
callback
json: { user_id: userId },
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
}
/**
* @param {string} projectId
* @param {string} docId
* @param {string} threadId
* @param {string} userId
* @param {Callback} callback
*/
function reopenThread(projectId, docId, threadId, userId, callback) {
_makeRequest(
async function deleteThread(projectId, docId, threadId, userId) {
await fetchNothing(
`${BASE_URL}/project/${projectId}/doc/${docId}/comment/${threadId}`,
{
path: `/project/${projectId}/doc/${docId}/comment/${threadId}/reopen`,
method: 'POST',
json: {
user_id: userId,
},
},
projectId,
'reopen-thread',
callback
)
}
function deleteThread(projectId, docId, threadId, userId, callback) {
_makeRequest(
{
path: `/project/${projectId}/doc/${docId}/comment/${threadId}`,
method: 'DELETE',
json: {
user_id: userId,
},
},
projectId,
'delete-thread',
callback
json: { user_id: userId },
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
}
)
}
function resyncProjectHistory(
async function resyncProjectHistory(
projectId,
projectHistoryId,
docs,
files,
opts,
callback
opts
) {
docs = docs.map(doc => ({
doc: doc.doc._id,
@@ -362,9 +284,7 @@ function resyncProjectHistory(
// Files without a hash likely do not have a blob. Abort.
for (const { file } of files) {
if (!file.hash) {
return callback(
new OError('found file with missing hash', { projectId, file })
)
throw new OError('found file with missing hash', { projectId, file })
}
}
files = files.map(file => ({
@@ -382,192 +302,119 @@ function resyncProjectHistory(
if (opts.resyncProjectStructureOnly) {
body.resyncProjectStructureOnly = opts.resyncProjectStructureOnly
}
_makeRequest(
{
path: `/project/${projectId}/history/resync`,
json: body,
method: 'POST',
timeout: 6 * 60 * 1000, // allow 6 minutes for resync
},
projectId,
'resync-project-history',
callback
)
await fetchNothing(`${BASE_URL}/project/${projectId}/history/resync`, {
json: body,
method: 'POST',
signal: AbortSignal.timeout(RESYNC_TIMEOUT_MS), // allow 6 minutes for resync
})
}
/**
* Block a project from being loaded in docupdater
*
* @param {string} projectId
* @param {Callback} callback
*/
function blockProject(projectId, callback) {
_makeRequest(
{ path: `/project/${projectId}/block`, method: 'POST', json: true },
projectId,
'block-project',
(err, body) => {
if (err) {
return callback(err)
}
callback(null, body.blocked)
}
)
async function blockProject(projectId) {
const body = await fetchJson(`${BASE_URL}/project/${projectId}/block`, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
return body.blocked
}
/**
* Unblock a previously blocked project
*
* @param {string} projectId
* @param {Callback} callback
*/
function unblockProject(projectId, callback) {
_makeRequest(
{ path: `/project/${projectId}/unblock`, method: 'POST', json: true },
projectId,
'unblock-project',
(err, body) => {
if (err) {
return callback(err)
}
callback(null, body.wasBlocked)
}
)
async function unblockProject(projectId) {
const body = await fetchJson(`${BASE_URL}/project/${projectId}/unblock`, {
method: 'POST',
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
return body.wasBlocked
}
function updateProjectStructure(
async function updateProjectStructure(
projectId,
projectHistoryId,
userId,
changes,
source,
callback
source
) {
if (
settings.apis.project_history == null ||
!settings.apis.project_history.sendProjectStructureOps
) {
return callback()
return
}
ProjectGetter.getProjectWithoutLock(
const project = await ProjectGetter.promises.getProjectWithoutLock(
projectId,
{ overleaf: true },
(err, project) => {
if (err) {
return callback(err)
}
const historyRangesSupport = _.get(
project,
'overleaf.history.rangesSupportEnabled',
false
)
const {
deletes: docDeletes,
adds: docAdds,
renames: docRenames,
} = _getUpdates(
'doc',
changes.oldDocs,
changes.newDocs,
historyRangesSupport
)
for (const newEntity of changes.newFiles || []) {
if (!newEntity.file.hash) {
// Files without a hash likely do not have a blob. Abort.
return callback(
new OError('found file with missing hash', { newEntity })
)
}
}
const {
deletes: fileDeletes,
adds: fileAdds,
renames: fileRenames,
} = _getUpdates(
'file',
changes.oldFiles,
changes.newFiles,
historyRangesSupport
)
const updates = [].concat(
docDeletes,
fileDeletes,
docAdds,
fileAdds,
docRenames,
fileRenames
)
const projectVersion =
changes && changes.newProject && changes.newProject.version
if (updates.length < 1) {
return callback()
}
if (projectVersion == null) {
logger.warn(
{ projectId, changes, projectVersion },
'did not receive project version in changes'
)
return callback(new Error('did not receive project version in changes'))
}
_makeRequest(
{
path: `/project/${projectId}`,
json: {
updates,
userId,
version: projectVersion,
projectHistoryId,
source,
},
method: 'POST',
},
projectId,
'update-project-structure',
callback
)
}
{ overleaf: true }
)
}
const historyRangesSupport = _.get(
project,
'overleaf.history.rangesSupportEnabled',
false
)
const {
deletes: docDeletes,
adds: docAdds,
renames: docRenames,
} = _getUpdates('doc', changes.oldDocs, changes.newDocs, historyRangesSupport)
for (const newEntity of changes.newFiles || []) {
if (!newEntity.file.hash) {
// Files without a hash likely do not have a blob. Abort.
throw new OError('found file with missing hash', { newEntity })
}
}
const {
deletes: fileDeletes,
adds: fileAdds,
renames: fileRenames,
} = _getUpdates(
'file',
changes.oldFiles,
changes.newFiles,
historyRangesSupport
)
const updates = [].concat(
docDeletes,
fileDeletes,
docAdds,
fileAdds,
docRenames,
fileRenames
)
const projectVersion =
changes && changes.newProject && changes.newProject.version
function _makeRequest(options, projectId, metricsKey, callback) {
const timer = new metrics.Timer(metricsKey)
request(
{
url: `${settings.apis.documentupdater.url}${options.path}`,
json: options.json,
method: options.method || 'GET',
timeout: options.timeout || 30 * 1000,
if (updates.length < 1) {
return
}
if (projectVersion == null) {
logger.warn(
{ projectId, changes, projectVersion },
'did not receive project version in changes'
)
throw new Error('did not receive project version in changes')
}
await fetchNothing(`${BASE_URL}/project/${projectId}`, {
method: 'POST',
json: {
updates,
userId,
version: projectVersion,
projectHistoryId,
source,
},
function (error, res, body) {
timer.done()
if (error) {
logger.warn(
{ error, projectId },
'error making request to document updater'
)
callback(error)
} else if (res.statusCode >= 200 && res.statusCode < 300) {
callback(null, body)
} else {
error = new Error(
`document updater returned a failure status code: ${res.statusCode}`
)
logger.warn(
{ error, projectId },
`document updater returned failure status code: ${res.statusCode}`
)
callback(error)
}
}
)
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
})
}
const _makeRequestAsync = promisify(_makeRequest)
function _getUpdates(
entityType,
oldEntities,
@@ -662,7 +509,7 @@ function buildFileMetadataForHistory(file) {
return metadata
}
module.exports = {
const DocumentUpdaterHandler = {
flushProjectToMongo,
flushMultipleProjectsToMongo,
flushProjectToMongoAndDelete,
@@ -675,7 +522,7 @@ module.exports = {
appendToDocument,
getProjectDocsIfMatch,
clearProjectState,
acceptChanges: callbackify(acceptChanges),
acceptChanges,
rejectChanges,
resolveThread,
reopenThread,
@@ -685,33 +532,13 @@ module.exports = {
unblockProject,
updateProjectStructure,
getDocumentWithHistoryRanges,
promises: {
flushProjectToMongo: promisify(flushProjectToMongo),
flushMultipleProjectsToMongo: promisify(flushMultipleProjectsToMongo),
flushProjectToMongoAndDelete: promisify(flushProjectToMongoAndDelete),
flushDocToMongo: promisify(flushDocToMongo),
deleteDoc: promisify(deleteDoc),
getComment: promisify(getComment),
getDocument: promisifyMultiResult(getDocument, [
'lines',
'version',
'ranges',
'ops',
]),
setDocument: promisify(setDocument),
getProjectDocsIfMatch: promisify(getProjectDocsIfMatch),
getProjectLastUpdatedAt: promisify(getProjectLastUpdatedAt),
clearProjectState: promisify(clearProjectState),
acceptChanges,
rejectChanges: promisify(rejectChanges),
resolveThread: promisify(resolveThread),
reopenThread: promisify(reopenThread),
deleteThread: promisify(deleteThread),
resyncProjectHistory: promisify(resyncProjectHistory),
blockProject: promisify(blockProject),
unblockProject: promisify(unblockProject),
updateProjectStructure: promisify(updateProjectStructure),
appendToDocument: promisify(appendToDocument),
getDocumentWithHistoryRanges: promisify(getDocumentWithHistoryRanges),
},
}
module.exports = {
...callbackifyAll(DocumentUpdaterHandler, {
multiResult: {
getDocument: ['lines', 'version', 'ranges', 'ops'],
},
}),
promises: DocumentUpdaterHandler,
}