diff --git a/services/history-v1/storage/lib/chunk_store/redis.js b/services/history-v1/storage/lib/chunk_store/redis.js index 5c62db5387..58ee0d15a7 100644 --- a/services/history-v1/storage/lib/chunk_store/redis.js +++ b/services/history-v1/storage/lib/chunk_store/redis.js @@ -1,3 +1,5 @@ +// @ts-check + const metrics = require('@overleaf/metrics') const OError = require('@overleaf/o-error') const redis = require('../redis') @@ -97,27 +99,36 @@ rclient.defineCommand('queue_changes', { local head = ARGV[2] local persistTime = tonumber(ARGV[3]) local expireTime = tonumber(ARGV[4]) - -- Changes start from ARGV[5] + local onlyIfExists = ARGV[5] + local changesIndex = 6 -- Changes start here local headVersion = tonumber(redis.call('GET', headVersionKey)) + + -- Check if updates should only be queued if the project already exists (used for gradual rollouts) + if not headVersion and onlyIfExists == 'true' then + return 'ignore' + end + + -- Check that the supplied baseVersion matches the head version + -- If headVersion is nil, it means the project does not exist yet and will be created. if headVersion and headVersion ~= baseVersion then return 'conflict' end -- Check if there are any changes to queue - if #ARGV < 5 then + if #ARGV < changesIndex then return 'no_changes_provided' end -- Store the changes -- RPUSH changesKey change1 change2 ... - redis.call('RPUSH', changesKey, unpack(ARGV, 5, #ARGV)) + redis.call('RPUSH', changesKey, unpack(ARGV, changesIndex, #ARGV)) -- Update head snapshot only if changes were successfully pushed redis.call('SET', headSnapshotKey, head) -- Update the head version - local numChanges = #ARGV - 4 + local numChanges = #ARGV - changesIndex + 1 local newHeadVersion = baseVersion + numChanges redis.call('SET', headVersionKey, newHeadVersion) @@ -142,9 +153,14 @@ rclient.defineCommand('queue_changes', { * @param {Snapshot} headSnapshot - The new head snapshot after applying changes. * @param {number} baseVersion - The expected current head version. * @param {Change[]} changes - An array of Change objects to queue. - * @param {number} persistTime - Timestamp (ms since epoch) when the oldest change in the buffer should be persisted. - * @param {number} expireTime - Timestamp (ms since epoch) when the project buffer should expire if inactive. - * @returns {Promise} Resolves on success. + * @param {object} [opts] + * @param {number} [opts.persistTime] - Timestamp (ms since epoch) when the + * oldest change in the buffer should be persisted. + * @param {number} [opts.expireTime] - Timestamp (ms since epoch) when the + * project buffer should expire if inactive. + * @param {boolean} [opts.onlyIfExists] - If true, only queue changes if the + * project already exists in Redis, otherwise ignore. + * @returns {Promise} Resolves on success to either 'ok' or 'ignore'. * @throws {BaseVersionConflictError} If the baseVersion does not match the current head version in Redis. * @throws {Error} If changes array is empty or if Redis operations fail. */ @@ -153,13 +169,16 @@ async function queueChanges( headSnapshot, baseVersion, changes, - persistTime, - expireTime + opts = {} ) { if (!changes || changes.length === 0) { throw new Error('Cannot queue empty changes array') } + const persistTime = opts.persistTime ?? Date.now() + MAX_PERSIST_DELAY_MS + const expireTime = opts.expireTime ?? Date.now() + PROJECT_TTL_MS + const onlyIfExists = Boolean(opts.onlyIfExists) + try { const keys = [ keySchema.head({ projectId }), @@ -174,13 +193,17 @@ async function queueChanges( JSON.stringify(headSnapshot.toRaw()), persistTime.toString(), expireTime.toString(), + onlyIfExists.toString(), // Only queue changes if the snapshot already exists ...changes.map(change => JSON.stringify(change.toRaw())), // Serialize changes ] const status = await rclient.queue_changes(keys, args) metrics.inc('chunk_store.redis.queue_changes', 1, { status }) if (status === 'ok') { - return + return status + } + if (status === 'ignore') { + return status // skip changes when project does not exist and onlyIfExists is true } if (status === 'conflict') { throw new BaseVersionConflictError('base version mismatch', { diff --git a/services/history-v1/storage/lib/persist_changes.js b/services/history-v1/storage/lib/persist_changes.js index 8a848aa214..4d832f4541 100644 --- a/services/history-v1/storage/lib/persist_changes.js +++ b/services/history-v1/storage/lib/persist_changes.js @@ -4,6 +4,7 @@ const _ = require('lodash') const logger = require('@overleaf/logger') +const metrics = require('@overleaf/metrics') const core = require('overleaf-editor-core') const Chunk = core.Chunk @@ -14,6 +15,7 @@ const chunkStore = require('./chunk_store') const { BlobStore } = require('./blob_store') const { InvalidChangeError } = require('./errors') const { getContentHash } = require('./content_hash') +const redisBackend = require('./chunk_store/redis') function countChangeBytes(change) { // Note: This is not quite accurate, because the raw change may contain raw @@ -179,7 +181,7 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { } } - async function extendLastChunkIfPossible() { + async function loadLatestChunk() { const latestChunk = await chunkStore.loadLatest(projectId) currentChunk = latestChunk @@ -192,9 +194,49 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { } currentSnapshot = latestChunk.getSnapshot().clone() - const timer = new Timer() - currentSnapshot.applyAll(latestChunk.getChanges()) + currentSnapshot.applyAll(currentChunk.getChanges()) + } + async function queueChangesInRedis() { + const hollowSnapshot = currentSnapshot.clone() + // We're transforming a lazy snapshot to a hollow snapshot, so loadFiles() + // doesn't really need a blobStore, but its signature still requires it. + const blobStore = new BlobStore(projectId) + await hollowSnapshot.loadFiles('hollow', blobStore) + hollowSnapshot.applyAll(changesToPersist) + const baseVersion = currentChunk.getEndVersion() + await redisBackend.queueChanges( + projectId, + hollowSnapshot, + baseVersion, + changesToPersist, + { onlyIfExists: true } + ) + } + + async function fakePersistRedisChanges() { + const nonPersistedChanges = + await redisBackend.getNonPersistedChanges(projectId) + + if ( + serializeChanges(nonPersistedChanges) === + serializeChanges(changesToPersist) + ) { + metrics.inc('persist_redis_changes_verification', 1, { status: 'match' }) + } else { + logger.warn({ projectId }, 'mismatch of non-persisted changes from Redis') + metrics.inc('persist_redis_changes_verification', 1, { + status: 'mismatch', + }) + } + + const baseVersion = currentChunk.getEndVersion() + const persistedVersion = baseVersion + nonPersistedChanges.length + await redisBackend.setPersistedVersion(projectId, persistedVersion) + } + + async function extendLastChunkIfPossible() { + const timer = new Timer() const changesPushed = await fillChunk(currentChunk, changesToPersist) if (!changesPushed) { return @@ -245,6 +287,13 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { changesToPersist = oldChanges const numberOfChangesToPersist = oldChanges.length + await loadLatestChunk() + try { + await queueChangesInRedis() + await fakePersistRedisChanges() + } catch (err) { + logger.error({ err }, 'Chunk buffer verification failed') + } await extendLastChunkIfPossible() await createNewChunksAsNeeded() @@ -258,4 +307,11 @@ async function persistChanges(projectId, allChanges, limits, clientEndVersion) { } } +/** + * @param {core.Change[]} changes + */ +function serializeChanges(changes) { + return JSON.stringify(changes.map(change => change.toRaw())) +} + module.exports = persistChanges diff --git a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js index 514ae617cf..91b7e3a5f4 100644 --- a/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js +++ b/services/history-v1/test/acceptance/js/storage/chunk_store_redis_backend.test.js @@ -86,8 +86,7 @@ describe('chunk buffer Redis backend', function () { headSnapshot, baseVersion, [change], - persistTime, - expireTime + { persistTime, expireTime } ) // Get the state to verify the changes @@ -126,8 +125,7 @@ describe('chunk buffer Redis backend', function () { headSnapshot, baseVersion, [change], - persistTime, - expireTime + { persistTime, expireTime } ) // If we get here, the test should fail expect.fail('Expected BaseVersionConflictError but no error was thrown') @@ -157,8 +155,7 @@ describe('chunk buffer Redis backend', function () { headSnapshot, baseVersion, [], // Empty changes array - persistTime, - expireTime + { persistTime, expireTime } ) // If we get here, the test should fail expect.fail('Expected Error but no error was thrown') @@ -191,8 +188,7 @@ describe('chunk buffer Redis backend', function () { headSnapshot, baseVersion, [change1, change2, change3], // Multiple changes - persistTime, - expireTime + { persistTime, expireTime } ) // Get the state to verify the changes @@ -226,8 +222,7 @@ describe('chunk buffer Redis backend', function () { headSnapshot, baseVersion, [change], - laterPersistTime, - expireTime + { persistTime: laterPersistTime, expireTime } ) // Get the state to verify the first persist time was set @@ -241,8 +236,10 @@ describe('chunk buffer Redis backend', function () { newerHeadSnapshot, baseVersion + 1, // Updated base version [change], - earlierPersistTime, // Earlier time should replace the later one - expireTime + { + persistTime: earlierPersistTime, // Earlier time should replace the later one + expireTime, + } ) // Get the state to verify the persist time was updated to the earlier time @@ -256,14 +253,127 @@ describe('chunk buffer Redis backend', function () { evenNewerHeadSnapshot, baseVersion + 2, // Updated base version [change], - laterPersistTime, // Later time should not replace the earlier one - expireTime + { + persistTime: laterPersistTime, // Later time should not replace the earlier one + expireTime, + } ) // Get the state to verify the persist time remains at the earlier time state = await redisBackend.getState(projectId) expect(state.persistTime).to.equal(earlierPersistTime) // Should still be the earlier time }) + + it('should ignore changes when onlyIfExists is true and project does not exist', async function () { + // Create base version + const baseVersion = 10 + + // Create a new head snapshot + const headSnapshot = new Snapshot() + + // Create changes + const timestamp = new Date() + const change = new Change([], timestamp) + + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Queue changes with onlyIfExists set to true + const result = await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change], + { persistTime, expireTime, onlyIfExists: true } + ) + + // Should return 'ignore' status + expect(result).to.equal('ignore') + + // Get the state - should be empty/null + const state = await redisBackend.getState(projectId) + expect(state.headVersion).to.be.null + expect(state.headSnapshot).to.be.null + }) + + it('should queue changes when onlyIfExists is true and project exists', async function () { + // First create the project + const headSnapshot = new Snapshot() + const baseVersion = 10 + const timestamp = new Date() + const change1 = new Change([], timestamp) + + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Create the project first + await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change1], + { persistTime, expireTime } + ) + + // Now create another change with onlyIfExists set to true + const newerSnapshot = new Snapshot() + const change2 = new Change([], timestamp) + + // Queue changes with onlyIfExists set to true + const result = await redisBackend.queueChanges( + projectId, + newerSnapshot, + baseVersion + 1, // Version should be 1 after the first change + [change2], + { persistTime, expireTime, onlyIfExists: true } + ) + + // Should return 'ok' status + expect(result).to.equal('ok') + + // Get the state to verify the changes were applied + const state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(baseVersion + 2) // Should be 2 after both changes + expect(state.headSnapshot).to.deep.equal(newerSnapshot.toRaw()) + }) + + it('should queue changes when onlyIfExists is false and project does not exist', async function () { + // Create base version + const baseVersion = 10 + + // Create a new head snapshot + const headSnapshot = new Snapshot() + + // Create changes + const timestamp = new Date() + const change = new Change([], timestamp) + + // Set times + const now = Date.now() + const persistTime = now + 30 * 1000 + const expireTime = now + 60 * 60 * 1000 + + // Queue changes with onlyIfExists explicitly set to false + const result = await redisBackend.queueChanges( + projectId, + headSnapshot, + baseVersion, + [change], + { persistTime, expireTime, onlyIfExists: false } + ) + + // Should return 'ok' status + expect(result).to.equal('ok') + + // Get the state to verify the project was created + const state = await redisBackend.getState(projectId) + expect(state.headVersion).to.equal(baseVersion + 1) + expect(state.headSnapshot).to.deep.equal(headSnapshot.toRaw()) + }) }) describe('getChangesSinceVersion', function () { @@ -1011,18 +1121,15 @@ async function queueChanges(projectId, changes, opts = {}) { const baseVersion = 0 const headSnapshot = new Snapshot() - // Set times - const now = Date.now() - const persistTime = opts.persistTime ?? now + 30 * 1000 // 30 seconds from now - const expireTime = opts.expireTime ?? now + 60 * 60 * 1000 // 1 hour from now - await redisBackend.queueChanges( projectId, headSnapshot, baseVersion, changes, - persistTime, - expireTime + { + persistTime: opts.persistTime, + expireTime: opts.expireTime, + } ) }