diff --git a/services/document-updater/app/js/HistoryManager.js b/services/document-updater/app/js/HistoryManager.js index 3963431925..d9a8459525 100644 --- a/services/document-updater/app/js/HistoryManager.js +++ b/services/document-updater/app/js/HistoryManager.js @@ -62,6 +62,7 @@ const HistoryManager = { // record updates for project history if ( HistoryManager.shouldFlushHistoryOps( + projectId, projectOpsLength, ops.length, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS @@ -77,7 +78,8 @@ const HistoryManager = { } }, - shouldFlushHistoryOps(length, opsLength, threshold) { + shouldFlushHistoryOps(projectId, length, opsLength, threshold) { + if (Settings.shortHistoryQueues.includes(projectId)) return true if (!length) { return false } // don't flush unless we know the length diff --git a/services/document-updater/app/js/ProjectManager.js b/services/document-updater/app/js/ProjectManager.js index 781ed0e168..cdd4c11482 100644 --- a/services/document-updater/app/js/ProjectManager.js +++ b/services/document-updater/app/js/ProjectManager.js @@ -317,6 +317,7 @@ function updateProjectWithLocks( } if ( HistoryManager.shouldFlushHistoryOps( + projectId, projectOpsLength, updates.length, HistoryManager.FLUSH_PROJECT_EVERY_N_OPS diff --git a/services/document-updater/config/settings.defaults.js b/services/document-updater/config/settings.defaults.js index 0cd29d325b..9ed59de6c4 100755 --- a/services/document-updater/config/settings.defaults.js +++ b/services/document-updater/config/settings.defaults.js @@ -184,4 +184,8 @@ module.exports = { smoothingOffset: process.env.SMOOTHING_OFFSET || 1000, // milliseconds gracefulShutdownDelayInMs: parseInt(process.env.GRACEFUL_SHUTDOWN_DELAY_SECONDS ?? '10', 10) * 1000, + + shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '') + .split(',') + .filter(s => !!s), } diff --git a/services/document-updater/test/unit/js/HistoryManager/HistoryManagerTests.js b/services/document-updater/test/unit/js/HistoryManager/HistoryManagerTests.js index 2fd019d4c2..2a5fb29b6d 100644 --- a/services/document-updater/test/unit/js/HistoryManager/HistoryManagerTests.js +++ b/services/document-updater/test/unit/js/HistoryManager/HistoryManagerTests.js @@ -14,6 +14,7 @@ describe('HistoryManager', function () { requires: { request: (this.request = {}), '@overleaf/settings': (this.Settings = { + shortHistoryQueues: [], apis: { project_history: { url: 'http://project_history.example.com', @@ -118,7 +119,7 @@ describe('HistoryManager', function () { beforeEach(function () { this.HistoryManager.shouldFlushHistoryOps = sinon.stub() this.HistoryManager.shouldFlushHistoryOps - .withArgs(this.project_ops_length) + .withArgs(this.project_id, this.project_ops_length) .returns(true) this.HistoryManager.recordAndFlushHistoryOps( @@ -139,7 +140,7 @@ describe('HistoryManager', function () { beforeEach(function () { this.HistoryManager.shouldFlushHistoryOps = sinon.stub() this.HistoryManager.shouldFlushHistoryOps - .withArgs(this.project_ops_length) + .withArgs(this.project_id, this.project_ops_length) .returns(false) this.HistoryManager.recordAndFlushHistoryOps( @@ -157,6 +158,7 @@ describe('HistoryManager', function () { describe('shouldFlushHistoryOps', function () { it('should return false if the number of ops is not known', function () { this.HistoryManager.shouldFlushHistoryOps( + this.project_id, null, ['a', 'b', 'c'].length, 1 @@ -168,6 +170,7 @@ describe('HistoryManager', function () { // Previously we were on 11 ops // We didn't pass over a multiple of 5 this.HistoryManager.shouldFlushHistoryOps( + this.project_id, 14, ['a', 'b', 'c'].length, 5 @@ -178,6 +181,7 @@ describe('HistoryManager', function () { // Previously we were on 12 ops // We've reached a new multiple of 5 this.HistoryManager.shouldFlushHistoryOps( + this.project_id, 15, ['a', 'b', 'c'].length, 5 @@ -189,11 +193,22 @@ describe('HistoryManager', function () { // Previously we were on 16 ops // We didn't pass over a multiple of 5 this.HistoryManager.shouldFlushHistoryOps( + this.project_id, 17, ['a', 'b', 'c'].length, 5 ).should.equal(true) }) + + it('should return true if the project has a short queue', function () { + this.Settings.shortHistoryQueues = [this.project_id] + this.HistoryManager.shouldFlushHistoryOps( + this.project_id, + 14, + ['a', 'b', 'c'].length, + 5 + ).should.equal(true) + }) }) }) diff --git a/services/project-history/app/js/FlushManager.js b/services/project-history/app/js/FlushManager.js index 6df3b20a87..455a4f56f7 100644 --- a/services/project-history/app/js/FlushManager.js +++ b/services/project-history/app/js/FlushManager.js @@ -11,6 +11,7 @@ import async from 'async' import logger from '@overleaf/logger' import OError from '@overleaf/o-error' import metrics from '@overleaf/metrics' +import Settings from '@overleaf/settings' import _ from 'lodash' import * as RedisManager from './RedisManager.js' import * as UpdatesProcessor from './UpdatesProcessor.js' @@ -37,6 +38,13 @@ export function flushIfOld(projectId, cutoffTime, callback) { ) metrics.inc('flush-old-updates', 1, { status: 'flushed' }) return UpdatesProcessor.processUpdatesForProject(projectId, callback) + } else if (Settings.shortHistoryQueues.includes(projectId)) { + logger.debug( + { projectId, firstOpTimestamp, cutoffTime }, + 'flushing project with short queue' + ) + metrics.inc('flush-old-updates', 1, { status: 'short-queue' }) + return UpdatesProcessor.processUpdatesForProject(projectId, callback) } else { metrics.inc('flush-old-updates', 1, { status: 'skipped' }) return callback() diff --git a/services/project-history/config/settings.defaults.cjs b/services/project-history/config/settings.defaults.cjs index 9e5a39868a..d259d070b9 100644 --- a/services/project-history/config/settings.defaults.cjs +++ b/services/project-history/config/settings.defaults.cjs @@ -106,4 +106,8 @@ module.exports = { }, maxFileSizeInBytes: 100 * 1024 * 1024, // 100 megabytes + + shortHistoryQueues: (process.env.SHORT_HISTORY_QUEUES || '') + .split(',') + .filter(s => !!s), } diff --git a/services/project-history/scripts/flush_old.js b/services/project-history/scripts/flush_old.js index 6dc140196e..7ac13b757a 100644 --- a/services/project-history/scripts/flush_old.js +++ b/services/project-history/scripts/flush_old.js @@ -124,11 +124,14 @@ async function main() { .map((projectId, idx) => { return { projectId, timestamp: timestamps[idx] } }) - .filter(({ timestamp }) => { + .filter(({ projectId, timestamp }) => { if (!timestamp) { nullCount++ + return true // Unknown age } - return timestamp ? olderThan(maxAge, timestamp) : true + if (olderThan(maxAge, timestamp)) return true // Older than threshold + if (Settings.shortHistoryQueues.includes(projectId)) return true // Short queue + return false // Do not flush }) collectedProjects.push(...newProjects) } diff --git a/services/project-history/test/acceptance/js/FlushManagerTests.js b/services/project-history/test/acceptance/js/FlushManagerTests.js index d11346d9a3..8d4432d3ef 100644 --- a/services/project-history/test/acceptance/js/FlushManagerTests.js +++ b/services/project-history/test/acceptance/js/FlushManagerTests.js @@ -6,6 +6,7 @@ import assert from 'node:assert' import mongodb from 'mongodb-legacy' import * as ProjectHistoryClient from './helpers/ProjectHistoryClient.js' import * as ProjectHistoryApp from './helpers/ProjectHistoryApp.js' +import Settings from '@overleaf/settings' const { ObjectId } = mongodb const MockHistoryStore = () => nock('http://127.0.0.1:3100') @@ -127,7 +128,7 @@ describe('Flushing old queues', function () { 'made calls to history service to store updates in the background' ) done() - }, 100) + }, 1_000) } ) }) @@ -183,6 +184,88 @@ describe('Flushing old queues', function () { }) }) + describe('when the update is newer than the cutoff and project has short queue', function () { + beforeEach(function () { + Settings.shortHistoryQueues.push(this.projectId) + }) + afterEach(function () { + Settings.shortHistoryQueues.length = 0 + }) + beforeEach(function (done) { + this.flushCall = MockHistoryStore() + .put( + `/api/projects/${historyId}/blobs/0a207c060e61f3b88eaee0a8cd0696f46fb155eb` + ) + .reply(201) + .post(`/api/projects/${historyId}/legacy_changes?end_version=0`) + .reply(200) + const update = { + pathname: '/main.tex', + docLines: 'a\nb', + doc: this.docId, + meta: { user_id: this.user_id, ts: new Date() }, + } + async.series( + [ + cb => + ProjectHistoryClient.pushRawUpdate(this.projectId, update, cb), + cb => + ProjectHistoryClient.setFirstOpTimestamp( + this.projectId, + Date.now() - 60 * 1000, + cb + ), + ], + done + ) + }) + + it('flushes the project history queue', function (done) { + request.post( + { + url: `http://127.0.0.1:3054/flush/old?maxAge=${3 * 3600}`, + }, + (error, res, body) => { + if (error) { + return done(error) + } + expect(res.statusCode).to.equal(200) + assert( + this.flushCall.isDone(), + 'made calls to history service to store updates' + ) + done() + } + ) + }) + + it('flushes the project history queue in the background when requested', function (done) { + request.post( + { + url: `http://127.0.0.1:3054/flush/old?maxAge=${3 * 3600}&background=1`, + }, + (error, res, body) => { + if (error) { + return done(error) + } + expect(res.statusCode).to.equal(200) + expect(body).to.equal('{"message":"running flush in background"}') + assert( + !this.flushCall.isDone(), + 'did not make calls to history service to store updates in the foreground' + ) + setTimeout(() => { + assert( + this.flushCall.isDone(), + 'made calls to history service to store updates in the background' + ) + done() + }, 1_000) + } + ) + }) + }) + describe('when the update does not have a timestamp', function () { beforeEach(function (done) { this.flushCall = MockHistoryStore() diff --git a/services/project-history/test/acceptance/js/helpers/ProjectHistoryApp.js b/services/project-history/test/acceptance/js/helpers/ProjectHistoryApp.js index ae453b74f9..6a81221840 100644 --- a/services/project-history/test/acceptance/js/helpers/ProjectHistoryApp.js +++ b/services/project-history/test/acceptance/js/helpers/ProjectHistoryApp.js @@ -9,6 +9,7 @@ * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ import { app } from '../../../../app/js/server.js' +import { mongoClient } from '../../../../app/js/mongodb.js' let running = false let initing = false @@ -29,13 +30,16 @@ export function ensureRunning(callback) { if (error != null) { throw error } - running = true - return (() => { - const result = [] - for (callback of Array.from(callbacks)) { - result.push(callback()) + + // Wait for mongo + mongoClient.connect(error => { + if (error != null) { + throw error } - return result - })() + running = true + for (callback of Array.from(callbacks)) { + callback() + } + }) }) }