mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-28 19:41:33 +02:00
Remove old methods of triggering doc updates
Remove the old pub/sub listener which is no longer used. Also remove the DocsWithPendingUpdates set, which used to track docs waiting to be updated. This was necessary incase messages were missed on the pub/sub channel, so we knew which docs still had pending updates. However, now we use the BLPOP queue, these updates just sit in the queue until a consumer comes back to continue consuming them.
This commit is contained in:
@@ -4,7 +4,6 @@ Settings = require('settings-sharelatex')
|
||||
logger = require('logger-sharelatex')
|
||||
logger.initialize("documentupdater")
|
||||
RedisManager = require('./app/js/RedisManager')
|
||||
UpdateManager = require('./app/js/UpdateManager')
|
||||
DispatchManager = require('./app/js/DispatchManager')
|
||||
Keys = require('./app/js/RedisKeyBuilder')
|
||||
Errors = require "./app/js/Errors"
|
||||
@@ -26,19 +25,8 @@ app.configure ->
|
||||
app.use express.bodyParser()
|
||||
app.use app.router
|
||||
|
||||
rclient.subscribe("pending-updates")
|
||||
rclient.on "message", (channel, doc_key) ->
|
||||
[project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key)
|
||||
if !Settings.shuttingDown
|
||||
UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) ->
|
||||
logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error?
|
||||
else
|
||||
logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update"
|
||||
|
||||
DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10)
|
||||
|
||||
UpdateManager.resumeProcessing()
|
||||
|
||||
app.get '/project/:project_id/doc/:doc_id', HttpController.getDoc
|
||||
app.post '/project/:project_id/doc/:doc_id', HttpController.setDoc
|
||||
app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded
|
||||
|
||||
@@ -6,7 +6,6 @@ PENDINGUPDATESKEY = "PendingUpdates"
|
||||
DOCLINES = "doclines"
|
||||
DOCOPS = "DocOps"
|
||||
DOCVERSION = "DocVersion"
|
||||
DOCIDSWITHPENDINGUPDATES = "DocsWithPendingUpdates"
|
||||
DOCSWITHHISTORYOPS = "DocsWithHistoryOps"
|
||||
UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps"
|
||||
|
||||
@@ -20,7 +19,6 @@ module.exports =
|
||||
changeQue : (op)-> CHANGEQUE+":"+op.project_id
|
||||
docsInProject : (op)-> DOCSINPROJECT+":"+op.project_id
|
||||
pendingUpdates : (op)-> PENDINGUPDATESKEY+":"+op.doc_id
|
||||
docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES
|
||||
combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}"
|
||||
splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":")
|
||||
docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id
|
||||
|
||||
@@ -88,21 +88,6 @@ module.exports = RedisManager =
|
||||
getUpdatesLength: (doc_id, callback)->
|
||||
rclient.llen keys.pendingUpdates(doc_id:doc_id), callback
|
||||
|
||||
getDocsWithPendingUpdates: (callback = (error, docs) ->) ->
|
||||
rclient.smembers keys.docsWithPendingUpdates, (error, doc_keys) ->
|
||||
return callback(error) if error?
|
||||
docs = doc_keys.map (doc_key) ->
|
||||
[project_id, doc_id] = keys.splitProjectIdAndDocId(doc_key)
|
||||
return {
|
||||
doc_id: doc_id
|
||||
project_id: project_id
|
||||
}
|
||||
callback null, docs
|
||||
|
||||
clearDocFromPendingUpdatesSet: (project_id, doc_id, callback = (error) ->) ->
|
||||
doc_key = keys.combineProjectIdAndDocId(project_id, doc_id)
|
||||
rclient.srem keys.docsWithPendingUpdates, doc_key, callback
|
||||
|
||||
getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) ->
|
||||
rclient.llen keys.docOps(doc_id: doc_id), (error, length) ->
|
||||
return callback(error) if error?
|
||||
|
||||
@@ -7,26 +7,12 @@ logger = require('logger-sharelatex')
|
||||
Metrics = require "./Metrics"
|
||||
|
||||
module.exports = UpdateManager =
|
||||
resumeProcessing: (callback = (error) ->) ->
|
||||
RedisManager.getDocsWithPendingUpdates (error, docs) =>
|
||||
return callback(error) if error?
|
||||
jobs = for doc in (docs or [])
|
||||
do (doc) =>
|
||||
(callback) => @processOutstandingUpdatesWithLock doc.project_id, doc.doc_id, callback
|
||||
|
||||
async.parallelLimit jobs, 5, callback
|
||||
|
||||
processOutstandingUpdates: (project_id, doc_id, _callback = (error) ->) ->
|
||||
processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) ->
|
||||
timer = new Metrics.Timer("updateManager.processOutstandingUpdates")
|
||||
callback = (args...) ->
|
||||
UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) ->
|
||||
timer.done()
|
||||
_callback(args...)
|
||||
|
||||
UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) =>
|
||||
return callback(error) if error?
|
||||
RedisManager.clearDocFromPendingUpdatesSet project_id, doc_id, (error) =>
|
||||
return callback(error) if error?
|
||||
callback()
|
||||
callback()
|
||||
|
||||
processOutstandingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) ->
|
||||
LockManager.tryLock doc_id, (error, gotLock, lockValue) =>
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/RedisManager"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
|
||||
describe "RedisManager.clearDocFromPendingUpdatesSet", ->
|
||||
beforeEach ->
|
||||
@project_id = "project-id"
|
||||
@doc_id = "document-id"
|
||||
@callback = sinon.stub()
|
||||
@RedisManager = SandboxedModule.require modulePath, requires:
|
||||
"redis-sharelatex" : createClient: () =>
|
||||
@rclient ?= auth:-> # only assign one rclient
|
||||
"logger-sharelatex": {}
|
||||
"./ZipManager": {}
|
||||
|
||||
@rclient.srem = sinon.stub().callsArg(2)
|
||||
@RedisManager.clearDocFromPendingUpdatesSet(@project_id, @doc_id, @callback)
|
||||
|
||||
it "should get the docs with pending updates", ->
|
||||
@rclient.srem
|
||||
.calledWith("DocsWithPendingUpdates", "#{@project_id}:#{@doc_id}")
|
||||
.should.equal true
|
||||
|
||||
it "should return the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
sinon = require('sinon')
|
||||
chai = require('chai')
|
||||
should = chai.should()
|
||||
modulePath = "../../../../app/js/RedisManager"
|
||||
SandboxedModule = require('sandboxed-module')
|
||||
|
||||
describe "RedisManager.getDocsWithPendingUpdates", ->
|
||||
beforeEach ->
|
||||
@callback = sinon.stub()
|
||||
@RedisManager = SandboxedModule.require modulePath, requires:
|
||||
"./ZipManager": {}
|
||||
"redis-sharelatex" : createClient: () =>
|
||||
@rclient ?= auth:->
|
||||
"logger-sharelatex": {}
|
||||
|
||||
@docs = [{
|
||||
doc_id: "doc-id-1"
|
||||
project_id: "project-id-1"
|
||||
}, {
|
||||
doc_id: "doc-id-2"
|
||||
project_id: "project-id-2"
|
||||
}]
|
||||
@doc_keys = @docs.map (doc) -> "#{doc.project_id}:#{doc.doc_id}"
|
||||
|
||||
@rclient.smembers = sinon.stub().callsArgWith(1, null, @doc_keys)
|
||||
@RedisManager.getDocsWithPendingUpdates(@callback)
|
||||
|
||||
it "should get the docs with pending updates", ->
|
||||
@rclient.smembers
|
||||
.calledWith("DocsWithPendingUpdates")
|
||||
.should.equal true
|
||||
|
||||
it "should return the docs with pending updates", ->
|
||||
@callback.calledWith(null, @docs).should.equal true
|
||||
|
||||
@@ -18,46 +18,14 @@ describe "UpdateManager", ->
|
||||
Timer: class Timer
|
||||
done: sinon.stub()
|
||||
|
||||
describe "resumeProcessing", ->
|
||||
beforeEach (done) ->
|
||||
@docs = [{
|
||||
doc_id: "doc-1"
|
||||
project_id: "project-1"
|
||||
}, {
|
||||
doc_id: "doc-2"
|
||||
project_id: "project-2"
|
||||
}, {
|
||||
doc_id: "doc-3"
|
||||
project_id: "project-3"
|
||||
}]
|
||||
@RedisManager.getDocsWithPendingUpdates = sinon.stub().callsArgWith(0, null, @docs)
|
||||
@UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2)
|
||||
@UpdateManager.resumeProcessing(done)
|
||||
|
||||
it "should the docs that haven't been processed yet", ->
|
||||
@RedisManager.getDocsWithPendingUpdates
|
||||
.called.should.equal true
|
||||
|
||||
it "should call processOutstandingUpdatesWithLock for each doc", ->
|
||||
for doc in @docs
|
||||
@UpdateManager.processOutstandingUpdatesWithLock
|
||||
.calledWith(doc.project_id, doc.doc_id)
|
||||
.should.equal true
|
||||
|
||||
describe "processOutstandingUpdates", ->
|
||||
beforeEach ->
|
||||
@UpdateManager.fetchAndApplyUpdates = sinon.stub().callsArg(2)
|
||||
@RedisManager.clearDocFromPendingUpdatesSet = sinon.stub().callsArg(2)
|
||||
@UpdateManager.processOutstandingUpdates @project_id, @doc_id, @callback
|
||||
|
||||
it "should apply the updates", ->
|
||||
@UpdateManager.fetchAndApplyUpdates.calledWith(@project_id, @doc_id).should.equal true
|
||||
|
||||
it "should clear the doc from the process pending set", ->
|
||||
@RedisManager.clearDocFromPendingUpdatesSet
|
||||
.calledWith(@project_id, @doc_id)
|
||||
.should.equal true
|
||||
|
||||
it "should call the callback", ->
|
||||
@callback.called.should.equal true
|
||||
|
||||
|
||||
Reference in New Issue
Block a user