From c0be3ef37b76a6a358defadf31c573ef9c805aa5 Mon Sep 17 00:00:00 2001 From: James Allen Date: Fri, 21 Mar 2014 12:41:05 +0000 Subject: [PATCH] Put doc_ids with history changes into project level set --- .../app/coffee/RedisKeyBuilder.coffee | 3 +- .../app/coffee/RedisManager.coffee | 16 ++-- .../app/coffee/TrackChangesManager.coffee | 27 ++---- .../coffee/ApplyingUpdatesToADocTests.coffee | 48 ++++------ .../clearDocFromPendingUpdatesSetTests.coffee | 1 + .../getDocsWithPendingUpdatesTests.coffee | 1 + .../getHistoryLoadManagerThreshold.coffee | 43 --------- .../RedisManager/prependDocOpsTests.coffee | 3 +- .../coffee/RedisManager/pushDocOpTests.coffee | 3 +- .../pushUncompressedHistoryOpTests.coffee | 13 ++- .../TrackChangesManagerTests.coffee | 95 ++++++++----------- 11 files changed, 94 insertions(+), 159 deletions(-) delete mode 100644 services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee index de2bc85443..0cfd330721 100644 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ b/services/document-updater/app/coffee/RedisKeyBuilder.coffee @@ -8,6 +8,7 @@ DOCLINES = "doclines" DOCOPS = "DocOps" DOCVERSION = "DocVersion" DOCIDSWITHPENDINGUPDATES = "DocsWithPendingUpdates" +DOCSWITHHISTORYOPS = "DocsWithHistoryOps" UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps" module.exports = @@ -25,7 +26,7 @@ module.exports = docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}" splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":") - historyLoadManagerThreshold: "HistoryLoadManagerThreshold" + docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id now : (key)-> d = new Date() d.getDate()+":"+(d.getMonth()+1)+":"+d.getFullYear()+":"+key diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 3d8efdbc70..c47e679339 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -155,9 +155,15 @@ module.exports = jsonOps = ops.map (op) -> JSON.stringify op rclient.lpush keys.docOps(doc_id: doc_id), jsonOps.reverse(), callback - pushUncompressedHistoryOp: (doc_id, op, callback = (error) ->) -> + pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error, length) ->) -> jsonOp = JSON.stringify op - rclient.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp, callback + multi = rclient.multi() + multi.rpush keys.uncompressedHistoryOp(doc_id: doc_id), jsonOp + multi.sadd keys.docsWithHistoryOps(project_id: project_id), doc_id + multi.exec (error, results) -> + return callback(error) if error? + [length, _] = results + callback(error, length) getDocOpsLength: (doc_id, callback = (error, length) ->) -> rclient.llen keys.docOps(doc_id: doc_id), callback @@ -165,12 +171,6 @@ module.exports = getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback - getHistoryLoadManagerThreshold: (callback = (error, threshold) ->) -> - rclient.get keys.historyLoadManagerThreshold, (error, value) -> - return callback(error) if error? - return callback null, 0 if !value? - callback null, parseInt(value, 10) - getDocumentsProjectId = (doc_id, callback)-> rclient.get keys.projectKey({doc_id:doc_id}), (err, project_id)-> diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index 65b9762fe5..2d887a4f9c 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -23,22 +23,15 @@ module.exports = TrackChangesManager = FLUSH_EVERY_N_OPS: 50 pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) -> - RedisManager.getHistoryLoadManagerThreshold (error, threshold) -> + logger.log project_id: project_id, doc_id: doc_id, "pushing history op" + RedisManager.pushUncompressedHistoryOp project_id, doc_id, op, (error, length) -> return callback(error) if error? - if TrackChangesManager.getLoadManagerBucket(doc_id) < threshold - RedisManager.pushUncompressedHistoryOp doc_id, op, (error, length) -> - return callback(error) if error? - if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 - # Do this in the background since it uses HTTP and so may be too - # slow to wait for when processing a doc update. - logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" - TrackChangesManager.flushDocChanges project_id, doc_id, (error) -> - if error? - logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" - callback() - else - callback() + if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 + # Do this in the background since it uses HTTP and so may be too + # slow to wait for when processing a doc update. + logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" + TrackChangesManager.flushDocChanges project_id, doc_id, (error) -> + if error? + logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" + callback() - getLoadManagerBucket: (doc_id) -> - hash = crypto.createHash("md5").update(doc_id).digest("hex") - return parseInt(hash.slice(0,4), 16) % 100 diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index 7d5ac144e7..6b4010b305 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -12,7 +12,7 @@ MockWebApi = require "./helpers/MockWebApi" DocUpdaterClient = require "./helpers/DocUpdaterClient" describe "Applying updates to a doc", -> - before (done) -> + before -> @lines = ["one", "two", "three"] @update = doc: @doc_id @@ -22,8 +22,6 @@ describe "Applying updates to a doc", -> }] v: 0 @result = ["one", "one and a half", "two", "three"] - rclient.set "HistoryLoadManagerThreshold", 100, (error) => - done() describe "when the document is not loaded", -> before (done) -> @@ -51,8 +49,13 @@ describe "Applying updates to a doc", -> it "should push the applied updates to the track changes api", (done) -> rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + throw error if error? JSON.parse(updates[0]).op.should.deep.equal @update.op - done() + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + throw error if error? + result.should.equal 1 + done() + describe "when the document is loaded", -> before (done) -> @@ -81,7 +84,9 @@ describe "Applying updates to a doc", -> it "should push the applied updates to the track changes api", (done) -> rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => JSON.parse(updates[0]).op.should.deep.equal @update.op - done() + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + result.should.equal 1 + done() describe "when the document has been deleted", -> describe "when the ops come in a single linear order", -> @@ -131,7 +136,10 @@ describe "Applying updates to a doc", -> updates = (JSON.parse(u) for u in updates) for appliedUpdate, i in @updates appliedUpdate.op.should.deep.equal updates[i].op - done() + + rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + result.should.equal 1 + done() describe "when older ops come in after the delete", -> before -> @@ -235,7 +243,7 @@ describe "Applying updates to a doc", -> done() describe "with enough updates to flush to the track changes api", -> - beforeEach -> + beforeEach (done) -> [@project_id, @doc_id] = [DocUpdaterClient.randomId(), DocUpdaterClient.randomId()] MockWebApi.insertDoc @project_id, @doc_id, { lines: @lines @@ -249,27 +257,13 @@ describe "Applying updates to a doc", -> sinon.spy MockTrackChangesApi, "flushDoc" + DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => + throw error if error? + setTimeout done, 200 + afterEach -> MockTrackChangesApi.flushDoc.restore() - describe "when under the load manager threshold", -> - beforeEach (done) -> - rclient.set "HistoryLoadManagerThreshold", 100, (error) => - throw error if error? - DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => - throw error if error? - setTimeout done, 200 + it "should flush the doc twice", -> + MockTrackChangesApi.flushDoc.calledTwice.should.equal true - it "should flush the doc twice", -> - MockTrackChangesApi.flushDoc.calledTwice.should.equal true - - describe "when over the load manager threshold", -> - beforeEach (done) -> - rclient.set "HistoryLoadManagerThreshold", 0, (error) => - throw error if error? - DocUpdaterClient.sendUpdates @project_id, @doc_id, @updates, (error) => - throw error if error? - setTimeout done, 200 - - it "should not flush the doc", -> - MockTrackChangesApi.flushDoc.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee index 676d454167..016d96a2ae 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee @@ -12,6 +12,7 @@ describe "RedisManager.clearDocFromPendingUpdatesSet", -> @RedisManager = SandboxedModule.require modulePath, requires: "redis" : createClient: () => @rclient = auth:-> + "logger-sharelatex": {} @rclient.srem = sinon.stub().callsArg(2) @RedisManager.clearDocFromPendingUpdatesSet(@project_id, @doc_id, @callback) diff --git a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee index 602197ad57..d179b45f9d 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee @@ -10,6 +10,7 @@ describe "RedisManager.getDocsWithPendingUpdates", -> @RedisManager = SandboxedModule.require modulePath, requires: "redis" : createClient: () => @rclient = auth:-> + "logger-sharelatex": {} @docs = [{ doc_id: "doc-id-1" diff --git a/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee b/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee deleted file mode 100644 index d69cec370c..0000000000 --- a/services/document-updater/test/unit/coffee/RedisManager/getHistoryLoadManagerThreshold.coffee +++ /dev/null @@ -1,43 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisManager.js" -SandboxedModule = require('sandboxed-module') - -describe "RedisManager.getHistoryLoadManagerThreshold", -> - beforeEach -> - @RedisManager = SandboxedModule.require modulePath, requires: - "redis": createClient: () => - @rclient = - auth: () -> - "logger-sharelatex": @logger = {log: sinon.stub()} - @callback = sinon.stub() - - describe "with no value", -> - beforeEach -> - @rclient.get = sinon.stub().callsArgWith(1, null, null) - @RedisManager.getHistoryLoadManagerThreshold @callback - - it "should get the value", -> - @rclient.get - .calledWith("HistoryLoadManagerThreshold") - .should.equal true - - it "should call the callback with 0", -> - @callback.calledWith(null, 0).should.equal true - - describe "with a value", -> - beforeEach -> - @rclient.get = sinon.stub().callsArgWith(1, null, "42") - @RedisManager.getHistoryLoadManagerThreshold @callback - - it "should get the value", -> - @rclient.get - .calledWith("HistoryLoadManagerThreshold") - .should.equal true - - it "should call the callback with the numeric value", -> - @callback.calledWith(null, 42).should.equal true - - - diff --git a/services/document-updater/test/unit/coffee/RedisManager/prependDocOpsTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/prependDocOpsTests.coffee index b4a8192d12..dee8b2f435 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/prependDocOpsTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/prependDocOpsTests.coffee @@ -4,13 +4,14 @@ should = chai.should() modulePath = "../../../../app/js/RedisManager" SandboxedModule = require('sandboxed-module') -describe "RedisManager.clearDocFromPendingUpdatesSet", -> +describe "RedisManager.prependDocOps", -> beforeEach -> @doc_id = "document-id" @callback = sinon.stub() @RedisManager = SandboxedModule.require modulePath, requires: "redis" : createClient: () => @rclient = auth:-> + "logger-sharelatex": {} @rclient.lpush = sinon.stub().callsArg(2) @ops = [ diff --git a/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee index 0c76730437..d911af16bd 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/pushDocOpTests.coffee @@ -4,7 +4,7 @@ should = chai.should() modulePath = "../../../../app/js/RedisManager" SandboxedModule = require('sandboxed-module') -describe "RedisManager.getPreviousDocOpsTests", -> +describe "RedisManager.pushDocOp", -> beforeEach -> @callback = sinon.stub() @RedisManager = SandboxedModule.require modulePath, requires: @@ -12,6 +12,7 @@ describe "RedisManager.getPreviousDocOpsTests", -> @rclient = auth: -> multi: => @rclient + "logger-sharelatex": {} @doc_id = "doc-id-123" beforeEach -> diff --git a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee index 415f8e4572..d6e19f163e 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/pushUncompressedHistoryOpTests.coffee @@ -13,20 +13,27 @@ describe "RedisManager.pushUncompressedHistoryOp", -> multi: () => @rclient "logger-sharelatex": @logger = {log: sinon.stub()} @doc_id = "doc-id-123" + @project_id = "project-id-123" @callback = sinon.stub() - @rclient.rpush = sinon.stub() describe "successfully", -> beforeEach -> @op = { op: [{ i: "foo", p: 4 }] } - @rclient.rpush = sinon.stub().callsArgWith(2, null, @length = 42) - @RedisManager.pushUncompressedHistoryOp @doc_id, @op, @callback + @rclient.rpush = sinon.stub() + @rclient.sadd = sinon.stub() + @rclient.exec = sinon.stub().callsArgWith(0, null, [@length = 42, "1"]) + @RedisManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should push the doc op into the doc ops list", -> @rclient.rpush .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@op)) .should.equal true + it "should add the doc_id to the set of which records the project docs", -> + @rclient.sadd + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + .should.equal true + it "should call the callback with the length", -> @callback.calledWith(null, @length).should.equal true diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index 148eeb33bb..8fad5322e2 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -45,69 +45,48 @@ describe "TrackChangesManager", -> @op = "mock-op" @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2) - describe "when the doc is under the load manager threshold", -> + describe "pushing the op", -> beforeEach -> - @RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40) - @TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(30) - - describe "pushing the op", -> - beforeEach -> - @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback - - it "should push the op into redis", -> - @RedisManager.pushUncompressedHistoryOp - .calledWith(@doc_id, @op) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true - - it "should not try to flush the op", -> - @TrackChangesManager.flushDocChanges.called.should.equal false - - describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> - beforeEach -> - @RedisManager.pushUncompressedHistoryOp = - sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback - - it "should tell the track changes api to flush", -> - @TrackChangesManager.flushDocChanges - .calledWith(@project_id, @doc_id) - .should.equal true - - describe "when TrackChangesManager errors", -> - beforeEach -> - @RedisManager.pushUncompressedHistoryOp = - sinon.stub().callsArgWith(2, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) - @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) - @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback - - it "should log out the error", -> - @logger.error - .calledWith( - err: @error - doc_id: @doc_id - project_id: @project_id - "error flushing doc to track changes api" - ) - .should.equal true - - describe "when the doc is over the load manager threshold", -> - beforeEach -> - @RedisManager.getHistoryLoadManagerThreshold = sinon.stub().callsArgWith(0, null, 40) - @TrackChangesManager.getLoadManagerBucket = sinon.stub().returns(50) - @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(2, null, 1) + @RedisManager.pushUncompressedHistoryOp = sinon.stub().callsArgWith(3, null, 1) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback - it "should not push the op", -> - @RedisManager.pushUncompressedHistoryOp.called.should.equal false - - it "should not try to flush the op", -> - @TrackChangesManager.flushDocChanges.called.should.equal false + it "should push the op into redis", -> + @RedisManager.pushUncompressedHistoryOp + .calledWith(@project_id, @doc_id, @op) + .should.equal true it "should call the callback", -> @callback.called.should.equal true + it "should not try to flush the op", -> + @TrackChangesManager.flushDocChanges.called.should.equal false + + describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> + beforeEach -> + @RedisManager.pushUncompressedHistoryOp = + sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + + it "should tell the track changes api to flush", -> + @TrackChangesManager.flushDocChanges + .calledWith(@project_id, @doc_id) + .should.equal true + + describe "when TrackChangesManager errors", -> + beforeEach -> + @RedisManager.pushUncompressedHistoryOp = + sinon.stub().callsArgWith(3, null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) + @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback + + it "should log out the error", -> + @logger.error + .calledWith( + err: @error + doc_id: @doc_id + project_id: @project_id + "error flushing doc to track changes api" + ) + .should.equal true +