From 5e830cbbdbaf0a9a41c85c7305efa0740e80fc4a Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 11 Dec 2015 15:56:47 +0000 Subject: [PATCH 01/11] put all new ops into packs --- .../app/coffee/MongoManager.coffee | 10 +-- .../app/coffee/PackManager.coffee | 71 +++++++++++++++++++ .../app/coffee/UpdatesManager.coffee | 63 ++++++++++------ .../UpdatesManager/UpdatesManagerTests.coffee | 21 +++--- 4 files changed, 122 insertions(+), 43 deletions(-) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 98a9025d8d..ff3dea4cab 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -1,20 +1,17 @@ {db, ObjectId} = require "./mongojs" PackManager = require "./PackManager" async = require "async" +_ = require "underscore" module.exports = MongoManager = getLastCompressedUpdate: (doc_id, callback = (error, update) ->) -> db.docHistory - .find(doc_id: ObjectId(doc_id.toString())) + .find(doc_id: ObjectId(doc_id.toString()), {pack: {$slice:-1}}) # only return the last entry in a pack .sort( v: -1 ) .limit(1) .toArray (error, compressedUpdates) -> return callback(error) if error? - if compressedUpdates[0]?.pack? - # cannot pop from a pack, throw error - error = new Error("last compressed update is a pack") - return callback error, null - return callback null, compressedUpdates[0] or null + callback null, compressedUpdates[0] or null peekLastCompressedUpdate: (doc_id, callback = (error, update, version) ->) -> # under normal use we pass back the last update as @@ -55,7 +52,6 @@ module.exports = MongoManager = else callback(err,results) - modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) -> return callback() if not newUpdate? db.docHistory.findAndModify diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 082c1d8a9a..4e918814ff 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -451,3 +451,74 @@ module.exports = PackManager = bulk.execute callback else callback() + + insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + return callback() if newUpdates.length == 0 + + updatesToFlush = [] + updatesRemaining = newUpdates.slice() + + n = lastUpdate?.n || 0 + sz = lastUpdate?.sz || 0 + + while updatesRemaining.length and n < PackManager.MAX_COUNT and sz < PackManager.MAX_SIZE + nextUpdate = updatesRemaining[0] + nextUpdateSize = BSON.calculateObjectSize(nextUpdate) + if nextUpdateSize + sz > PackManager.MAX_SIZE and n > 0 + break + n++ + sz += nextUpdateSize + updatesToFlush.push updatesRemaining.shift() + + PackManager.flushCompressedUpdates project_id, doc_id, lastUpdate, updatesToFlush, temporary, (error) -> + return callback(error) if error? + PackManager.insertCompressedUpdates project_id, doc_id, null, updatesRemaining, temporary, callback + + flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + return callback() if newUpdates.length == 0 + if lastUpdate? + PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback + else + PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback + + insertUpdatesIntoNewPack: (project_id, doc_id, newUpdates, temporary, callback = (error) ->) -> + first = newUpdates[0] + last = newUpdates[newUpdates.length - 1] + n = newUpdates.length + sz = BSON.calculateObjectSize(newUpdates) + newPack = + project_id: ObjectId(project_id.toString()) + doc_id: ObjectId(doc_id.toString()) + pack: newUpdates + n: n + sz: sz + meta: + start_ts: first.meta.start_ts + end_ts: last.meta.end_ts + v: first.v + v_end: last.v + logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack" + db.docHistory.insert newPack, callback + + appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> + first = newUpdates[0] + last = newUpdates[newUpdates.length - 1] + n = newUpdates.length + sz = BSON.calculateObjectSize(newUpdates) + query = + _id: lastUpdate._id + project_id: ObjectId(project_id.toString()) + doc_id: ObjectId(doc_id.toString()) + pack: {$exists: true} + update = + $push: + "pack": {$each: newUpdates} + $inc: + "n": n + "sz": sz + $set: + "meta.end_ts": last.meta.end_ts + "v_end": last.v + logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack" + db.docHistory.findAndModify {query, update}, callback + diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index f40a7c8503..97a630ec53 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -1,4 +1,5 @@ MongoManager = require "./MongoManager" +PackManager = require "./PackManager" RedisManager = require "./RedisManager" UpdateCompressor = require "./UpdateCompressor" LockManager = require "./LockManager" @@ -43,34 +44,50 @@ module.exports = UpdatesManager = else return callback error - compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates + if rawUpdates.length == 0 + return callback() - if not lastCompressedUpdate? - # no existing update, insert everything + if not lastCompressedUpdate? or lastCompressedUpdate.pack? # handle pack append as a special case + UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback + else #use the existing op code + UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback + + _updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> + compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates + PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) -> + return callback(error) if error? + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? + callback() + + _updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> + compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates + + if not lastCompressedUpdate? + # no existing update, insert everything + updateToModify = null + updatesToInsert = compressedUpdates + else + # there are existing updates, see what happens when we + # compress them together with the new ones + [firstUpdate, additionalUpdates...] = compressedUpdates + + if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate) + # first update version hasn't changed, skip it and insert remaining updates + # this is an optimisation, we could update the existing op with itself updateToModify = null - updatesToInsert = compressedUpdates + updatesToInsert = additionalUpdates else - # there are existing updates, see what happens when we - # compress them together with the new ones - [firstUpdate, additionalUpdates...] = compressedUpdates + # first update version did changed, modify it and insert remaining updates + updateToModify = firstUpdate + updatesToInsert = additionalUpdates - if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate) - # first update version hasn't changed, skip it and insert remaining updates - # this is an optimisation, we could update the existing op with itself - updateToModify = null - updatesToInsert = additionalUpdates - else - # first update version did changed, modify it and insert remaining updates - updateToModify = firstUpdate - updatesToInsert = additionalUpdates - - MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> + MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> + return callback(error) if error? + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? + MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> return callback(error) if error? - logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? - MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> - return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" - callback() + logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" + callback() REDIS_READ_BATCH_SIZE: 100 processUncompressedUpdates: (project_id, doc_id, callback = (error) ->) -> diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 4fc85a5e19..299c4ba068 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -10,6 +10,7 @@ describe "UpdatesManager", -> @UpdatesManager = SandboxedModule.require modulePath, requires: "./UpdateCompressor": @UpdateCompressor = {} "./MongoManager" : @MongoManager = {} + "./PackManager" : @PackManager = {} "./RedisManager" : @RedisManager = {} "./LockManager" : @LockManager = {} "./WebApiManager": @WebApiManager = {} @@ -41,8 +42,7 @@ describe "UpdatesManager", -> @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null) - @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) - @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback @@ -51,16 +51,12 @@ describe "UpdatesManager", -> .calledWith(@doc_id) .should.equal true - it "should compress the raw ops", -> - @UpdateCompressor.compressRawUpdates - .calledWith(null, @rawUpdates) - .should.equal true - it "should save the compressed ops", -> - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates, @temporary) + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary) .should.equal true + it "should call the callback", -> @callback.called.should.equal true @@ -136,8 +132,7 @@ describe "UpdatesManager", -> @compressedUpdates = [ { v: 13, op: "compressed-op-12" } ] @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, null, @lastVersion) - @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) - @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) describe "when the raw ops start where the existing history ends", -> @@ -156,8 +151,8 @@ describe "UpdatesManager", -> .should.equal true it "should save the compressed ops", -> - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates, @temporary) + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary) .should.equal true it "should call the callback", -> From 0ba00a9eb73b9aec584c5b13b6e17c9554bada9a Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 17 Dec 2015 11:41:20 +0000 Subject: [PATCH 02/11] expire temporary packs and roll over to a new pack each day --- services/track-changes/app/coffee/PackManager.coffee | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 4e918814ff..f328cc0939 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -4,6 +4,8 @@ _ = require "underscore" logger = require "logger-sharelatex" LockManager = require "./LockManager" +DAYS = 24 * 3600 * 1000 # one day in milliseconds + module.exports = PackManager = # The following functions implement methods like a mongo find, but # expands any documents containing a 'pack' field into multiple @@ -476,7 +478,7 @@ module.exports = PackManager = flushCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> return callback() if newUpdates.length == 0 - if lastUpdate? + if lastUpdate? and not (temporary and ((Date.now() - lastUpdate.meta?.start_ts) > 1 * DAYS)) PackManager.appendUpdatesToExistingPack project_id, doc_id, lastUpdate, newUpdates, temporary, callback else PackManager.insertUpdatesIntoNewPack project_id, doc_id, newUpdates, temporary, callback @@ -497,6 +499,8 @@ module.exports = PackManager = end_ts: last.meta.end_ts v: first.v v_end: last.v + if temporary + newPack.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack" db.docHistory.insert newPack, callback @@ -519,6 +523,8 @@ module.exports = PackManager = $set: "meta.end_ts": last.meta.end_ts "v_end": last.v + if lastUpdate.expiresAt and temporary + update.$set.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack" db.docHistory.findAndModify {query, update}, callback From 0532a4daaa98e4c63e8aeed7c8b0de5ac13a7299 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 24 Dec 2015 11:29:46 +0000 Subject: [PATCH 03/11] use compound index to replace separate index for packs --- .../app/coffee/MongoManager.coffee | 4 +--- .../app/coffee/PackManager.coffee | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index ff3dea4cab..13df864c7b 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -136,9 +136,7 @@ module.exports = MongoManager = # For finding all updates that go into a diff for a doc db.docHistory.ensureIndex { doc_id: 1, v: 1 }, { background: true } # For finding all updates that affect a project - db.docHistory.ensureIndex { project_id: 1, "meta.end_ts": 1 }, { background: true } - # For finding all packs that affect a project (use a sparse index so only packs are included) - db.docHistory.ensureIndex { project_id: 1, "pack.0.meta.end_ts": 1, "meta.end_ts": 1}, { background: true, sparse: true } + db.docHistory.ensureIndex { project_id: 1, "meta.end_ts": 1, "meta.start_ts": -1 }, { background: true } # For finding updates that don't yet have a project_id and need it inserting db.docHistory.ensureIndex { doc_id: 1, project_id: 1 }, { background: true } # For finding project meta-data diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index f328cc0939..e838c8b001 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -178,20 +178,25 @@ module.exports = PackManager = .find(tailQuery, projection) .sort(sort) - # now find any packs that overlap with the time window + # now find any packs that overlap with the time window from outside + # cutoff before + # --|-----wanted-range--|------------------ time=> + # |-------------|pack(end_ts) + # + # these were not picked up by the original query because + # end_ts>before but the beginning of the pack may be in the time range overlapQuery = _.clone(query) if before? && cutoff? overlapQuery['meta.end_ts'] = {"$gte": before} - overlapQuery['pack.0.meta.end_ts'] = {"$lte": before } + overlapQuery['meta.start_ts'] = {"$lte": before } else if before? && not cutoff? overlapQuery['meta.end_ts'] = {"$gte": before} - overlapQuery['pack.0.meta.end_ts'] = {"$lte": before } + overlapQuery['meta.start_ts'] = {"$lte": before } else if not before? && cutoff? - overlapQuery['meta.end_ts'] = {"$gte": cutoff} - overlapQuery['pack.0.meta.end_ts'] = {"$gte": 0 } + overlapQuery['meta.end_ts'] = {"$gte": cutoff} # we already have these?? else if not before? && not cutoff? - overlapQuery['meta.end_ts'] = {"$gte": 0 } - overlapQuery['pack.0.meta.end_ts'] = {"$gte": 0 } + overlapQuery['meta.end_ts'] = {"$gte": 0 } # shouldn't happen?? + overlap = collection .find(overlapQuery, projection) .sort(sort) From f64969c7843a3378c006f397fc4af73661600211 Mon Sep 17 00:00:00 2001 From: Brian - Work Date: Mon, 11 Jan 2016 11:56:12 +0000 Subject: [PATCH 04/11] added comment about query memory usage for toArray --- services/track-changes/app/coffee/PackManager.coffee | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index e838c8b001..210139d468 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -88,6 +88,10 @@ module.exports = PackManager = needMore = false # keep track of whether we need to load more data updates = [] # used to accumulate the set of results + + # FIXME: packs are big so we should accumulate the results + # incrementally instead of using .toArray() to avoid reading all + # of the changes into memory cursor.toArray (err, result) -> unpackedSet = PackManager._unpackResults(result) updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit) @@ -141,6 +145,9 @@ module.exports = PackManager = updates = [] # used to accumulate the set of results + # FIXME: packs are big so we should accumulate the results + # incrementally instead of using .toArray() to avoid reading all + # of the changes into memory cursor.toArray (err, result) -> if err? return callback err, result From c6be12f3d5432c200fa955455c52a36600beae39 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 11 Jan 2016 12:42:22 +0000 Subject: [PATCH 05/11] set v_end on pack creation --- services/track-changes/app/coffee/PackManager.coffee | 1 + 1 file changed, 1 insertion(+) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 210139d468..17dae0cba7 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -300,6 +300,7 @@ module.exports = PackManager = top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } top.sz = sz + top.v_end = d.v delete top.op delete top._id packs.push top From f592611cacd1ad6b6960f73eb2e442ac0e84ced7 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 11 Jan 2016 12:42:53 +0000 Subject: [PATCH 06/11] always create a new pack, never keep as op --- .../track-changes/app/coffee/PackManager.coffee | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 17dae0cba7..5f67dd4baf 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -269,15 +269,10 @@ module.exports = PackManager = MAX_SIZE: 1024*1024 # make these configurable parameters MAX_COUNT: 1024 - MIN_COUNT: 100 - KEEP_OPS: 100 convertDocsToPacks: (docs, callback) -> packs = [] top = null - # keep the last KEEP_OPS as individual ops - docs = docs.slice(0,-PackManager.KEEP_OPS) - docs.forEach (d,i) -> # skip existing packs if d.pack? @@ -294,7 +289,7 @@ module.exports = PackManager = top.v_end = d.v top.meta.end_ts = d.meta.end_ts return - else if sz < PackManager.MAX_SIZE + else # create a new pack top = _.clone(d) top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] @@ -304,13 +299,7 @@ module.exports = PackManager = delete top.op delete top._id packs.push top - else - # keep the op - # util.log "keeping large op unchanged (#{sz} bytes)" - # only store packs with a sufficient number of ops, discard others - packs = packs.filter (packObj) -> - packObj.pack.length > PackManager.MIN_COUNT callback(null, packs) checkHistory: (docs, callback) -> From 679a81564ee73fb7e9ded5a640278ba1e20e3073 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 11 Jan 2016 12:46:26 +0000 Subject: [PATCH 07/11] respect mongo 3 limit of 1000 bulk operations --- services/track-changes/app/coffee/PackManager.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 5f67dd4baf..5b277c3ec4 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -268,7 +268,7 @@ module.exports = PackManager = return newResults MAX_SIZE: 1024*1024 # make these configurable parameters - MAX_COUNT: 1024 + MAX_COUNT: 512 convertDocsToPacks: (docs, callback) -> packs = [] From 78b3412ca834e184d66a346b77e07541187873f6 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 11 Jan 2016 14:36:11 +0000 Subject: [PATCH 08/11] decrease delay when packing --- services/track-changes/app/coffee/PackManager.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 5b277c3ec4..c4562d48d4 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -409,7 +409,7 @@ module.exports = PackManager = }, {upsert:true}, () -> callback null, null - DB_WRITE_DELAY: 2000 + DB_WRITE_DELAY: 100 savePacks: (packs, callback) -> async.eachSeries packs, PackManager.safeInsert, (err, result) -> From 399a8f0a297aa5e687dcc12e5804f3221730f451 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 15 Jan 2016 15:02:34 +0000 Subject: [PATCH 09/11] update tests to assume packs are created --- .../coffee/AppendingUpdatesTests.coffee | 36 +++++++++---------- .../coffee/ArchivingUpdatesTests.coffee | 7 ++-- .../coffee/FlushingUpdatesTests.coffee | 2 +- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee b/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee index a82767e7c5..3d4bf14465 100644 --- a/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee +++ b/services/track-changes/test/acceptance/coffee/AppendingUpdatesTests.coffee @@ -37,7 +37,7 @@ describe "Appending doc ops to the history", -> done() it "should insert the compressed op into mongo", -> - expect(@updates[0].op).to.deep.equal [{ + expect(@updates[0].pack[0].op).to.deep.equal [{ p: 3, i: "foo" }] @@ -99,13 +99,13 @@ describe "Appending doc ops to the history", -> throw error if error? done() - it "should combine all the updates into one", -> - expect(@updates[0].op).to.deep.equal [{ - p: 3, i: "foobar" + it "should combine all the updates into one pack", -> + expect(@updates[0].pack[1].op).to.deep.equal [{ + p: 6, i: "bar" }] it "should insert the correct version number into mongo", -> - expect(@updates[0].v).to.equal 8 + expect(@updates[0].v_end).to.equal 8 describe "when the updates are far apart", -> @@ -129,11 +129,11 @@ describe "Appending doc ops to the history", -> throw error if error? done() - it "should keep the updates separate", -> - expect(@updates[0].op).to.deep.equal [{ + it "should combine the updates into one pack", -> + expect(@updates[0].pack[0].op).to.deep.equal [{ p: 3, i: "foo" }] - expect(@updates[1].op).to.deep.equal [{ + expect(@updates[0].pack[1].op).to.deep.equal [{ p: 6, i: "bar" }] @@ -160,10 +160,10 @@ describe "Appending doc ops to the history", -> done() it "should concat the compressed op into mongo", -> - expect(@updates[0].op).to.deep.equal @expectedOp + expect(@updates[0].pack.length).to.deep.equal 3 # batch size is 100 it "should insert the correct version number into mongo", -> - expect(@updates[0].v).to.equal 250 + expect(@updates[0].v_end).to.equal 250 describe "when there are multiple ops in each update", -> @@ -188,16 +188,16 @@ describe "Appending doc ops to the history", -> done() it "should insert the compressed ops into mongo", -> - expect(@updates[0].op).to.deep.equal [{ + expect(@updates[0].pack[0].op).to.deep.equal [{ p: 3, i: "foo" }] - expect(@updates[1].op).to.deep.equal [{ + expect(@updates[0].pack[1].op).to.deep.equal [{ p: 6, i: "bar" }] it "should insert the correct version numbers into mongo", -> - expect(@updates[0].v).to.equal 3 - expect(@updates[1].v).to.equal 4 + expect(@updates[0].pack[0].v).to.equal 3 + expect(@updates[0].pack[1].v).to.equal 4 describe "when there is a no-op update", -> before (done) -> @@ -221,17 +221,17 @@ describe "Appending doc ops to the history", -> done() it "should insert the compressed no-op into mongo", -> - expect(@updates[0].op).to.deep.equal [] + expect(@updates[0].pack[0].op).to.deep.equal [] it "should insert the compressed next update into mongo", -> - expect(@updates[1].op).to.deep.equal [{ + expect(@updates[0].pack[1].op).to.deep.equal [{ p: 3, i: "foo" }] it "should insert the correct version numbers into mongo", -> - expect(@updates[0].v).to.equal 3 - expect(@updates[1].v).to.equal 4 + expect(@updates[0].pack[0].v).to.equal 3 + expect(@updates[0].pack[1].v).to.equal 4 describe "when the project has versioning enabled", -> before (done) -> diff --git a/services/track-changes/test/acceptance/coffee/ArchivingUpdatesTests.coffee b/services/track-changes/test/acceptance/coffee/ArchivingUpdatesTests.coffee index eb04a976ab..ab06b991b8 100644 --- a/services/track-changes/test/acceptance/coffee/ArchivingUpdatesTests.coffee +++ b/services/track-changes/test/acceptance/coffee/ArchivingUpdatesTests.coffee @@ -89,9 +89,10 @@ describe "Archiving updates", -> doc.lastVersion.should.equal 20 done() - it "should store twenty doc changes in S3", (done) -> + it "should store twenty doc changes in S3 in one pack", (done) -> TrackChangesClient.getS3Doc @project_id, @doc_id, (error, res, doc) => - doc.length.should.equal 20 + doc.length.should.equal 1 + doc[0].pack.length.should.equal 20 done() describe "unarchiving a doc's updates", -> @@ -103,7 +104,7 @@ describe "Archiving updates", -> it "should restore doc changes", (done) -> db.docHistory.count { doc_id: ObjectId(@doc_id) }, (error, count) -> throw error if error? - count.should.equal 20 + count.should.equal 1 done() it "should remove doc marked as inS3", (done) -> diff --git a/services/track-changes/test/acceptance/coffee/FlushingUpdatesTests.coffee b/services/track-changes/test/acceptance/coffee/FlushingUpdatesTests.coffee index 63ddca4da9..3f82304da1 100644 --- a/services/track-changes/test/acceptance/coffee/FlushingUpdatesTests.coffee +++ b/services/track-changes/test/acceptance/coffee/FlushingUpdatesTests.coffee @@ -31,7 +31,7 @@ describe "Flushing updates", -> it "should flush the op into mongo", (done) -> TrackChangesClient.getCompressedUpdates @doc_id, (error, updates) -> - expect(updates[0].op).to.deep.equal [{ + expect(updates[0].pack[0].op).to.deep.equal [{ p: 3, i: "f" }] done() From 61d22f027a2a08209ef222d0ddf3253b247a95d0 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 18 Jan 2016 15:17:48 +0000 Subject: [PATCH 10/11] added test for inserting as pack --- .../UpdatesManager/UpdatesManagerTests.coffee | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 299c4ba068..f742499d1b 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -51,12 +51,11 @@ describe "UpdatesManager", -> .calledWith(@doc_id) .should.equal true - it "should save the compressed ops", -> + it "should save the compressed ops as a pack", -> @PackManager.insertCompressedUpdates .calledWith(@project_id, @doc_id, null, @compressedUpdates, @temporary) .should.equal true - it "should call the callback", -> @callback.called.should.equal true @@ -98,6 +97,30 @@ describe "UpdatesManager", -> it "should call the callback", -> @callback.called.should.equal true + describe "when the raw ops start where the existing history ends and the history is in a pack", -> + beforeEach -> + @lastCompressedUpdate = {pack: [{ v: 11, op: "compressed-op-11" }], v:11} + @rawUpdates = [{ v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }] + @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v) + @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, @rawUpdates, @temporary, @callback + + it "should look at the last compressed op", -> + @MongoManager.peekLastCompressedUpdate + .calledWith(@doc_id) + .should.equal true + + it "should defer the compression of raw ops to PackManager", -> + @UpdateCompressor.compressRawUpdates + .should.not.be.called + + it "should save the new compressed ops into a pack", -> + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, @lastCompressedUpdate, @compressedUpdates, @temporary) + .should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + describe "when some raw ops are passed that have already been compressed", -> beforeEach -> @rawUpdates = [{ v: 10, op: "mock-op-10" }, { v: 11, op: "mock-op-11"}, { v: 12, op: "mock-op-12" }, { v: 13, op: "mock-op-13" }] From ae61d1261ee176c73fd03fdb44be50373a55cbda Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 19 Jan 2016 15:52:12 +0000 Subject: [PATCH 11/11] added tests for pack updates --- .../PackManager/PackManagerTests.coffee | 232 ++++++++++++++++++ .../UpdatesManager/UpdatesManagerTests.coffee | 3 +- 2 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee diff --git a/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee new file mode 100644 index 0000000000..c538096a0a --- /dev/null +++ b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee @@ -0,0 +1,232 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +expect = chai.expect +modulePath = "../../../../app/js/PackManager.js" +SandboxedModule = require('sandboxed-module') +{ObjectId} = require("mongojs") +bson = require("bson") +BSON = new bson.BSONPure() + +tk = require "timekeeper" + +describe "PackManager", -> + beforeEach -> + tk.freeze(new Date()) + @PackManager = SandboxedModule.require modulePath, requires: + "./mongojs" : { db: @db = {}, ObjectId: ObjectId, BSON: BSON } + "./LockManager" : {} + "logger-sharelatex": { log: sinon.stub(), error: sinon.stub() } + @callback = sinon.stub() + @doc_id = ObjectId().toString() + @project_id = ObjectId().toString() + + afterEach -> + tk.reset() + + describe "insertCompressedUpdates", -> + beforeEach -> + @lastUpdate = { + _id: "12345" + pack: [ + { op: "op-1", meta: "meta-1", v: 1}, + { op: "op-2", meta: "meta-2", v: 2} + ] + n : 2 + sz : 100 + } + @newUpdates = [ + { op: "op-3", meta: "meta-3", v: 3}, + { op: "op-4", meta: "meta-4", v: 4} + ] + @db.docHistory = + insert: sinon.stub().callsArg(1) + findAndModify: sinon.stub().callsArg(1) + + describe "with no last update", -> + beforeEach -> + @PackManager.insertUpdatesIntoNewPack = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates @project_id, @doc_id, null, @newUpdates, true, @callback + + describe "for a small update", -> + it "should insert the update into a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates, true).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + describe "for many small updates", -> + beforeEach -> + @newUpdates = ({ op: "op-#{i}", meta: "meta-#{i}", v: i} for i in [0..2048]) + @PackManager.insertCompressedUpdates @project_id, @doc_id, null, @newUpdates, false, @callback + + it "should append the initial updates to the existing pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[0...512], false).should.equal true + + it "should insert the first set remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[512...1024], false).should.equal true + + it "should insert the second set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1024...1536], false).should.equal true + + it "should insert the third set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1536...2048], false).should.equal true + + it "should insert the final set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2048..2048], false).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + + + describe "with an existing pack as the last update", -> + beforeEach -> + @PackManager.appendUpdatesToExistingPack = sinon.stub().callsArg(5) + @PackManager.insertUpdatesIntoNewPack = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback + + describe "for a small update", -> + it "should append the update to the existing pack", -> + @PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates, false).should.equal true + it "should not insert any new packs", -> + @PackManager.insertUpdatesIntoNewPack.called.should.equal false + it "should call the callback", -> + @callback.called.should.equal true + + describe "for many small updates", -> + beforeEach -> + @newUpdates = ({ op: "op-#{i}", meta: "meta-#{i}", v: i} for i in [0..2048]) + @PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback + + it "should append the initial updates to the existing pack", -> + @PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates[0...510], false).should.equal true + + it "should insert the first set remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[510...1022], false).should.equal true + + it "should insert the second set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1022...1534], false).should.equal true + + it "should insert the third set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1534...2046], false).should.equal true + + it "should insert the final set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2046..2048], false).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + describe "for many big updates", -> + beforeEach -> + longString = ("a" for [0 .. (0.75*@PackManager.MAX_SIZE)]).join("") + @newUpdates = ({ op: "op-#{i}-#{longString}", meta: "meta-#{i}", v: i} for i in [0..4]) + @PackManager.insertCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, false, @callback + + it "should append the initial updates to the existing pack", -> + @PackManager.appendUpdatesToExistingPack.calledWith(@project_id, @doc_id, @lastUpdate, @newUpdates[0..0], false).should.equal true + + it "should insert the first set remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[1..1], false).should.equal true + + it "should insert the second set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[2..2], false).should.equal true + + it "should insert the third set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[3..3], false).should.equal true + + it "should insert the final set of remaining updates as a new pack", -> + @PackManager.insertUpdatesIntoNewPack.calledWith(@project_id, @doc_id, @newUpdates[4..4], false).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + describe "flushCompressedUpdates", -> + describe "when there is no previous update", -> + beforeEach -> + @PackManager.flushCompressedUpdates @project_id, @doc_id, null, @newUpdates, true, @callback + + describe "for a small update that will expire", -> + it "should insert the update into mongo", -> + @db.docHistory.insert.calledWithMatch({ + pack: @newUpdates, + project_id: ObjectId(@project_id), + doc_id: ObjectId(@doc_id) + n: @newUpdates.length + v: @newUpdates[0].v + v_end: @newUpdates[@newUpdates.length-1].v + }).should.equal true + + it "should set an expiry time in the future", -> + @db.docHistory.insert.calledWithMatch({ + expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000) + }).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + describe "when there is a recent previous update in mongo", -> + beforeEach -> + @lastUpdate = { + _id: "12345" + pack: [ + { op: "op-1", meta: "meta-1", v: 1}, + { op: "op-2", meta: "meta-2", v: 2} + ] + n : 2 + sz : 100 + expiresAt: new Date(Date.now()) + } + + @PackManager.flushCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, true, @callback + + describe "for a small update that will expire", -> + it "should append the update in mongo", -> + @db.docHistory.findAndModify.calledWithMatch({ + query: {_id: @lastUpdate._id} + update: { $push: {"pack" : {$each: @newUpdates}}, $set: {v_end: @newUpdates[@newUpdates.length-1].v}} + }).should.equal true + + it "should set an expiry time in the future", -> + @db.docHistory.findAndModify.calledWithMatch({ + update: {$set: {expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000)}} + }).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true + + + describe "when there is an old previous update in mongo", -> + beforeEach -> + @lastUpdate = { + _id: "12345" + pack: [ + { op: "op-1", meta: "meta-1", v: 1}, + { op: "op-2", meta: "meta-2", v: 2} + ] + n : 2 + sz : 100 + meta: {start_ts: Date.now() - 30 * 24 * 3600 * 1000} + expiresAt: new Date(Date.now() - 30 * 24 * 3600 * 1000) + } + + @PackManager.flushCompressedUpdates @project_id, @doc_id, @lastUpdate, @newUpdates, true, @callback + + describe "for a small update that will expire", -> + it "should insert the update into mongo", -> + @db.docHistory.insert.calledWithMatch({ + pack: @newUpdates, + project_id: ObjectId(@project_id), + doc_id: ObjectId(@doc_id) + n: @newUpdates.length + v: @newUpdates[0].v + v_end: @newUpdates[@newUpdates.length-1].v + }).should.equal true + + it "should set an expiry time in the future", -> + @db.docHistory.insert.calledWithMatch({ + expiresAt: new Date(Date.now() + 7 * 24 * 3600 * 1000) + }).should.equal true + + it "should call the callback", -> + @callback.called.should.equal true diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index f742499d1b..64ed839bdc 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -67,6 +67,7 @@ describe "UpdatesManager", -> @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v) @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) + @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) describe "when the raw ops start where the existing history ends", -> @@ -109,7 +110,7 @@ describe "UpdatesManager", -> .calledWith(@doc_id) .should.equal true - it "should defer the compression of raw ops to PackManager", -> + it "should defer the compression of raw ops until they are written in a new pack", -> @UpdateCompressor.compressRawUpdates .should.not.be.called