From 2bbbf3c005d828b954b16757b40861c9c74b8623 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 6 Oct 2017 12:23:23 +0100 Subject: [PATCH] add unflushed time to doc in redis --- .../app/coffee/DocumentManager.coffee | 29 +++++++-- .../app/coffee/ProjectManager.coffee | 35 +++------- .../app/coffee/RedisManager.coffee | 15 ++++- .../config/settings.defaults.coffee | 1 + .../DocumentManagerTests.coffee | 65 ++++++++++++++++++- .../ProjectManager/getProjectDocsTests.coffee | 24 +++---- .../RedisManager/RedisManagerTests.coffee | 36 ++++++++-- 7 files changed, 151 insertions(+), 54 deletions(-) diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index 054baca47e..50e08741cd 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -8,14 +8,16 @@ RealTimeRedisManager = require "./RealTimeRedisManager" Errors = require "./Errors" RangesManager = require "./RangesManager" +MAX_UNFLUSHED_AGE = 300 * 1000 # 5 mins, document should be flushed to mongo this time after a change + module.exports = DocumentManager = - getDoc: (project_id, doc_id, _callback = (error, lines, version, alreadyLoaded) ->) -> + getDoc: (project_id, doc_id, _callback = (error, lines, version, ranges, alreadyLoaded, unflushedTime) ->) -> timer = new Metrics.Timer("docManager.getDoc") callback = (args...) -> timer.done() _callback(args...) - RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges) -> + RedisManager.getDoc project_id, doc_id, (error, lines, version, ranges, unflushedTime) -> return callback(error) if error? if !lines? or !version? logger.log {project_id, doc_id}, "doc not in redis so getting from persistence API" @@ -24,9 +26,9 @@ module.exports = DocumentManager = logger.log {project_id, doc_id, lines, version}, "got doc from persistence API" RedisManager.putDocInMemory project_id, doc_id, lines, version, ranges, (error) -> return callback(error) if error? - callback null, lines, version, ranges, false + callback null, lines, version, ranges, false, null else - callback null, lines, version, ranges, true + callback null, lines, version, ranges, true, unflushedTime getDocAndRecentOps: (project_id, doc_id, fromVersion, _callback = (error, lines, version, recentOps, ranges) ->) -> timer = new Metrics.Timer("docManager.getDocAndRecentOps") @@ -103,7 +105,7 @@ module.exports = DocumentManager = logger.log project_id: project_id, doc_id: doc_id, version: version, "flushing doc" PersistenceManager.setDoc project_id, doc_id, lines, version, ranges, (error) -> return callback(error) if error? - callback null + RedisManager.clearUnflushedTime doc_id, callback flushAndDeleteDoc: (project_id, doc_id, _callback = (error) ->) -> timer = new Metrics.Timer("docManager.flushAndDeleteDoc") @@ -156,6 +158,17 @@ module.exports = DocumentManager = return callback(error) if error? callback() + getDocAndFlushIfOld: (project_id, doc_id, callback = (error, doc) ->) -> + DocumentManager.getDoc project_id, doc_id, (error, lines, version, ranges, alreadyLoaded, unflushedTime) -> + return callback(error) if error? + # if doc was already loaded see if it needs to be flushed + if alreadyLoaded and unflushedTime? and (Date.now() - unflushedTime) > MAX_UNFLUSHED_AGE + DocumentManager.flushDocIfLoaded project_id, doc_id, (error) -> + return callback(error) if error? + callback(null, lines, version) + else + callback(null, lines, version) + getDocWithLock: (project_id, doc_id, callback = (error, lines, version) ->) -> UpdateManager = require "./UpdateManager" UpdateManager.lockUpdatesAndDo DocumentManager.getDoc, project_id, doc_id, callback @@ -163,7 +176,11 @@ module.exports = DocumentManager = getDocAndRecentOpsWithLock: (project_id, doc_id, fromVersion, callback = (error, lines, version) ->) -> UpdateManager = require "./UpdateManager" UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndRecentOps, project_id, doc_id, fromVersion, callback - + + getDocAndFlushIfOldWithLock: (project_id, doc_id, callback = (error, doc) ->) -> + UpdateManager = require "./UpdateManager" + UpdateManager.lockUpdatesAndDo DocumentManager.getDocAndFlushIfOld, project_id, doc_id, callback + setDocWithLock: (project_id, doc_id, lines, source, user_id, undoing, callback = (error) ->) -> UpdateManager = require "./UpdateManager" UpdateManager.lockUpdatesAndDo DocumentManager.setDoc, project_id, doc_id, lines, source, user_id, undoing, callback diff --git a/services/document-updater/app/coffee/ProjectManager.coffee b/services/document-updater/app/coffee/ProjectManager.coffee index 7c290840c8..4a48351a1b 100644 --- a/services/document-updater/app/coffee/ProjectManager.coffee +++ b/services/document-updater/app/coffee/ProjectManager.coffee @@ -69,40 +69,25 @@ module.exports = ProjectManager = logger.error err: error, project_id: project_id, "error getting/setting project state in getProjectDocs" return callback(error) # we can't return docs if project structure has changed - return callback Errors.ProjectStateChangedError("project state changed") if projectStateChanged + if projectStateChanged + return callback Errors.ProjectStateChangedError("project state changed") # project structure hasn't changed, return doc content from redis RedisManager.getDocIdsInProject project_id, (error, doc_ids) -> if error? logger.error err: error, project_id: project_id, "error getting doc ids in getProjectDocs" return callback(error) jobs = [] - docs = [] for doc_id in doc_ids or [] do (doc_id) -> jobs.push (cb) -> - # check the doc version first - RedisManager.getDocVersion doc_id, (error, version) -> - if error? - logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc version in getProjectDocs" - return cb(error) - # skip getting the doc if we already have that version - if version? and version is excludeVersions[doc_id] - # not currently using excludeVersions so we shouldn't get here! - # change to logger.log when this code path is in use - logger.error err: error, project_id: project_id, doc_id: doc_id, version: version, "skipping doc version in getProjectDocs" - return cb() - # otherwise get the doc lines from redis - RedisManager.getDocLines doc_id, (error, lines) -> - if error? - logger.error err: error, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocs" - return cb(error) - try - docs.push {_id: doc_id, lines: JSON.parse(lines), v: version} - catch e - logger.error err: e, project_id: project_id, doc_id: doc_id, lines: lines, version: version, "error parsing doc lines in getProjectDocs" - return cb(e) - cb() - async.series jobs, (error) -> + # get the doc lines from redis + DocumentManager.getDocAndFlushIfOldWithLock project_id, doc_id, (err, lines, version) -> + if err? + logger.error err:err, project_id: project_id, doc_id: doc_id, "error getting project doc lines in getProjectDocs" + return cb(err) + doc = {_id:doc_id, lines:lines, v:version} # create a doc object to return + cb(null, doc) + async.series jobs, (error, docs) -> return callback(error) if error? callback(null, docs) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index b233cdf7e7..b873f9bd6a 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -86,6 +86,7 @@ module.exports = RedisManager = multi.del keys.docVersion(doc_id:doc_id) multi.del keys.docHash(doc_id:doc_id) multi.del keys.ranges(doc_id:doc_id) + multi.del keys.unflushedTime(doc_id:doc_id) multi.exec (error) -> return callback(error) if error? multi = rclient.multi() @@ -105,7 +106,7 @@ module.exports = RedisManager = clearProjectState: (project_id, callback = (error) ->) -> rclient.del keys.projectState(project_id:project_id), callback - getDoc : (project_id, doc_id, callback = (error, lines, version, ranges) ->)-> + getDoc : (project_id, doc_id, callback = (error, lines, version, ranges, unflushedTime) ->)-> timer = new metrics.Timer("redis.get-doc") multi = rclient.multi() multi.get keys.docLines(doc_id:doc_id) @@ -113,7 +114,8 @@ module.exports = RedisManager = multi.get keys.docHash(doc_id:doc_id) multi.get keys.projectKey(doc_id:doc_id) multi.get keys.ranges(doc_id:doc_id) - multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges])-> + multi.get keys.unflushedTime(doc_id:doc_id) + multi.exec (error, [docLines, version, storedHash, doc_project_id, ranges, unflushedTime])-> timeSpan = timer.done() return callback(error) if error? # check if request took too long and bail out. only do this for @@ -149,7 +151,7 @@ module.exports = RedisManager = return callback(error) if error? if result isnt 0 # doc should already be in set logger.error project_id: project_id, doc_id: doc_id, doc_project_id: doc_project_id, "doc missing from docsInProject set" - callback null, docLines, version, ranges + callback null, docLines, version, ranges, unflushedTime getDocVersion: (doc_id, callback = (error, version) ->) -> rclient.get keys.docVersion(doc_id: doc_id), (error, version) -> @@ -247,6 +249,10 @@ module.exports = RedisManager = # expire must come after rpush since before it will be a no-op if the list is empty multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL # index 6 multi.rpush historyKeys.uncompressedHistoryOps(doc_id: doc_id), jsonOps... # index 7 + # Set the unflushed timestamp to the current time if the doc + # hasn't been modified before (the content in mongo has been + # valid up to this point). Otherwise leave it alone ("NX" flag). + multi.set keys.unflushedTime(doc_id: doc_id), Date.now(), "NX" multi.exec (error, result) -> return callback(error) if error? # check the hash computed on the redis server @@ -257,6 +263,9 @@ module.exports = RedisManager = uncompressedHistoryOpsLength = result?[7] return callback(null, uncompressedHistoryOpsLength) + clearUnflushedTime: (doc_id, callback = (error) ->) -> + rclient.del keys.unflushedTime(doc_id:doc_id), callback + getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index 838ffa19a8..9b800f2729 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -42,6 +42,7 @@ module.exports = docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" projectState: ({project_id}) -> "ProjectState:#{project_id}" + unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}" # cluster: [{ # port: "7000" # host: "localhost" diff --git a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee index b2eaa321d6..5bff4fda63 100644 --- a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee @@ -4,6 +4,7 @@ should = chai.should() modulePath = "../../../../app/js/DocumentManager.js" SandboxedModule = require('sandboxed-module') Errors = require "../../../../app/js/Errors" +tk = require "timekeeper" describe "DocumentManager", -> beforeEach -> @@ -60,6 +61,7 @@ describe "DocumentManager", -> describe "when the doc is in Redis", -> beforeEach -> @RedisManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges) + @RedisManager.clearUnflushedTime = sinon.stub().callsArgWith(1, null) @PersistenceManager.setDoc = sinon.stub().yields() @DocumentManager.flushDocIfLoaded @project_id, @doc_id, @callback @@ -373,4 +375,65 @@ describe "DocumentManager", -> it "should call the callback with a not found error", -> error = new Errors.NotFoundError("document not found: #{@doc_id}") - @callback.calledWith(error).should.equal true \ No newline at end of file + @callback.calledWith(error).should.equal true + + describe "getDocAndFlushIfOld", -> + beforeEach -> + tk.freeze(new Date()) + @DocumentManager.flushDocIfLoaded = sinon.stub().callsArg(2) + + afterEach -> + tk.reset() + + describe "when the doc is in Redis", -> + describe "and has changes to be flushed", -> + beforeEach -> + @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, true, Date.now() - 1e9) + @DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback + + it "should get the doc", -> + @DocumentManager.getDoc + .calledWith(@project_id, @doc_id) + .should.equal true + + it "should flush the doc", -> + @DocumentManager.flushDocIfLoaded + .calledWith(@project_id, @doc_id) + .should.equal true + + it "should call the callback with the lines and versions", -> + @callback.calledWith(null, @lines, @version).should.equal true + + describe "and has only changes that don't need to be flushed", -> + beforeEach -> + @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, true, Date.now() - 100) + @DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback + + it "should get the doc", -> + @DocumentManager.getDoc + .calledWith(@project_id, @doc_id) + .should.equal true + + it "should not flush the doc", -> + @DocumentManager.flushDocIfLoaded + .called.should.equal false + + it "should call the callback with the lines and versions", -> + @callback.calledWith(null, @lines, @version).should.equal true + + describe "when the doc is not in Redis", -> + beforeEach -> + @DocumentManager.getDoc = sinon.stub().callsArgWith(2, null, @lines, @version, @ranges, false) + @DocumentManager.getDocAndFlushIfOld @project_id, @doc_id, @callback + + it "should get the doc", -> + @DocumentManager.getDoc + .calledWith(@project_id, @doc_id) + .should.equal true + + it "should not flush the doc", -> + @DocumentManager.flushDocIfLoaded + .called.should.equal false + + it "should call the callback with the lines and versions", -> + @callback.calledWith(null, @lines, @version).should.equal true diff --git a/services/document-updater/test/unit/coffee/ProjectManager/getProjectDocsTests.coffee b/services/document-updater/test/unit/coffee/ProjectManager/getProjectDocsTests.coffee index d5b12c63d8..99c249d6cb 100644 --- a/services/document-updater/test/unit/coffee/ProjectManager/getProjectDocsTests.coffee +++ b/services/document-updater/test/unit/coffee/ProjectManager/getProjectDocsTests.coffee @@ -29,14 +29,13 @@ describe "ProjectManager - getProjectDocs", -> ] @RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null) @RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids) - @RedisManager.getDocVersion = sinon.stub() - @RedisManager.getDocVersion.withArgs(@doc_ids[0]).callsArgWith(1, null, @doc_versions[0]) - @RedisManager.getDocVersion.withArgs(@doc_ids[1]).callsArgWith(1, null, @doc_versions[1]) - @RedisManager.getDocVersion.withArgs(@doc_ids[2]).callsArgWith(1, null, @doc_versions[2]) - @RedisManager.getDocLines = sinon.stub() - @RedisManager.getDocLines.withArgs(@doc_ids[0]).callsArgWith(1, null, JSON.stringify(@doc_lines[0])) - @RedisManager.getDocLines.withArgs(@doc_ids[1]).callsArgWith(1, null, JSON.stringify(@doc_lines[1])) - @RedisManager.getDocLines.withArgs(@doc_ids[2]).callsArgWith(1, null, JSON.stringify(@doc_lines[2])) + @DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub() + @DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[0]) + .callsArgWith(2, null, @doc_lines[0], @doc_versions[0]) + @DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[1]) + .callsArgWith(2, null, @doc_lines[1], @doc_versions[1]) + @DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, @doc_ids[2]) + .callsArgWith(2, null, @doc_lines[2], @doc_versions[2]) @ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) => @callback(error, docs) done() @@ -81,10 +80,11 @@ describe "ProjectManager - getProjectDocs", -> @doc_ids = ["doc-id-1", "doc-id-2", "doc-id-3"] @RedisManager.checkOrSetProjectState = sinon.stub().callsArgWith(2, null) @RedisManager.getDocIdsInProject = sinon.stub().callsArgWith(1, null, @doc_ids) - @RedisManager.getDocVersion = sinon.stub().callsArgWith(1, null) - @RedisManager.getDocLines = sinon.stub() - @RedisManager.getDocLines.withArgs("doc-id-1").callsArgWith(1, null, JSON.stringify(["test doc content"])) - @RedisManager.getDocLines.withArgs("doc-id-2").callsArgWith(1, @error = new Error("oops")) # trigger an error + @DocumentManager.getDocAndFlushIfOldWithLock = sinon.stub() + @DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-1") + .callsArgWith(2, null, ["test doc content"], @doc_versions[1]) + @DocumentManager.getDocAndFlushIfOldWithLock.withArgs(@project_id, "doc-id-2") + .callsArgWith(2, @error = new Error("oops")) # trigger an error @ProjectManager.getProjectDocs @project_id, @projectStateHash, @excludeVersions, (error, docs) => @callback(error) done() diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 0a5149c552..f0b37ae986 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -5,6 +5,7 @@ modulePath = "../../../../app/js/RedisManager.js" SandboxedModule = require('sandboxed-module') Errors = require "../../../../app/js/Errors" crypto = require "crypto" +tk = require "timekeeper" describe "RedisManager", -> beforeEach -> @@ -30,6 +31,7 @@ describe "RedisManager", -> docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" projectState: ({project_id}) -> "ProjectState:#{project_id}" + unflushedTime: ({doc_id}) -> "UnflushedTime:#{doc_id}" history: key_schema: uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" @@ -61,8 +63,9 @@ describe "RedisManager", -> @hash = crypto.createHash('sha1').update(@jsonlines,'utf8').digest('hex') @ranges = { comments: "mock", entries: "mock" } @json_ranges = JSON.stringify @ranges + @unflushed_time = 12345 @rclient.get = sinon.stub() - @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges]) + @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @hash, @project_id, @json_ranges, @unflushed_time]) @rclient.sadd = sinon.stub().yields(null, 0) describe "successfully", -> @@ -89,6 +92,11 @@ describe "RedisManager", -> .calledWith("Ranges:#{@doc_id}") .should.equal true + it "should get the unflushed time", -> + @rclient.get + .calledWith("UnflushedTime:#{@doc_id}") + .should.equal true + it "should check if the document is in the DocsIn set", -> @rclient.sadd .calledWith("DocsIn:#{@project_id}") @@ -96,7 +104,7 @@ describe "RedisManager", -> it 'should return the document', -> @callback - .calledWith(null, @lines, @version, @ranges) + .calledWithExactly(null, @lines, @version, @ranges, @unflushed_time) .should.equal true it 'should not log any errors', -> @@ -116,7 +124,7 @@ describe "RedisManager", -> it 'should return an empty result', -> @callback - .calledWith(null, null, 0, {}) + .calledWithExactly(null, null, 0, {}) .should.equal true it 'should not log any errors', -> @@ -134,7 +142,7 @@ describe "RedisManager", -> it 'should return the document', -> @callback - .calledWith(null, @lines, @version, @ranges) + .calledWithExactly(null, @lines, @version, @ranges, @unflushed_time) .should.equal true describe "with a corrupted document", -> @@ -155,11 +163,11 @@ describe "RedisManager", -> describe "with a slow request to redis", -> beforeEach -> - @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges]) + @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @badHash, @project_id, @json_ranges, @unflushed_time]) @clock = sinon.useFakeTimers(); @rclient.exec = (cb) => @clock.tick(6000); - cb(null, [@jsonlines, @version, @another_project_id, @json_ranges]) + cb(null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time]) @RedisManager.getDoc @project_id, @doc_id, @callback @@ -174,7 +182,7 @@ describe "RedisManager", -> describe "getDoc with an invalid project id", -> beforeEach -> @another_project_id = "project-id-456" - @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges]) + @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonlines, @version, @another_project_id, @json_ranges, @unflushed_time]) @RedisManager.getDoc @project_id, @doc_id, @callback it 'should return an error', -> @@ -317,6 +325,10 @@ describe "RedisManager", -> beforeEach -> @RedisManager.getDocVersion.withArgs(@doc_id).yields(null, @version - @ops.length) @RedisManager.updateDocument @doc_id, @lines, @version, @ops, @ranges, @callback + tk.freeze(new Date()) + + afterEach -> + tk.reset() it "should get the current doc version to check for consistency", -> @RedisManager.getDocVersion @@ -343,6 +355,11 @@ describe "RedisManager", -> .calledWith("Ranges:#{@doc_id}", JSON.stringify(@ranges)) .should.equal true + it "should set the unflushed time", -> + @rclient.set + .calledWith("UnflushedTime:#{@doc_id}", Date.now(), "NX") + .should.equal true + it "should push the doc op into the doc ops list", -> @rclient.rpush .calledWith("DocOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) @@ -570,6 +587,11 @@ describe "RedisManager", -> @rclient.del .calledWith("DocHash:#{@doc_id}") .should.equal true + + it "should delete the unflushed time", -> + @rclient.del + .calledWith("UnflushedTime:#{@doc_id}") + .should.equal true it "should delete the project_id for the doc", -> @rclient.del