Merge pull request #25284 from overleaf/em-queue-changes-verification

Exercise the Redis buffer when persisting changes

GitOrigin-RevId: a649b9808b6472e7c5dd9c8bfa6e3c98fb6ef4d4
This commit is contained in:
Brian Gough
2025-05-07 15:42:51 +01:00
committed by Copybot
parent 9f0f910a83
commit ec91c120b1
3 changed files with 220 additions and 34 deletions

View File

@@ -1,3 +1,5 @@
// @ts-check
const metrics = require('@overleaf/metrics')
const OError = require('@overleaf/o-error')
const redis = require('../redis')
@@ -97,27 +99,36 @@ rclient.defineCommand('queue_changes', {
local head = ARGV[2]
local persistTime = tonumber(ARGV[3])
local expireTime = tonumber(ARGV[4])
-- Changes start from ARGV[5]
local onlyIfExists = ARGV[5]
local changesIndex = 6 -- Changes start here
local headVersion = tonumber(redis.call('GET', headVersionKey))
-- Check if updates should only be queued if the project already exists (used for gradual rollouts)
if not headVersion and onlyIfExists == 'true' then
return 'ignore'
end
-- Check that the supplied baseVersion matches the head version
-- If headVersion is nil, it means the project does not exist yet and will be created.
if headVersion and headVersion ~= baseVersion then
return 'conflict'
end
-- Check if there are any changes to queue
if #ARGV < 5 then
if #ARGV < changesIndex then
return 'no_changes_provided'
end
-- Store the changes
-- RPUSH changesKey change1 change2 ...
redis.call('RPUSH', changesKey, unpack(ARGV, 5, #ARGV))
redis.call('RPUSH', changesKey, unpack(ARGV, changesIndex, #ARGV))
-- Update head snapshot only if changes were successfully pushed
redis.call('SET', headSnapshotKey, head)
-- Update the head version
local numChanges = #ARGV - 4
local numChanges = #ARGV - changesIndex + 1
local newHeadVersion = baseVersion + numChanges
redis.call('SET', headVersionKey, newHeadVersion)
@@ -142,9 +153,14 @@ rclient.defineCommand('queue_changes', {
* @param {Snapshot} headSnapshot - The new head snapshot after applying changes.
* @param {number} baseVersion - The expected current head version.
* @param {Change[]} changes - An array of Change objects to queue.
* @param {number} persistTime - Timestamp (ms since epoch) when the oldest change in the buffer should be persisted.
* @param {number} expireTime - Timestamp (ms since epoch) when the project buffer should expire if inactive.
* @returns {Promise<void>} Resolves on success.
* @param {object} [opts]
* @param {number} [opts.persistTime] - Timestamp (ms since epoch) when the
* oldest change in the buffer should be persisted.
* @param {number} [opts.expireTime] - Timestamp (ms since epoch) when the
* project buffer should expire if inactive.
* @param {boolean} [opts.onlyIfExists] - If true, only queue changes if the
* project already exists in Redis, otherwise ignore.
* @returns {Promise<string>} Resolves on success to either 'ok' or 'ignore'.
* @throws {BaseVersionConflictError} If the baseVersion does not match the current head version in Redis.
* @throws {Error} If changes array is empty or if Redis operations fail.
*/
@@ -153,13 +169,16 @@ async function queueChanges(
headSnapshot,
baseVersion,
changes,
persistTime,
expireTime
opts = {}
) {
if (!changes || changes.length === 0) {
throw new Error('Cannot queue empty changes array')
}
const persistTime = opts.persistTime ?? Date.now() + MAX_PERSIST_DELAY_MS
const expireTime = opts.expireTime ?? Date.now() + PROJECT_TTL_MS
const onlyIfExists = Boolean(opts.onlyIfExists)
try {
const keys = [
keySchema.head({ projectId }),
@@ -174,13 +193,17 @@ async function queueChanges(
JSON.stringify(headSnapshot.toRaw()),
persistTime.toString(),
expireTime.toString(),
onlyIfExists.toString(), // Only queue changes if the snapshot already exists
...changes.map(change => JSON.stringify(change.toRaw())), // Serialize changes
]
const status = await rclient.queue_changes(keys, args)
metrics.inc('chunk_store.redis.queue_changes', 1, { status })
if (status === 'ok') {
return
return status
}
if (status === 'ignore') {
return status // skip changes when project does not exist and onlyIfExists is true
}
if (status === 'conflict') {
throw new BaseVersionConflictError('base version mismatch', {

View File

@@ -4,6 +4,7 @@
const _ = require('lodash')
const logger = require('@overleaf/logger')
const metrics = require('@overleaf/metrics')
const core = require('overleaf-editor-core')
const Chunk = core.Chunk
@@ -14,6 +15,7 @@ const chunkStore = require('./chunk_store')
const { BlobStore } = require('./blob_store')
const { InvalidChangeError } = require('./errors')
const { getContentHash } = require('./content_hash')
const redisBackend = require('./chunk_store/redis')
function countChangeBytes(change) {
// Note: This is not quite accurate, because the raw change may contain raw
@@ -179,7 +181,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
}
}
async function extendLastChunkIfPossible() {
async function loadLatestChunk() {
const latestChunk = await chunkStore.loadLatest(projectId)
currentChunk = latestChunk
@@ -192,9 +194,49 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
}
currentSnapshot = latestChunk.getSnapshot().clone()
const timer = new Timer()
currentSnapshot.applyAll(latestChunk.getChanges())
currentSnapshot.applyAll(currentChunk.getChanges())
}
async function queueChangesInRedis() {
const hollowSnapshot = currentSnapshot.clone()
// We're transforming a lazy snapshot to a hollow snapshot, so loadFiles()
// doesn't really need a blobStore, but its signature still requires it.
const blobStore = new BlobStore(projectId)
await hollowSnapshot.loadFiles('hollow', blobStore)
hollowSnapshot.applyAll(changesToPersist)
const baseVersion = currentChunk.getEndVersion()
await redisBackend.queueChanges(
projectId,
hollowSnapshot,
baseVersion,
changesToPersist,
{ onlyIfExists: true }
)
}
async function fakePersistRedisChanges() {
const nonPersistedChanges =
await redisBackend.getNonPersistedChanges(projectId)
if (
serializeChanges(nonPersistedChanges) ===
serializeChanges(changesToPersist)
) {
metrics.inc('persist_redis_changes_verification', 1, { status: 'match' })
} else {
logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis')
metrics.inc('persist_redis_changes_verification', 1, {
status: 'mismatch',
})
}
const baseVersion = currentChunk.getEndVersion()
const persistedVersion = baseVersion + nonPersistedChanges.length
await redisBackend.setPersistedVersion(projectId, persistedVersion)
}
async function extendLastChunkIfPossible() {
const timer = new Timer()
const changesPushed = await fillChunk(currentChunk, changesToPersist)
if (!changesPushed) {
return
@@ -245,6 +287,13 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
changesToPersist = oldChanges
const numberOfChangesToPersist = oldChanges.length
await loadLatestChunk()
try {
await queueChangesInRedis()
await fakePersistRedisChanges()
} catch (err) {
logger.error({ err }, 'Chunk buffer verification failed')
}
await extendLastChunkIfPossible()
await createNewChunksAsNeeded()
@@ -258,4 +307,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
}
}
/**
* @param {core.Change[]} changes
*/
function serializeChanges(changes) {
return JSON.stringify(changes.map(change => change.toRaw()))
}
module.exports = persistChanges

View File

@@ -86,8 +86,7 @@ describe('chunk buffer Redis backend', function () {
headSnapshot,
baseVersion,
[change],
persistTime,
expireTime
{ persistTime, expireTime }
)
// Get the state to verify the changes
@@ -126,8 +125,7 @@ describe('chunk buffer Redis backend', function () {
headSnapshot,
baseVersion,
[change],
persistTime,
expireTime
{ persistTime, expireTime }
)
// If we get here, the test should fail
expect.fail('Expected BaseVersionConflictError but no error was thrown')
@@ -157,8 +155,7 @@ describe('chunk buffer Redis backend', function () {
headSnapshot,
baseVersion,
[], // Empty changes array
persistTime,
expireTime
{ persistTime, expireTime }
)
// If we get here, the test should fail
expect.fail('Expected Error but no error was thrown')
@@ -191,8 +188,7 @@ describe('chunk buffer Redis backend', function () {
headSnapshot,
baseVersion,
[change1, change2, change3], // Multiple changes
persistTime,
expireTime
{ persistTime, expireTime }
)
// Get the state to verify the changes
@@ -226,8 +222,7 @@ describe('chunk buffer Redis backend', function () {
headSnapshot,
baseVersion,
[change],
laterPersistTime,
expireTime
{ persistTime: laterPersistTime, expireTime }
)
// Get the state to verify the first persist time was set
@@ -241,8 +236,10 @@ describe('chunk buffer Redis backend', function () {
newerHeadSnapshot,
baseVersion + 1, // Updated base version
[change],
earlierPersistTime, // Earlier time should replace the later one
expireTime
{
persistTime: earlierPersistTime, // Earlier time should replace the later one
expireTime,
}
)
// Get the state to verify the persist time was updated to the earlier time
@@ -256,14 +253,127 @@ describe('chunk buffer Redis backend', function () {
evenNewerHeadSnapshot,
baseVersion + 2, // Updated base version
[change],
laterPersistTime, // Later time should not replace the earlier one
expireTime
{
persistTime: laterPersistTime, // Later time should not replace the earlier one
expireTime,
}
)
// Get the state to verify the persist time remains at the earlier time
state = await redisBackend.getState(projectId)
expect(state.persistTime).to.equal(earlierPersistTime) // Should still be the earlier time
})
it('should ignore changes when onlyIfExists is true and project does not exist', async function () {
// Create base version
const baseVersion = 10
// Create a new head snapshot
const headSnapshot = new Snapshot()
// Create changes
const timestamp = new Date()
const change = new Change([], timestamp)
// Set times
const now = Date.now()
const persistTime = now + 30 * 1000
const expireTime = now + 60 * 60 * 1000
// Queue changes with onlyIfExists set to true
const result = await redisBackend.queueChanges(
projectId,
headSnapshot,
baseVersion,
[change],
{ persistTime, expireTime, onlyIfExists: true }
)
// Should return 'ignore' status
expect(result).to.equal('ignore')
// Get the state - should be empty/null
const state = await redisBackend.getState(projectId)
expect(state.headVersion).to.be.null
expect(state.headSnapshot).to.be.null
})
it('should queue changes when onlyIfExists is true and project exists', async function () {
// First create the project
const headSnapshot = new Snapshot()
const baseVersion = 10
const timestamp = new Date()
const change1 = new Change([], timestamp)
// Set times
const now = Date.now()
const persistTime = now + 30 * 1000
const expireTime = now + 60 * 60 * 1000
// Create the project first
await redisBackend.queueChanges(
projectId,
headSnapshot,
baseVersion,
[change1],
{ persistTime, expireTime }
)
// Now create another change with onlyIfExists set to true
const newerSnapshot = new Snapshot()
const change2 = new Change([], timestamp)
// Queue changes with onlyIfExists set to true
const result = await redisBackend.queueChanges(
projectId,
newerSnapshot,
baseVersion + 1, // Version should be 1 after the first change
[change2],
{ persistTime, expireTime, onlyIfExists: true }
)
// Should return 'ok' status
expect(result).to.equal('ok')
// Get the state to verify the changes were applied
const state = await redisBackend.getState(projectId)
expect(state.headVersion).to.equal(baseVersion + 2) // Should be 2 after both changes
expect(state.headSnapshot).to.deep.equal(newerSnapshot.toRaw())
})
it('should queue changes when onlyIfExists is false and project does not exist', async function () {
// Create base version
const baseVersion = 10
// Create a new head snapshot
const headSnapshot = new Snapshot()
// Create changes
const timestamp = new Date()
const change = new Change([], timestamp)
// Set times
const now = Date.now()
const persistTime = now + 30 * 1000
const expireTime = now + 60 * 60 * 1000
// Queue changes with onlyIfExists explicitly set to false
const result = await redisBackend.queueChanges(
projectId,
headSnapshot,
baseVersion,
[change],
{ persistTime, expireTime, onlyIfExists: false }
)
// Should return 'ok' status
expect(result).to.equal('ok')
// Get the state to verify the project was created
const state = await redisBackend.getState(projectId)
expect(state.headVersion).to.equal(baseVersion + 1)
expect(state.headSnapshot).to.deep.equal(headSnapshot.toRaw())
})
})
describe('getChangesSinceVersion', function () {
@@ -1011,18 +1121,15 @@ async function queueChanges(projectId, changes, opts = {}) {
const baseVersion = 0
const headSnapshot = new Snapshot()
// Set times
const now = Date.now()
const persistTime = opts.persistTime ?? now + 30 * 1000 // 30 seconds from now
const expireTime = opts.expireTime ?? now + 60 * 60 * 1000 // 1 hour from now
await redisBackend.queueChanges(
projectId,
headSnapshot,
baseVersion,
changes,
persistTime,
expireTime
{
persistTime: opts.persistTime,
expireTime: opts.expireTime,
}
)
}