diff --git a/services/project-history/app/js/RedisManager.js b/services/project-history/app/js/RedisManager.js index 2f3b55263e..ac3197daf2 100644 --- a/services/project-history/app/js/RedisManager.js +++ b/services/project-history/app/js/RedisManager.js @@ -6,20 +6,33 @@ import redis from '@overleaf/redis-wrapper' import metrics from '@overleaf/metrics' import OError from '@overleaf/o-error' -// maximum size taken from the redis queue, to prevent project history -// consuming unbounded amounts of memory -let RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024 +/** + * Maximum size taken from the redis queue, to prevent project history + * consuming unbounded amounts of memory + */ +export const RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024 -// maximum length of ops (insertion and deletions) to process in a single -// iteration -let MAX_UPDATE_OP_LENGTH = 1024 +/** + * Batch size when reading updates from Redis + */ +export const RAW_UPDATES_BATCH_SIZE = 50 -// warn if we exceed this raw update size, the final compressed updates we send -// could be smaller than this +/** + * Maximum length of ops (insertion and deletions) to process in a single + * iteration + */ +export const MAX_UPDATE_OP_LENGTH = 1024 + +/** + * Warn if we exceed this raw update size, the final compressed updates we + * send could be smaller than this + */ const WARN_RAW_UPDATE_SIZE = 1024 * 1024 -// maximum number of new docs to process in a single iteration -let MAX_NEW_DOC_CONTENT_COUNT = 32 +/** + * Maximum number of new docs to process in a single iteration + */ +export const MAX_NEW_DOC_CONTENT_COUNT = 32 const CACHE_TTL_IN_SECONDS = 3600 @@ -32,10 +45,54 @@ async function countUnprocessedUpdates(projectId) { return updates } -async function getOldestDocUpdates(projectId, batchSize) { +async function* getRawUpdates(projectId) { const key = Keys.projectHistoryOps({ project_id: projectId }) - const updates = await rclient.lrange(key, 0, batchSize - 1) - return updates + let start = 0 + while (true) { + const stop = start + RAW_UPDATES_BATCH_SIZE - 1 + const updates = await rclient.lrange(key, start, stop) + for (const update of updates) { + yield update + } + if (updates.length < RAW_UPDATES_BATCH_SIZE) { + return + } + start += RAW_UPDATES_BATCH_SIZE + } +} + +async function getRawUpdatesBatch(projectId, batchSize) { + const rawUpdates = [] + let totalRawUpdatesSize = 0 + let hasMore = false + for await (const rawUpdate of getRawUpdates(projectId)) { + totalRawUpdatesSize += rawUpdate.length + if ( + rawUpdates.length > 0 && + totalRawUpdatesSize > RAW_UPDATE_SIZE_THRESHOLD + ) { + hasMore = true + break + } + rawUpdates.push(rawUpdate) + if (rawUpdates.length >= batchSize) { + hasMore = true + break + } + } + metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1) + if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) { + const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length) + logger.warn( + { + projectId, + totalRawUpdatesSize, + rawUpdateSizes, + }, + 'large raw update size' + ) + } + return { rawUpdates, hasMore } } export function parseDocUpdates(jsonUpdates) { @@ -44,112 +101,53 @@ export function parseDocUpdates(jsonUpdates) { async function getUpdatesInBatches(projectId, batchSize, runner) { let moreBatches = true - while (moreBatches) { - let rawUpdates = await getOldestDocUpdates(projectId, batchSize) - - moreBatches = rawUpdates.length === batchSize - - if (rawUpdates.length === 0) { - return + const redisBatch = await getRawUpdatesBatch(projectId, batchSize) + if (redisBatch.rawUpdates.length === 0) { + break } + moreBatches = redisBatch.hasMore - // don't process any more batches if we are single stepping - if (batchSize === 1) { - moreBatches = false - } - - // consume the updates up to a maximum total number of bytes - // ensuring that at least one update will be processed (we may - // exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger - // than that). - let totalRawUpdatesSize = 0 - const updatesToProcess = [] - for (const rawUpdate of rawUpdates) { - const nextTotalSize = totalRawUpdatesSize + rawUpdate.length - if ( - updatesToProcess.length > 0 && - nextTotalSize > RAW_UPDATE_SIZE_THRESHOLD - ) { - // stop consuming updates if we have at least one and the - // next update would exceed the size threshold - break - } else { - updatesToProcess.push(rawUpdate) - totalRawUpdatesSize += rawUpdate.length - } - } - - // if we hit the size limit above, only process the updates up to that point - if (updatesToProcess.length < rawUpdates.length) { - moreBatches = true // process remaining raw updates in the next iteration - rawUpdates = updatesToProcess - } - - metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1) - if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) { - const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length) - logger.warn( - { projectId, totalRawUpdatesSize, rawUpdateSizes }, - 'large raw update size' - ) - } - - let updates - try { - updates = parseDocUpdates(rawUpdates) - } catch (error) { - throw OError.tag(error, 'failed to parse updates', { - projectId, - updates, - }) - } - - // consume the updates up to a maximum number of ops (insertions and deletions) + const rawUpdates = [] + const updates = [] let totalOpLength = 0 - let updatesToProcessCount = 0 let totalDocContentCount = 0 - for (const parsedUpdate of updates) { - if (parsedUpdate.resyncDocContent) { - totalDocContentCount++ - } - if (totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) { - break - } - const nextTotalOpLength = totalOpLength + (parsedUpdate?.op?.length || 1) - if ( - updatesToProcessCount > 0 && - nextTotalOpLength > MAX_UPDATE_OP_LENGTH - ) { - break - } else { - totalOpLength = nextTotalOpLength - updatesToProcessCount++ - } - } - - // if we hit the op limit above, only process the updates up to that point - if (updatesToProcessCount < updates.length) { - logger.debug( - { + for (const rawUpdate of redisBatch.rawUpdates) { + let update + try { + update = JSON.parse(rawUpdate) + } catch (error) { + throw OError.tag(error, 'failed to parse update', { projectId, - updatesToProcessCount, - updates_count: updates.length, - totalOpLength, - }, - 'restricting number of ops to be processed' - ) - moreBatches = true - // there is a 1:1 mapping between rawUpdates and updates - // which we need to preserve here to ensure we only - // delete the updates that are actually processed - rawUpdates = rawUpdates.slice(0, updatesToProcessCount) - updates = updates.slice(0, updatesToProcessCount) + update, + }) + } + + totalOpLength += update?.op?.length || 1 + if (update.resyncDocContent) { + totalDocContentCount += 1 + } + + if ( + updates.length > 0 && + (totalOpLength > MAX_UPDATE_OP_LENGTH || + totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) + ) { + moreBatches = true + break + } + + rawUpdates.push(rawUpdate) + updates.push(update) } - logger.debug({ projectId }, 'retrieved raw updates from redis') await runner(updates) await deleteAppliedDocUpdates(projectId, rawUpdates) + + if (batchSize === 1) { + // Special case for single stepping, don't process more batches + break + } } } @@ -324,23 +322,10 @@ async function clearCachedHistoryId(projectId) { await rclient.del(key) } -// for tests -export function setMaxUpdateOpLength(value) { - MAX_UPDATE_OP_LENGTH = value -} - -export function setRawUpdateSizeThreshold(value) { - RAW_UPDATE_SIZE_THRESHOLD = value -} - -export function setMaxNewDocContentCount(value) { - MAX_NEW_DOC_CONTENT_COUNT = value -} - // EXPORTS const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates) -const getOldestDocUpdatesCb = callbackify(getOldestDocUpdates) +const getRawUpdatesBatchCb = callbackify(getRawUpdatesBatch) const deleteAppliedDocUpdatesCb = callbackify(deleteAppliedDocUpdates) const destroyDocUpdatesQueueCb = callbackify(destroyDocUpdatesQueue) const getProjectIdsWithHistoryOpsCb = callbackify(getProjectIdsWithHistoryOps) @@ -378,7 +363,7 @@ const getUpdatesInBatchesCb = function ( export { countUnprocessedUpdatesCb as countUnprocessedUpdates, - getOldestDocUpdatesCb as getOldestDocUpdates, + getRawUpdatesBatchCb as getRawUpdatesBatch, deleteAppliedDocUpdatesCb as deleteAppliedDocUpdates, destroyDocUpdatesQueueCb as destroyDocUpdatesQueue, getUpdatesInBatchesCb as getUpdatesInBatches, @@ -396,7 +381,7 @@ export { export const promises = { countUnprocessedUpdates, - getOldestDocUpdates, + getRawUpdatesBatch, deleteAppliedDocUpdates, destroyDocUpdatesQueue, getUpdatesInBatches, diff --git a/services/project-history/app/js/UpdatesProcessor.js b/services/project-history/app/js/UpdatesProcessor.js index 6bd2fea21d..06c71de69d 100644 --- a/services/project-history/app/js/UpdatesProcessor.js +++ b/services/project-history/app/js/UpdatesProcessor.js @@ -29,38 +29,34 @@ export const REDIS_READ_BATCH_SIZE = 500 export const _mocks = {} export function getRawUpdates(projectId, batchSize, callback) { - RedisManager.getOldestDocUpdates( - projectId, - batchSize, - (error, rawUpdates) => { + RedisManager.getRawUpdatesBatch(projectId, batchSize, (error, batch) => { + if (error != null) { + return callback(OError.tag(error)) + } + + let updates + try { + updates = RedisManager.parseDocUpdates(batch.rawUpdates) + } catch (error) { + return callback(OError.tag(error)) + } + + _getHistoryId(projectId, updates, (error, historyId) => { if (error != null) { return callback(OError.tag(error)) } - - let updates - try { - updates = RedisManager.parseDocUpdates(rawUpdates) - } catch (error) { - return callback(OError.tag(error)) - } - - _getHistoryId(projectId, updates, (error, historyId) => { - if (error != null) { - return callback(OError.tag(error)) - } - HistoryStoreManager.getMostRecentChunk( - projectId, - historyId, - (error, chunk) => { - if (error != null) { - return callback(OError.tag(error)) - } - callback(null, { project_id: projectId, chunk, updates }) + HistoryStoreManager.getMostRecentChunk( + projectId, + historyId, + (error, chunk) => { + if (error != null) { + return callback(OError.tag(error)) } - ) - }) - } - ) + callback(null, { project_id: projectId, chunk, updates }) + } + ) + }) + }) } // Process all updates for a project, only check project-level information once diff --git a/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js b/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js index 7652f7935a..53f9378021 100644 --- a/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js +++ b/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js @@ -6,15 +6,7 @@ const MODULE_PATH = '../../../../app/js/RedisManager.js' describe('RedisManager', function () { beforeEach(async function () { - this.rclient = { - auth: sinon.stub(), - exec: sinon.stub().resolves(), - lrange: sinon.stub(), - lrem: sinon.stub(), - srem: sinon.stub(), - del: sinon.stub(), - } - this.rclient.multi = sinon.stub().returns(this.rclient) + this.rclient = new FakeRedis() this.RedisWrapper = { createClient: sinon.stub().returns(this.rclient), } @@ -44,10 +36,10 @@ describe('RedisManager', function () { '@overleaf/metrics': this.Metrics, }) - this.project_id = 'project-id-123' + this.projectId = 'project-id-123' this.batchSize = 100 - this.historyOpsKey = `Project:HistoryOps:{${this.project_id}}` - this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.project_id}}` + this.historyOpsKey = `Project:HistoryOps:{${this.projectId}}` + this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.projectId}}` this.updates = [ { v: 42, op: ['a', 'b', 'c', 'd'] }, @@ -60,32 +52,51 @@ describe('RedisManager', function () { ) }) - describe('getOldestDocUpdates', function () { - beforeEach(async function () { - this.rclient.lrange.resolves(this.rawUpdates) - this.batchSize = 3 - this.result = await this.RedisManager.promises.getOldestDocUpdates( - this.project_id, - this.batchSize + describe('getRawUpdatesBatch', function () { + it('gets a small number of updates in one batch', async function () { + const updates = makeUpdates(2) + const rawUpdates = makeRawUpdates(updates) + this.rclient.setList(this.historyOpsKey, rawUpdates) + const result = await this.RedisManager.promises.getRawUpdatesBatch( + this.projectId, + 100 ) + expect(result).to.deep.equal({ rawUpdates, hasMore: false }) }) - it('should read the updates from redis', function () { - this.rclient.lrange - .calledWith(this.historyOpsKey, 0, this.batchSize - 1) - .should.equal(true) + it('gets a larger number of updates in several batches', async function () { + const updates = makeUpdates( + this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2 + 12 + ) + const rawUpdates = makeRawUpdates(updates) + this.rclient.setList(this.historyOpsKey, rawUpdates) + const result = await this.RedisManager.promises.getRawUpdatesBatch( + this.projectId, + 5000 + ) + expect(result).to.deep.equal({ rawUpdates, hasMore: false }) }) - it('should call the callback with the unparsed ops', function () { - this.result.should.equal(this.rawUpdates) + it("doesn't return more than the number of updates requested", async function () { + const updates = makeUpdates(100) + const rawUpdates = makeRawUpdates(updates) + this.rclient.setList(this.historyOpsKey, rawUpdates) + const result = await this.RedisManager.promises.getRawUpdatesBatch( + this.projectId, + 75 + ) + expect(result).to.deep.equal({ + rawUpdates: rawUpdates.slice(0, 75), + hasMore: true, + }) }) }) describe('parseDocUpdates', function () { it('should return the parsed ops', function () { - this.RedisManager.parseDocUpdates(this.rawUpdates).should.deep.equal( - this.updates - ) + const updates = makeUpdates(12) + const rawUpdates = makeRawUpdates(updates) + this.RedisManager.parseDocUpdates(rawUpdates).should.deep.equal(updates) }) }) @@ -96,40 +107,26 @@ describe('RedisManager', function () { describe('single batch smaller than batch size', function () { beforeEach(async function () { - this.rclient.lrange.resolves(this.rawUpdates) - this.batchSize = 3 + this.updates = makeUpdates(2) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - this.batchSize, + this.projectId, + 3, this.runner ) }) - it('requests a single batch of updates', function () { - this.rclient.lrange.should.have.been.calledOnce - this.rclient.lrange.should.have.been.calledWith( - this.historyOpsKey, - 0, - this.batchSize - 1 - ) - }) - it('calls the runner once', function () { this.runner.callCount.should.equal(1) }) it('calls the runner with the updates', function () { - this.runner.calledWith(this.updates).should.equal(true) + this.runner.should.have.been.calledWith(this.updates) }) it('deletes the applied updates', function () { - for (const update of this.rawUpdates) { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - update - ) - } + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) it('deletes the first op timestamp', function () { @@ -141,35 +138,26 @@ describe('RedisManager', function () { describe('single batch at batch size', function () { beforeEach(async function () { - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves([]) + this.updates = makeUpdates(123) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 123, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner once', function () { this.runner.callCount.should.equal(1) }) it('calls the runner with the updates', function () { - this.runner.calledWith(this.updates).should.equal(true) + this.runner.should.have.been.calledWith(this.updates) }) it('deletes the applied updates', function () { - for (const update of this.rawUpdates) { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - update - ) - } + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) it('deletes the first op timestamp', function () { @@ -181,215 +169,160 @@ describe('RedisManager', function () { describe('single batch exceeding size limit on updates', function () { beforeEach(async function () { - // set the threshold below the size of the first update - this.RedisManager.setRawUpdateSizeThreshold( - this.rawUpdates[0].length - 1 - ) - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) - + this.updates = makeUpdates(2, [ + 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD), + ]) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 123, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) it('calls the runner with the first update', function () { - this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) - }) - - it('deletes the first update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[0] - ) + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, 1)) }) it('calls the runner with the second update', function () { - this.runner.should.have.been.calledWith(this.updates.slice(1)) + this.runner + .getCall(1) + .should.have.been.calledWith(this.updates.slice(1)) }) - it('deletes the second set of applied updates', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[1] - ) + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) }) describe('two batches with first update below and second update above the size limit on updates', function () { beforeEach(async function () { - // set the threshold above the size of the first update, but below the total size - this.RedisManager.setRawUpdateSizeThreshold( - this.rawUpdates[0].length + 1 - ) - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) + this.updates = makeUpdates(2, [ + 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD / 2), + ]) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 123, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) it('calls the runner with the first update', function () { - this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true) - }) - - it('deletes the first set applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[0] - ) + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, 1)) }) it('calls the runner with the second update', function () { - this.runner.calledWith(this.updates.slice(1)).should.equal(true) + this.runner + .getCall(1) + .should.have.been.calledWith(this.updates.slice(1)) }) - it('deletes the second applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[1] - ) + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) }) describe('single batch exceeding op count limit on updates', function () { beforeEach(async function () { - // set the threshold below the size of the first update - this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length - 1) - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) - + const ops = Array(this.RedisManager.MAX_UPDATE_OP_LENGTH + 1).fill('op') + this.updates = makeUpdates(2, { op: ops }) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 123, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the first updates', function () { - this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true) - }) - - it('deletes the first applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[0] - ) - }) - - it('calls the runner with the second updates', function () { - this.runner.calledWith(this.updates.slice(1)).should.equal(true) - }) - - it('deletes the second applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[1] - ) - }) - }) - - describe('single batch exceeding doc content count', function () { - beforeEach(async function () { - this.updates = [{ resyncDocContent: 123 }, { resyncDocContent: 456 }] - this.rawUpdates = this.updates.map(update => JSON.stringify(update)) - // set the threshold below the size of the first update - this.RedisManager.setMaxNewDocContentCount(this.updates.length - 1) - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) - - await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, - this.runner - ) - }) - - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) it('calls the runner with the first update', function () { - this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) - }) - - it('deletes the first applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[0] - ) + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, 1)) }) it('calls the runner with the second update', function () { - this.runner.should.have.been.calledWith(this.updates.slice(1)) + this.runner + .getCall(1) + .should.have.been.calledWith(this.updates.slice(1)) }) - it('deletes the second set of applied updates', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[1] + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + }) + }) + + describe('single batch exceeding doc content count', function () { + beforeEach(async function () { + this.updates = makeUpdates( + this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT + 3, + { resyncDocContent: 123 } ) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) + await this.RedisManager.promises.getUpdatesInBatches( + this.projectId, + 123, + this.runner + ) + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the first batch of updates', function () { + this.runner.should.have.been.calledWith( + this.updates.slice(0, this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT) + ) + }) + + it('calls the runner with the second batch of updates', function () { + this.runner.should.have.been.calledWith( + this.updates.slice(this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT) + ) + }) + + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) }) describe('two batches with first update below and second update above the ops length limit on updates', function () { beforeEach(async function () { // set the threshold below the size of the first update - this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length + 1) - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) - + this.updates = makeUpdates(2, { op: ['op1', 'op2'] }) + this.updates[1].op = Array( + this.RedisManager.MAX_UPDATE_OP_LENGTH + 2 + ).fill('op') + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 123, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) @@ -398,79 +331,120 @@ describe('RedisManager', function () { this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) }) - it('deletes the first applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[0] - ) - }) - it('calls the runner with the second update', function () { this.runner.should.have.been.calledWith(this.updates.slice(1)) }) - it('deletes the second applied update', function () { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - this.rawUpdates[1] - ) + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) }) - describe('two batches', function () { + describe('two batches, one partial', function () { beforeEach(async function () { - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).resolves(this.extraRawUpdates) + this.updates = makeUpdates(15) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) await this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + 10, this.runner ) }) - it('requests a second batch of updates', function () { - this.rclient.lrange.should.have.been.calledTwice - }) - it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) it('calls the runner with the updates', function () { - this.runner.should.have.been.calledWith(this.updates) - this.runner.should.have.been.calledWith(this.extraUpdates) + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, 10)) + this.runner + .getCall(1) + .should.have.been.calledWith(this.updates.slice(10)) }) - it('deletes the first set of applied updates', function () { - for (const update of this.rawUpdates) { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - update - ) - } + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + }) + }) + + describe('two full batches', function () { + beforeEach(async function () { + this.updates = makeUpdates(20) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) + await this.RedisManager.promises.getUpdatesInBatches( + this.projectId, + 10, + this.runner + ) }) - it('deletes the second set of applied updates', function () { - for (const update of this.extraRawUpdates) { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - update + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the updates', function () { + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, 10)) + this.runner + .getCall(1) + .should.have.been.calledWith(this.updates.slice(10)) + }) + + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + }) + }) + + describe('three full bathches, bigger than the Redis read batch size', function () { + beforeEach(async function () { + this.batchSize = this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2 + this.updates = makeUpdates(this.batchSize * 3) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) + await this.RedisManager.promises.getUpdatesInBatches( + this.projectId, + this.batchSize, + this.runner + ) + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(3) + }) + + it('calls the runner with the updates', function () { + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, this.batchSize)) + this.runner + .getCall(1) + .should.have.been.calledWith( + this.updates.slice(this.batchSize, this.batchSize * 2) ) - } + this.runner + .getCall(2) + .should.have.been.calledWith(this.updates.slice(this.batchSize * 2)) + }) + + it('deletes the applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) }) }) describe('error when first reading updates', function () { beforeEach(async function () { - this.error = new Error('error') - this.rclient.lrange.rejects(this.error) + this.updates = makeUpdates(10) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) + this.rclient.throwErrorOnLrangeCall(0) await expect( this.RedisManager.promises.getUpdatesInBatches( - this.project_id, + this.projectId, 2, this.runner ) @@ -478,38 +452,105 @@ describe('RedisManager', function () { }) it('does not delete any updates', function () { - expect(this.rclient.lrem).not.to.have.been.called + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal( + this.rawUpdates + ) }) }) describe('error when reading updates for a second batch', function () { beforeEach(async function () { - this.error = new Error('error') - this.rclient.lrange.onCall(0).resolves(this.rawUpdates) - this.rclient.lrange.onCall(1).rejects(this.error) - + this.batchSize = this.RedisManager.RAW_UPDATES_BATCH_SIZE - 1 + this.updates = makeUpdates(this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2) + this.rawUpdates = makeRawUpdates(this.updates) + this.rclient.setList(this.historyOpsKey, this.rawUpdates) + this.rclient.throwErrorOnLrangeCall(1) await expect( this.RedisManager.promises.getUpdatesInBatches( - this.project_id, - 2, + this.projectId, + this.batchSize, this.runner ) ).to.be.rejected }) - it('deletes the first set of applied updates', function () { - for (const update of this.rawUpdates) { - expect(this.rclient.lrem).to.have.been.calledWith( - this.historyOpsKey, - 1, - update - ) - } + it('calls the runner with the first batch of updates', function () { + this.runner.should.have.been.calledOnce + this.runner + .getCall(0) + .should.have.been.calledWith(this.updates.slice(0, this.batchSize)) }) - it('deletes applied updates only once', function () { - expect(this.rclient.lrem.callCount).to.equal(this.rawUpdates.length) + it('deletes only the first batch of applied updates', function () { + expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal( + this.rawUpdates.slice(this.batchSize) + ) }) }) }) }) + +class FakeRedis { + constructor() { + this.data = new Map() + this.del = sinon.stub() + this.lrangeCallCount = -1 + } + + setList(key, list) { + this.data.set(key, list) + } + + getList(key) { + return this.data.get(key) + } + + throwErrorOnLrangeCall(callNum) { + this.lrangeCallThrowingError = callNum + } + + async lrange(key, start, stop) { + this.lrangeCallCount += 1 + if ( + this.lrangeCallThrowingError != null && + this.lrangeCallThrowingError === this.lrangeCallCount + ) { + throw new Error('LRANGE failed!') + } + const list = this.data.get(key) ?? [] + return list.slice(start, stop + 1) + } + + async lrem(key, count, elementToRemove) { + expect(count).to.be.greaterThan(0) + const original = this.data.get(key) ?? [] + const filtered = original.filter(element => { + if (count > 0 && element === elementToRemove) { + count-- + return false + } + return true + }) + this.data.set(key, filtered) + } + + async exec() { + // Nothing to do + } + + multi() { + return this + } +} + +function makeUpdates(updateCount, extraFields = {}) { + const updates = [] + for (let i = 0; i < updateCount; i++) { + updates.push({ v: i, ...extraFields }) + } + return updates +} + +function makeRawUpdates(updates) { + return updates.map(JSON.stringify) +}