From 8779f3f686c0cce325bdb00858ab897a9699be85 Mon Sep 17 00:00:00 2001 From: James Allen Date: Tue, 23 Aug 2016 16:00:46 +0100 Subject: [PATCH] Only write DocOps atomically with version and lines, after all docs are applied --- .../app/coffee/DocOpsManager.coffee | 17 ---- .../app/coffee/DocumentManager.coffee | 3 +- .../app/coffee/RedisManager.coffee | 33 ++++---- .../app/coffee/ShareJsDB.coffee | 28 +++---- .../app/coffee/ShareJsUpdateManager.coffee | 8 +- .../app/coffee/TrackChangesManager.coffee | 16 +++- .../app/coffee/UpdateManager.coffee | 8 +- .../app/coffee/WebRedisManager.coffee | 6 +- .../coffee/ApplyingUpdatesToADocTests.coffee | 16 +++- .../DocOpsManager/DocOpsManagerTests.coffee | 52 ------------- .../getDocAndRecentOpsTests.coffee | 13 ++-- .../RedisManager/RedisManagerTests.coffee | 78 +++++++++++++------ .../unit/coffee/ShareJsDB/GetOpsTests.coffee | 20 ++--- .../coffee/ShareJsDB/GetSnapshotTests.coffee | 9 ++- .../coffee/ShareJsDB/WriteOpsTests.coffee | 22 ++---- .../ShareJsUpdateManagerTests.coffee | 11 ++- .../TrackChangesManagerTests.coffee | 36 ++++++--- .../UpdateManager/ApplyingUpdates.coffee | 17 +++- .../lockUpdatesAndDoTests.coffee | 5 ++ .../WebRedisManagerTests.coffee | 10 +-- 20 files changed, 205 insertions(+), 203 deletions(-) delete mode 100644 services/document-updater/app/coffee/DocOpsManager.coffee delete mode 100644 services/document-updater/test/unit/coffee/DocOpsManager/DocOpsManagerTests.coffee diff --git a/services/document-updater/app/coffee/DocOpsManager.coffee b/services/document-updater/app/coffee/DocOpsManager.coffee deleted file mode 100644 index a85a1e18ee..0000000000 --- a/services/document-updater/app/coffee/DocOpsManager.coffee +++ /dev/null @@ -1,17 +0,0 @@ -RedisManager = require "./RedisManager" -TrackChangesManager = require "./TrackChangesManager" - -module.exports = DocOpsManager = - getPreviousDocOps: (project_id, doc_id, start, end, callback = (error, ops) ->) -> - RedisManager.getPreviousDocOps doc_id, start, end, (error, ops) -> - return callback(error) if error? - callback null, ops - - pushDocOp: (project_id, doc_id, op, callback = (error) ->) -> - RedisManager.pushDocOp doc_id, op, (error, version) -> - return callback(error) if error? - TrackChangesManager.pushUncompressedHistoryOp project_id, doc_id, op, (error) -> - return callback(error) if error? - callback null, version - - diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index 69311bd979..ddeaceea2f 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -1,6 +1,5 @@ RedisManager = require "./RedisManager" PersistenceManager = require "./PersistenceManager" -DocOpsManager = require "./DocOpsManager" DiffCodec = require "./DiffCodec" logger = require "logger-sharelatex" Metrics = require "./Metrics" @@ -37,7 +36,7 @@ module.exports = DocumentManager = if fromVersion == -1 callback null, lines, version, [] else - DocOpsManager.getPreviousDocOps project_id, doc_id, fromVersion, version, (error, ops) -> + RedisManager.getPreviousDocOps doc_id, fromVersion, version, (error, ops) -> return callback(error) if error? callback null, lines, version, ops diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index cb2e28b296..6e474c9b96 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -66,12 +66,6 @@ module.exports = RedisManager = version = parseInt(version, 10) callback null, version - setDocument : (doc_id, docLines, version, callback = (error) ->)-> - multi = rclient.multi() - multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) - multi.set keys.docVersion(doc_id:doc_id), version - multi.exec (error, replys) -> callback(error) - getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) -> rclient.llen keys.docOps(doc_id: doc_id), (error, length) -> return callback(error) if error? @@ -104,18 +98,23 @@ module.exports = RedisManager = DOC_OPS_TTL: 60 * minutes DOC_OPS_MAX_LENGTH: 100 - pushDocOp: (doc_id, op, callback = (error, new_version) ->) -> - jsonOp = JSON.stringify op - multi = rclient.multi() - multi.rpush keys.docOps(doc_id: doc_id), jsonOp - multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL - multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 - multi.incr keys.docVersion(doc_id: doc_id) - multi.exec (error, replys) -> - [_, __, ___, version] = replys + updateDocument : (doc_id, docLines, newVersion, appliedOps = [], callback = (error) ->)-> + RedisManager.getDocVersion doc_id, (error, currentVersion) -> return callback(error) if error? - version = parseInt(version, 10) - callback null, version + if currentVersion + appliedOps.length != newVersion + error = new Error("Version mismatch. '#{doc_id}' is corrupted.") + logger.error {err: error, doc_id, currentVersion, newVersion, opsLength: appliedOps.length}, "version mismatch" + return callback(error) + jsonOps = appliedOps.map (op) -> JSON.stringify op + multi = rclient.multi() + multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) + multi.set keys.docVersion(doc_id:doc_id), newVersion + multi.rpush keys.docOps(doc_id: doc_id), jsonOps... # TODO: Really double check that these are going onto the array in the correct order + multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL + multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 + multi.exec (error, replys) -> + return callback(error) if error? + return callback() getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/app/coffee/ShareJsDB.coffee b/services/document-updater/app/coffee/ShareJsDB.coffee index f9527ccc0b..3d80c680cb 100644 --- a/services/document-updater/app/coffee/ShareJsDB.coffee +++ b/services/document-updater/app/coffee/ShareJsDB.coffee @@ -2,11 +2,16 @@ Keys = require('./UpdateKeys') Settings = require('settings-sharelatex') DocumentManager = require "./DocumentManager" RedisManager = require "./RedisManager" -DocOpsManager = require "./DocOpsManager" Errors = require "./Errors" logger = require "logger-sharelatex" -module.exports = ShareJsDB = +module.exports = class ShareJsDB + constructor: () -> + @appliedOps = {} + # ShareJS calls this detacted from the instance, so we need + # bind it to keep our context that can access @appliedOps + @writeOp = @_writeOp.bind(@) + getOps: (doc_key, start, end, callback) -> if start == end return callback null, [] @@ -18,21 +23,12 @@ module.exports = ShareJsDB = end = -1 [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) - DocOpsManager.getPreviousDocOps project_id, doc_id, start, end, (error, ops) -> - return callback error if error? - callback null, ops + RedisManager.getPreviousDocOps doc_id, start, end, callback - writeOp: (doc_key, opData, callback) -> - [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) - DocOpsManager.pushDocOp project_id, doc_id, opData, (error, version) -> - return callback error if error? - - if version == opData.v + 1 - callback() - else - error = new Error("Version mismatch. '#{doc_id}' is corrupted.") - logger.error err: error, doc_id: doc_id, project_id: project_id, opVersion: opData.v, expectedVersion: version, "doc is corrupt" - callback error + _writeOp: (doc_key, opData, callback) -> + @appliedOps[doc_key] ?= [] + @appliedOps[doc_key].push opData + callback() getSnapshot: (doc_key, callback) -> [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) diff --git a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee index eb7ad92720..ca00a04ea9 100644 --- a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee +++ b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee @@ -15,7 +15,11 @@ ShareJsModel:: = {} util.inherits ShareJsModel, EventEmitter module.exports = ShareJsUpdateManager = - getNewShareJsModel: () -> new ShareJsModel(ShareJsDB, maxDocLength: Settings.max_doc_length) + getNewShareJsModel: () -> + db = new ShareJsDB() + model = new ShareJsModel(db, maxDocLength: Settings.max_doc_length) + model.db = db + return model applyUpdates: (project_id, doc_id, updates, callback = (error, updatedDocLines) ->) -> logger.log project_id: project_id, doc_id: doc_id, updates: updates, "applying sharejs updates" @@ -51,7 +55,7 @@ module.exports = ShareJsUpdateManager = @_sendError(project_id, doc_id, error) return callback(error) docLines = data.snapshot.split(/\r\n|\n|\r/) - callback(null, docLines, data.v) + callback(null, docLines, data.v, model.db.appliedOps[doc_key] or []) _listenForOps: (model) -> model.on "applyOp", (doc_key, opData) -> diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index cc61bdb0ae..7dfc98115a 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -1,6 +1,7 @@ settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" +async = require "async" WebRedisManager = require "./WebRedisManager" module.exports = TrackChangesManager = @@ -21,14 +22,21 @@ module.exports = TrackChangesManager = return callback(error) FLUSH_EVERY_N_OPS: 50 - pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) -> - WebRedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) -> + pushUncompressedHistoryOps: (project_id, doc_id, ops, callback = (error) ->) -> + WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> return callback(error) if error? - if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 + # We want to flush every 50 ops, i.e. 50, 100, 150, etc + # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these + # ops. If we've changed, then we've gone over a multiple of 50 and should flush. + # (Most of the time, we will only hit 50 and then flushing will put us back to 0) + previousLength = length - ops.length + prevBlock = Math.floor(previousLength / TrackChangesManager.FLUSH_EVERY_N_OPS) + newBlock = Math.floor(length / TrackChangesManager.FLUSH_EVERY_N_OPS) + if newBlock != prevBlock # Do this in the background since it uses HTTP and so may be too # slow to wait for when processing a doc update. logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" TrackChangesManager.flushDocChanges project_id, doc_id, (error) -> if error? logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" - callback() + callback() \ No newline at end of file diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 219c52848b..0b5da21c8f 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -2,6 +2,7 @@ LockManager = require "./LockManager" RedisManager = require "./RedisManager" WebRedisManager = require "./WebRedisManager" ShareJsUpdateManager = require "./ShareJsUpdateManager" +TrackChangesManager = require "./TrackChangesManager" Settings = require('settings-sharelatex') async = require("async") logger = require('logger-sharelatex') @@ -43,10 +44,13 @@ module.exports = UpdateManager = applyUpdates: (project_id, doc_id, updates, callback = (error) ->) -> for update in updates or [] UpdateManager._sanitizeUpdate update - ShareJsUpdateManager.applyUpdates project_id, doc_id, updates, (error, updatedDocLines, version) -> + ShareJsUpdateManager.applyUpdates project_id, doc_id, updates, (error, updatedDocLines, version, appliedOps) -> return callback(error) if error? logger.log doc_id: doc_id, version: version, "updating doc via sharejs" - RedisManager.setDocument doc_id, updatedDocLines, version, callback + # TODO: Do these in parallel? Worry about consistency here? + RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, (error) -> + return callback(error) if error? + TrackChangesManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/app/coffee/WebRedisManager.coffee b/services/document-updater/app/coffee/WebRedisManager.coffee index a14c2d6c86..85f301752f 100644 --- a/services/document-updater/app/coffee/WebRedisManager.coffee +++ b/services/document-updater/app/coffee/WebRedisManager.coffee @@ -22,10 +22,10 @@ module.exports = WebRedisManager = getUpdatesLength: (doc_id, callback)-> rclient.llen "PendingUpdates:#{doc_id}", callback - pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) -> - jsonOp = JSON.stringify op + pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + jsonOps = ops.map (op) -> JSON.stringify op async.parallel [ - (cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp, cb + (cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOps..., cb (cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb ], (error, results) -> return callback(error) if error? diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index 3dbd2f0a3d..94463f4ad6 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -141,6 +141,13 @@ describe "Applying updates to a doc", -> rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => result.should.equal 1 done() + + it "should store the doc ops in the correct order", (done) -> + rclient.lrange "DocOps:#{@doc_id}", 0, -1, (error, updates) => + updates = (JSON.parse(u) for u in updates) + for appliedUpdate, i in @updates + appliedUpdate.op.should.deep.equal updates[i].op + done() describe "when older ops come in after the delete", -> before (done) -> @@ -208,7 +215,14 @@ describe "Applying updates to a doc", -> MockWebApi.insertDoc @project_id, @doc_id, lines: @lines db.docOps.insert doc_id: ObjectId(@doc_id), version: 0, (error) => throw error if error? - DocUpdaterClient.sendUpdates @project_id, @doc_id, updates, (error) => + + # Send updates in chunks to causes multiple flushes + actions = [] + for i in [0..9] + do (i) => + actions.push (cb) => + DocUpdaterClient.sendUpdates @project_id, @doc_id, updates.slice(i*10, (i+1)*10), cb + async.series actions, (error) => throw error if error? setTimeout done, 2000 diff --git a/services/document-updater/test/unit/coffee/DocOpsManager/DocOpsManagerTests.coffee b/services/document-updater/test/unit/coffee/DocOpsManager/DocOpsManagerTests.coffee deleted file mode 100644 index a215b0ccd4..0000000000 --- a/services/document-updater/test/unit/coffee/DocOpsManager/DocOpsManagerTests.coffee +++ /dev/null @@ -1,52 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/DocOpsManager.js" -SandboxedModule = require('sandboxed-module') -{ObjectId} = require "mongojs" - -describe "DocOpsManager", -> - beforeEach -> - @doc_id = ObjectId().toString() - @project_id = ObjectId().toString() - @callback = sinon.stub() - @DocOpsManager = SandboxedModule.require modulePath, requires: - "./RedisManager": @RedisManager = {} - "./TrackChangesManager": @TrackChangesManager = {} - - describe "getPreviousDocOps", -> - beforeEach -> - @ops = [ "mock-op-1", "mock-op-2" ] - @start = 30 - @end = 32 - @RedisManager.getPreviousDocOps = sinon.stub().callsArgWith(3, null, @ops) - @DocOpsManager.getPreviousDocOps @project_id, @doc_id, @start, @end, @callback - - it "should get the previous doc ops", -> - @RedisManager.getPreviousDocOps - .calledWith(@doc_id, @start, @end) - .should.equal true - - it "should call the callback with the ops", -> - @callback.calledWith(null, @ops).should.equal true - - describe "pushDocOp", -> - beforeEach -> - @op = "mock-op" - @RedisManager.pushDocOp = sinon.stub().callsArgWith(2, null, @version = 42) - @TrackChangesManager.pushUncompressedHistoryOp = sinon.stub().callsArg(3) - @DocOpsManager.pushDocOp @project_id, @doc_id, @op, @callback - - it "should push the op in to the docOps list", -> - @RedisManager.pushDocOp - .calledWith(@doc_id, @op) - .should.equal true - - it "should push the op into the pushUncompressedHistoryOp", -> - @TrackChangesManager.pushUncompressedHistoryOp - .calledWith(@project_id, @doc_id, @op) - .should.equal true - - it "should call the callback with the version", -> - @callback.calledWith(null, @version).should.equal true - diff --git a/services/document-updater/test/unit/coffee/DocumentManager/getDocAndRecentOpsTests.coffee b/services/document-updater/test/unit/coffee/DocumentManager/getDocAndRecentOpsTests.coffee index 8c54b2b854..c77af9a77c 100644 --- a/services/document-updater/test/unit/coffee/DocumentManager/getDocAndRecentOpsTests.coffee +++ b/services/document-updater/test/unit/coffee/DocumentManager/getDocAndRecentOpsTests.coffee @@ -4,12 +4,11 @@ should = chai.should() modulePath = "../../../../app/js/DocumentManager.js" SandboxedModule = require('sandboxed-module') -describe "DocumentUpdater.getDocAndRecentOps", -> +describe "DocumentManager.getDocAndRecentOps", -> beforeEach -> @DocumentManager = SandboxedModule.require modulePath, requires: "./RedisManager": @RedisManager = {} "./PersistenceManager": @PersistenceManager = {} - "./DocOpsManager": @DocOpsManager = {} "logger-sharelatex": @logger = {log: sinon.stub()} "./Metrics": @Metrics = Timer: class Timer @@ -27,7 +26,7 @@ describe "DocumentUpdater.getDocAndRecentOps", -> describe "with a previous version specified", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version) - @DocOpsManager.getPreviousDocOps = sinon.stub().callsArgWith(4, null, @ops) + @RedisManager.getPreviousDocOps = sinon.stub().callsArgWith(3, null, @ops) @DocumentManager.getDocAndRecentOps @project_id, @doc_id, @fromVersion, @callback it "should get the doc", -> @@ -36,8 +35,8 @@ describe "DocumentUpdater.getDocAndRecentOps", -> .should.equal true it "should get the doc ops", -> - @DocOpsManager.getPreviousDocOps - .calledWith(@project_id, @doc_id, @fromVersion, @version) + @RedisManager.getPreviousDocOps + .calledWith(@doc_id, @fromVersion, @version) .should.equal true it "should call the callback with the doc info", -> @@ -49,7 +48,7 @@ describe "DocumentUpdater.getDocAndRecentOps", -> describe "with no previous version specified", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version) - @DocOpsManager.getPreviousDocOps = sinon.stub().callsArgWith(4, null, @ops) + @RedisManager.getPreviousDocOps = sinon.stub().callsArgWith(3, null, @ops) @DocumentManager.getDocAndRecentOps @project_id, @doc_id, -1, @callback it "should get the doc", -> @@ -58,7 +57,7 @@ describe "DocumentUpdater.getDocAndRecentOps", -> .should.equal true it "should not need to get the doc ops", -> - @DocOpsManager.getPreviousDocOps.called.should.equal false + @RedisManager.getPreviousDocOps.called.should.equal false it "should call the callback with the doc info", -> @callback.calledWith(null, @lines, @version, []).should.equal true diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 105b391b33..0e122e63fb 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -140,40 +140,70 @@ describe "RedisManager", -> it "should log out the problem", -> @logger.warn.called.should.equal true - describe "pushDocOp", -> + describe "updateDocument", -> beforeEach -> + @rclient.set = sinon.stub() @rclient.rpush = sinon.stub() @rclient.expire = sinon.stub() - @rclient.incr = sinon.stub() @rclient.ltrim = sinon.stub() - @op = { op: [{ i: "foo", p: 4 }] } + @RedisManager.getDocVersion = sinon.stub() + + @lines = ["one", "two", "three"] + @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 8 }] }] @version = 42 - _ = null - @rclient.exec = sinon.stub().callsArgWith(0, null, [_, _, _, @version]) - @RedisManager.pushDocOp @doc_id, @op, @callback - it "should push the doc op into the doc ops list", -> - @rclient.rpush - .calledWith("DocOps:#{@doc_id}", JSON.stringify(@op)) - .should.equal true + @rclient.exec = sinon.stub().callsArg(0) - it "should renew the expiry ttl on the doc ops array", -> - @rclient.expire - .calledWith("DocOps:#{@doc_id}", @RedisManager.DOC_OPS_TTL) - .should.equal true + describe "with a consistent version", -> + beforeEach -> + @RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length) + @RedisManager.updateDocument @doc_id, @lines, @version, @ops, @callback + + it "should get the current doc version to check for consistency", -> + @RedisManager.getDocVersion + .calledWith(@doc_id) + .should.equal true + + it "should set the doclines", -> + @rclient.set + .calledWith("doclines:#{@doc_id}", JSON.stringify(@lines)) + .should.equal true + + it "should set the version", -> + @rclient.set + .calledWith("DocVersion:#{@doc_id}", @version) + .should.equal true - it "should truncate the list to 100 members", -> - @rclient.ltrim - .calledWith("DocOps:#{@doc_id}", -@RedisManager.DOC_OPS_MAX_LENGTH, -1) - .should.equal true + it "should push the doc op into the doc ops list", -> + @rclient.rpush + .calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) + .should.equal true - it "should increment the version number", -> - @rclient.incr - .calledWith("DocVersion:#{@doc_id}") - .should.equal true + it "should renew the expiry ttl on the doc ops array", -> + @rclient.expire + .calledWith("DocOps:#{@doc_id}", @RedisManager.DOC_OPS_TTL) + .should.equal true - it "should call the callback with the version number", -> - @callback.calledWith(null, parseInt(@version, 10)).should.equal true + it "should truncate the list to 100 members", -> + @rclient.ltrim + .calledWith("DocOps:#{@doc_id}", -@RedisManager.DOC_OPS_MAX_LENGTH, -1) + .should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + describe "with an inconsistent version", -> + beforeEach -> + @RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length - 1) + @RedisManager.updateDocument @doc_id, @lines, @version, @ops, @callback + + it "should not call multi.exec", -> + @rclient.exec.called.should.equal false + + it "should call the callback with an error", -> + @callback + .calledWith(new Error("Version mismatch. '#{@doc_id}' is corrupted.")) + .should.equal true describe "putDocInMemory", -> beforeEach (done) -> diff --git a/services/document-updater/test/unit/coffee/ShareJsDB/GetOpsTests.coffee b/services/document-updater/test/unit/coffee/ShareJsDB/GetOpsTests.coffee index 31830e5afc..5621f39a85 100644 --- a/services/document-updater/test/unit/coffee/ShareJsDB/GetOpsTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsDB/GetOpsTests.coffee @@ -14,14 +14,14 @@ describe "ShareJsDB.getOps", -> @redis_ops = (JSON.stringify(op) for op in @ops) @ShareJsDB = SandboxedModule.require modulePath, requires: "./RedisManager": @RedisManager = {} - "./DocOpsManager": @DocOpsManager = {} "./DocumentManager":{} "logger-sharelatex": {} + @db = new @ShareJsDB() describe "with start == end", -> beforeEach -> @start = @end = 42 - @ShareJsDB.getOps @doc_key, @start, @end, @callback + @db.getOps @doc_key, @start, @end, @callback it "should return an empty array", -> @callback.calledWith(null, []).should.equal true @@ -30,12 +30,12 @@ describe "ShareJsDB.getOps", -> beforeEach -> @start = 35 @end = 42 - @DocOpsManager.getPreviousDocOps = sinon.stub().callsArgWith(4, null, @ops) - @ShareJsDB.getOps @doc_key, @start, @end, @callback + @RedisManager.getPreviousDocOps = sinon.stub().callsArgWith(3, null, @ops) + @db.getOps @doc_key, @start, @end, @callback it "should get the range from redis", -> - @DocOpsManager.getPreviousDocOps - .calledWith(@project_id, @doc_id, @start, @end-1) + @RedisManager.getPreviousDocOps + .calledWith(@doc_id, @start, @end-1) .should.equal true it "should return the ops", -> @@ -45,11 +45,11 @@ describe "ShareJsDB.getOps", -> beforeEach -> @start = 35 @end = null - @DocOpsManager.getPreviousDocOps = sinon.stub().callsArgWith(4, null, @ops) - @ShareJsDB.getOps @doc_key, @start, @end, @callback + @RedisManager.getPreviousDocOps = sinon.stub().callsArgWith(3, null, @ops) + @db.getOps @doc_key, @start, @end, @callback it "should get until the end of the list", -> - @DocOpsManager.getPreviousDocOps - .calledWith(@project_id, @doc_id, @start, -1) + @RedisManager.getPreviousDocOps + .calledWith(@doc_id, @start, -1) .should.equal true diff --git a/services/document-updater/test/unit/coffee/ShareJsDB/GetSnapshotTests.coffee b/services/document-updater/test/unit/coffee/ShareJsDB/GetSnapshotTests.coffee index 1cd1e62c4e..f2527b01a2 100644 --- a/services/document-updater/test/unit/coffee/ShareJsDB/GetSnapshotTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsDB/GetSnapshotTests.coffee @@ -17,6 +17,7 @@ describe "ShareJsDB.getSnapshot", -> "./RedisManager": {} "./DocOpsManager": {} "logger-sharelatex": {} + @db = new @ShareJsDB() @version = 42 @@ -27,7 +28,7 @@ describe "ShareJsDB.getSnapshot", -> describe "successfully", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version) - @ShareJsDB.getSnapshot @doc_key, @callback + @db.getSnapshot @doc_key, @callback it "should get the doc", -> @DocumentManager.getDoc @@ -46,7 +47,7 @@ describe "ShareJsDB.getSnapshot", -> describe "when the doclines do not exist", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, null, null) - @ShareJsDB.getSnapshot @doc_key, @callback + @db.getSnapshot @doc_key, @callback it "should return the callback with a NotFoundError", -> @callback.calledWith(new Errors.NotFoundError("not found")).should.equal true @@ -54,7 +55,7 @@ describe "ShareJsDB.getSnapshot", -> describe "when getDoc returns an error", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, @error = new Error("oops"), null, null) - @ShareJsDB.getSnapshot @doc_key, @callback + @db.getSnapshot @doc_key, @callback it "should return the callback with an error", -> @callback.calledWith(@error).should.equal true @@ -66,7 +67,7 @@ describe "ShareJsDB.getSnapshot", -> describe "successfully", -> beforeEach -> @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version) - @ShareJsDB.getSnapshot @doc_key, @callback + @db.getSnapshot @doc_key, @callback it "should get the doc", -> @DocumentManager.getDoc diff --git a/services/document-updater/test/unit/coffee/ShareJsDB/WriteOpsTests.coffee b/services/document-updater/test/unit/coffee/ShareJsDB/WriteOpsTests.coffee index 30e92bad3c..838f63034e 100644 --- a/services/document-updater/test/unit/coffee/ShareJsDB/WriteOpsTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsDB/WriteOpsTests.coffee @@ -1,5 +1,6 @@ sinon = require('sinon') chai = require('chai') +expect = chai.expect should = chai.should() modulePath = "../../../../app/js/ShareJsDB.js" SandboxedModule = require('sandboxed-module') @@ -18,34 +19,21 @@ describe "ShareJsDB.writeOps", -> "./DocOpsManager": @DocOpsManager = {} "./DocumentManager": {} "logger-sharelatex": @logger = {error: sinon.stub()} + @db = new @ShareJsDB() describe "writing an op", -> beforeEach -> @version = 42 @opData.v = @version - @DocOpsManager.pushDocOp = sinon.stub().callsArgWith(3, null, @version+1) - @ShareJsDB.writeOp @doc_key, @opData, @callback + @db.writeOp @doc_key, @opData, @callback - it "should write the op to redis", -> - @DocOpsManager.pushDocOp - .calledWith(@project_id, @doc_id, @opData) - .should.equal true + it "should write into appliedOps", -> + expect(@db.appliedOps[@doc_key]).to.deep.equal [@opData] it "should call the callback without an error", -> @callback.called.should.equal true (@callback.args[0][0]?).should.equal false - describe "writing an op at the wrong version", -> - beforeEach -> - @version = 42 - @mismatch = 5 - @opData.v = @version - @DocOpsManager.pushDocOp = sinon.stub().callsArgWith(3, null, @version + @mismatch) - @ShareJsDB.writeOp @doc_key, @opData, @callback - - it "should call the callback with an error", -> - @callback.calledWith(new Error()).should.equal true - diff --git a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee index 6d21ca3889..8d967ec2ee 100644 --- a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee @@ -26,6 +26,8 @@ describe "ShareJsUpdateManager", -> @model = applyOp: sinon.stub().callsArg(2) getSnapshot: sinon.stub() + db: + appliedOps: {} @ShareJsUpdateManager.getNewShareJsModel = sinon.stub().returns(@model) @ShareJsUpdateManager._listenForOps = sinon.stub() @ShareJsUpdateManager.removeDocFromCache = sinon.stub().callsArg(1) @@ -38,8 +40,9 @@ describe "ShareJsUpdateManager", -> describe "successfully", -> beforeEach (done) -> @model.getSnapshot.callsArgWith(1, null, {snapshot: @updatedDocLines.join("\n"), v: @version}) - @ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version) => - @callback(err, docLines, version) + @model.db.appliedOps["#{@project_id}:#{@doc_id}"] = @appliedOps = ["mock-ops"] + @ShareJsUpdateManager.applyUpdates @project_id, @doc_id, @updates, (err, docLines, version, appliedOps) => + @callback(err, docLines, version, appliedOps) done() it "should create a new ShareJs model", -> @@ -61,8 +64,8 @@ describe "ShareJsUpdateManager", -> .calledWith("#{@project_id}:#{@doc_id}") .should.equal true - it "should return the updated doc lines", -> - @callback.calledWith(null, @updatedDocLines, @version).should.equal true + it "should return the updated doc lines, version and ops", -> + @callback.calledWith(null, @updatedDocLines, @version, @appliedOps).should.equal true describe "when applyOp fails", -> beforeEach (done) -> diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index 574795f3bb..143f01d1ee 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -40,19 +40,19 @@ describe "TrackChangesManager", -> it "should return the callback with an error", -> @callback.calledWith(new Error("track changes api return non-success code: 500")).should.equal true - describe "pushUncompressedHistoryOp", -> + describe "pushUncompressedHistoryOps", -> beforeEach -> - @op = "mock-op" + @ops = ["mock-ops"] @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2) describe "pushing the op", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + @WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) + @TrackChangesManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback - it "should push the op into redis", -> - @WebRedisManager.pushUncompressedHistoryOp - .calledWith(@project_id, @doc_id, @op) + it "should push the ops into redis", -> + @WebRedisManager.pushUncompressedHistoryOps + .calledWith(@project_id, @doc_id, @ops) .should.equal true it "should call the callback", -> @@ -61,11 +61,23 @@ describe "TrackChangesManager", -> it "should not try to flush the op", -> @TrackChangesManager.flushDocChanges.called.should.equal false - describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> + describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOp = + @WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + @TrackChangesManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback + + it "should tell the track changes api to flush", -> + @TrackChangesManager.flushDocChanges + .calledWith(@project_id, @doc_id) + .should.equal true + + describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", -> + beforeEach -> + @ops = ["op1", "op2", "op3"] + @WebRedisManager.pushUncompressedHistoryOps = + sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS + 1) + @TrackChangesManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback it "should tell the track changes api to flush", -> @TrackChangesManager.flushDocChanges @@ -74,10 +86,10 @@ describe "TrackChangesManager", -> describe "when TrackChangesManager errors", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOp = + @WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + @TrackChangesManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback it "should log out the error", -> @logger.error diff --git a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee index 19094794bb..e5c4cf9118 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee @@ -14,10 +14,12 @@ describe "UpdateManager", -> "./RedisManager" : @RedisManager = {} "./WebRedisManager" : @WebRedisManager = {} "./ShareJsUpdateManager" : @ShareJsUpdateManager = {} + "./TrackChangesManager" : @TrackChangesManager = {} "logger-sharelatex": @logger = { log: sinon.stub() } "./Metrics": @Metrics = Timer: class Timer done: sinon.stub() + "settings-sharelatex": Settings = {} describe "processOutstandingUpdates", -> beforeEach -> @@ -152,8 +154,10 @@ describe "UpdateManager", -> @updates = [{op: [{p: 42, i: "foo"}]}] @updatedDocLines = ["updated", "lines"] @version = 34 - @ShareJsUpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version) - @RedisManager.setDocument = sinon.stub().callsArg(3) + @appliedOps = ["mock-applied-ops"] + @ShareJsUpdateManager.applyUpdates = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version, @appliedOps) + @RedisManager.updateDocument = sinon.stub().callsArg(4) + @TrackChangesManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) describe "normally", -> beforeEach -> @@ -165,8 +169,13 @@ describe "UpdateManager", -> .should.equal true it "should save the document", -> - @RedisManager.setDocument - .calledWith(@doc_id, @updatedDocLines, @version) + @RedisManager.updateDocument + .calledWith(@doc_id, @updatedDocLines, @version, @appliedOps) + .should.equal true + + it "should push the applied ops into the track changes queue", -> + @TrackChangesManager.pushUncompressedHistoryOps + .calledWith(@project_id, @doc_id, @appliedOps) .should.equal true it "should call the callback", -> diff --git a/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee index fa9ca76356..a4b455d219 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/lockUpdatesAndDoTests.coffee @@ -11,7 +11,12 @@ describe 'UpdateManager - lockUpdatesAndDo', -> "./RedisManager" : @RedisManager = {} "./WebRedisManager" : @WebRedisManager = {} "./ShareJsUpdateManager" : @ShareJsUpdateManager = {} + "./TrackChangesManager" : @TrackChangesManager = {} "logger-sharelatex": @logger = { log: sinon.stub() } + "./Metrics": @Metrics = + Timer: class Timer + done: sinon.stub() + "settings-sharelatex": Settings = {} @project_id = "project-id-123" @doc_id = "doc-id-123" @method = sinon.stub().callsArgWith(3, null, @response_arg1) diff --git a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee index 932cb92e26..cd0ce7e9fe 100644 --- a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee @@ -70,18 +70,18 @@ describe "WebRedisManager", -> it "should return the length", -> @callback.calledWith(null, @length).should.equal true - describe "pushUncompressedHistoryOp", -> + describe "pushUncompressedHistoryOps", -> beforeEach (done) -> - @op = { op: [{ i: "foo", p: 4 }] } + @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] @rclient.rpush = sinon.stub().yields(null, @length = 42) @rclient.sadd = sinon.stub().yields() - @WebRedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, (args...) => + @WebRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) => @callback(args...) done() - it "should push the doc op into the doc ops list", -> + it "should push the doc op into the doc ops list as JSON", -> @rclient.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op)) + .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) .should.equal true it "should add the doc_id to the set of which records the project docs", ->