Merge pull request #25993 from overleaf/bg-history-refactor-persist-buffer-limits

refactor persist buffer to add limits

GitOrigin-RevId: 4a40a7a8812acf5bb7f98bfd7b94d81ebe19fc57
This commit is contained in:
Brian Gough
2025-06-03 11:51:07 +01:00
committed by Copybot
parent 2088d8e30a
commit 4c697a5659
3 changed files with 115 additions and 14 deletions

View File

@@ -23,21 +23,13 @@ const redisBackend = require('./chunk_store/redis')
* 6. Set the new persisted version (endVersion of the latest persisted chunk) in Redis.
*
* @param {string} projectId
* @param {Object} limits
* @throws {Error | OError} If a critical error occurs during persistence.
*/
async function persistBuffer(projectId) {
async function persistBuffer(projectId, limits) {
assert.projectId(projectId)
logger.debug({ projectId }, 'starting persistBuffer operation')
// Set limits to force us to persist all of the changes.
const farFuture = new Date()
farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000)
const limits = {
maxChanges: 0,
minChangeTimestamp: farFuture,
maxChangeTimestamp: farFuture,
}
// 1. Get the latest chunk's endVersion from GCS/main store
let endVersion
const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId)

View File

@@ -18,7 +18,15 @@ logger.initialize('persist-redis-chunks')
async function persistProjectAction(projectId) {
const job = await claimPersistJob(projectId)
await persistBuffer(projectId)
// Set limits to force us to persist all of the changes.
const farFuture = new Date()
farFuture.setTime(farFuture.getTime() + 7 * 24 * 3600 * 1000)
const limits = {
maxChanges: 0,
minChangeTimestamp: farFuture,
maxChangeTimestamp: farFuture,
}
await persistBuffer(projectId, limits)
if (job && job.close) {
await job.close()
}

View File

@@ -92,7 +92,7 @@ describe('persistBuffer', function () {
await redisBackend.setPersistedVersion(projectId, initialVersion)
// Persist the changes from Redis to the chunk store
await persistBuffer(projectId)
await persistBuffer(projectId, limitsToPersistImmediately)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk).to.exist
@@ -196,7 +196,7 @@ describe('persistBuffer', function () {
persistedChunkEndVersion
)
await persistBuffer(projectId)
await persistBuffer(projectId, limitsToPersistImmediately)
const latestChunk = await chunkStore.loadLatest(projectId)
expect(latestChunk).to.exist
@@ -287,7 +287,8 @@ describe('persistBuffer', function () {
const chunksBefore = await chunkStore.getProjectChunks(projectId)
await persistBuffer(projectId)
// Persist buffer (which should do nothing as there are no new changes)
await persistBuffer(projectId, limitsToPersistImmediately)
const chunksAfter = await chunkStore.getProjectChunks(projectId)
expect(chunksAfter.length).to.equal(chunksBefore.length)
@@ -335,4 +336,104 @@ describe('persistBuffer', function () {
expect(finalPersistedVersionInRedis).to.equal(persistedChunkEndVersion)
})
})
describe('when limits restrict the number of changes to persist', function () {
it('should persist only a subset of changes and update persistedVersion accordingly', async function () {
const now = Date.now()
const oneDayAgo = now - 1000 * 60 * 60 * 24
const oneHourAgo = now - 1000 * 60 * 60
const twoHoursAgo = now - 1000 * 60 * 60 * 2
const threeHoursAgo = now - 1000 * 60 * 60 * 3
// Create an initial file with some content
const initialContent = 'Initial content.'
const addInitialFileChange = new Change(
[new AddFileOperation('main.tex', File.fromString(initialContent))],
new Date(oneDayAgo),
[]
)
await persistChanges(
projectId,
[addInitialFileChange],
limitsToPersistImmediately,
initialVersion
)
const versionAfterInitialSetup = initialVersion + 1 // Version is 1
// Queue three additional changes in Redis
const op1 = new TextOperation()
.retain(initialContent.length)
.insert(' Change 1.')
const change1 = new Change(
[new EditFileOperation('main.tex', op1)],
new Date(threeHoursAgo)
)
const contentAfterC1 = initialContent + ' Change 1.'
const op2 = new TextOperation()
.retain(contentAfterC1.length)
.insert(' Change 2.')
const change2 = new Change(
[new EditFileOperation('main.tex', op2)],
new Date(twoHoursAgo)
)
const contentAfterC2 = contentAfterC1 + ' Change 2.'
const op3 = new TextOperation()
.retain(contentAfterC2.length)
.insert(' Change 3.')
const change3 = new Change(
[new EditFileOperation('main.tex', op3)],
new Date(oneHourAgo)
)
const changesToQueue = [change1, change2, change3]
await redisBackend.queueChanges(
projectId,
new Snapshot(), // dummy snapshot
versionAfterInitialSetup, // startVersion for queued changes
changesToQueue,
{
persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS,
expireTime: now + redisBackend.PROJECT_TTL_MS,
}
)
await redisBackend.setPersistedVersion(
projectId,
versionAfterInitialSetup
)
// Define limits to only persist 2 additional changes (on top of the initial file creation),
// which should leave the final change (change3) in the redis buffer.
const restrictiveLimits = {
minChangeTimestamp: new Date(oneHourAgo), // only changes more than 1 hour old are considered
maxChangeTimestamp: new Date(twoHoursAgo), // they will be persisted if any change is older than 2 hours
}
await persistBuffer(projectId, restrictiveLimits)
// Check the latest persisted chunk, it should only have the initial file and the first two changes
const latestChunk = await chunkStore.loadLatest(projectId, {
persistedOnly: true,
})
expect(latestChunk).to.exist
expect(latestChunk.getChanges().length).to.equal(3) // addInitialFileChange + change1 + change2
expect(latestChunk.getStartVersion()).to.equal(initialVersion)
const expectedEndVersion = versionAfterInitialSetup + 2 // Persisted two changes from the queue
expect(latestChunk.getEndVersion()).to.equal(expectedEndVersion)
// Check persisted version in Redis
const state = await redisBackend.getState(projectId)
expect(state.persistedVersion).to.equal(expectedEndVersion)
// Check non-persisted changes in Redis
const nonPersisted = await redisBackend.getNonPersistedChanges(
projectId,
expectedEndVersion
)
expect(nonPersisted).to.be.an('array').with.lengthOf(1) // change3 should remain
expect(nonPersisted).to.deep.equal([change3])
})
})
})