From 8ebc069ddbcea694724524085b0068637870d10e Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 8 Oct 2015 16:10:48 +0100 Subject: [PATCH] modify last compressed op in place --- .../app/coffee/MongoManager.coffee | 24 ++++++--- .../app/coffee/UpdatesManager.coffee | 41 ++++++++++----- .../MongoManager/MongoManagerTests.coffee | 36 ++----------- .../UpdatesManager/UpdatesManagerTests.coffee | 50 +++++++++++-------- 4 files changed, 78 insertions(+), 73 deletions(-) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 9b5c63c85f..2605ca17e6 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -16,10 +16,7 @@ module.exports = MongoManager = return callback error, null return callback null, compressedUpdates[0] or null - deleteCompressedUpdate: (id, callback = (error) ->) -> - db.docHistory.remove({ _id: ObjectId(id.toString()) }, callback) - - popLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) -> + peekLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) -> # under normal use we pass back the last update as # callback(null,update). # @@ -37,9 +34,7 @@ module.exports = MongoManager = # the update is marked as broken so we will force a new op return callback null, null else - MongoManager.deleteCompressedUpdate update._id, (error) -> - return callback(error) if error? - callback null, update + return callback null, update else callback null, null @@ -60,6 +55,21 @@ module.exports = MongoManager = callback(err,results) + modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) -> + return callback() if not newUpdate? + db.docHistory.findAndModify + query: lastUpdate, + update: + $set : + op: newUpdate.op + meta: newUpdate.meta + v: newUpdate.v + new: true + , (err, result, lastErrorObject) -> + return callback(error) if error? + return new Error("could not modify existing op") if not result? + callback(err, result) + insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) -> update = { doc_id: ObjectId(doc_id.toString()) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 4703ff6a1d..61a0c7ce9d 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -14,10 +14,10 @@ module.exports = UpdatesManager = if length == 0 return callback() - MongoManager.popLastCompressedUpdate doc_id, (error, lastCompressedUpdate, lastVersion) -> + MongoManager.peekLastCompressedUpdate doc_id, (error, lastCompressedUpdate, lastVersion) -> # lastCompressedUpdate is the most recent update in Mongo. # - # The popLastCompressedUpdate method may pass it back as 'null' + # The peekLastCompressedUpdate method may pass it back as 'null' # to force the start of a new compressed update, even when there # was a previous compressed update in Mongo. In this case it # passes back the lastVersion from the update to check @@ -37,21 +37,36 @@ module.exports = UpdatesManager = if rawUpdates[0]? and rawUpdates[0].v != lastVersion + 1 error = new Error("Tried to apply raw op at version #{rawUpdates[0].v} to last compressed update with version #{lastVersion}") logger.error err: error, doc_id: doc_id, project_id: project_id, "inconsistent doc versions" - # Push the update back into Mongo - if it was present - catching errors at this - # point is useless, we're already bailing - if lastCompressedUpdate? - MongoManager.insertCompressedUpdates project_id, doc_id, [lastCompressedUpdate], temporary, () -> - return callback error - else - callback error - return + return callback error compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates - MongoManager.insertCompressedUpdates project_id, doc_id, compressedUpdates, temporary,(error) -> + if not lastCompressedUpdate? + # no existing update, insert everything + updateToModify = null + updatesToInsert = compressedUpdates + else + # there are existing updates, see what happens when we + # compress them together with the new ones + [firstUpdate, additionalUpdates...] = compressedUpdates + + if firstUpdate.v == lastCompressedUpdate.v + # first update version hasn't changed, skip it and insert remaining updates + # this is an optimisation, we could update the existing op with itself + updateToModify = null + updatesToInsert = additionalUpdates + else + # first update version did changed, modify it and insert remaining updates + updateToModify = firstUpdate + updatesToInsert = additionalUpdates + + MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" - callback() + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? + MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> + return callback(error) if error? + logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" + callback() REDIS_READ_BATCH_SIZE: 100 processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> diff --git a/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee b/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee index 0556e32413..656447bfa5 100644 --- a/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee @@ -53,36 +53,18 @@ describe "MongoManager", -> it "should call the call back with the update", -> @callback.calledWith(null, @update).should.equal true - describe "deleteCompressedUpdate", -> - beforeEach -> - @update_id = ObjectId().toString() - @db.docHistory = - remove: sinon.stub().callsArg(1) - @MongoManager.deleteCompressedUpdate(@update_id, @callback) - it "should remove the update", -> - @db.docHistory.remove - .calledWith(_id: ObjectId(@update_id)) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true - - describe "popLastCompressedUpdate", -> + describe "peekLastCompressedUpdate", -> describe "when there is no last update", -> beforeEach -> @MongoManager.getLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) - @MongoManager.deleteCompressedUpdate = sinon.stub() - @MongoManager.popLastCompressedUpdate @doc_id, @callback + @MongoManager.peekLastCompressedUpdate @doc_id, @callback it "should get the last update", -> @MongoManager.getLastCompressedUpdate .calledWith(@doc_id) .should.equal true - it "should not try to delete the last update", -> - @MongoManager.deleteCompressedUpdate.called.should.equal false - it "should call the callback with no update", -> @callback.calledWith(null, null).should.equal true @@ -90,19 +72,13 @@ describe "MongoManager", -> beforeEach -> @update = { _id: Object() } @MongoManager.getLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @update) - @MongoManager.deleteCompressedUpdate = sinon.stub().callsArgWith(1, null) - @MongoManager.popLastCompressedUpdate @doc_id, @callback + @MongoManager.peekLastCompressedUpdate @doc_id, @callback it "should get the last update", -> @MongoManager.getLastCompressedUpdate .calledWith(@doc_id) .should.equal true - it "should delete the last update", -> - @MongoManager.deleteCompressedUpdate - .calledWith(@update._id) - .should.equal true - it "should call the callback with the update", -> @callback.calledWith(null, @update).should.equal true @@ -110,17 +86,13 @@ describe "MongoManager", -> beforeEach -> @update = { _id: Object(), v: 12345, inS3: true } @MongoManager.getLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @update) - @MongoManager.deleteCompressedUpdate = sinon.stub() - @MongoManager.popLastCompressedUpdate @doc_id, @callback + @MongoManager.peekLastCompressedUpdate @doc_id, @callback it "should get the last update", -> @MongoManager.getLastCompressedUpdate .calledWith(@doc_id) .should.equal true - it "should not try to delete the last update", -> - @MongoManager.deleteCompressedUpdate.called.should.equal false - it "should call the callback with a null update and the correct version", -> @callback.calledWith(null, null, @update.v).should.equal true diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 04a2111893..b4c44f3b94 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -24,12 +24,12 @@ describe "UpdatesManager", -> describe "compressAndSaveRawUpdates", -> describe "when there are no raw ops", -> beforeEach -> - @MongoManager.popLastCompressedUpdate = sinon.stub() + @MongoManager.peekLastCompressedUpdate = sinon.stub() @MongoManager.insertCompressedUpdates = sinon.stub() @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, [], @temporary, @callback it "should not need to access the database", -> - @MongoManager.popLastCompressedUpdate.called.should.equal false + @MongoManager.peekLastCompressedUpdate.called.should.equal false @MongoManager.insertCompressedUpdates.called.should.equal false it "should call the callback", -> @@ -38,15 +38,16 @@ describe "UpdatesManager", -> describe "when there is no compressed history to begin with", -> beforeEach -> @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }] - @compressedUpdates = { v: 13, op: "compressed-op-12" } + @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] - @MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) + @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) + @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback - it "should try to pop the last compressed op", -> - @MongoManager.popLastCompressedUpdate + it "should look at the last compressed op", -> + @MongoManager.peekLastCompressedUpdate .calledWith(@doc_id) .should.equal true @@ -66,9 +67,10 @@ describe "UpdatesManager", -> describe "when the raw ops need appending to existing history", -> beforeEach -> @lastCompressedUpdate = { v: 11, op: "compressed-op-11" } - @compressedUpdates = { v: 13, op: "compressed-op-12" } + @compressedUpdates = [ { v: 12, op: "compressed-op-11+12" }, { v: 13, op: "compressed-op-12" } ] - @MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate) + @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate) + @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @@ -77,8 +79,8 @@ describe "UpdatesManager", -> @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }] @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback - it "should try to pop the last compressed op", -> - @MongoManager.popLastCompressedUpdate + it "should look at the last compressed op", -> + @MongoManager.peekLastCompressedUpdate .calledWith(@doc_id) .should.equal true @@ -86,10 +88,15 @@ describe "UpdatesManager", -> @UpdateCompressor.compressRawUpdates .calledWith(@lastCompressedUpdate, @rawUpdates) .should.equal true + + it "should update the existing op", -> + @MongoManager.modifyCompressedUpdate + .calledWith(@lastCompressedUpdate, @compressedUpdates[0]) + .should.equal true - it "should save the compressed ops", -> + it "should save the new compressed ops", -> @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates, @temporary) + .calledWith(@project_id, @doc_id, @compressedUpdates[1..], @temporary) .should.equal true it "should call the callback", -> @@ -116,19 +123,20 @@ describe "UpdatesManager", -> .calledWith(new Error("Tried to apply raw op at version 13 to last compressed update with version 11")) .should.equal true - it "should put the popped update back into mongo", -> - @MongoManager.insertCompressedUpdates.calledOnce.should.equal true - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, [@lastCompressedUpdate], @temporary) - .should.equal true + it "should not modify any update in mongo", -> + @MongoManager.modifyCompressedUpdate.called.should.equal false + + it "should not insert any update into mongo", -> + @MongoManager.insertCompressedUpdates.called.should.equal false describe "when the raw ops need appending to existing history which is in S3", -> beforeEach -> @lastCompressedUpdate = null @lastVersion = 11 - @compressedUpdates = { v: 13, op: "compressed-op-12" } + @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] - @MongoManager.popLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion) + @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion) + @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @@ -137,8 +145,8 @@ describe "UpdatesManager", -> @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }] @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback - it "should try to pop the last compressed op", -> - @MongoManager.popLastCompressedUpdate + it "should try to look at the last compressed op", -> + @MongoManager.peekLastCompressedUpdate .calledWith(@doc_id) .should.equal true