Merge pull request #26505 from overleaf/em-persist-buffer-batch

Persist Redis buffer in batches

GitOrigin-RevId: 1de22807dae3554b3274ec103783b0868b1439d9
This commit is contained in:
Eric Mc Sween
2025-06-27 07:58:05 -04:00
committed by Copybot
parent 04a56c1954
commit 08d89a4451
4 changed files with 269 additions and 133 deletions

View File

@@ -395,6 +395,7 @@ rclient.defineCommand('get_non_persisted_changes', {
local persistedVersionKey = KEYS[2]
local changesKey = KEYS[3]
local baseVersion = tonumber(ARGV[1])
local maxChanges = tonumber(ARGV[2])
-- Check if head version exists
local headVersion = tonumber(redis.call('GET', headVersionKey))
@@ -415,9 +416,20 @@ rclient.defineCommand('get_non_persisted_changes', {
return {'ok', {}}
else
local numChanges = headVersion - baseVersion
local changes = redis.call('LRANGE', changesKey, -numChanges, -1)
if #changes < numChanges then
local endIndex, expectedChanges
if maxChanges > 0 and maxChanges < numChanges then
-- return only the first maxChanges changes; the end index is inclusive
endIndex = -numChanges + maxChanges - 1
expectedChanges = maxChanges
else
endIndex = -1
expectedChanges = numChanges
end
local changes = redis.call('LRANGE', changesKey, -numChanges, endIndex)
if #changes < expectedChanges then
-- We didn't get as many changes as we expected
return {'out_of_bounds'}
end
@@ -433,6 +445,9 @@ rclient.defineCommand('get_non_persisted_changes', {
* @param {string} projectId - The unique identifier of the project.
* @param {number} baseVersion - The version on top of which the changes should
* be applied.
* @param {object} [opts]
* @param {number} [opts.maxChanges] - The maximum number of changes to return.
* Defaults to 0, meaning no limit.
* @returns {Promise<Change[]>} Changes that can be applied on top of
* baseVersion. An empty array means that the project doesn't have
* changes to persist. A null value means that the non-persisted
@@ -440,14 +455,15 @@ rclient.defineCommand('get_non_persisted_changes', {
*
* @throws {Error} If Redis operations fail.
*/
async function getNonPersistedChanges(projectId, baseVersion) {
async function getNonPersistedChanges(projectId, baseVersion, opts = {}) {
let result
try {
result = await rclient.get_non_persisted_changes(
keySchema.headVersion({ projectId }),
keySchema.persistedVersion({ projectId }),
keySchema.changes({ projectId }),
baseVersion.toString()
baseVersion.toString(),
opts.maxChanges ?? 0
)
} catch (err) {
metrics.inc('chunk_store.redis.get_non_persisted_changes', 1, {

View File

@@ -12,6 +12,8 @@ const persistChanges = require('./persist_changes')
const resyncProject = require('./resync_project')
const redisBackend = require('./chunk_store/redis')
const PERSIST_BATCH_SIZE = 50
/**
* Persist the changes from Redis buffer to the main storage
*
@@ -42,16 +44,147 @@ async function persistBuffer(projectId, limits) {
endVersion = 0 // No chunks found, start from version 0
logger.debug({ projectId }, 'no existing chunks found in main storage')
}
const originalEndVersion = endVersion
logger.debug({ projectId, endVersion }, 'got latest persisted chunk')
// 2. Get non-persisted changes from Redis
const changesToPersist = await redisBackend.getNonPersistedChanges(
projectId,
endVersion
)
// Process changes in batches
let numberOfChangesPersisted = 0
let currentChunk = null
let resyncNeeded = false
let resyncChangesWerePersisted = false
while (true) {
// 2. Get non-persisted changes from Redis
const changesToPersist = await redisBackend.getNonPersistedChanges(
projectId,
endVersion,
{ maxChanges: PERSIST_BATCH_SIZE }
)
if (changesToPersist.length === 0) {
if (changesToPersist.length === 0) {
break
}
logger.debug(
{
projectId,
endVersion,
count: changesToPersist.length,
},
'found changes in Redis to persist'
)
// 4. Load file blobs for these Redis changes. Errors will propagate.
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const blobHashes = new Set()
for (const change of changesToPersist) {
change.findBlobHashes(blobHashes)
}
if (blobHashes.size > 0) {
await batchBlobStore.preload(Array.from(blobHashes))
}
for (const change of changesToPersist) {
await change.loadFiles('lazy', blobStore)
}
// 5. Run the persistChanges() algorithm. Errors will propagate.
logger.debug(
{
projectId,
endVersion,
changeCount: changesToPersist.length,
},
'calling persistChanges'
)
const persistResult = await persistChanges(
projectId,
changesToPersist,
limits,
endVersion
)
if (!persistResult || !persistResult.currentChunk) {
metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' })
throw new OError(
'persistChanges did not produce a new chunk for non-empty changes',
{
projectId,
endVersion,
changeCount: changesToPersist.length,
}
)
}
currentChunk = persistResult.currentChunk
const newEndVersion = currentChunk.getEndVersion()
if (newEndVersion <= endVersion) {
metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' })
throw new OError(
'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes',
{
projectId,
newEndVersion,
endVersion,
changeCount: changesToPersist.length,
}
)
}
logger.debug(
{
projectId,
oldVersion: endVersion,
newVersion: newEndVersion,
},
'successfully persisted changes from Redis to main storage'
)
// 6. Set the persisted version in Redis. Errors will propagate.
const status = await redisBackend.setPersistedVersion(
projectId,
newEndVersion
)
if (status !== 'ok') {
metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' })
throw new OError('failed to update persisted version in Redis', {
projectId,
newEndVersion,
status,
})
}
logger.debug(
{ projectId, newEndVersion },
'updated persisted version in Redis'
)
numberOfChangesPersisted += persistResult.numberOfChangesPersisted
endVersion = newEndVersion
// Check if a resync might be needed
if (persistResult.resyncNeeded) {
resyncNeeded = true
}
if (
changesToPersist.some(
change => change.getOrigin()?.getKind() === 'history-resync'
)
) {
resyncChangesWerePersisted = true
}
if (persistResult.numberOfChangesPersisted < PERSIST_BATCH_SIZE) {
// We reached the end of available changes
break
}
}
if (numberOfChangesPersisted === 0) {
logger.debug(
{ projectId, endVersion },
'no new changes in Redis buffer to persist'
@@ -61,124 +194,16 @@ async function persistBuffer(projectId, limits) {
// to match the current endVersion. This shouldn't be needed
// unless a worker failed to update the persisted version.
await redisBackend.setPersistedVersion(projectId, endVersion)
const { chunk } = await chunkStore.loadByChunkRecord(
projectId,
latestChunkMetadata
} else {
logger.debug(
{ projectId, finalPersistedVersion: endVersion },
'persistBuffer operation completed successfully'
)
// Return the result in the same format as persistChanges
// so that the caller can handle it uniformly.
return {
numberOfChangesPersisted: changesToPersist.length,
originalEndVersion: endVersion,
currentChunk: chunk,
}
metrics.inc('persist_buffer', 1, { status: 'persisted' })
}
logger.debug(
{
projectId,
endVersion,
count: changesToPersist.length,
},
'found changes in Redis to persist'
)
// 4. Load file blobs for these Redis changes. Errors will propagate.
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const blobHashes = new Set()
for (const change of changesToPersist) {
change.findBlobHashes(blobHashes)
}
if (blobHashes.size > 0) {
await batchBlobStore.preload(Array.from(blobHashes))
}
for (const change of changesToPersist) {
await change.loadFiles('lazy', blobStore)
}
// 5. Run the persistChanges() algorithm. Errors will propagate.
logger.debug(
{
projectId,
endVersion,
changeCount: changesToPersist.length,
},
'calling persistChanges'
)
const persistResult = await persistChanges(
projectId,
changesToPersist,
limits,
endVersion
)
if (!persistResult || !persistResult.currentChunk) {
metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' })
throw new OError(
'persistChanges did not produce a new chunk for non-empty changes',
{
projectId,
endVersion,
changeCount: changesToPersist.length,
}
)
}
const newPersistedChunk = persistResult.currentChunk
const newEndVersion = newPersistedChunk.getEndVersion()
if (newEndVersion <= endVersion) {
metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' })
throw new OError(
'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes',
{
projectId,
newEndVersion,
endVersion,
changeCount: changesToPersist.length,
}
)
}
logger.debug(
{
projectId,
oldVersion: endVersion,
newVersion: newEndVersion,
},
'successfully persisted changes from Redis to main storage'
)
// 6. Set the persisted version in Redis. Errors will propagate.
const status = await redisBackend.setPersistedVersion(
projectId,
newEndVersion
)
if (status !== 'ok') {
metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' })
throw new OError('failed to update persisted version in Redis', {
projectId,
newEndVersion,
status,
})
}
logger.debug(
{ projectId, newEndVersion },
'updated persisted version in Redis'
)
// 7. Resync the project if content hash validation failed
if (limits.autoResync && persistResult.resyncNeeded) {
if (
changesToPersist.some(
change => change.getOrigin()?.getKind() === 'history-resync'
)
) {
if (limits.autoResync && resyncNeeded) {
if (resyncChangesWerePersisted) {
// To avoid an infinite loop, do not resync if the current batch of
// changes contains a history resync.
logger.warn(
@@ -193,14 +218,20 @@ async function persistBuffer(projectId, limits) {
}
}
logger.debug(
{ projectId, finalPersistedVersion: newEndVersion },
'persistBuffer operation completed successfully'
)
if (currentChunk == null) {
const { chunk } = await chunkStore.loadByChunkRecord(
projectId,
latestChunkMetadata
)
currentChunk = chunk
}
metrics.inc('persist_buffer', 1, { status: 'persisted' })
return persistResult
return {
numberOfChangesPersisted,
originalEndVersion,
currentChunk,
resyncNeeded,
}
}
module.exports = persistBuffer

View File

@@ -541,7 +541,7 @@ describe('chunk buffer Redis backend', function () {
expect(nonPersistedChanges).to.deep.equal(changes)
})
it('should return part of the changes if requested', async function () {
it('should return part of the changes following a given base version if requested', async function () {
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
projectId,
3
@@ -549,6 +549,24 @@ describe('chunk buffer Redis backend', function () {
expect(nonPersistedChanges).to.deep.equal(changes.slice(1))
})
it('should limit the number of changes returned if requested', async function () {
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
projectId,
2,
{ maxChanges: 2 }
)
expect(nonPersistedChanges).to.deep.equal(changes.slice(0, 2))
})
it('should return all changes if limit is not reached', async function () {
const nonPersistedChanges = await redisBackend.getNonPersistedChanges(
projectId,
3,
{ maxChanges: 10 }
)
expect(nonPersistedChanges).to.deep.equal(changes.slice(1))
})
it('should error if the base version requested is too low', async function () {
await expect(
redisBackend.getNonPersistedChanges(projectId, 0)

View File

@@ -12,6 +12,7 @@ import {
} from 'overleaf-editor-core'
import persistBuffer from '../../../../storage/lib/persist_buffer.js'
import chunkStore from '../../../../storage/lib/chunk_store/index.js'
import { BlobStore } from '../../../../storage/lib/blob_store/index.js'
import redisBackend from '../../../../storage/lib/chunk_store/redis.js'
import persistChanges from '../../../../storage/lib/persist_changes.js'
import cleanup from './support/cleanup.js'
@@ -22,6 +23,7 @@ describe('persistBuffer', function () {
let projectId
const initialVersion = 0
let limitsToPersistImmediately
let blobStore
before(function () {
const farFuture = new Date()
@@ -39,6 +41,7 @@ describe('persistBuffer', function () {
beforeEach(async function () {
projectId = fixtures.docs.uninitializedProject.id
await chunkStore.initializeProject(projectId)
blobStore = new BlobStore(projectId)
})
describe('with an empty initial chunk (new project)', function () {
@@ -340,6 +343,7 @@ describe('persistBuffer', function () {
numberOfChangesPersisted: 0,
originalEndVersion: persistedChunkEndVersion,
currentChunk,
resyncNeeded: false,
})
const chunksAfter = await chunkStore.getProjectChunks(projectId)
@@ -389,6 +393,7 @@ describe('persistBuffer', function () {
numberOfChangesPersisted: 0,
originalEndVersion: persistedChunkEndVersion,
currentChunk,
resyncNeeded: false,
})
const chunksAfter = await chunkStore.getProjectChunks(projectId)
@@ -516,4 +521,70 @@ describe('persistBuffer', function () {
expect(nonPersisted).to.deep.equal([change3])
})
})
describe('with lots of changes to persist', function () {
it('should persist all changes', async function () {
const changes = []
// Create an initial file with some content
const blob = await blobStore.putString('')
changes.push(
new Change(
[new AddFileOperation('main.tex', File.createLazyFromBlobs(blob))],
new Date(),
[]
)
)
for (let i = 0; i < 500; i++) {
const op = new TextOperation().retain(i).insert('x')
changes.push(
new Change([new EditFileOperation('main.tex', op)], new Date())
)
}
const now = Date.now()
await redisBackend.queueChanges(
projectId,
new Snapshot(), // dummy snapshot
0, // startVersion for queued changes
changes,
{
persistTime: now + redisBackend.MAX_PERSIST_DELAY_MS,
expireTime: now + redisBackend.PROJECT_TTL_MS,
}
)
const expectedEndVersion = 501
const persistResult = await persistBuffer(
projectId,
limitsToPersistImmediately
)
expect(persistResult.numberOfChangesPersisted).to.equal(
expectedEndVersion
)
expect(persistResult.originalEndVersion).to.equal(0)
expect(persistResult.resyncNeeded).to.be.false
// Check the latest persisted chunk
const latestChunk = await chunkStore.loadLatest(projectId, {
persistedOnly: true,
})
expect(latestChunk).to.exist
expect(latestChunk.getEndVersion()).to.equal(expectedEndVersion)
// Check that chunk returned by persistBuffer matches the latest chunk
expect(persistResult.currentChunk).to.deep.equal(latestChunk)
// Check persisted version in Redis
const state = await redisBackend.getState(projectId)
expect(state.persistedVersion).to.equal(expectedEndVersion)
// Check non-persisted changes in Redis
const nonPersisted = await redisBackend.getNonPersistedChanges(
projectId,
expectedEndVersion
)
expect(nonPersisted).to.deep.equal([])
})
})
})