From f707783aba69a8f4d09134883ea7d8858eb4dfb8 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 1 Jun 2016 11:28:23 +0100 Subject: [PATCH] Remove old methods of triggering doc updates Remove the old pub/sub listener which is no longer used. Also remove the DocsWithPendingUpdates set, which used to track docs waiting to be updated. This was necessary incase messages were missed on the pub/sub channel, so we knew which docs still had pending updates. However, now we use the BLPOP queue, these updates just sit in the queue until a consumer comes back to continue consuming them. --- services/document-updater/app.coffee | 12 ------- .../app/coffee/RedisKeyBuilder.coffee | 2 -- .../app/coffee/RedisManager.coffee | 15 -------- .../app/coffee/UpdateManager.coffee | 20 ++--------- .../clearDocFromPendingUpdatesSetTests.coffee | 29 --------------- .../getDocsWithPendingUpdatesTests.coffee | 35 ------------------- .../UpdateManager/ApplyingUpdates.coffee | 32 ----------------- 7 files changed, 3 insertions(+), 142 deletions(-) delete mode 100644 services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee delete mode 100644 services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index 41c7a01958..eafe03e402 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -4,7 +4,6 @@ Settings = require('settings-sharelatex') logger = require('logger-sharelatex') logger.initialize("documentupdater") RedisManager = require('./app/js/RedisManager') -UpdateManager = require('./app/js/UpdateManager') DispatchManager = require('./app/js/DispatchManager') Keys = require('./app/js/RedisKeyBuilder') Errors = require "./app/js/Errors" @@ -26,19 +25,8 @@ app.configure -> app.use express.bodyParser() app.use app.router -rclient.subscribe("pending-updates") -rclient.on "message", (channel, doc_key) -> - [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) - if !Settings.shuttingDown - UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) -> - logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error? - else - logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update" - DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10) -UpdateManager.resumeProcessing() - app.get '/project/:project_id/doc/:doc_id', HttpController.getDoc app.post '/project/:project_id/doc/:doc_id', HttpController.setDoc app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee index 391fd0ace3..e55ae99bde 100644 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ b/services/document-updater/app/coffee/RedisKeyBuilder.coffee @@ -6,7 +6,6 @@ PENDINGUPDATESKEY = "PendingUpdates" DOCLINES = "doclines" DOCOPS = "DocOps" DOCVERSION = "DocVersion" -DOCIDSWITHPENDINGUPDATES = "DocsWithPendingUpdates" DOCSWITHHISTORYOPS = "DocsWithHistoryOps" UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps" @@ -20,7 +19,6 @@ module.exports = changeQue : (op)-> CHANGEQUE+":"+op.project_id docsInProject : (op)-> DOCSINPROJECT+":"+op.project_id pendingUpdates : (op)-> PENDINGUPDATESKEY+":"+op.doc_id - docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}" splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":") docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index bcb5d28ae0..1215471ce1 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -88,21 +88,6 @@ module.exports = RedisManager = getUpdatesLength: (doc_id, callback)-> rclient.llen keys.pendingUpdates(doc_id:doc_id), callback - getDocsWithPendingUpdates: (callback = (error, docs) ->) -> - rclient.smembers keys.docsWithPendingUpdates, (error, doc_keys) -> - return callback(error) if error? - docs = doc_keys.map (doc_key) -> - [project_id, doc_id] = keys.splitProjectIdAndDocId(doc_key) - return { - doc_id: doc_id - project_id: project_id - } - callback null, docs - - clearDocFromPendingUpdatesSet: (project_id, doc_id, callback = (error) ->) -> - doc_key = keys.combineProjectIdAndDocId(project_id, doc_id) - rclient.srem keys.docsWithPendingUpdates, doc_key, callback - getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) -> rclient.llen keys.docOps(doc_id: doc_id), (error, length) -> return callback(error) if error? diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 2ab74feda1..97c33e8b6f 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -7,26 +7,12 @@ logger = require('logger-sharelatex') Metrics = require "./Metrics" module.exports = UpdateManager = - resumeProcessing: (callback = (error) ->) -> - RedisManager.getDocsWithPendingUpdates (error, docs) => - return callback(error) if error? - jobs = for doc in (docs or []) - do (doc) => - (callback) => @processOutstandingUpdatesWithLock doc.project_id, doc.doc_id, callback - - async.parallelLimit jobs, 5, callback - - processOutstandingUpdates: (project_id, doc_id, _callback = (error) ->) -> + processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) -> timer = new Metrics.Timer("updateManager.processOutstandingUpdates") - callback = (args...) -> + UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) -> timer.done() - _callback(args...) - - UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) => return callback(error) if error? - RedisManager.clearDocFromPendingUpdatesSet project_id, doc_id, (error) => - return callback(error) if error? - callback() + callback() processOutstandingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> LockManager.tryLock doc_id, (error, gotLock, lockValue) => diff --git a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee deleted file mode 100644 index c89842f7bc..0000000000 --- a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee +++ /dev/null @@ -1,29 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisManager" -SandboxedModule = require('sandboxed-module') - -describe "RedisManager.clearDocFromPendingUpdatesSet", -> - beforeEach -> - @project_id = "project-id" - @doc_id = "document-id" - @callback = sinon.stub() - @RedisManager = SandboxedModule.require modulePath, requires: - "redis-sharelatex" : createClient: () => - @rclient ?= auth:-> # only assign one rclient - "logger-sharelatex": {} - "./ZipManager": {} - - @rclient.srem = sinon.stub().callsArg(2) - @RedisManager.clearDocFromPendingUpdatesSet(@project_id, @doc_id, @callback) - - it "should get the docs with pending updates", -> - @rclient.srem - .calledWith("DocsWithPendingUpdates", "#{@project_id}:#{@doc_id}") - .should.equal true - - it "should return the callback", -> - @callback.called.should.equal true - - diff --git a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee deleted file mode 100644 index 45efa4c984..0000000000 --- a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee +++ /dev/null @@ -1,35 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisManager" -SandboxedModule = require('sandboxed-module') - -describe "RedisManager.getDocsWithPendingUpdates", -> - beforeEach -> - @callback = sinon.stub() - @RedisManager = SandboxedModule.require modulePath, requires: - "./ZipManager": {} - "redis-sharelatex" : createClient: () => - @rclient ?= auth:-> - "logger-sharelatex": {} - - @docs = [{ - doc_id: "doc-id-1" - project_id: "project-id-1" - }, { - doc_id: "doc-id-2" - project_id: "project-id-2" - }] - @doc_keys = @docs.map (doc) -> "#{doc.project_id}:#{doc.doc_id}" - - @rclient.smembers = sinon.stub().callsArgWith(1, null, @doc_keys) - @RedisManager.getDocsWithPendingUpdates(@callback) - - it "should get the docs with pending updates", -> - @rclient.smembers - .calledWith("DocsWithPendingUpdates") - .should.equal true - - it "should return the docs with pending updates", -> - @callback.calledWith(null, @docs).should.equal true - diff --git a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee index 7b41647f7b..249740973f 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee @@ -18,46 +18,14 @@ describe "UpdateManager", -> Timer: class Timer done: sinon.stub() - describe "resumeProcessing", -> - beforeEach (done) -> - @docs = [{ - doc_id: "doc-1" - project_id: "project-1" - }, { - doc_id: "doc-2" - project_id: "project-2" - }, { - doc_id: "doc-3" - project_id: "project-3" - }] - @RedisManager.getDocsWithPendingUpdates = sinon.stub().callsArgWith(0, null, @docs) - @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) - @UpdateManager.resumeProcessing(done) - - it "should the docs that haven't been processed yet", -> - @RedisManager.getDocsWithPendingUpdates - .called.should.equal true - - it "should call processOutstandingUpdatesWithLock for each doc", -> - for doc in @docs - @UpdateManager.processOutstandingUpdatesWithLock - .calledWith(doc.project_id, doc.doc_id) - .should.equal true - describe "processOutstandingUpdates", -> beforeEach -> @UpdateManager.fetchAndApplyUpdates = sinon.stub().callsArg(2) - @RedisManager.clearDocFromPendingUpdatesSet = sinon.stub().callsArg(2) @UpdateManager.processOutstandingUpdates @project_id, @doc_id, @callback it "should apply the updates", -> @UpdateManager.fetchAndApplyUpdates.calledWith(@project_id, @doc_id).should.equal true - it "should clear the doc from the process pending set", -> - @RedisManager.clearDocFromPendingUpdatesSet - .calledWith(@project_id, @doc_id) - .should.equal true - it "should call the callback", -> @callback.called.should.equal true