From 79d8fced496f366a586fd3228d7db3a0069973ee Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 8 May 2017 15:56:02 +0100 Subject: [PATCH 1/6] make history update more atomic --- .../app/coffee/HistoryRedisManager.coffee | 14 +++++--------- .../app/coffee/RedisManager.coffee | 9 ++++++--- .../HistoryRedisManagerTests.coffee | 12 +----------- .../coffee/RedisManager/RedisManagerTests.coffee | 6 +++++- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee index 315d9daabf..6d9a482ced 100644 --- a/services/document-updater/app/coffee/HistoryRedisManager.coffee +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -8,13 +8,9 @@ module.exports = HistoryRedisManager = pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> if ops.length == 0 return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush - opVersions = ops.map (op) -> op?.v - logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops" - jsonOps = ops.map (op) -> JSON.stringify op - async.parallel [ - (cb) -> rclient.rpush Keys.uncompressedHistoryOps({doc_id}), jsonOps..., cb - (cb) -> rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, cb - ], (error, results) -> + logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops" + rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) -> return callback(error) if error? - [length, _] = results - callback(error, length) \ No newline at end of file + rclient.llen Keys.uncompressedHistoryOps({doc_id}), (error, length) -> + return callback(error) if error? + callback(null, length) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 3359a36231..1942b86dc3 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -25,6 +25,7 @@ MEGABYTES = 1024 * 1024 MAX_RANGES_SIZE = 3 * MEGABYTES keys = Settings.redis.documentupdater.key_schema +historyKeys = Settings.redis.history.key_schema module.exports = RedisManager = rclient: rclient @@ -167,9 +168,10 @@ module.exports = RedisManager = logger.error err: error, doc_id: doc_id, newDocLines: newDocLines, error.message return callback(error) newHash = RedisManager._computeHash(newDocLines) - - logger.log doc_id: doc_id, version: newVersion, hash: newHash, "updating doc in redis" - + + opVersions = appliedOps.map (op) -> op?.v + logger.log doc_id: doc_id, version: newVersion, hash: newHash, op_versions: opVersions, "updating doc in redis" + RedisManager._serializeRanges ranges, (error, ranges) -> if error? logger.error {err: error, doc_id}, error.message @@ -180,6 +182,7 @@ module.exports = RedisManager = multi.set keys.docHash(doc_id:doc_id), newHash if jsonOps.length > 0 multi.rpush keys.docOps(doc_id: doc_id), jsonOps... + multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 if ranges? diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee index f51942c1e1..0137c2128a 100644 --- a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -27,7 +27,7 @@ describe "HistoryRedisManager", -> describe "pushUncompressedHistoryOps", -> beforeEach -> @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] - @rclient.rpush = sinon.stub().yields(null, @length = 42) + @rclient.llen = sinon.stub().yields(null, @length = 42) @rclient.sadd = sinon.stub().yields() describe "with ops", -> @@ -36,11 +36,6 @@ describe "HistoryRedisManager", -> @callback(args...) done() - it "should push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) - .should.equal true - it "should add the doc_id to the set of which records the project docs", -> @rclient.sadd .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) @@ -55,11 +50,6 @@ describe "HistoryRedisManager", -> @callback(args...) done() - it "should not push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .called - .should.equal false - it "should not add the doc_id to the set of which records the project docs", -> @rclient.sadd .called diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 070abd859a..26eaaf0892 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -19,7 +19,7 @@ describe "RedisManager", -> documentupdater: {logHashErrors: {write:true, read:true}} redis: documentupdater: - key_schema: + key_schema: blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" docLines: ({doc_id}) -> "doclines:#{doc_id}" docOps: ({doc_id}) -> "DocOps:#{doc_id}" @@ -29,6 +29,10 @@ describe "RedisManager", -> pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" + history: + key_schema: + uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" + docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" } "redis-sharelatex": createClient: () => @rclient From e2f70aca1a19e628983f5652cba04d3ae3845ff8 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 8 May 2017 16:02:40 +0100 Subject: [PATCH 2/6] fix tests for redis cluster --- .../test/unit/coffee/LockManager/ReleasingTheLock.coffee | 6 ++++++ .../test/unit/coffee/LockManager/tryLockTests.coffee | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee b/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee index 5c6b6a6381..36c458cb71 100644 --- a/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee +++ b/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee @@ -19,6 +19,12 @@ describe 'LockManager - releasing the lock', ()-> error:-> "redis-sharelatex": createClient : () => @client + "settings-sharelatex": { + redis: + lock: + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + } "./Metrics": {inc: () ->} @LockManager = SandboxedModule.require(modulePath, requires: mocks) @lockValue = "lock-value-stub" diff --git a/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee b/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee index 33c3eb3d51..b3ff7cdd7e 100644 --- a/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee +++ b/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee @@ -13,6 +13,12 @@ describe 'LockManager - trying the lock', -> auth:-> set: @set = sinon.stub() "./Metrics": {inc: () ->} + "settings-sharelatex": { + redis: + lock: + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + } @callback = sinon.stub() @doc_id = "doc-id-123" From 2d158b03d747204e463976f6c3cf6a324f394678 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 9 May 2017 09:32:56 +0100 Subject: [PATCH 3/6] rename pushUncompressedHistoryOps --- .../app/coffee/HistoryManager.coffee | 4 ++-- .../app/coffee/HistoryRedisManager.coffee | 2 +- .../HistoryManager/HistoryManagerTests.coffee | 16 ++++++++-------- .../HistoryRedisManagerTests.coffee | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 512fd5e68b..6c47c24c1c 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -25,7 +25,7 @@ module.exports = HistoryManager = pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> if ops.length == 0 return callback() - HistoryRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> + HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error, length) -> return callback(error) if error? # We want to flush every 50 ops, i.e. 50, 100, 150, etc # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these @@ -41,4 +41,4 @@ module.exports = HistoryManager = HistoryManager.flushDocChanges project_id, doc_id, (error) -> if error? logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" - callback() \ No newline at end of file + callback() diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee index 6d9a482ced..2329b6f433 100644 --- a/services/document-updater/app/coffee/HistoryRedisManager.coffee +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -5,7 +5,7 @@ async = require "async" logger = require('logger-sharelatex') module.exports = HistoryRedisManager = - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> if ops.length == 0 return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops" diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index 66ffd98e80..bff896cdd8 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -47,11 +47,11 @@ describe "HistoryManager", -> describe "pushing the op", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback it "should push the ops into redis", -> - @HistoryRedisManager.pushUncompressedHistoryOps + @HistoryRedisManager.recordDocHasHistoryOps .calledWith(@project_id, @doc_id, @ops) .should.equal true @@ -63,7 +63,7 @@ describe "HistoryManager", -> describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -75,7 +75,7 @@ describe "HistoryManager", -> describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> @ops = ["op1", "op2", "op3"] - @HistoryRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -86,7 +86,7 @@ describe "HistoryManager", -> describe "when HistoryManager errors", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -103,10 +103,10 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback - it "should not call HistoryRedisManager.pushUncompressedHistoryOps", -> - @HistoryRedisManager.pushUncompressedHistoryOps.called.should.equal false + it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> + @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee index 0137c2128a..f14e6da27f 100644 --- a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -24,7 +24,7 @@ describe "HistoryRedisManager", -> @project_id = "project-id-123" @callback = sinon.stub() - describe "pushUncompressedHistoryOps", -> + describe "recordDocHasHistoryOps", -> beforeEach -> @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] @rclient.llen = sinon.stub().yields(null, @length = 42) @@ -32,7 +32,7 @@ describe "HistoryRedisManager", -> describe "with ops", -> beforeEach (done) -> - @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) => + @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, @ops, (args...) => @callback(args...) done() @@ -46,7 +46,7 @@ describe "HistoryRedisManager", -> describe "with no ops", -> beforeEach (done) -> - @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) => + @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) => @callback(args...) done() From fdf5e8e0b8ab8af120140ecee2a94206adbe37ec Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 9 May 2017 10:34:31 +0100 Subject: [PATCH 4/6] get history ops length directly from redis update --- .../app/coffee/HistoryManager.coffee | 5 ++-- .../app/coffee/HistoryRedisManager.coffee | 6 ++--- .../app/coffee/RedisManager.coffee | 25 +++++++++++-------- .../app/coffee/UpdateManager.coffee | 4 +-- .../HistoryManager/HistoryManagerTests.coffee | 20 +++++++-------- .../HistoryRedisManagerTests.coffee | 4 --- .../UpdateManager/UpdateManagerTests.coffee | 2 +- 7 files changed, 32 insertions(+), 34 deletions(-) diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 6c47c24c1c..c6aa7797cf 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -22,11 +22,12 @@ module.exports = HistoryManager = return callback(error) FLUSH_EVERY_N_OPS: 50 - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> + pushUncompressedHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> if ops.length == 0 return callback() - HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error, length) -> + HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> return callback(error) if error? + return callback() if not length? # don't flush unless we know the length # We want to flush every 50 ops, i.e. 50, 100, 150, etc # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these # ops. If we've changed, then we've gone over a multiple of 50 and should flush. diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee index 2329b6f433..0ac8723359 100644 --- a/services/document-updater/app/coffee/HistoryRedisManager.coffee +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -5,12 +5,10 @@ async = require "async" logger = require('logger-sharelatex') module.exports = HistoryRedisManager = - recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> if ops.length == 0 return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops" rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) -> return callback(error) if error? - rclient.llen Keys.uncompressedHistoryOps({doc_id}), (error, length) -> - return callback(error) if error? - callback(null, length) + callback() diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 1942b86dc3..4aad7ec109 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -177,25 +177,28 @@ module.exports = RedisManager = logger.error {err: error, doc_id}, error.message return callback(error) multi = rclient.multi() - multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines - multi.set keys.docVersion(doc_id:doc_id), newVersion - multi.set keys.docHash(doc_id:doc_id), newHash - if jsonOps.length > 0 - multi.rpush keys.docOps(doc_id: doc_id), jsonOps... - multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... - multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL - multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 + multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0 + multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1 + multi.set keys.docHash(doc_id:doc_id), newHash # index 2 + multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3 + multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4 if ranges? - multi.set keys.ranges(doc_id:doc_id), ranges + multi.set keys.ranges(doc_id:doc_id), ranges # index 5 else - multi.del keys.ranges(doc_id:doc_id) + multi.del keys.ranges(doc_id:doc_id) # also index 5 + # push the ops last so we can get the lengths at fixed index positions 6 and 7 + if jsonOps.length > 0 + multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6 + multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7 multi.exec (error, result) -> return callback(error) if error? # check the hash computed on the redis server writeHash = result?[0] if logHashWriteErrors and writeHash? and writeHash isnt newHash logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument" - return callback() + # return length of uncompressedHistoryOps queue (index 7) + uncompressedHistoryOpsLength = result?[7] + return callback(null, uncompressedHistoryOpsLength) getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 5022f6bb38..b903f6615f 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -61,9 +61,9 @@ module.exports = UpdateManager = return callback(error) if error? RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) -> return callback(error) if error? - RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) -> + RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) -> return callback(error) if error? - HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback + HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index bff896cdd8..b41c9b9f7a 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -47,8 +47,8 @@ describe "HistoryManager", -> describe "pushing the op", -> beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 1, @callback it "should push the ops into redis", -> @HistoryRedisManager.recordDocHasHistoryOps @@ -64,8 +64,8 @@ describe "HistoryManager", -> describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -76,8 +76,8 @@ describe "HistoryManager", -> beforeEach -> @ops = ["op1", "op2", "op3"] @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -87,9 +87,9 @@ describe "HistoryManager", -> describe "when HistoryManager errors", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) + sinon.stub().callsArgWith(3, null) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback it "should log out the error", -> @logger.error @@ -103,8 +103,8 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> - @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], 1, @callback it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee index f14e6da27f..ca3937d4c5 100644 --- a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -27,7 +27,6 @@ describe "HistoryRedisManager", -> describe "recordDocHasHistoryOps", -> beforeEach -> @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] - @rclient.llen = sinon.stub().yields(null, @length = 42) @rclient.sadd = sinon.stub().yields() describe "with ops", -> @@ -41,9 +40,6 @@ describe "HistoryRedisManager", -> .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) .should.equal true - it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true - describe "with no ops", -> beforeEach (done) -> @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) => diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index 3e659ee078..57bd9166d1 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -166,7 +166,7 @@ describe "UpdateManager", -> @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().yields() @RealTimeRedisManager.sendData = sinon.stub() - @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) + @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(4) describe "normally", -> beforeEach -> From 7ce6285e3d39956a27a0c31f2fc1bf7de584ba1e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 9 May 2017 10:44:26 +0100 Subject: [PATCH 5/6] increase flush threshold to 100 ops --- .../document-updater/app/coffee/HistoryManager.coffee | 10 +++++----- .../coffee/ApplyingUpdatesToADocTests.coffee | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index c6aa7797cf..17b77a00e6 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -21,17 +21,17 @@ module.exports = HistoryManager = error = new Error("track changes api returned a failure status code: #{res.statusCode}") return callback(error) - FLUSH_EVERY_N_OPS: 50 + FLUSH_EVERY_N_OPS: 100 pushUncompressedHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> if ops.length == 0 return callback() HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> return callback(error) if error? return callback() if not length? # don't flush unless we know the length - # We want to flush every 50 ops, i.e. 50, 100, 150, etc - # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these - # ops. If we've changed, then we've gone over a multiple of 50 and should flush. - # (Most of the time, we will only hit 50 and then flushing will put us back to 0) + # We want to flush every 100 ops, i.e. 100, 200, 300, etc + # Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these + # ops. If we've changed, then we've gone over a multiple of 100 and should flush. + # (Most of the time, we will only hit 100 and then flushing will put us back to 0) previousLength = length - ops.length prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS) newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS) diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index d06119c690..a2eba4c063 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -207,7 +207,7 @@ describe "Applying updates to a doc", -> before (done) -> [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] updates = [] - for v in [0..99] # Should flush after 50 ops + for v in [0..199] # Should flush after 100 ops updates.push doc_id: @doc_id, op: [i: v.toString(), p: 0] @@ -219,7 +219,7 @@ describe "Applying updates to a doc", -> # Send updates in chunks to causes multiple flushes actions = [] - for i in [0..9] + for i in [0..19] do (i) => actions.push (cb) => DocUpdaterClient.sendUpdates @project_id, @doc_id, updates.slice(i*10, (i+1)*10), cb From 36407ac726d0aa3a8e3917426b5f77f40b32f815 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 9 May 2017 12:02:27 +0100 Subject: [PATCH 6/6] rename HistoryManager pushUncompressedHistoryOps --- .../app/coffee/HistoryManager.coffee | 2 +- .../document-updater/app/coffee/UpdateManager.coffee | 2 +- .../coffee/HistoryManager/HistoryManagerTests.coffee | 12 ++++++------ .../coffee/UpdateManager/UpdateManagerTests.coffee | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 17b77a00e6..9f78b5af4b 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -22,7 +22,7 @@ module.exports = HistoryManager = return callback(error) FLUSH_EVERY_N_OPS: 100 - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> + recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> if ops.length == 0 return callback() HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index b903f6615f..269b16ee67 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -63,7 +63,7 @@ module.exports = UpdateManager = return callback(error) if error? RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) -> return callback(error) if error? - HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback + HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index b41c9b9f7a..37e35ca285 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -40,7 +40,7 @@ describe "HistoryManager", -> it "should return the callback with an error", -> @callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true - describe "pushUncompressedHistoryOps", -> + describe "recordAndFlushHistoryOps", -> beforeEach -> @ops = ["mock-ops"] @HistoryManager.flushDocChanges = sinon.stub().callsArg(2) @@ -48,7 +48,7 @@ describe "HistoryManager", -> describe "pushing the op", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 1, @callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback it "should push the ops into redis", -> @HistoryRedisManager.recordDocHasHistoryOps @@ -65,7 +65,7 @@ describe "HistoryManager", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -77,7 +77,7 @@ describe "HistoryManager", -> @ops = ["op1", "op2", "op3"] @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -89,7 +89,7 @@ describe "HistoryManager", -> @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback it "should log out the error", -> @logger.error @@ -104,7 +104,7 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], 1, @callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index 57bd9166d1..2de6e93e44 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -166,7 +166,7 @@ describe "UpdateManager", -> @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().yields() @RealTimeRedisManager.sendData = sinon.stub() - @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(4) + @HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4) describe "normally", -> beforeEach -> @@ -188,7 +188,7 @@ describe "UpdateManager", -> .should.equal true it "should push the applied ops into the history queue", -> - @HistoryManager.pushUncompressedHistoryOps + @HistoryManager.recordAndFlushHistoryOps .calledWith(@project_id, @doc_id, @appliedOps) .should.equal true