diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 98a9025d8d..ff3dea4cab 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -1,20 +1,17 @@ {db, ObjectId} = require "./mongojs" PackManager = require "./PackManager" async = require "async" +_ = require "underscore" module.exports = MongoManager = getLastCompressedUpdate: (doc_id, callback = (error, update) ->) -> db.docHistory - .find(doc_id: ObjectId(doc_id.toString())) + .find(doc_id: ObjectId(doc_id.toString()), {pack: {$slice:-1}}) # only return the last entry in a pack .sort( v: -1 ) .limit(1) .toArray (error, compressedUpdates) -> return callback(error) if error? - if compressedUpdates[0]?.pack? - # cannot pop from a pack, throw error - error = new Error("last compressed update is a pack") - return callback error, null - return callback null, compressedUpdates[0] or null + callback null, compressedUpdates[0] or null peekLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) -> # under normal use we pass back the last update as @@ -55,7 +52,6 @@ module.exports = MongoManager = else callback(err,results) - modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) -> return callback() if not newUpdate? db.docHistory.findAndModify diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 082c1d8a9a..4e918814ff 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -451,3 +451,74 @@ module.exports = PackManager = bulk.execute callback else callback() + + insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + return callback() if newUpdates.length == 0 + + updatesToFlush = [] + updatesRemaining = newUpdates.slice() + + n = lastUpdate?.n || 0 + sz = lastUpdate?.sz || 0 + + while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE + nextUpdate = updatesRemaining[0] + nextUpdateSize = BSON.calculateObjectSize(nextUpdate) + if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0 + break + n++ + sz += nextUpdateSize + updatesToFlush.push updatesRemaining.shift() + + PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) -> + return callback(error) if error? + PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback + + flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + return callback() if newUpdates.length == 0 + if lastUpdate? + PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback + else + PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback + + insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) -> + first = newUpdates[0] + last = newUpdates[newUpdates.length - 1] + n = newUpdates.length + sz = BSON.calculateObjectSize(newUpdates) + newPack = + project_id: ObjectId(project_id.toString()) + doc_id: ObjectId(doc_id.toString()) + pack: newUpdates + n: n + sz: sz + meta: + start_ts: first.meta.start_ts + end_ts: last.meta.end_ts + v: first.v + v_end: last.v + logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack" + db.docHistory.insert newPack, callback + + appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + first = newUpdates[0] + last = newUpdates[newUpdates.length - 1] + n = newUpdates.length + sz = BSON.calculateObjectSize(newUpdates) + query = + _id: lastUpdate._id + project_id: ObjectId(project_id.toString()) + doc_id: ObjectId(doc_id.toString()) + pack: {$exists: true} + update = + $push: + "pack": {$each: newUpdates} + $inc: + "n": n + "sz": sz + $set: + "meta.end_ts": last.meta.end_ts + "v_end": last.v + logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack" + db.docHistory.findAndModify {query, update}, callback + diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index f40a7c8503..97a630ec53 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -1,4 +1,5 @@ MongoManager = require "./MongoManager" +PackManager = require "./PackManager" RedisManager = require "./RedisManager" UpdateCompressor = require "./UpdateCompressor" LockManager = require "./LockManager" @@ -43,34 +44,50 @@ module.exports = UpdatesManager = else return callback error - compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates + if rawUpdates.length == 0 + return callback() - if not lastCompressedUpdate? - # no existing update, insert everything + if not lastCompressedUpdate? or lastCompressedUpdate.pack? # handle pack append as a special case + UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback + else #use the existing op code + UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback + + _updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> + compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates + PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) -> + return callback(error) if error? + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? + callback() + + _updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> + compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates + + if not lastCompressedUpdate? + # no existing update, insert everything + updateToModify = null + updatesToInsert = compressedUpdates + else + # there are existing updates, see what happens when we + # compress them together with the new ones + [firstUpdate, additionalUpdates...] = compressedUpdates + + if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate) + # first update version hasn't changed, skip it and insert remaining updates + # this is an optimisation, we could update the existing op with itself updateToModify = null - updatesToInsert = compressedUpdates + updatesToInsert = additionalUpdates else - # there are existing updates, see what happens when we - # compress them together with the new ones - [firstUpdate, additionalUpdates...] = compressedUpdates + # first update version did changed, modify it and insert remaining updates + updateToModify = firstUpdate + updatesToInsert = additionalUpdates - if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate) - # first update version hasn't changed, skip it and insert remaining updates - # this is an optimisation, we could update the existing op with itself - updateToModify = null - updatesToInsert = additionalUpdates - else - # first update version did changed, modify it and insert remaining updates - updateToModify = firstUpdate - updatesToInsert = additionalUpdates - - MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> + MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> + return callback(error) if error? + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? + MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> return callback(error) if error? - logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? - MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> - return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" - callback() + logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" + callback() REDIS_READ_BATCH_SIZE: 100 processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 4fc85a5e19..299c4ba068 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -10,6 +10,7 @@ describe "UpdatesManager", -> @UpdatesManager = SandboxedModule.require modulePath, requires: "./UpdateCompressor": @UpdateCompressor = {} "./MongoManager" : @MongoManager = {} + "./PackManager" : @PackManager = {} "./RedisManager" : @RedisManager = {} "./LockManager" : @LockManager = {} "./WebApiManager": @WebApiManager = {} @@ -41,8 +42,7 @@ describe "UpdatesManager", -> @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) - @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) - @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback @@ -51,16 +51,12 @@ describe "UpdatesManager", -> .calledWith(@doc_id) .should.equal true - it "should compress the raw ops", -> - @UpdateCompressor.compressRawUpdates - .calledWith(null, @rawUpdates) - .should.equal true - it "should save the compressed ops", -> - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates, @temporary) + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary) .should.equal true + it "should call the callback", -> @callback.called.should.equal true @@ -136,8 +132,7 @@ describe "UpdatesManager", -> @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion) - @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) - @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) describe "when the raw ops start where the existing history ends", -> @@ -156,8 +151,8 @@ describe "UpdatesManager", -> .should.equal true it "should save the compressed ops", -> - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates, @temporary) + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary) .should.equal true it "should call the callback", ->