diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 512fd5e68b..9f78b5af4b 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -21,16 +21,17 @@ module.exports = HistoryManager = error = new Error("track changes api returned a failure status code: #{res.statusCode}") return callback(error) - FLUSH_EVERY_N_OPS: 50 - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> + FLUSH_EVERY_N_OPS: 100 + recordAndFlushHistoryOps: (project_id, doc_id, ops = [], length, callback = (error) ->) -> if ops.length == 0 return callback() - HistoryRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> + HistoryRedisManager.recordDocHasHistoryOps project_id, doc_id, ops, (error) -> return callback(error) if error? - # We want to flush every 50 ops, i.e. 50, 100, 150, etc - # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these - # ops. If we've changed, then we've gone over a multiple of 50 and should flush. - # (Most of the time, we will only hit 50 and then flushing will put us back to 0) + return callback() if not length? # don't flush unless we know the length + # We want to flush every 100 ops, i.e. 100, 200, 300, etc + # Find out which 'block' (i.e. 0-99, 100-199) we were in before and after pushing these + # ops. If we've changed, then we've gone over a multiple of 100 and should flush. + # (Most of the time, we will only hit 100 and then flushing will put us back to 0) previousLength = length - ops.length prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS) newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS) @@ -41,4 +42,4 @@ module.exports = HistoryManager = HistoryManager.flushDocChanges project_id, doc_id, (error) -> if error? logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" - callback() \ No newline at end of file + callback() diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee index 315d9daabf..0ac8723359 100644 --- a/services/document-updater/app/coffee/HistoryRedisManager.coffee +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -5,16 +5,10 @@ async = require "async" logger = require('logger-sharelatex') module.exports = HistoryRedisManager = - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + recordDocHasHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> if ops.length == 0 return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush - opVersions = ops.map (op) -> op?.v - logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops" - jsonOps = ops.map (op) -> JSON.stringify op - async.parallel [ - (cb) -> rclient.rpush Keys.uncompressedHistoryOps({doc_id}), jsonOps..., cb - (cb) -> rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, cb - ], (error, results) -> + logger.log project_id: project_id, doc_id: doc_id, "marking doc in project for history ops" + rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, (error) -> return callback(error) if error? - [length, _] = results - callback(error, length) \ No newline at end of file + callback() diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 3359a36231..4aad7ec109 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -25,6 +25,7 @@ MEGABYTES = 1024 * 1024 MAX_RANGES_SIZE = 3 * MEGABYTES keys = Settings.redis.documentupdater.key_schema +historyKeys = Settings.redis.history.key_schema module.exports = RedisManager = rclient: rclient @@ -167,32 +168,37 @@ module.exports = RedisManager = logger.error err: error, doc_id: doc_id, newDocLines: newDocLines, error.message return callback(error) newHash = RedisManager._computeHash(newDocLines) - - logger.log doc_id: doc_id, version: newVersion, hash: newHash, "updating doc in redis" - + + opVersions = appliedOps.map (op) -> op?.v + logger.log doc_id: doc_id, version: newVersion, hash: newHash, op_versions: opVersions, "updating doc in redis" + RedisManager._serializeRanges ranges, (error, ranges) -> if error? logger.error {err: error, doc_id}, error.message return callback(error) multi = rclient.multi() - multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines - multi.set keys.docVersion(doc_id:doc_id), newVersion - multi.set keys.docHash(doc_id:doc_id), newHash - if jsonOps.length > 0 - multi.rpush keys.docOps(doc_id: doc_id), jsonOps... - multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL - multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 + multi.eval setScript, 1, keys.docLines(doc_id:doc_id), newDocLines # index 0 + multi.set keys.docVersion(doc_id:doc_id), newVersion # index 1 + multi.set keys.docHash(doc_id:doc_id), newHash # index 2 + multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 3 + multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 # index 4 if ranges? - multi.set keys.ranges(doc_id:doc_id), ranges + multi.set keys.ranges(doc_id:doc_id), ranges # index 5 else - multi.del keys.ranges(doc_id:doc_id) + multi.del keys.ranges(doc_id:doc_id) # also index 5 + # push the ops last so we can get the lengths at fixed index positions 6 and 7 + if jsonOps.length > 0 + multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # index 6 + multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7 multi.exec (error, result) -> return callback(error) if error? # check the hash computed on the redis server writeHash = result?[0] if logHashWriteErrors and writeHash? and writeHash isnt newHash logger.error doc_id: doc_id, writeHash: writeHash, origHash: newHash, docLines:newDocLines, "hash mismatch on updateDocument" - return callback() + # return length of uncompressedHistoryOps queue (index 7) + uncompressedHistoryOpsLength = result?[7] + return callback(null, uncompressedHistoryOpsLength) getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 5022f6bb38..269b16ee67 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -61,9 +61,9 @@ module.exports = UpdateManager = return callback(error) if error? RangesManager.applyUpdate project_id, doc_id, ranges, appliedOps, updatedDocLines, (error, new_ranges) -> return callback(error) if error? - RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error) -> + RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_ranges, (error, historyOpsLength) -> return callback(error) if error? - HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback + HistoryManager.recordAndFlushHistoryOps project_id, doc_id, appliedOps, historyOpsLength, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index d06119c690..a2eba4c063 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -207,7 +207,7 @@ describe "Applying updates to a doc", -> before (done) -> [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] updates = [] - for v in [0..99] # Should flush after 50 ops + for v in [0..199] # Should flush after 100 ops updates.push doc_id: @doc_id, op: [i: v.toString(), p: 0] @@ -219,7 +219,7 @@ describe "Applying updates to a doc", -> # Send updates in chunks to causes multiple flushes actions = [] - for i in [0..9] + for i in [0..19] do (i) => actions.push (cb) => DocUpdaterClient.sendUpdates @project_id, @doc_id, updates.slice(i*10, (i+1)*10), cb diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index 66ffd98e80..37e35ca285 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -40,18 +40,18 @@ describe "HistoryManager", -> it "should return the callback with an error", -> @callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true - describe "pushUncompressedHistoryOps", -> + describe "recordAndFlushHistoryOps", -> beforeEach -> @ops = ["mock-ops"] @HistoryManager.flushDocChanges = sinon.stub().callsArg(2) describe "pushing the op", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 1, @callback it "should push the ops into redis", -> - @HistoryRedisManager.pushUncompressedHistoryOps + @HistoryRedisManager.recordDocHasHistoryOps .calledWith(@project_id, @doc_id, @ops) .should.equal true @@ -63,9 +63,9 @@ describe "HistoryManager", -> describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryRedisManager.recordDocHasHistoryOps = + sinon.stub().callsArgWith(3, null) + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS,@callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -75,9 +75,9 @@ describe "HistoryManager", -> describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> @ops = ["op1", "op2", "op3"] - @HistoryRedisManager.pushUncompressedHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryRedisManager.recordDocHasHistoryOps = + sinon.stub().callsArgWith(3, null) + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1, @callback it "should tell the track changes api to flush", -> @HistoryManager.flushDocChanges @@ -86,10 +86,10 @@ describe "HistoryManager", -> describe "when HistoryManager errors", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = - sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) + @HistoryRedisManager.recordDocHasHistoryOps = + sinon.stub().callsArgWith(3, null) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, @ops, 2 * @HistoryManager.FLUSH_EVERY_N_OPS, @callback it "should log out the error", -> @logger.error @@ -103,10 +103,10 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> - @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) - @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback + @HistoryRedisManager.recordDocHasHistoryOps = sinon.stub().callsArgWith(3, null) + @HistoryManager.recordAndFlushHistoryOps @project_id, @doc_id, [], 1, @callback - it "should not call HistoryRedisManager.pushUncompressedHistoryOps", -> - @HistoryRedisManager.pushUncompressedHistoryOps.called.should.equal false + it "should not call HistoryRedisManager.recordDocHasHistoryOps", -> + @HistoryRedisManager.recordDocHasHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee index f51942c1e1..ca3937d4c5 100644 --- a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -24,42 +24,28 @@ describe "HistoryRedisManager", -> @project_id = "project-id-123" @callback = sinon.stub() - describe "pushUncompressedHistoryOps", -> + describe "recordDocHasHistoryOps", -> beforeEach -> @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] - @rclient.rpush = sinon.stub().yields(null, @length = 42) @rclient.sadd = sinon.stub().yields() describe "with ops", -> beforeEach (done) -> - @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) => + @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, @ops, (args...) => @callback(args...) done() - it "should push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) - .should.equal true - it "should add the doc_id to the set of which records the project docs", -> @rclient.sadd .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) .should.equal true - it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true - describe "with no ops", -> beforeEach (done) -> - @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) => + @HistoryRedisManager.recordDocHasHistoryOps @project_id, @doc_id, [], (args...) => @callback(args...) done() - it "should not push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .called - .should.equal false - it "should not add the doc_id to the set of which records the project docs", -> @rclient.sadd .called diff --git a/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee b/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee index 5c6b6a6381..36c458cb71 100644 --- a/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee +++ b/services/document-updater/test/unit/coffee/LockManager/ReleasingTheLock.coffee @@ -19,6 +19,12 @@ describe 'LockManager - releasing the lock', ()-> error:-> "redis-sharelatex": createClient : () => @client + "settings-sharelatex": { + redis: + lock: + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + } "./Metrics": {inc: () ->} @LockManager = SandboxedModule.require(modulePath, requires: mocks) @lockValue = "lock-value-stub" diff --git a/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee b/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee index 33c3eb3d51..b3ff7cdd7e 100644 --- a/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee +++ b/services/document-updater/test/unit/coffee/LockManager/tryLockTests.coffee @@ -13,6 +13,12 @@ describe 'LockManager - trying the lock', -> auth:-> set: @set = sinon.stub() "./Metrics": {inc: () ->} + "settings-sharelatex": { + redis: + lock: + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + } @callback = sinon.stub() @doc_id = "doc-id-123" diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 070abd859a..26eaaf0892 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -19,7 +19,7 @@ describe "RedisManager", -> documentupdater: {logHashErrors: {write:true, read:true}} redis: documentupdater: - key_schema: + key_schema: blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" docLines: ({doc_id}) -> "doclines:#{doc_id}" docOps: ({doc_id}) -> "DocOps:#{doc_id}" @@ -29,6 +29,10 @@ describe "RedisManager", -> pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" + history: + key_schema: + uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" + docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" } "redis-sharelatex": createClient: () => @rclient diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index 3e659ee078..2de6e93e44 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -166,7 +166,7 @@ describe "UpdateManager", -> @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().yields() @RealTimeRedisManager.sendData = sinon.stub() - @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) + @HistoryManager.recordAndFlushHistoryOps = sinon.stub().callsArg(4) describe "normally", -> beforeEach -> @@ -188,7 +188,7 @@ describe "UpdateManager", -> .should.equal true it "should push the applied ops into the history queue", -> - @HistoryManager.pushUncompressedHistoryOps + @HistoryManager.recordAndFlushHistoryOps .calledWith(@project_id, @doc_id, @appliedOps) .should.equal true