[document-updater] migrate HistoryManager to async/await (#28789)

shouldFlushHistoryOps has a default value for 'threshold', which keeps
the exports simpler and still lets the unit tests override it.

GitOrigin-RevId: 1c6d4a2778052b5af40e2e338589a230ac2f4646
This commit is contained in:
Jakob Ackermann
2025-10-08 15:36:34 +02:00
committed by Copybot
parent b0b9733a42
commit d648c96603
5 changed files with 169 additions and 236 deletions

View File

@@ -1,145 +1,118 @@
const async = require('async')
const logger = require('@overleaf/logger')
const { promisifyAll } = require('@overleaf/promise-utils')
const request = require('request')
const { promiseMapWithLimit } = require('@overleaf/promise-utils')
const Settings = require('@overleaf/settings')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const metrics = require('./Metrics')
const { fetchNothing } = require('@overleaf/fetch-utils')
const OError = require('@overleaf/o-error')
const HistoryManager = {
// flush changes in the background
flushProjectChangesAsync(projectId) {
HistoryManager.flushProjectChanges(
projectId,
{ background: true },
function () {}
const FLUSH_PROJECT_EVERY_N_OPS = 500
const MAX_PARALLEL_REQUESTS = 4
// flush changes in the background
function flushProjectChangesAsync(projectId) {
flushProjectChanges(projectId, { background: true }).catch(err => {
logger.error({ projectId, err }, 'failed to flush in background')
})
}
// flush changes (for when we need to know the queue is flushed)
async function flushProjectChanges(projectId, options) {
if (options.skip_history_flush) {
logger.debug({ projectId }, 'skipping flush of project history')
return
}
metrics.inc('history-flush', 1, { status: 'project-history' })
const url = new URL(
`${Settings.apis.project_history.url}/project/${projectId}/flush`
)
if (options.background) {
// pass on the background flush option if present
url.searchParams.set('background', 'true')
}
logger.debug({ projectId, url }, 'flushing doc in project history api')
try {
await fetchNothing(url, { method: 'POST' })
} catch (err) {
throw OError.tag(err, 'project history api request failed', { projectId })
}
}
function recordAndFlushHistoryOps(projectId, ops, projectOpsLength) {
if (ops == null) {
ops = []
}
if (ops.length === 0) {
return
}
// record updates for project history
if (shouldFlushHistoryOps(projectId, projectOpsLength, ops.length)) {
// Do this in the background since it uses HTTP and so may be too
// slow to wait for when processing a doc update.
logger.debug(
{ projectOpsLength, projectId },
'flushing project history api'
)
},
flushProjectChangesAsync(projectId)
}
}
// flush changes and callback (for when we need to know the queue is flushed)
flushProjectChanges(projectId, options, callback) {
if (callback == null) {
callback = function () {}
}
if (options.skip_history_flush) {
logger.debug({ projectId }, 'skipping flush of project history')
return callback()
}
metrics.inc('history-flush', 1, { status: 'project-history' })
const url = `${Settings.apis.project_history.url}/project/${projectId}/flush`
const qs = {}
if (options.background) {
qs.background = true
} // pass on the background flush option if present
logger.debug({ projectId, url, qs }, 'flushing doc in project history api')
request.post({ url, qs }, function (error, res, body) {
if (error) {
logger.error({ error, projectId }, 'project history api request failed')
callback(error)
} else if (res.statusCode < 200 && res.statusCode >= 300) {
logger.error(
{ projectId },
`project history api returned a failure status code: ${res.statusCode}`
)
callback(error)
} else {
callback()
}
})
},
function shouldFlushHistoryOps(
projectId,
length,
opsLength,
threshold = FLUSH_PROJECT_EVERY_N_OPS
) {
if (Settings.shortHistoryQueues.includes(projectId)) return true
if (!length) {
return false
} // don't flush unless we know the length
// We want to flush every 100 ops, i.e. 100, 200, 300, etc
// Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
// ops. If we've changed, then we've gone over a multiple of 100 and should flush.
// (Most of the time, we will only hit 100 and then flushing will put us back to 0)
const previousLength = length - opsLength
const prevBlock = Math.floor(previousLength / threshold)
const newBlock = Math.floor(length / threshold)
return newBlock !== prevBlock
}
FLUSH_DOC_EVERY_N_OPS: 100,
FLUSH_PROJECT_EVERY_N_OPS: 500,
recordAndFlushHistoryOps(projectId, ops, projectOpsLength) {
if (ops == null) {
ops = []
}
if (ops.length === 0) {
return
}
// record updates for project history
if (
HistoryManager.shouldFlushHistoryOps(
projectId,
projectOpsLength,
ops.length,
HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
)
) {
// Do this in the background since it uses HTTP and so may be too
// slow to wait for when processing a doc update.
logger.debug(
{ projectOpsLength, projectId },
'flushing project history api'
)
HistoryManager.flushProjectChangesAsync(projectId)
}
},
shouldFlushHistoryOps(projectId, length, opsLength, threshold) {
if (Settings.shortHistoryQueues.includes(projectId)) return true
if (!length) {
return false
} // don't flush unless we know the length
// We want to flush every 100 ops, i.e. 100, 200, 300, etc
// Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these
// ops. If we've changed, then we've gone over a multiple of 100 and should flush.
// (Most of the time, we will only hit 100 and then flushing will put us back to 0)
const previousLength = length - opsLength
const prevBlock = Math.floor(previousLength / threshold)
const newBlock = Math.floor(length / threshold)
return newBlock !== prevBlock
},
MAX_PARALLEL_REQUESTS: 4,
resyncProjectHistory(
async function resyncProjectHistory(
projectId,
projectHistoryId,
docs,
files,
opts,
callback
) {
await ProjectHistoryRedisManager.promises.queueResyncProjectStructure(
projectId,
projectHistoryId,
docs,
files,
opts,
callback
) {
ProjectHistoryRedisManager.queueResyncProjectStructure(
opts
)
if (opts.resyncProjectStructureOnly) return
const DocumentManager = require('./DocumentManager')
await promiseMapWithLimit(MAX_PARALLEL_REQUESTS, docs, doc => {
DocumentManager.promises.resyncDocContentsWithLock(
projectId,
projectHistoryId,
docs,
files,
opts,
function (error) {
if (error) {
return callback(error)
}
if (opts.resyncProjectStructureOnly) return callback()
const DocumentManager = require('./DocumentManager')
const resyncDoc = (doc, cb) => {
DocumentManager.resyncDocContentsWithLock(
projectId,
doc.doc,
doc.path,
opts,
cb
)
}
async.eachLimit(
docs,
HistoryManager.MAX_PARALLEL_REQUESTS,
resyncDoc,
callback
)
}
doc.doc,
doc.path,
opts
)
},
})
}
module.exports = HistoryManager
module.exports.promises = promisifyAll(HistoryManager, {
without: [
'flushProjectChangesAsync',
'recordAndFlushHistoryOps',
'shouldFlushHistoryOps',
],
})
module.exports = {
FLUSH_PROJECT_EVERY_N_OPS,
flushProjectChangesAsync,
recordAndFlushHistoryOps,
shouldFlushHistoryOps,
promises: {
flushProjectChanges,
resyncProjectHistory,
},
}

View File

@@ -228,8 +228,7 @@ async function updateProjectWithLocks(
HistoryManager.shouldFlushHistoryOps(
projectId,
projectOpsLength,
updates.length,
HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
updates.length
)
) {
HistoryManager.flushProjectChangesAsync(projectId)

View File

@@ -19,9 +19,11 @@ async function sendProjectUpdateAndWait(projectId, docId, update, version) {
}
describe("Applying updates to a project's structure", function () {
before(function () {
before(function (done) {
this.user_id = 'user-id-123'
this.version = 1234
DocUpdaterApp.ensureRunning(done)
})
describe('renaming a file', function () {

View File

@@ -33,7 +33,7 @@ SandboxedModule.configure({
'mongodb-legacy': require('mongodb-legacy'), // for ObjectId comparisons
'overleaf-editor-core': require('overleaf-editor-core'), // does not play nice with sandbox
},
globals: { Buffer, JSON, Math, console, process },
globals: { Buffer, JSON, Math, console, process, URL },
sourceTransformers: {
removeNodePrefix: function (source) {
return source.replace(/require\(['"]node:/g, "require('")

View File

@@ -13,6 +13,9 @@ describe('HistoryManager', function () {
this.HistoryManager = SandboxedModule.require(modulePath, {
requires: {
request: (this.request = {}),
'@overleaf/fetch-utils': (this.fetchUtils = {
fetchNothing: sinon.stub().resolves(),
}),
'@overleaf/settings': (this.Settings = {
shortHistoryQueues: [],
apis: {
@@ -21,74 +24,69 @@ describe('HistoryManager', function () {
},
},
}),
'./DocumentManager': (this.DocumentManager = {}),
'./DocumentManager': (this.DocumentManager = {
promises: {
resyncDocContentsWithLock: sinon.stub().resolves(),
},
}),
'./RedisManager': (this.RedisManager = {}),
'./ProjectHistoryRedisManager': (this.ProjectHistoryRedisManager = {}),
'./ProjectHistoryRedisManager': (this.ProjectHistoryRedisManager = {
promises: {
queueResyncProjectStructure: sinon.stub().resolves(),
},
}),
'./Metrics': (this.metrics = { inc: sinon.stub() }),
},
})
this.project_id = 'mock-project-id'
this.callback = sinon.stub()
})
describe('flushProjectChangesAsync', function () {
beforeEach(function () {
this.request.post = sinon
.stub()
.callsArgWith(1, null, { statusCode: 204 })
this.HistoryManager.flushProjectChangesAsync(this.project_id)
})
it('should send a request to the project history api', function () {
this.request.post
.calledWith({
url: `${this.Settings.apis.project_history.url}/project/${this.project_id}/flush`,
qs: { background: true },
})
.should.equal(true)
this.fetchUtils.fetchNothing.should.have.been.calledWith(
new URL(
`${this.Settings.apis.project_history.url}/project/${this.project_id}/flush?background=true`
)
)
})
})
describe('flushProjectChanges', function () {
describe('in the normal case', function () {
beforeEach(function (done) {
this.request.post = sinon
.stub()
.callsArgWith(1, null, { statusCode: 204 })
this.HistoryManager.flushProjectChanges(
beforeEach(async function () {
await this.HistoryManager.promises.flushProjectChanges(
this.project_id,
{
background: true,
},
done
}
)
})
it('should send a request to the project history api', function () {
this.request.post
.calledWith({
url: `${this.Settings.apis.project_history.url}/project/${this.project_id}/flush`,
qs: { background: true },
})
.should.equal(true)
this.fetchUtils.fetchNothing.should.have.been.calledWith(
new URL(
`${this.Settings.apis.project_history.url}/project/${this.project_id}/flush?background=true`
)
)
})
})
describe('with the skip_history_flush option', function () {
beforeEach(function (done) {
this.request.post = sinon.stub()
this.HistoryManager.flushProjectChanges(
beforeEach(async function () {
await this.HistoryManager.promises.flushProjectChanges(
this.project_id,
{
skip_history_flush: true,
},
done
}
)
})
it('should not send a request to the project history api', function () {
this.request.post.called.should.equal(false)
this.fetchUtils.fetchNothing.should.not.have.been.called
})
})
})
@@ -96,7 +94,7 @@ describe('HistoryManager', function () {
describe('recordAndFlushHistoryOps', function () {
beforeEach(function () {
this.ops = ['mock-ops']
this.project_ops_length = 10
this.project_ops_length = 500
this.HistoryManager.flushProjectChangesAsync = sinon.stub()
})
@@ -111,17 +109,12 @@ describe('HistoryManager', function () {
})
it('should not flush project changes', function () {
this.HistoryManager.flushProjectChangesAsync.called.should.equal(false)
this.fetchUtils.fetchNothing.should.not.have.been.called
})
})
describe('with enough ops to flush project changes', function () {
beforeEach(function () {
this.HistoryManager.shouldFlushHistoryOps = sinon.stub()
this.HistoryManager.shouldFlushHistoryOps
.withArgs(this.project_id, this.project_ops_length)
.returns(true)
this.HistoryManager.recordAndFlushHistoryOps(
this.project_id,
this.ops,
@@ -130,29 +123,12 @@ describe('HistoryManager', function () {
})
it('should flush project changes', function () {
this.HistoryManager.flushProjectChangesAsync
.calledWith(this.project_id)
.should.equal(true)
})
})
describe('with enough ops to flush doc changes', function () {
beforeEach(function () {
this.HistoryManager.shouldFlushHistoryOps = sinon.stub()
this.HistoryManager.shouldFlushHistoryOps
.withArgs(this.project_id, this.project_ops_length)
.returns(false)
this.HistoryManager.recordAndFlushHistoryOps(
this.project_id,
this.ops,
this.project_ops_length
this.fetchUtils.fetchNothing.should.have.been.calledWith(
new URL(
`${this.Settings.apis.project_history.url}/project/${this.project_id}/flush?background=true`
)
)
})
it('should not flush project changes', function () {
this.HistoryManager.flushProjectChangesAsync.called.should.equal(false)
})
})
describe('shouldFlushHistoryOps', function () {
@@ -228,78 +204,61 @@ describe('HistoryManager', function () {
url: `www.filestore.test/${this.project_id}/mock-file-id`,
},
]
this.ProjectHistoryRedisManager.queueResyncProjectStructure = sinon
.stub()
.yields()
this.DocumentManager.resyncDocContentsWithLock = sinon.stub().yields()
})
describe('full sync', function () {
beforeEach(function () {
this.HistoryManager.resyncProjectHistory(
beforeEach(async function () {
await this.HistoryManager.promises.resyncProjectHistory(
this.project_id,
this.projectHistoryId,
this.docs,
this.files,
{},
this.callback
{}
)
})
it('should queue a project structure reync', function () {
this.ProjectHistoryRedisManager.queueResyncProjectStructure
.calledWith(
this.project_id,
this.projectHistoryId,
this.docs,
this.files
)
.should.equal(true)
this.ProjectHistoryRedisManager.promises.queueResyncProjectStructure.should.have.been.calledWith(
this.project_id,
this.projectHistoryId,
this.docs,
this.files
)
})
it('should queue doc content reyncs', function () {
this.DocumentManager.resyncDocContentsWithLock
.calledWith(this.project_id, this.docs[0].doc, this.docs[0].path)
.should.equal(true)
})
it('should call the callback', function () {
this.callback.called.should.equal(true)
this.DocumentManager.promises.resyncDocContentsWithLock.should.have.been.calledWith(
this.project_id,
this.docs[0].doc,
this.docs[0].path
)
})
})
describe('resyncProjectStructureOnly=true', function () {
beforeEach(function () {
this.HistoryManager.resyncProjectHistory(
beforeEach(async function () {
await this.HistoryManager.promises.resyncProjectHistory(
this.project_id,
this.projectHistoryId,
this.docs,
this.files,
{ resyncProjectStructureOnly: true },
this.callback
{ resyncProjectStructureOnly: true }
)
})
it('should queue a project structure reync', function () {
this.ProjectHistoryRedisManager.queueResyncProjectStructure
.calledWith(
this.project_id,
this.projectHistoryId,
this.docs,
this.files,
{ resyncProjectStructureOnly: true }
)
.should.equal(true)
})
it('should not queue doc content reyncs', function () {
this.DocumentManager.resyncDocContentsWithLock.called.should.equal(
false
this.ProjectHistoryRedisManager.promises.queueResyncProjectStructure.should.have.been.calledWith(
this.project_id,
this.projectHistoryId,
this.docs,
this.files,
{ resyncProjectStructureOnly: true }
)
})
it('should call the callback', function () {
this.callback.called.should.equal(true)
it('should not queue doc content reyncs', function () {
this.DocumentManager.promises.resyncDocContentsWithLock.should.not.have
.been.called
})
})
})