From f6bfc14a79bc2d4aabea473f89313e2000932e57 Mon Sep 17 00:00:00 2001 From: Eric Mc Sween Date: Mon, 20 Feb 2023 12:28:37 -0500 Subject: [PATCH] Merge pull request #11913 from overleaf/em-revert-batch-redis-reads Revert batched Redis reads GitOrigin-RevId: 4f71dcb7a7e7ae92046ab7edef0930c0358da945 --- .../project-history/app/js/RedisManager.js | 235 ++++--- .../unit/js/RedisManager/RedisManagerTests.js | 658 +++++++++--------- 2 files changed, 450 insertions(+), 443 deletions(-) diff --git a/services/project-history/app/js/RedisManager.js b/services/project-history/app/js/RedisManager.js index a09d4bbe3e..2f3b55263e 100644 --- a/services/project-history/app/js/RedisManager.js +++ b/services/project-history/app/js/RedisManager.js @@ -6,33 +6,20 @@ import redis from '@overleaf/redis-wrapper' import metrics from '@overleaf/metrics' import OError from '@overleaf/o-error' -/** - * Maximum size taken from the redis queue, to prevent project history - * consuming unbounded amounts of memory - */ -export const RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024 +// maximum size taken from the redis queue, to prevent project history +// consuming unbounded amounts of memory +let RAW_UPDATE_SIZE_THRESHOLD = 4 * 1024 * 1024 -/** - * Batch size when reading updates from Redis - */ -export const RAW_UPDATES_BATCH_SIZE = 50 +// maximum length of ops (insertion and deletions) to process in a single +// iteration +let MAX_UPDATE_OP_LENGTH = 1024 -/** - * Maximum length of ops (insertion and deletions) to process in a single - * iteration - */ -export const MAX_UPDATE_OP_LENGTH = 1024 - -/** - * Warn if we exceed this raw update size, the final compressed updates we - * send could be smaller than this - */ +// warn if we exceed this raw update size, the final compressed updates we send +// could be smaller than this const WARN_RAW_UPDATE_SIZE = 1024 * 1024 -/** - * Maximum number of new docs to process in a single iteration - */ -export const MAX_NEW_DOC_CONTENT_COUNT = 32 +// maximum number of new docs to process in a single iteration +let MAX_NEW_DOC_CONTENT_COUNT = 32 const CACHE_TTL_IN_SECONDS = 3600 @@ -45,31 +32,10 @@ async function countUnprocessedUpdates(projectId) { return updates } -async function* getRawUpdates(projectId) { +async function getOldestDocUpdates(projectId, batchSize) { const key = Keys.projectHistoryOps({ project_id: projectId }) - let start = 0 - while (true) { - const stop = start + RAW_UPDATES_BATCH_SIZE - 1 - const updates = await rclient.lrange(key, start, stop) - for (const update of updates) { - yield update - } - if (updates.length < RAW_UPDATES_BATCH_SIZE) { - return - } - start += RAW_UPDATES_BATCH_SIZE - } -} - -async function getOldestDocUpdates(projectId, maxUpdates) { - const rawUpdates = [] - for await (const rawUpdate of getRawUpdates(projectId)) { - rawUpdates.push(rawUpdate) - if (rawUpdates.length >= maxUpdates) { - break - } - } - return rawUpdates + const updates = await rclient.lrange(key, 0, batchSize - 1) + return updates } export function parseDocUpdates(jsonUpdates) { @@ -77,83 +43,113 @@ export function parseDocUpdates(jsonUpdates) { } async function getUpdatesInBatches(projectId, batchSize, runner) { - let currentBatch = new Batch(projectId, batchSize) - for await (const rawUpdate of getRawUpdates(projectId)) { - let update - try { - update = JSON.parse(rawUpdate) - } catch (error) { - throw OError.tag(error, 'failed to parse updates', { - projectId, - update, - }) + let moreBatches = true + + while (moreBatches) { + let rawUpdates = await getOldestDocUpdates(projectId, batchSize) + + moreBatches = rawUpdates.length === batchSize + + if (rawUpdates.length === 0) { + return } - const fitsInCurrentBatch = currentBatch.add(rawUpdate, update) - if (!fitsInCurrentBatch) { - const nextBatch = new Batch(projectId, batchSize) - nextBatch.add(rawUpdate, update) - await currentBatch.process(runner) - currentBatch = nextBatch + // don't process any more batches if we are single stepping + if (batchSize === 1) { + moreBatches = false } - } - if (!currentBatch.isEmpty()) { - await currentBatch.process(runner) - } -} -class Batch { - constructor(projectId, maxUpdates) { - this.projectId = projectId - this.maxUpdates = maxUpdates - this.rawUpdates = [] - this.updates = [] - this.totalRawUpdatesSize = 0 - this.totalDocContentCount = 0 - this.totalOpLength = 0 - } - - add(rawUpdate, update) { - const rawUpdateSize = rawUpdate.length - const docContentCount = update.resyncDocContent ? 1 : 0 - const opLength = update?.op?.length || 1 - if ( - this.updates.length > 0 && - (this.updates.length >= this.maxUpdates || - this.totalRawUpdatesSize + rawUpdateSize > RAW_UPDATE_SIZE_THRESHOLD || - this.totalDocContentCount + docContentCount > - MAX_NEW_DOC_CONTENT_COUNT || - this.totalOpLength + opLength > MAX_UPDATE_OP_LENGTH) - ) { - return false + // consume the updates up to a maximum total number of bytes + // ensuring that at least one update will be processed (we may + // exceed RAW_UPDATE_SIZE_THRESHOLD is the first update is bigger + // than that). + let totalRawUpdatesSize = 0 + const updatesToProcess = [] + for (const rawUpdate of rawUpdates) { + const nextTotalSize = totalRawUpdatesSize + rawUpdate.length + if ( + updatesToProcess.length > 0 && + nextTotalSize > RAW_UPDATE_SIZE_THRESHOLD + ) { + // stop consuming updates if we have at least one and the + // next update would exceed the size threshold + break + } else { + updatesToProcess.push(rawUpdate) + totalRawUpdatesSize += rawUpdate.length + } } - this.rawUpdates.push(rawUpdate) - this.updates.push(update) - this.totalRawUpdatesSize += rawUpdateSize - this.totalDocContentCount += docContentCount - this.totalOpLength += opLength - return true - } - isEmpty() { - return this.updates.length === 0 - } + // if we hit the size limit above, only process the updates up to that point + if (updatesToProcess.length < rawUpdates.length) { + moreBatches = true // process remaining raw updates in the next iteration + rawUpdates = updatesToProcess + } - async process(runner) { - metrics.timing('redis.incoming.bytes', this.totalRawUpdatesSize, 1) - if (this.totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) { - const rawUpdateSizes = this.rawUpdates.map(rawUpdate => rawUpdate.length) + metrics.timing('redis.incoming.bytes', totalRawUpdatesSize, 1) + if (totalRawUpdatesSize > WARN_RAW_UPDATE_SIZE) { + const rawUpdateSizes = rawUpdates.map(rawUpdate => rawUpdate.length) logger.warn( - { - projectId: this.projectId, - totalRawUpdatesSize: this.totalRawUpdatesSize, - rawUpdateSizes, - }, + { projectId, totalRawUpdatesSize, rawUpdateSizes }, 'large raw update size' ) } - await runner(this.updates) - await deleteAppliedDocUpdates(this.projectId, this.rawUpdates) + + let updates + try { + updates = parseDocUpdates(rawUpdates) + } catch (error) { + throw OError.tag(error, 'failed to parse updates', { + projectId, + updates, + }) + } + + // consume the updates up to a maximum number of ops (insertions and deletions) + let totalOpLength = 0 + let updatesToProcessCount = 0 + let totalDocContentCount = 0 + for (const parsedUpdate of updates) { + if (parsedUpdate.resyncDocContent) { + totalDocContentCount++ + } + if (totalDocContentCount > MAX_NEW_DOC_CONTENT_COUNT) { + break + } + const nextTotalOpLength = totalOpLength + (parsedUpdate?.op?.length || 1) + if ( + updatesToProcessCount > 0 && + nextTotalOpLength > MAX_UPDATE_OP_LENGTH + ) { + break + } else { + totalOpLength = nextTotalOpLength + updatesToProcessCount++ + } + } + + // if we hit the op limit above, only process the updates up to that point + if (updatesToProcessCount < updates.length) { + logger.debug( + { + projectId, + updatesToProcessCount, + updates_count: updates.length, + totalOpLength, + }, + 'restricting number of ops to be processed' + ) + moreBatches = true + // there is a 1:1 mapping between rawUpdates and updates + // which we need to preserve here to ensure we only + // delete the updates that are actually processed + rawUpdates = rawUpdates.slice(0, updatesToProcessCount) + updates = updates.slice(0, updatesToProcessCount) + } + + logger.debug({ projectId }, 'retrieved raw updates from redis') + await runner(updates) + await deleteAppliedDocUpdates(projectId, rawUpdates) } } @@ -328,6 +324,19 @@ async function clearCachedHistoryId(projectId) { await rclient.del(key) } +// for tests +export function setMaxUpdateOpLength(value) { + MAX_UPDATE_OP_LENGTH = value +} + +export function setRawUpdateSizeThreshold(value) { + RAW_UPDATE_SIZE_THRESHOLD = value +} + +export function setMaxNewDocContentCount(value) { + MAX_NEW_DOC_CONTENT_COUNT = value +} + // EXPORTS const countUnprocessedUpdatesCb = callbackify(countUnprocessedUpdates) diff --git a/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js b/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js index afebb63ce2..7652f7935a 100644 --- a/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js +++ b/services/project-history/test/unit/js/RedisManager/RedisManagerTests.js @@ -6,7 +6,15 @@ const MODULE_PATH = '../../../../app/js/RedisManager.js' describe('RedisManager', function () { beforeEach(async function () { - this.rclient = new FakeRedis() + this.rclient = { + auth: sinon.stub(), + exec: sinon.stub().resolves(), + lrange: sinon.stub(), + lrem: sinon.stub(), + srem: sinon.stub(), + del: sinon.stub(), + } + this.rclient.multi = sinon.stub().returns(this.rclient) this.RedisWrapper = { createClient: sinon.stub().returns(this.rclient), } @@ -36,10 +44,10 @@ describe('RedisManager', function () { '@overleaf/metrics': this.Metrics, }) - this.projectId = 'project-id-123' + this.project_id = 'project-id-123' this.batchSize = 100 - this.historyOpsKey = `Project:HistoryOps:{${this.projectId}}` - this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.projectId}}` + this.historyOpsKey = `Project:HistoryOps:{${this.project_id}}` + this.firstOpTimestampKey = `ProjectHistory:FirstOpTimestamp:{${this.project_id}}` this.updates = [ { v: 42, op: ['a', 'b', 'c', 'd'] }, @@ -53,47 +61,31 @@ describe('RedisManager', function () { }) describe('getOldestDocUpdates', function () { - it('gets a small number of updates in one batch', async function () { - const updates = makeUpdates(2) - const rawUpdates = makeRawUpdates(updates) - this.rclient.setList(this.historyOpsKey, rawUpdates) - const result = await this.RedisManager.promises.getOldestDocUpdates( - this.projectId, - 100 + beforeEach(async function () { + this.rclient.lrange.resolves(this.rawUpdates) + this.batchSize = 3 + this.result = await this.RedisManager.promises.getOldestDocUpdates( + this.project_id, + this.batchSize ) - expect(result).to.deep.equal(rawUpdates) }) - it('gets a larger number of updates in several batches', async function () { - const updates = makeUpdates( - this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2 + 12 - ) - const rawUpdates = makeRawUpdates(updates) - this.rclient.setList(this.historyOpsKey, rawUpdates) - const result = await this.RedisManager.promises.getOldestDocUpdates( - this.projectId, - 5000 - ) - expect(result).to.deep.equal(rawUpdates) + it('should read the updates from redis', function () { + this.rclient.lrange + .calledWith(this.historyOpsKey, 0, this.batchSize - 1) + .should.equal(true) }) - it("doesn't return more than the number of updates requested", async function () { - const updates = makeUpdates(100) - const rawUpdates = makeRawUpdates(updates) - this.rclient.setList(this.historyOpsKey, rawUpdates) - const result = await this.RedisManager.promises.getOldestDocUpdates( - this.projectId, - 75 - ) - expect(result).to.deep.equal(rawUpdates.slice(0, 75)) + it('should call the callback with the unparsed ops', function () { + this.result.should.equal(this.rawUpdates) }) }) describe('parseDocUpdates', function () { it('should return the parsed ops', function () { - const updates = makeUpdates(12) - const rawUpdates = makeRawUpdates(updates) - this.RedisManager.parseDocUpdates(rawUpdates).should.deep.equal(updates) + this.RedisManager.parseDocUpdates(this.rawUpdates).should.deep.equal( + this.updates + ) }) }) @@ -104,26 +96,40 @@ describe('RedisManager', function () { describe('single batch smaller than batch size', function () { beforeEach(async function () { - this.updates = makeUpdates(2) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) + this.rclient.lrange.resolves(this.rawUpdates) + this.batchSize = 3 await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 3, + this.project_id, + this.batchSize, this.runner ) }) + it('requests a single batch of updates', function () { + this.rclient.lrange.should.have.been.calledOnce + this.rclient.lrange.should.have.been.calledWith( + this.historyOpsKey, + 0, + this.batchSize - 1 + ) + }) + it('calls the runner once', function () { this.runner.callCount.should.equal(1) }) it('calls the runner with the updates', function () { - this.runner.should.have.been.calledWith(this.updates) + this.runner.calledWith(this.updates).should.equal(true) }) it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + for (const update of this.rawUpdates) { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + update + ) + } }) it('deletes the first op timestamp', function () { @@ -135,26 +141,35 @@ describe('RedisManager', function () { describe('single batch at batch size', function () { beforeEach(async function () { - this.updates = makeUpdates(123) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves([]) await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, + this.project_id, + 2, this.runner ) }) + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + it('calls the runner once', function () { this.runner.callCount.should.equal(1) }) it('calls the runner with the updates', function () { - this.runner.should.have.been.calledWith(this.updates) + this.runner.calledWith(this.updates).should.equal(true) }) it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + for (const update of this.rawUpdates) { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + update + ) + } }) it('deletes the first op timestamp', function () { @@ -165,161 +180,25 @@ describe('RedisManager', function () { }) describe('single batch exceeding size limit on updates', function () { - beforeEach(async function () { - this.updates = makeUpdates(2, [ - 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD), - ]) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, - this.runner - ) - }) - - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the first update', function () { - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, 1)) - }) - - it('calls the runner with the second update', function () { - this.runner - .getCall(1) - .should.have.been.calledWith(this.updates.slice(1)) - }) - - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) - }) - }) - - describe('two batches with first update below and second update above the size limit on updates', function () { - beforeEach(async function () { - this.updates = makeUpdates(2, [ - 'x'.repeat(this.RedisManager.RAW_UPDATE_SIZE_THRESHOLD / 2), - ]) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, - this.runner - ) - }) - - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the first update', function () { - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, 1)) - }) - - it('calls the runner with the second update', function () { - this.runner - .getCall(1) - .should.have.been.calledWith(this.updates.slice(1)) - }) - - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) - }) - }) - - describe('single batch exceeding op count limit on updates', function () { - beforeEach(async function () { - const ops = Array(this.RedisManager.MAX_UPDATE_OP_LENGTH + 1).fill('op') - this.updates = makeUpdates(2, { op: ops }) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, - this.runner - ) - }) - - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the first update', function () { - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, 1)) - }) - - it('calls the runner with the second update', function () { - this.runner - .getCall(1) - .should.have.been.calledWith(this.updates.slice(1)) - }) - - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) - }) - }) - - describe('single batch exceeding doc content count', function () { - beforeEach(async function () { - this.updates = makeUpdates( - this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT + 3, - { resyncDocContent: 123 } - ) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, - this.runner - ) - }) - - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the first batch of updates', function () { - this.runner.should.have.been.calledWith( - this.updates.slice(0, this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT) - ) - }) - - it('calls the runner with the second batch of updates', function () { - this.runner.should.have.been.calledWith( - this.updates.slice(this.RedisManager.MAX_NEW_DOC_CONTENT_COUNT) - ) - }) - - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) - }) - }) - - describe('two batches with first update below and second update above the ops length limit on updates', function () { beforeEach(async function () { // set the threshold below the size of the first update - this.updates = makeUpdates(2, { op: ['op1', 'op2'] }) - this.updates[1].op = Array( - this.RedisManager.MAX_UPDATE_OP_LENGTH + 2 - ).fill('op') - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) + this.RedisManager.setRawUpdateSizeThreshold( + this.rawUpdates[0].length - 1 + ) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) + await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 123, + this.project_id, + 2, this.runner ) }) + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) @@ -328,84 +207,270 @@ describe('RedisManager', function () { this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) }) + it('deletes the first update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[0] + ) + }) + it('calls the runner with the second update', function () { this.runner.should.have.been.calledWith(this.updates.slice(1)) }) - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + it('deletes the second set of applied updates', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[1] + ) }) }) - describe('two batches, one partial', function () { + describe('two batches with first update below and second update above the size limit on updates', function () { beforeEach(async function () { - this.updates = makeUpdates(15) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) + // set the threshold above the size of the first update, but below the total size + this.RedisManager.setRawUpdateSizeThreshold( + this.rawUpdates[0].length + 1 + ) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 10, + this.project_id, + 2, this.runner ) }) + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the first update', function () { + this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true) + }) + + it('deletes the first set applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[0] + ) + }) + + it('calls the runner with the second update', function () { + this.runner.calledWith(this.updates.slice(1)).should.equal(true) + }) + + it('deletes the second applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[1] + ) + }) + }) + + describe('single batch exceeding op count limit on updates', function () { + beforeEach(async function () { + // set the threshold below the size of the first update + this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length - 1) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) + + await this.RedisManager.promises.getUpdatesInBatches( + this.project_id, + 2, + this.runner + ) + }) + + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the first updates', function () { + this.runner.calledWith(this.updates.slice(0, 1)).should.equal(true) + }) + + it('deletes the first applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[0] + ) + }) + + it('calls the runner with the second updates', function () { + this.runner.calledWith(this.updates.slice(1)).should.equal(true) + }) + + it('deletes the second applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[1] + ) + }) + }) + + describe('single batch exceeding doc content count', function () { + beforeEach(async function () { + this.updates = [{ resyncDocContent: 123 }, { resyncDocContent: 456 }] + this.rawUpdates = this.updates.map(update => JSON.stringify(update)) + // set the threshold below the size of the first update + this.RedisManager.setMaxNewDocContentCount(this.updates.length - 1) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) + + await this.RedisManager.promises.getUpdatesInBatches( + this.project_id, + 2, + this.runner + ) + }) + + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the first update', function () { + this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) + }) + + it('deletes the first applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[0] + ) + }) + + it('calls the runner with the second update', function () { + this.runner.should.have.been.calledWith(this.updates.slice(1)) + }) + + it('deletes the second set of applied updates', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[1] + ) + }) + }) + + describe('two batches with first update below and second update above the ops length limit on updates', function () { + beforeEach(async function () { + // set the threshold below the size of the first update + this.RedisManager.setMaxUpdateOpLength(this.updates[0].op.length + 1) + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.rawUpdates.slice(1)) + + await this.RedisManager.promises.getUpdatesInBatches( + this.project_id, + 2, + this.runner + ) + }) + + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + + it('calls the runner twice', function () { + this.runner.callCount.should.equal(2) + }) + + it('calls the runner with the first update', function () { + this.runner.should.have.been.calledWith(this.updates.slice(0, 1)) + }) + + it('deletes the first applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[0] + ) + }) + + it('calls the runner with the second update', function () { + this.runner.should.have.been.calledWith(this.updates.slice(1)) + }) + + it('deletes the second applied update', function () { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + this.rawUpdates[1] + ) + }) + }) + + describe('two batches', function () { + beforeEach(async function () { + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).resolves(this.extraRawUpdates) + await this.RedisManager.promises.getUpdatesInBatches( + this.project_id, + 2, + this.runner + ) + }) + + it('requests a second batch of updates', function () { + this.rclient.lrange.should.have.been.calledTwice + }) + it('calls the runner twice', function () { this.runner.callCount.should.equal(2) }) it('calls the runner with the updates', function () { - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, 10)) - this.runner - .getCall(1) - .should.have.been.calledWith(this.updates.slice(10)) + this.runner.should.have.been.calledWith(this.updates) + this.runner.should.have.been.calledWith(this.extraUpdates) }) - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) - }) - }) - - describe('two full batches', function () { - beforeEach(async function () { - this.updates = makeUpdates(20) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - await this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - 10, - this.runner - ) + it('deletes the first set of applied updates', function () { + for (const update of this.rawUpdates) { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + update + ) + } }) - it('calls the runner twice', function () { - this.runner.callCount.should.equal(2) - }) - - it('calls the runner with the updates', function () { - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, 10)) - this.runner - .getCall(1) - .should.have.been.calledWith(this.updates.slice(10)) - }) - - it('deletes the applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal([]) + it('deletes the second set of applied updates', function () { + for (const update of this.extraRawUpdates) { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + update + ) + } }) }) describe('error when first reading updates', function () { beforeEach(async function () { - this.updates = makeUpdates(10) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - this.rclient.throwErrorOnLrangeCall(0) + this.error = new Error('error') + this.rclient.lrange.rejects(this.error) await expect( this.RedisManager.promises.getUpdatesInBatches( - this.projectId, + this.project_id, 2, this.runner ) @@ -413,105 +478,38 @@ describe('RedisManager', function () { }) it('does not delete any updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal( - this.rawUpdates - ) + expect(this.rclient.lrem).not.to.have.been.called }) }) describe('error when reading updates for a second batch', function () { beforeEach(async function () { - this.batchSize = this.RedisManager.RAW_UPDATES_BATCH_SIZE - 1 - this.updates = makeUpdates(this.RedisManager.RAW_UPDATES_BATCH_SIZE * 2) - this.rawUpdates = makeRawUpdates(this.updates) - this.rclient.setList(this.historyOpsKey, this.rawUpdates) - this.rclient.throwErrorOnLrangeCall(1) + this.error = new Error('error') + this.rclient.lrange.onCall(0).resolves(this.rawUpdates) + this.rclient.lrange.onCall(1).rejects(this.error) + await expect( this.RedisManager.promises.getUpdatesInBatches( - this.projectId, - this.batchSize, + this.project_id, + 2, this.runner ) ).to.be.rejected }) - it('calls the runner with the first batch of updates', function () { - this.runner.should.have.been.calledOnce - this.runner - .getCall(0) - .should.have.been.calledWith(this.updates.slice(0, this.batchSize)) + it('deletes the first set of applied updates', function () { + for (const update of this.rawUpdates) { + expect(this.rclient.lrem).to.have.been.calledWith( + this.historyOpsKey, + 1, + update + ) + } }) - it('deletes only the first batch of applied updates', function () { - expect(this.rclient.getList(this.historyOpsKey)).to.deep.equal( - this.rawUpdates.slice(this.batchSize) - ) + it('deletes applied updates only once', function () { + expect(this.rclient.lrem.callCount).to.equal(this.rawUpdates.length) }) }) }) }) - -class FakeRedis { - constructor() { - this.data = new Map() - this.del = sinon.stub() - this.lrangeCallCount = -1 - } - - setList(key, list) { - this.data.set(key, list) - } - - getList(key) { - return this.data.get(key) - } - - throwErrorOnLrangeCall(callNum) { - this.lrangeCallThrowingError = callNum - } - - async lrange(key, start, stop) { - this.lrangeCallCount += 1 - if ( - this.lrangeCallThrowingError != null && - this.lrangeCallThrowingError === this.lrangeCallCount - ) { - throw new Error('LRANGE failed!') - } - const list = this.data.get(key) ?? [] - return list.slice(start, stop + 1) - } - - async lrem(key, count, elementToRemove) { - expect(count).to.be.greaterThan(0) - const original = this.data.get(key) ?? [] - const filtered = original.filter(element => { - if (count > 0 && element === elementToRemove) { - count-- - return false - } - return true - }) - this.data.set(key, filtered) - } - - async exec() { - // Nothing to do - } - - multi() { - return this - } -} - -function makeUpdates(updateCount, extraFields = {}) { - const updates = [] - for (let i = 0; i < updateCount; i++) { - updates.push({ v: i, ...extraFields }) - } - return updates -} - -function makeRawUpdates(updates) { - return updates.map(JSON.stringify) -}