[history-ot] flush history for projects with short queues ASAP (#25776)

* [document-updater] flush history for projects with short queues ASAP

* [k8s] document-updater: enable short history queue for history-ot demo

* [project-history] flush history for projects with short queues ASAP

* [project-history] wait for mongo before running acceptance tests

* [k8s] project-history: enable short history queue for history-ot demo

* [project-history] change wait-for-mongo step in tests

Co-authored-by: Eric Mc Sween <eric.mcsween@overleaf.com>

---------

Co-authored-by: Eric Mc Sween <eric.mcsween@overleaf.com>
GitOrigin-RevId: 3e989c409e8e9887655b35f2659ce0829e61b357
This commit is contained in:
Jakob Ackermann
2025-05-22 12:29:53 +01:00
committed by Copybot
parent 385523bcdc
commit 6785c84ea3
9 changed files with 137 additions and 13 deletions
@@ -62,6 +62,7 @@ const HistoryManager = {
// record updates for project history
if (
HistoryManager.shouldFlushHistoryOps(
projectId,
projectOpsLength,
ops.length,
HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
@@ -77,7 +78,8 @@ const HistoryManager = {
}
},
shouldFlushHistoryOps(length, opsLength, threshold) {
shouldFlushHistoryOps(projectId, length, opsLength, threshold) {
if (Settings.shortHistoryQueues.includes(projectId)) return true
if (!length) {
return false
} // don't flush unless we know the length
@@ -317,6 +317,7 @@ function updateProjectWithLocks(
}
if (
HistoryManager.shouldFlushHistoryOps(
projectId,
projectOpsLength,
updates.length,
HistoryManager.FLUSH_PROJECT_EVERY_N_OPS
@@ -184,4 +184,8 @@ module.exports = {
smoothingOffset: process.env.SMOOTHING_OFFSET || 1000, // milliseconds
gracefulShutdownDelayInMs:
parseInt(process.env.GRACEFUL_SHUTDOWN_DELAY_SECONDS ?? '10', 10) * 1000,
shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '')
.split(',')
.filter(s => !!s),
}
@@ -14,6 +14,7 @@ describe('HistoryManager', function () {
requires: {
request: (this.request = {}),
'@overleaf/settings': (this.Settings = {
shortHistoryQueues: [],
apis: {
project_history: {
url: 'http://project_history.example.com',
@@ -118,7 +119,7 @@ describe('HistoryManager', function () {
beforeEach(function () {
this.HistoryManager.shouldFlushHistoryOps = sinon.stub()
this.HistoryManager.shouldFlushHistoryOps
.withArgs(this.project_ops_length)
.withArgs(this.project_id, this.project_ops_length)
.returns(true)
this.HistoryManager.recordAndFlushHistoryOps(
@@ -139,7 +140,7 @@ describe('HistoryManager', function () {
beforeEach(function () {
this.HistoryManager.shouldFlushHistoryOps = sinon.stub()
this.HistoryManager.shouldFlushHistoryOps
.withArgs(this.project_ops_length)
.withArgs(this.project_id, this.project_ops_length)
.returns(false)
this.HistoryManager.recordAndFlushHistoryOps(
@@ -157,6 +158,7 @@ describe('HistoryManager', function () {
describe('shouldFlushHistoryOps', function () {
it('should return false if the number of ops is not known', function () {
this.HistoryManager.shouldFlushHistoryOps(
this.project_id,
null,
['a', 'b', 'c'].length,
1
@@ -168,6 +170,7 @@ describe('HistoryManager', function () {
// Previously we were on 11 ops
// We didn't pass over a multiple of 5
this.HistoryManager.shouldFlushHistoryOps(
this.project_id,
14,
['a', 'b', 'c'].length,
5
@@ -178,6 +181,7 @@ describe('HistoryManager', function () {
// Previously we were on 12 ops
// We've reached a new multiple of 5
this.HistoryManager.shouldFlushHistoryOps(
this.project_id,
15,
['a', 'b', 'c'].length,
5
@@ -189,11 +193,22 @@ describe('HistoryManager', function () {
// Previously we were on 16 ops
// We didn't pass over a multiple of 5
this.HistoryManager.shouldFlushHistoryOps(
this.project_id,
17,
['a', 'b', 'c'].length,
5
).should.equal(true)
})
it('should return true if the project has a short queue', function () {
this.Settings.shortHistoryQueues = [this.project_id]
this.HistoryManager.shouldFlushHistoryOps(
this.project_id,
14,
['a', 'b', 'c'].length,
5
).should.equal(true)
})
})
})
@@ -11,6 +11,7 @@ import async from 'async'
import logger from '@overleaf/logger'
import OError from '@overleaf/o-error'
import metrics from '@overleaf/metrics'
import Settings from '@overleaf/settings'
import _ from 'lodash'
import * as RedisManager from './RedisManager.js'
import * as UpdatesProcessor from './UpdatesProcessor.js'
@@ -37,6 +38,13 @@ export function flushIfOld(projectId, cutoffTime, callback) {
)
metrics.inc('flush-old-updates', 1, { status: 'flushed' })
return UpdatesProcessor.processUpdatesForProject(projectId, callback)
} else if (Settings.shortHistoryQueues.includes(projectId)) {
logger.debug(
{ projectId, firstOpTimestamp, cutoffTime },
'flushing project with short queue'
)
metrics.inc('flush-old-updates', 1, { status: 'short-queue' })
return UpdatesProcessor.processUpdatesForProject(projectId, callback)
} else {
metrics.inc('flush-old-updates', 1, { status: 'skipped' })
return callback()
@@ -106,4 +106,8 @@ module.exports = {
},
maxFileSizeInBytes: 100 * 1024 * 1024, // 100 megabytes
shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '')
.split(',')
.filter(s => !!s),
}
@@ -124,11 +124,14 @@ async function main() {
.map((projectId, idx) => {
return { projectId, timestamp: timestamps[idx] }
})
.filter(({ timestamp }) => {
.filter(({ projectId, timestamp }) => {
if (!timestamp) {
nullCount++
return true // Unknown age
}
return timestamp ? olderThan(maxAge, timestamp) : true
if (olderThan(maxAge, timestamp)) return true // Older than threshold
if (Settings.shortHistoryQueues.includes(projectId)) return true // Short queue
return false // Do not flush
})
collectedProjects.push(...newProjects)
}
@@ -6,6 +6,7 @@ import assert from 'node:assert'
import mongodb from 'mongodb-legacy'
import * as ProjectHistoryClient from './helpers/ProjectHistoryClient.js'
import * as ProjectHistoryApp from './helpers/ProjectHistoryApp.js'
import Settings from '@overleaf/settings'
const { ObjectId } = mongodb
const MockHistoryStore = () => nock('http://127.0.0.1:3100')
@@ -127,7 +128,7 @@ describe('Flushing old queues', function () {
'made calls to history service to store updates in the background'
)
done()
}, 100)
}, 1_000)
}
)
})
@@ -183,6 +184,88 @@ describe('Flushing old queues', function () {
})
})
describe('when the update is newer than the cutoff and project has short queue', function () {
beforeEach(function () {
Settings.shortHistoryQueues.push(this.projectId)
})
afterEach(function () {
Settings.shortHistoryQueues.length = 0
})
beforeEach(function (done) {
this.flushCall = MockHistoryStore()
.put(
`/api/projects/${historyId}/blobs/0a207c060e61f3b88eaee0a8cd0696f46fb155eb`
)
.reply(201)
.post(`/api/projects/${historyId}/legacy_changes?end_version=0`)
.reply(200)
const update = {
pathname: '/main.tex',
docLines: 'a\nb',
doc: this.docId,
meta: { user_id: this.user_id, ts: new Date() },
}
async.series(
[
cb =>
ProjectHistoryClient.pushRawUpdate(this.projectId, update, cb),
cb =>
ProjectHistoryClient.setFirstOpTimestamp(
this.projectId,
Date.now() - 60 * 1000,
cb
),
],
done
)
})
it('flushes the project history queue', function (done) {
request.post(
{
url: `http://127.0.0.1:3054/flush/old?maxAge=${3 * 3600}`,
},
(error, res, body) => {
if (error) {
return done(error)
}
expect(res.statusCode).to.equal(200)
assert(
this.flushCall.isDone(),
'made calls to history service to store updates'
)
done()
}
)
})
it('flushes the project history queue in the background when requested', function (done) {
request.post(
{
url: `http://127.0.0.1:3054/flush/old?maxAge=${3 * 3600}&background=1`,
},
(error, res, body) => {
if (error) {
return done(error)
}
expect(res.statusCode).to.equal(200)
expect(body).to.equal('{"message":"running flush in background"}')
assert(
!this.flushCall.isDone(),
'did not make calls to history service to store updates in the foreground'
)
setTimeout(() => {
assert(
this.flushCall.isDone(),
'made calls to history service to store updates in the background'
)
done()
}, 1_000)
}
)
})
})
describe('when the update does not have a timestamp', function () {
beforeEach(function (done) {
this.flushCall = MockHistoryStore()
@@ -9,6 +9,7 @@
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
import { app } from '../../../../app/js/server.js'
import { mongoClient } from '../../../../app/js/mongodb.js'
let running = false
let initing = false
@@ -29,13 +30,16 @@ export function ensureRunning(callback) {
if (error != null) {
throw error
}
running = true
return (() => {
const result = []
for (callback of Array.from(callbacks)) {
result.push(callback())
// Wait for mongo
mongoClient.connect(error => {
if (error != null) {
throw error
}
return result
})()
running = true
for (callback of Array.from(callbacks)) {
callback()
}
})
})
}