diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 4ed177e91c..f12691027b 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -48,12 +48,6 @@ app.post "/project/:project_id/flush", HttpController.flushProject app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpController.restore -app.post "/doc/:doc_id/pack", HttpController.packDoc -app.get "/doc/list", HttpController.listDocs - -app.post '/project/:project_id/archive', HttpController.archiveProject -app.post '/project/:project_id/unarchive', HttpController.unArchiveProject - packWorker = null # use a single packing worker app.post "/pack", (req, res, next) -> diff --git a/services/track-changes/app/coffee/DiffManager.coffee b/services/track-changes/app/coffee/DiffManager.coffee index bb110b1824..dfdb1a80a2 100644 --- a/services/track-changes/app/coffee/DiffManager.coffee +++ b/services/track-changes/app/coffee/DiffManager.coffee @@ -14,7 +14,6 @@ module.exports = DiffManager = callback(null, content, version, updates) getDiff: (project_id, doc_id, fromVersion, toVersion, callback = (error, diff) ->) -> - logger.log project_id: project_id, doc_id: doc_id, from: fromVersion, to: toVersion, "getting diff" DiffManager.getDocumentBeforeVersion project_id, doc_id, fromVersion, (error, startingContent, updates) -> if error? if error.message == "broken-history" diff --git a/services/track-changes/app/coffee/DocArchiveManager.coffee b/services/track-changes/app/coffee/DocArchiveManager.coffee deleted file mode 100644 index d557278b8e..0000000000 --- a/services/track-changes/app/coffee/DocArchiveManager.coffee +++ /dev/null @@ -1,88 +0,0 @@ -MongoManager = require "./MongoManager" -MongoAWS = require "./MongoAWS" -LockManager = require "./LockManager" -DocstoreHandler = require "./DocstoreHandler" -logger = require "logger-sharelatex" -_ = require "underscore" -async = require "async" -settings = require("settings-sharelatex") - -module.exports = DocArchiveManager = - - archiveAllDocsChanges: (project_id, callback = (error, docs) ->) -> - DocstoreHandler.getAllDocs project_id, (error, docs) -> - if error? - return callback(error) - else if !docs? - return callback new Error("No docs for project #{project_id}") - jobs = _.map docs, (doc) -> - (cb)-> DocArchiveManager.archiveDocChangesWithLock project_id, doc._id, cb - async.series jobs, callback - - archiveDocChangesWithLock: (project_id, doc_id, callback = (error) ->) -> - job = (releaseLock) -> - DocArchiveManager.archiveDocChanges project_id, doc_id, releaseLock - LockManager.runWithLock("HistoryLock:#{doc_id}", job, callback) - - archiveDocChanges: (project_id, doc_id, callback)-> - MongoManager.getArchivedDocStatus doc_id, (error, result) -> - return callback(error) if error? - if result?.inS3 is true - logger.log {project_id, doc_id}, "document history is already archived" - return callback() - if result?.inS3? - logger.log {project_id, doc_id}, "document history archive is already in progress" - return callback() - MongoManager.getDocChangesCount doc_id, (error, count) -> - return callback(error) if error? - if count == 0 - logger.log {project_id, doc_id}, "document history is empty, not archiving" - return callback() - MongoManager.peekLastCompressedUpdate doc_id, (error, update, lastVersion) -> - return callback(error) if error? - logger.log {doc_id, project_id}, "archiving got last compressed update" - MongoManager.markDocHistoryAsArchiveInProgress doc_id, lastVersion, (error) -> - return callback(error) if error? - logger.log {doc_id, project_id}, "marked doc history as archive in progress" - MongoAWS.archiveDocHistory project_id, doc_id, update, (error) -> - if error? - logger.log {doc_id, project_id, error}, "error exporting document to S3" - MongoManager.clearDocHistoryAsArchiveInProgress doc_id, update, (err) -> - return callback(err) if err? - logger.log {doc_id, project_id}, "cleared archive in progress flag" - callback(error) - else - logger.log doc_id:doc_id, project_id:project_id, "exported document to S3" - MongoManager.markDocHistoryAsArchived doc_id, lastVersion, (error) -> - return callback(error) if error? - logger.log {doc_id, project_id}, "marked doc history as archived" - callback() - - unArchiveAllDocsChanges: (project_id, callback = (error, docs) ->) -> - DocstoreHandler.getAllDocs project_id, (error, docs) -> - if error? - return callback(error) - else if !docs? - return callback new Error("No docs for project #{project_id}") - jobs = _.map docs, (doc) -> - (cb)-> DocArchiveManager.unArchiveDocChangesWithLock project_id, doc._id, cb - async.parallelLimit jobs, 4, callback - - unArchiveDocChangesWithLock: (project_id, doc_id, callback = (error) ->) -> - job = (releaseLock) -> - DocArchiveManager.unArchiveDocChanges project_id, doc_id, releaseLock - LockManager.runWithLock("HistoryLock:#{doc_id}", job, callback) - - unArchiveDocChanges: (project_id, doc_id, callback)-> - MongoManager.getArchivedDocStatus doc_id, (error, result) -> - return callback(error) if error? - if result?.inS3 isnt true - logger.log {project_id, doc_id}, "no changes marked as in s3, not unarchiving" - return callback() - else - MongoAWS.unArchiveDocHistory project_id, doc_id, (error) -> - return callback(error) if error? - logger.log doc_id:doc_id, project_id:project_id, "imported document from S3" - MongoManager.markDocHistoryAsUnarchived doc_id, (error) -> - return callback(error) if error? - callback() diff --git a/services/track-changes/app/coffee/DocstoreHandler.coffee b/services/track-changes/app/coffee/DocstoreHandler.coffee deleted file mode 100644 index 9e3b10d258..0000000000 --- a/services/track-changes/app/coffee/DocstoreHandler.coffee +++ /dev/null @@ -1,21 +0,0 @@ -request = require("request").defaults(jar: false) -logger = require "logger-sharelatex" -settings = require "settings-sharelatex" - -module.exports = DocstoreHandler = - - getAllDocs: (project_id, callback = (error) ->) -> - logger.log project_id: project_id, "getting all docs for project in docstore api" - url = "#{settings.apis.docstore.url}/project/#{project_id}/doc" - request.get { - url: url - json: true - }, (error, res, docs) -> - return callback(error) if error? - logger.log {error, res, docs: if docs?.length then docs.map (d) -> d._id else []}, "docstore response" - if 200 <= res.statusCode < 300 - callback(null, docs) - else - error = new Error("docstore api responded with non-success code: #{res.statusCode}") - logger.error err: error, project_id: project_id, "error getting all docs from docstore" - callback(error) diff --git a/services/track-changes/app/coffee/HttpController.coffee b/services/track-changes/app/coffee/HttpController.coffee index 0b4cd658e9..6c357c505f 100644 --- a/services/track-changes/app/coffee/HttpController.coffee +++ b/services/track-changes/app/coffee/HttpController.coffee @@ -3,7 +3,6 @@ DiffManager = require "./DiffManager" PackManager = require "./PackManager" RestoreManager = require "./RestoreManager" logger = require "logger-sharelatex" -DocArchiveManager = require "./DocArchiveManager" HealthChecker = require "./HealthChecker" _ = require "underscore" @@ -23,23 +22,6 @@ module.exports = HttpController = return next(error) if error? res.send 204 - listDocs: (req, res, next = (error) ->) -> - logger.log "listing packing doc history" - limit = +req.query?.limit || 100 - doc_id = req.query?.doc_id if req.query?.doc_id?.match(/^[0-9a-f]{24}$/) - PackManager.listDocs {limit, doc_id}, (error, doc_ids) -> - return next(error) if error? - ids = (doc.doc_id.toString() for doc in doc_ids) - output = _.uniq(ids).join("\n") + "\n" - res.send output - - packDoc: (req, res, next = (error) ->) -> - doc_id = req.params.doc_id - logger.log doc_id: doc_id, "packing doc history" - PackManager.packDocHistory doc_id, (error) -> - return next(error) if error? - res.send 204 - checkDoc: (req, res, next = (error) ->) -> doc_id = req.params.doc_id project_id = req.params.project_id @@ -68,7 +50,7 @@ module.exports = HttpController = else to = null - logger.log project_id, doc_id: doc_id, from: from, to: to, "getting diff" + logger.log {project_id, doc_id, from, to}, "getting diff" DiffManager.getDiff project_id, doc_id, from, to, (error, diff) -> return next(error) if error? res.send JSON.stringify(diff: diff) @@ -95,20 +77,6 @@ module.exports = HttpController = return next(error) if error? res.send 204 - archiveProject: (req, res, next = (error) ->) -> - project_id = req.params.project_id - logger.log project_id: project_id, "archiving all track changes to s3" - DocArchiveManager.archiveAllDocsChanges project_id, (error) -> - return next(error) if error? - res.send 204 - - unArchiveProject: (req, res, next = (error) ->) -> - project_id = req.params.project_id - logger.log project_id: project_id, "unarchiving all track changes from s3" - DocArchiveManager.unArchiveAllDocsChanges project_id, (error) -> - return next(error) if error? - res.send 204 - healthCheck: (req, res)-> HealthChecker.check (err)-> if err? diff --git a/services/track-changes/app/coffee/MongoAWS.coffee b/services/track-changes/app/coffee/MongoAWS.coffee index 692e82bca7..cfa256e0f6 100644 --- a/services/track-changes/app/coffee/MongoAWS.coffee +++ b/services/track-changes/app/coffee/MongoAWS.coffee @@ -5,110 +5,106 @@ S3S = require 's3-streams' {db, ObjectId} = require "./mongojs" JSONStream = require "JSONStream" ReadlineStream = require "byline" +zlib = require "zlib" + +DAYS = 24 * 3600 * 1000 # one day in milliseconds + +AWS_CONFIG = { + accessKeyId: settings.trackchanges.s3.key + secretAccessKey: settings.trackchanges.s3.secret +} + +createStream = (streamConstructor, project_id, doc_id, pack_id) -> + return streamConstructor new AWS.S3(AWS_CONFIG), { + "Bucket": settings.trackchanges.stores.doc_history, + "Key": project_id+"/changes-"+doc_id+"/pack-"+pack_id + } module.exports = MongoAWS = - MAX_SIZE: 1024*1024 # almost max size - MAX_COUNT: 512 # almost max count - - archiveDocHistory: (project_id, doc_id, update, _callback = (error) ->) -> + archivePack: (project_id, doc_id, pack_id, _callback = (error) ->) -> callback = (args...) -> _callback(args...) _callback = () -> query = { + _id: ObjectId(pack_id) doc_id: ObjectId(doc_id) - v: {$lte: update.v} - expiresAt: {$exists : false} } - AWS.config.update { - accessKeyId: settings.filestore.s3.key - secretAccessKey: settings.filestore.s3.secret - } + return callback new Error("invalid project id") if not project_id? + return callback new Error("invalid doc id") if not doc_id? + return callback new Error("invalid pack id") if not pack_id? - logger.log {project_id, doc_id}, "uploading data to s3" + logger.log {project_id, doc_id, pack_id}, "uploading data to s3" - upload = S3S.WriteStream new AWS.S3(), { - "Bucket": settings.filestore.stores.user_files, - "Key": project_id+"/changes-"+doc_id - } + upload = createStream S3S.WriteStream, project_id, doc_id, pack_id - db.docHistory.find(query) - .on 'error', (err) -> - callback(err) - .pipe JSONStream.stringify() - .pipe upload - .on 'error', (err) -> - callback(err) - .on 'finish', () -> - return callback(null) - - unArchiveDocHistory: (project_id, doc_id, _callback = (error) ->) -> + db.docHistory.findOne query, (err, result) -> + return callback(err) if err? + return callback new Error("cannot find pack to send to s3") if not result? + return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt? + uncompressedData = JSON.stringify(result) + zlib.gzip uncompressedData, (err, buf) -> + logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack" + return callback(err) if err? + upload.on 'error', (err) -> + callback(err) + upload.on 'finish', () -> + logger.log {project_id, doc_id, pack_id}, "upload to s3 completed" + callback(null) + upload.write buf + upload.end() + readArchivedPack: (project_id, doc_id, pack_id, _callback = (error, result) ->) -> callback = (args...) -> _callback(args...) _callback = () -> - AWS.config.update { - accessKeyId: settings.filestore.s3.key - secretAccessKey: settings.filestore.s3.secret - } + return callback new Error("invalid project id") if not project_id? + return callback new Error("invalid doc id") if not doc_id? + return callback new Error("invalid pack id") if not pack_id? - logger.log {project_id, doc_id}, "downloading data from s3" + logger.log {project_id, doc_id, pack_id}, "downloading data from s3" - download = S3S.ReadStream new AWS.S3(), { - "Bucket": settings.filestore.stores.user_files, - "Key": project_id+"/changes-"+doc_id - }, { - encoding: "utf8" - } - - lineStream = new ReadlineStream(); - ops = [] - sz = 0 + download = createStream S3S.ReadStream, project_id, doc_id, pack_id inputStream = download .on 'open', (obj) -> return 1 .on 'error', (err) -> callback(err) - .pipe lineStream - inputStream.on 'data', (line) -> - if line.length > 2 - try - ops.push(JSON.parse(line)) - catch err - return callback(err) - sz += line.length - if ops.length >= MongoAWS.MAX_COUNT || sz >= MongoAWS.MAX_SIZE - inputStream.pause() - MongoAWS.handleBulk ops.slice(0), sz, () -> - inputStream.resume() - ops.splice(0,ops.length) - sz = 0 - .on 'end', () -> - MongoAWS.handleBulk ops, sz, callback - .on 'error', (err) -> + gunzip = zlib.createGunzip() + gunzip.setEncoding('utf8') + gunzip.on 'error', (err) -> + logger.log {project_id, doc_id, pack_id, err}, "error uncompressing gzip stream" + callback(err) + + outputStream = inputStream.pipe gunzip + parts = [] + outputStream.on 'error', (err) -> return callback(err) + outputStream.on 'end', () -> + logger.log {project_id, doc_id, pack_id}, "download from s3 completed" + try + object = JSON.parse parts.join('') + catch e + return callback(e) + object._id = ObjectId(object._id) + object.doc_id = ObjectId(object.doc_id) + object.project_id = ObjectId(object.project_id) + for op in object.pack + op._id = ObjectId(op._id) if op._id? + callback null, object + outputStream.on 'data', (data) -> + parts.push data - handleBulk: (ops, size, cb) -> - bulk = db.docHistory.initializeUnorderedBulkOp(); - - for op in ops - op._id = ObjectId(op._id) - op.doc_id = ObjectId(op.doc_id) - op.project_id = ObjectId(op.project_id) - bulk.find({_id:op._id}).upsert().updateOne(op) - - if ops.length > 0 - bulk.execute (err, result) -> - if err? - logger.error err:err, "error bulking ReadlineStream" - else - logger.log count:ops.length, result:result, size: size, "bulked ReadlineStream" - cb(err) - else - cb() + unArchivePack: (project_id, doc_id, pack_id, callback = (error) ->) -> + MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, object) -> + return callback(err) if err? + # allow the object to expire, we can always retrieve it again + object.expiresAt = new Date(Date.now() + 7 * DAYS) + logger.log {project_id, doc_id, pack_id}, "inserting object from s3" + db.docHistory.insert object, callback diff --git a/services/track-changes/app/coffee/MongoAWSexternal.coffee b/services/track-changes/app/coffee/MongoAWSexternal.coffee deleted file mode 100644 index f422a583b5..0000000000 --- a/services/track-changes/app/coffee/MongoAWSexternal.coffee +++ /dev/null @@ -1,123 +0,0 @@ -settings = require "settings-sharelatex" -child_process = require "child_process" -mongoUri = require "mongo-uri"; -logger = require "logger-sharelatex" -AWS = require 'aws-sdk' -fs = require 'fs' -S3S = require 's3-streams' - -module.exports = MongoAWSexternal = - - archiveDocHistory: (project_id, doc_id, callback = (error) ->) -> - MongoAWS.mongoExportDocHistory doc_id, (error, filepath) -> - MongoAWS.s3upStream project_id, doc_id, filepath, callback - #delete temp file? - - - unArchiveDocHistory: (project_id, doc_id, callback = (error) ->) -> - MongoAWS.s3downStream project_id, doc_id, (error, filepath) -> - if error == null - MongoAWS.mongoImportDocHistory filepath, callback - #delete temp file? - else - callback - - mongoExportDocHistory: (doc_id, callback = (error, filepath) ->) -> - uriData = mongoUri.parse(settings.mongo.url); - filepath = settings.path.dumpFolder + '/' + doc_id + '.jsonUp' - - args = [] - args.push '-h' - args.push uriData.hosts[0] - args.push '-d' - args.push uriData.database - args.push '-c' - args.push 'docHistory' - args.push '-q' - args.push "{doc_id: ObjectId('#{doc_id}') , expiresAt: {$exists : false} }" - args.push '-o' - args.push filepath - - proc = child_process.spawn "mongoexport", args - - proc.on "error", callback - - stderr = "" - proc.stderr.on "data", (chunk) -> stderr += chunk.toString() - - proc.on "close", (code) -> - if code == 0 - return callback(null,filepath) - else - return callback(new Error("mongodump failed: #{stderr}"),null) - - mongoImportDocHistory: (filepath, callback = (error) ->) -> - - uriData = mongoUri.parse(settings.mongo.url); - - args = [] - args.push '-h' - args.push uriData.hosts[0] - args.push '-d' - args.push uriData.database - args.push '-c' - args.push 'docHistory' - args.push '--file' - args.push filepath - - proc = child_process.spawn "mongoimport", args - - proc.on "error", callback - - stderr = "" - proc.stderr.on "data", (chunk) -> stderr += chunk.toString() - - proc.on "close", (code) -> - if code == 0 - return callback(null,filepath) - else - return callback(new Error("mongodump failed: #{stderr}"),null) - - s3upStream: (project_id, doc_id, filepath, callback = (error) ->) -> - - AWS.config.update { - accessKeyId: settings.filestore.s3.key - secretAccessKey: settings.filestore.s3.secret - } - - upload = S3S.WriteStream new AWS.S3(), { - "Bucket": settings.filestore.stores.user_files, - "Key": project_id+"/changes-"+doc_id - } - - fs.createReadStream(filepath) - .on 'open', (obj) -> - return 1 - .pipe(upload) - .on 'finish', () -> - return callback(null) - .on 'error', (err) -> - return callback(err) - - s3downStream: (project_id, doc_id, callback = (error, filepath) ->) -> - - filepath = settings.path.dumpFolder + '/' + doc_id + '.jsonDown' - - AWS.config.update { - accessKeyId: settings.filestore.s3.key - secretAccessKey: settings.filestore.s3.secret - } - - download = S3S.ReadStream new AWS.S3(), { - "Bucket": settings.filestore.stores.user_files, - "Key": project_id+"/changes-"+doc_id - } - - download - .on 'open', (obj) -> - return 1 - .pipe(fs.createWriteStream(filepath)) - .on 'finish', () -> - return callback(null, filepath) - .on 'error', (err) -> - return callback(err, null) diff --git a/services/track-changes/app/coffee/MongoManager.coffee b/services/track-changes/app/coffee/MongoManager.coffee index 13df864c7b..dfb8c2a2e1 100644 --- a/services/track-changes/app/coffee/MongoManager.coffee +++ b/services/track-changes/app/coffee/MongoManager.coffee @@ -31,81 +31,11 @@ module.exports = MongoManager = else return callback null, update, update.v else - MongoManager.getArchivedDocStatus doc_id, (error, status) -> + PackManager.getLastPackFromIndex doc_id, (error, pack) -> return callback(error) if error? - return callback(null, null, status.lastVersion) if status?.inS3? and status?.lastVersion? + return callback(null, null, pack.v_end) if pack?.inS3? and pack?.v_end? callback null, null - insertCompressedUpdates: (project_id, doc_id, updates, temporary, callback = (error) ->) -> - jobs = [] - for update in updates - do (update) -> - jobs.push (callback) -> MongoManager.insertCompressedUpdate project_id, doc_id, update, temporary, callback - async.series jobs, (err, results) -> - if not temporary - # keep track of updates to be packed - db.docHistoryStats.update {doc_id:ObjectId(doc_id)}, { - $inc:{update_count:updates.length}, - $currentDate:{last_update:true} - }, {upsert:true}, () -> - callback(err,results) - else - callback(err,results) - - modifyCompressedUpdate: (lastUpdate, newUpdate, callback = (error) ->) -> - return callback() if not newUpdate? - db.docHistory.findAndModify - query: lastUpdate, - update: - $set : - op: newUpdate.op - meta: newUpdate.meta - v: newUpdate.v - new: true - , (err, result, lastErrorObject) -> - return callback(error) if error? - return new Error("could not modify existing op") if not result? - callback(err, result) - - insertCompressedUpdate: (project_id, doc_id, update, temporary, callback = (error) ->) -> - update = { - doc_id: ObjectId(doc_id.toString()) - project_id: ObjectId(project_id.toString()) - op: update.op - meta: update.meta - v: update.v - } - - if temporary - seconds = 1000 - minutes = 60 * seconds - hours = 60 * minutes - days = 24 * hours - update.expiresAt = new Date(Date.now() + 7 * days) - # may need to roll over a pack here if we are inserting packs - db.docHistory.insert update, callback - - getDocUpdates:(doc_id, options = {}, callback = (error, updates) ->) -> - query = - doc_id: ObjectId(doc_id.toString()) - if options.from? - query["v"] ||= {} - query["v"]["$gte"] = options.from - if options.to? - query["v"] ||= {} - query["v"]["$lte"] = options.to - - PackManager.findDocResults(db.docHistory, query, options.limit, callback) - - getProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) -> - query = - project_id: ObjectId(project_id.toString()) - - if options.before? - query["meta.end_ts"] = { $lt: options.before } - - PackManager.findProjectResults(db.docHistory, query, options.limit, callback) - backportProjectId: (project_id, doc_id, callback = (error) ->) -> db.docHistory.update { doc_id: ObjectId(doc_id.toString()) @@ -143,31 +73,3 @@ module.exports = MongoManager = db.projectHistoryMetaData.ensureIndex { project_id: 1 }, { background: true } # TTL index for auto deleting week old temporary ops db.docHistory.ensureIndex { expiresAt: 1 }, { expireAfterSeconds: 0, background: true } - # For finding documents which need packing - db.docHistoryStats.ensureIndex { doc_id: 1 }, { background: true } - db.docHistoryStats.ensureIndex { updates: -1, doc_id: 1 }, { background: true } - - getArchivedDocStatus: (doc_id, callback)-> - db.docHistoryStats.findOne {doc_id: ObjectId(doc_id.toString()), inS3: {$exists:true}}, {inS3: true, lastVersion: true}, callback - - getDocChangesCount: (doc_id, callback)-> - db.docHistory.count { doc_id : ObjectId(doc_id.toString())}, callback - - markDocHistoryAsArchiveInProgress: (doc_id, lastVersion, callback) -> - db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$set : {inS3: false, lastVersion: lastVersion}}, {upsert:true}, callback - - clearDocHistoryAsArchiveInProgress: (doc_id, update, callback) -> - db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$unset : {inS3: true, lastVersion: true}}, callback - - markDocHistoryAsArchived: (doc_id, lastVersion, callback)-> - db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, {$set : {inS3: true}}, {upsert:true}, (error)-> - return callback(error) if error? - # clear the archived entries from the docHistory now we have finally succeeded - db.docHistory.remove { doc_id : ObjectId(doc_id.toString()), v: {$lte : lastVersion}, expiresAt: {$exists : false} }, (error)-> - return callback(error) if error? - callback(error) - - markDocHistoryAsUnarchived: (doc_id, callback)-> - # note this removes any inS3 field, regardless of its value (true/false/null) - db.docHistoryStats.update {doc_id: ObjectId(doc_id.toString())}, { $unset : { inS3: true, lastVersion: true} }, (error)-> - callback(error) diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index 29ec6fa584..4eba1bff2d 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -3,463 +3,49 @@ _ = require "underscore" {db, ObjectId, BSON} = require "./mongojs" logger = require "logger-sharelatex" LockManager = require "./LockManager" +MongoAWS = require "./MongoAWS" +ProjectIterator = require "./ProjectIterator" + +# Sharejs operations are stored in a 'pack' object +# +# e.g. a single sharejs update looks like +# +# { +# "doc_id" : 549dae9e0a2a615c0c7f0c98, +# "project_id" : 549dae9c0a2a615c0c7f0c8c, +# "op" : [ {"p" : 6981, "d" : "?" } ], +# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, +# "v" : 17082 +# } +# +# and a pack looks like this +# +# { +# "doc_id" : 549dae9e0a2a615c0c7f0c98, +# "project_id" : 549dae9c0a2a615c0c7f0c8c, +# "pack" : [ U1, U2, U3, ...., UN], +# "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, +# "v" : 17082 +# "v_end" : ... +# } +# +# where U1, U2, U3, .... are single updates stripped of their +# doc_id and project_id fields (which are the same for all the +# updates in the pack). +# +# The pack itself has v and meta fields, this makes it possible to +# treat packs and single updates in a similar way. +# +# The v field of the pack itself is from the first entry U1, the +# v_end field from UN. The meta.end_ts field of the pack itself is +# from the last entry UN, the meta.start_ts field from U1. DAYS = 24 * 3600 * 1000 # one day in milliseconds module.exports = PackManager = - # The following functions implement methods like a mongo find, but - # expands any documents containing a 'pack' field into multiple - # values - # - # e.g. a single update looks like - # - # { - # "doc_id" : 549dae9e0a2a615c0c7f0c98, - # "project_id" : 549dae9c0a2a615c0c7f0c8c, - # "op" : [ {"p" : 6981, "d" : "?" } ], - # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, - # "v" : 17082 - # } - # - # and a pack looks like this - # - # { - # "doc_id" : 549dae9e0a2a615c0c7f0c98, - # "project_id" : 549dae9c0a2a615c0c7f0c8c, - # "pack" : [ U1, U2, U3, ...., UN], - # "meta" : { "user_id" : 52933..., "start_ts" : 1422310693931, "end_ts" : 1422310693931 }, - # "v" : 17082 - # } - # - # where U1, U2, U3, .... are single updates stripped of their - # doc_id and project_id fields (which are the same for all the - # updates in the pack). - # - # The pack itself has v and meta fields, this makes it possible to - # treat packs and single updates in the same way. - # - # The v field of the pack itself is from the first entry U1 - # The meta.end_ts field of the pack itself is from the last entry UN. - - findDocResults: (collection, query, limit, callback) -> - # query - the mongo query selector, includes both the doc_id/project_id and - # the range on v - # limit - the mongo limit, we need to apply it after unpacking any - # packs - - sort = {} - sort['v'] = -1; - cursor = collection - .find( query ) - .sort( sort ) - # if we have packs, we will trim the results more later after expanding them - if limit? - cursor.limit(limit) - - # take the part of the query which selects the range over the parameter - rangeQuery = query['v'] - - # helper function to check if an item from a pack is inside the - # desired range - filterFn = (item) -> - return false if rangeQuery?['$gte']? && item['v'] < rangeQuery['$gte'] - return false if rangeQuery?['$lte']? && item['v'] > rangeQuery['$lte'] - return false if rangeQuery?['$lt']? && item['v'] >= rangeQuery['$lt'] - return false if rangeQuery?['$gt']? && item['v'] <= rangeQuery['$gt'] - return true - - versionOrder = (a, b) -> - b.v - a.v - - # create a query which can be used to select the entries BEFORE - # the range because we sometimes need to find extra ones (when the - # boundary falls in the middle of a pack) - extraQuery = _.clone(query) - # The pack uses its first entry for its metadata and v, so the - # only queries where we might not get all the packs are those for - # $gt and $gte (i.e. we need to find packs which start before our - # range but end in it) - if rangeQuery?['$gte']? - extraQuery['v'] = {'$lt' : rangeQuery['$gte']} - else if rangeQuery?['$gt'] - extraQuery['v'] = {'$lte' : rangeQuery['$gt']} - else - delete extraQuery['v'] - - needMore = false # keep track of whether we need to load more data - updates = [] # used to accumulate the set of results - - # FIXME: packs are big so we should accumulate the results - # incrementally instead of using .toArray() to avoid reading all - # of the changes into memory - cursor.toArray (err, result) -> - unpackedSet = PackManager._unpackResults(result) - updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit) - # check if we need to retrieve more data, because there is a - # pack that crosses into our range - last = if unpackedSet.length then unpackedSet[unpackedSet.length-1] else null - if limit? && updates.length == limit - needMore = false - else if extraQuery['v']? && last? && filterFn(last) - needMore = true - else if extraQuery['v']? && updates.length == 0 - needMore = true - if needMore - # we do need an extra result set - extra = collection - .find(extraQuery) - .sort(sort) - .limit(1) - extra.toArray (err, result2) -> - if err? - return callback err, updates.sort versionOrder - else - extraSet = PackManager._unpackResults(result2) - updates = PackManager._filterAndLimit(updates, extraSet, filterFn, limit) - callback err, updates.sort versionOrder - return - if err? - callback err, result - else - callback err, updates.sort versionOrder - - findProjectResults: (collection, query, limit, callback) -> - # query - the mongo query selector, includes both the doc_id/project_id and - # the range on meta.end_ts - # limit - the mongo limit, we need to apply it after unpacking any - # packs - - sort = {} - sort['meta.end_ts'] = -1; - - projection = {"op":false, "pack.op": false} - cursor = collection - .find( query, projection ) # no need to return the op only need version info - .sort( sort ) - # if we have packs, we will trim the results more later after expanding them - if limit? - cursor.limit(limit) - - # take the part of the query which selects the range over the parameter - before = query['meta.end_ts']?['$lt'] # may be null - - updates = [] # used to accumulate the set of results - - # FIXME: packs are big so we should accumulate the results - # incrementally instead of using .toArray() to avoid reading all - # of the changes into memory - cursor.toArray (err, result) -> - if err? - return callback err, result - if result.length == 0 && not before? # no results and no time range specified - return callback err, result - - unpackedSet = PackManager._unpackResults(result) - if limit? - unpackedSet = unpackedSet.slice(0, limit) - # find the end time of the last result, we will take all the - # results up to this, and then all the changes at that time - # (without imposing a limit) and any overlapping packs - cutoff = if unpackedSet.length then unpackedSet[unpackedSet.length-1].meta.end_ts else null - - filterFn = (item) -> - ts = item?.meta?.end_ts - return false if before? && ts >= before - return false if cutoff? && ts < cutoff - return true - - timeOrder = (a, b) -> - (b.meta.end_ts - a.meta.end_ts) || documentOrder(a, b) - - documentOrder = (a, b) -> - x = a.doc_id.valueOf() - y = b.doc_id.valueOf() - if x > y then 1 else if x < y then -1 else 0 - - updates = PackManager._filterAndLimit(updates, unpackedSet, filterFn, limit) - - # get all elements on the lower bound (cutoff) - tailQuery = _.clone(query) - tailQuery['meta.end_ts'] = cutoff - tail = collection - .find(tailQuery, projection) - .sort(sort) - - # now find any packs that overlap with the time window from outside - # cutoff before - # --|-----wanted-range--|------------------ time=> - # |-------------|pack(end_ts) - # - # these were not picked up by the original query because - # end_ts>before but the beginning of the pack may be in the time range - overlapQuery = _.clone(query) - if before? && cutoff? - overlapQuery['meta.end_ts'] = {"$gte": before} - overlapQuery['meta.start_ts'] = {"$lte": before } - else if before? && not cutoff? - overlapQuery['meta.end_ts'] = {"$gte": before} - overlapQuery['meta.start_ts'] = {"$lte": before } - else if not before? && cutoff? - overlapQuery['meta.end_ts'] = {"$gte": cutoff} # we already have these?? - else if not before? && not cutoff? - overlapQuery['meta.end_ts'] = {"$gte": 0 } # shouldn't happen?? - - overlap = collection - .find(overlapQuery, projection) - .sort(sort) - - # we don't specify a limit here, as there could be any number of overlaps - # NB. need to catch items in original query and followup query for duplicates - - applyAndUpdate = (result) -> - extraSet = PackManager._unpackResults(result) - # note: final argument is null, no limit applied because we - # need all the updates at the final time to avoid breaking - # the changeset into parts - updates = PackManager._filterAndLimit(updates, extraSet, filterFn, null) - tail.toArray (err, result2) -> - if err? - return callback err, updates.sort timeOrder - else - applyAndUpdate result2 - overlap.toArray (err, result3) -> - if err? - return callback err, updates.sort timeOrder - else - applyAndUpdate result3 - callback err, updates.sort timeOrder - - _unpackResults: (updates) -> - # iterate over the updates, if there's a pack, expand it into ops and - # insert it into the array at that point - result = [] - updates.forEach (item) -> - if item.pack? - all = PackManager._explodePackToOps item - result = result.concat all - else - result.push item - return result - - _explodePackToOps: (packObj) -> - # convert a pack into an array of ops - doc_id = packObj.doc_id - project_id = packObj.project_id - result = packObj.pack.map (item) -> - item.doc_id = doc_id - item.project_id = project_id - item - return result.reverse() - - _filterAndLimit: (results, extra, filterFn, limit) -> - # update results with extra docs, after filtering and limiting - filtered = extra.filter(filterFn) - newResults = results.concat filtered - # remove duplicates - seen = {} - newResults = newResults.filter (item) -> - key = item.doc_id + ' ' + item.v - if seen[key] - return false - else - seen[key] = true - return true - newResults.slice(0, limit) if limit? - return newResults MAX_SIZE: 1024*1024 # make these configurable parameters - MAX_COUNT: 512 - - convertDocsToPacks: (docs, callback) -> - packs = [] - top = null - docs.forEach (d,i) -> - # skip existing packs - if d.pack? - top = null - return - sz = BSON.calculateObjectSize(d) - # decide if this doc can be added to the current pack - validLength = top? && (top.pack.length < PackManager.MAX_COUNT) - validSize = top? && (top.sz + sz < PackManager.MAX_SIZE) - bothPermanent = top? && (top.expiresAt? is false) && (d.expiresAt? is false) - bothTemporary = top? && (top.expiresAt? is true) && (d.expiresAt? is true) - within1Day = bothTemporary && (d.meta.start_ts - top.meta.start_ts < 24 * 3600 * 1000) - if top? && validLength && validSize && (bothPermanent || (bothTemporary && within1Day)) - top.pack = top.pack.concat {v: d.v, meta: d.meta, op: d.op, _id: d._id} - top.sz += sz - top.n += 1 - top.v_end = d.v - top.meta.end_ts = d.meta.end_ts - top.expiresAt = d.expiresAt if top.expiresAt? - return - else - # create a new pack - top = _.clone(d) - top.pack = [ {v: d.v, meta: d.meta, op: d.op, _id: d._id} ] - top.meta = { start_ts: d.meta.start_ts, end_ts: d.meta.end_ts } - top.sz = sz - top.n = 1 - top.v_end = d.v - delete top.op - delete top._id - packs.push top - - callback(null, packs) - - checkHistory: (docs, callback) -> - errors = [] - prev = null - error = (args...) -> - errors.push args - docs.forEach (d,i) -> - if d.pack? - n = d.pack.length - last = d.pack[n-1] - error('bad pack v_end', d) if d.v_end != last.v - error('bad pack start_ts', d) if d.meta.start_ts != d.pack[0].meta.start_ts - error('bad pack end_ts', d) if d.meta.end_ts != last.meta.end_ts - d.pack.forEach (p, i) -> - prev = v - v = p.v - error('bad version', v, 'in', p) if v <= prev - #error('expired op', p, 'in pack') if p.expiresAt? - else - prev = v - v = d.v - error('bad version', v, 'in', d) if v <= prev - if errors.length - callback(errors) - else - callback() - - insertPack: (packObj, callback) -> - bulk = db.docHistory.initializeOrderedBulkOp() - doc_id = packObj.doc_id - expect_nInserted = 1 - expect_nRemoved = packObj.pack.length - logger.log {doc_id: doc_id}, "adding pack, removing #{expect_nRemoved} ops" - bulk.insert packObj - ids = (op._id for op in packObj.pack) - bulk.find({_id:{$in:ids}}).remove() - bulk.execute (err, result) -> - if err? - logger.error {doc_id: doc_id}, "error adding pack" - callback(err, result) - else if result.nInserted != expect_nInserted or result.nRemoved != expect_nRemoved - logger.error {doc_id: doc_id, result}, "unexpected result adding pack" - callback(new Error( - msg: 'unexpected result' - expected: {expect_nInserted, expect_nRemoved} - ), result) - else - db.docHistoryStats.update {doc_id:doc_id}, { - $inc:{update_count:-expect_nRemoved}, - $currentDate:{last_packed:true} - }, {upsert:true}, () -> - callback(err, result) - - # retrieve document ops/packs and check them - getDocHistory: (doc_id, callback) -> - db.docHistory.find({doc_id:ObjectId(doc_id)}).sort {v:1}, (err, docs) -> - return callback(err) if err? - # for safety, do a consistency check of the history - logger.log {doc_id}, "checking history for document" - PackManager.checkHistory docs, (err) -> - return callback(err) if err? - callback(err, docs) - #PackManager.deleteExpiredPackOps docs, (err) -> - # return callback(err) if err? - # callback err, docs - - packDocHistory: (doc_id, options, callback) -> - if typeof callback == "undefined" and typeof options == 'function' - callback = options - options = {} - LockManager.runWithLock( - "HistoryLock:#{doc_id}", - (releaseLock) -> - PackManager._packDocHistory(doc_id, options, releaseLock) - , callback - ) - - _packDocHistory: (doc_id, options, callback) -> - logger.log {doc_id},"starting pack operation for document history" - - PackManager.getDocHistory doc_id, (err, docs) -> - return callback(err) if err? - origDocs = 0 - origPacks = 0 - for d in docs - if d.pack? then origPacks++ else origDocs++ - PackManager.convertDocsToPacks docs, (err, packs) -> - return callback(err) if err? - total = 0 - for p in packs - total = total + p.pack.length - logger.log {doc_id, origDocs, origPacks, newPacks: packs.length, totalOps: total}, "document stats" - if packs.length - if options['dry-run'] - logger.log {doc_id}, 'dry-run, skipping write packs' - return callback() - PackManager.savePacks packs, (err) -> - return callback(err) if err? - # check the history again - PackManager.getDocHistory doc_id, callback - else - logger.log {doc_id}, "no packs to write" - # keep a record that we checked this one to avoid rechecking it - db.docHistoryStats.update {doc_id:doc_id}, { - $currentDate:{last_checked:true} - }, {upsert:true}, () -> - callback null, null - - DB_WRITE_DELAY: 100 - - savePacks: (packs, callback) -> - async.eachSeries packs, PackManager.safeInsert, (err, result) -> - if err? - logger.log {err, result}, "error writing packs" - callback err, result - else - callback() - - safeInsert: (packObj, callback) -> - PackManager.insertPack packObj, (err, result) -> - setTimeout () -> - callback(err,result) - , PackManager.DB_WRITE_DELAY - - deleteExpiredPackOps: (docs, callback) -> - now = Date.now() - toRemove = [] - toUpdate = [] - docs.forEach (d,i) -> - if d.pack? - newPack = d.pack.filter (op) -> - if op.expiresAt? then op.expiresAt > now else true - if newPack.length == 0 - toRemove.push d - else if newPack.length < d.pack.length - # adjust the pack properties - d.pack = newPack - first = d.pack[0] - last = d.pack[d.pack.length - 1] - d.v_end = last.v - d.meta.start_ts = first.meta.start_ts - d.meta.end_ts = last.meta.end_ts - toUpdate.push d - if toRemove.length or toUpdate.length - bulk = db.docHistory.initializeOrderedBulkOp() - toRemove.forEach (pack) -> - console.log "would remove", pack - #bulk.find({_id:pack._id}).removeOne() - toUpdate.forEach (pack) -> - console.log "would update", pack - #bulk.find({_id:pack._id}).updateOne(pack); - bulk.execute callback - else - callback() + MAX_COUNT: 1024 insertCompressedUpdates: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> return callback() if newUpdates.length == 0 @@ -509,7 +95,12 @@ module.exports = PackManager = if temporary newPack.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, newUpdates}, "inserting updates into new pack" - db.docHistory.insert newPack, callback + db.docHistory.save newPack, (err, result) -> + return callback(err) if err? + if temporary + return callback() + else + PackManager.updateIndex project_id, doc_id, callback appendUpdatesToExistingPack: (project_id, doc_id, lastUpdate, newUpdates, temporary, callback = (error) ->) -> first = newUpdates[0] @@ -533,11 +124,330 @@ module.exports = PackManager = if lastUpdate.expiresAt and temporary update.$set.expiresAt = new Date(Date.now() + 7 * DAYS) logger.log {project_id, doc_id, lastUpdate, newUpdates}, "appending updates to existing pack" - db.docHistory.findAndModify {query, update}, callback + db.docHistory.findAndModify {query, update, new:true, fields:{meta:1,v_end:1}}, callback - listDocs: (options, callback) -> - query = {"op.p":{$exists:true}} - query.doc_id = {$gt: ObjectId(options.doc_id)} if options.doc_id? - db.docHistory.find(query, {doc_id:true}).sort({doc_id:1}).limit (options.limit||100), (err, docs) -> + # Retrieve all changes for a document + + getOpsByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback = (error, updates) ->) -> + PackManager.loadPacksByVersionRange project_id, doc_id, fromVersion, toVersion, (error) -> + query = {doc_id:ObjectId(doc_id.toString())} + query.v = {$lte:toVersion} if toVersion? + query.v_end = {$gte:fromVersion} if fromVersion? + #console.log "query:", query + db.docHistory.find(query).sort {v:-1}, (err, result) -> + return callback(err) if err? + #console.log "getOpsByVersionRange:", err, result + updates = [] + opInRange = (op, from, to) -> + return false if fromVersion? and op.v < fromVersion + return false if toVersion? and op.v > toVersion + return true + for docHistory in result + #console.log 'adding', docHistory.pack + for op in docHistory.pack.reverse() when opInRange(op, fromVersion, toVersion) + op.project_id = docHistory.project_id + op.doc_id = docHistory.doc_id + #console.log "added op", op.v, fromVersion, toVersion + updates.push op + callback(null, updates) + + loadPacksByVersionRange: (project_id, doc_id, fromVersion, toVersion, callback) -> + PackManager.getIndex doc_id, (err, indexResult) -> return callback(err) if err? - callback(null, docs) + indexPacks = indexResult?.packs or [] + packInRange = (pack, from, to) -> + return false if fromVersion? and pack.v_end < fromVersion + return false if toVersion? and pack.v > toVersion + return true + neededIds = (pack._id for pack in indexPacks when packInRange(pack, fromVersion, toVersion)) + PackManager.fetchPacksIfNeeded project_id, doc_id, neededIds, callback + + fetchPacksIfNeeded: (project_id, doc_id, pack_ids, callback) -> + db.docHistory.find {_id: {$in: (ObjectId(id) for id in pack_ids)}}, {_id:1}, (err, loadedPacks) -> + return callback(err) if err? + allPackIds = (id.toString() for id in pack_ids) + loadedPackIds = (pack._id.toString() for pack in loadedPacks) + packIdsToFetch = _.difference allPackIds, loadedPackIds + logger.log {loadedPackIds, allPackIds, packIdsToFetch}, "analysed packs" + async.eachLimit packIdsToFetch, 4, (pack_id, cb) -> + MongoAWS.unArchivePack project_id, doc_id, pack_id, cb + , (err) -> + return callback(err) if err? + logger.log "done unarchiving" + callback() + + # Retrieve all changes across a project + + makeProjectIterator: (project_id, before, callback) -> + # get all the docHistory Entries + db.docHistory.find({project_id: ObjectId(project_id)},{pack:false}).sort {"meta.end_ts":-1}, (err, packs) -> + return callback(err) if err? + allPacks = [] + seenIds = {} + for pack in packs + allPacks.push pack + seenIds[pack._id] = true + db.docHistoryIndex.find {project_id: ObjectId(project_id)}, (err, indexes) -> + return callback(err) if err? + for index in indexes + for pack in index.packs when not seenIds[pack._id] + pack.project_id = index.project_id + pack.doc_id = index._id + pack.fromIndex = true + allPacks.push pack + seenIds[pack._id] = true + callback(null, new ProjectIterator(allPacks, before, PackManager.getPackById)) + + getPackById: (project_id, doc_id, pack_id, callback) -> + db.docHistory.findOne {_id: pack_id}, (err, pack) -> + return callback(err) if err? + if not pack? + MongoAWS.unArchivePack project_id, doc_id, pack_id, callback + else if pack.expiresAt? + # we only need to touch the TTL on the listing of changes in the project + # because diffs on individual documents are always done after that + PackManager.increaseTTL pack, callback + else + callback(null, pack) + + increaseTTL: (pack, callback) -> + if pack.expiresAt < new Date(Date.now() + 6 * DAYS) + # update cache expiry since we are using this pack + db.docHistory.findAndModify { + query: {_id: pack._id} + update: {$set: {expiresAt: new Date(Date.now() + 7 * DAYS)}} + }, (err) -> + return callback(err, pack) + else + callback(null, pack) + + # Manage docHistoryIndex collection + + getIndex: (doc_id, callback) -> + db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString())}, callback + + getPackFromIndex: (doc_id, pack_id, callback) -> + db.docHistoryIndex.findOne {_id:ObjectId(doc_id.toString()), "packs._id": pack_id}, {"packs.$":1}, callback + + getLastPackFromIndex: (doc_id, callback) -> + db.docHistoryIndex.findOne {_id: ObjectId(doc_id.toString())}, {packs:{$slice:-1}}, (err, indexPack) -> + return callback(err) if err? + return callback() if not indexPack? + callback(null,indexPack[0]) + + getIndexWithKeys: (doc_id, callback) -> + PackManager.getIndex doc_id, (err, index) -> + return callback(err) if err? + return callback() if not index? + for pack in index?.packs or [] + index[pack._id] = pack + callback(null, index) + + initialiseIndex: (project_id, doc_id, callback) -> + PackManager.findCompletedPacks project_id, doc_id, (err, packs) -> + #console.log 'err', err, 'packs', packs, packs?.length + return callback(err) if err? + return callback() if not packs? + PackManager.insertPacksIntoIndexWithLock project_id, doc_id, packs, callback + + updateIndex: (project_id, doc_id, callback) -> + # find all packs prior to current pack + PackManager.findUnindexedPacks project_id, doc_id, (err, newPacks) -> + return callback(err) if err? + return callback() if not newPacks? + PackManager.insertPacksIntoIndexWithLock project_id, doc_id, newPacks, (err) -> + return callback(err) if err? + logger.log {project_id, doc_id, newPacks}, "added new packs to index" + callback() + + findCompletedPacks: (project_id, doc_id, callback) -> + query = { doc_id: ObjectId(doc_id.toString()) } + db.docHistory.find(query, {pack:false}).sort {v:1}, (err, packs) -> + return callback(err) if err? + return callback() if not packs? + return callback() if packs?.length <= 1 + packs.pop() # discard the last pack, it's still in progress + callback(null, packs) + + # findPacks: (project_id, doc_id, queryFilter, callback) -> + # query = { doc_id: ObjectId(doc_id.toString()) } + # query = _.defaults query, queryFilter if queryFilter? + # db.docHistory.find(query, {pack:false}).sort {v:1}, callback + + findUnindexedPacks: (project_id, doc_id, callback) -> + PackManager.getIndexWithKeys doc_id, (err, indexResult) -> + return callback(err) if err? + PackManager.findCompletedPacks project_id, doc_id, (err, historyPacks) -> + return callback(err) if err? + return callback() if not historyPacks? + # select only the new packs not already in the index + newPacks = (pack for pack in historyPacks when not indexResult[pack._id]?) + newPacks = (_.omit(pack, 'doc_id', 'project_id', 'n', 'sz') for pack in newPacks) + logger.log {project_id, doc_id, n: newPacks.length}, "found new packs" + callback(null, newPacks) + + insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) -> + LockManager.runWithLock( + "HistoryIndexLock:#{doc_id}", + (releaseLock) -> + PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock + callback + ) + + _insertPacksIntoIndex: (project_id, doc_id, newPacks, callback) -> + db.docHistoryIndex.findAndModify { + query: {_id:ObjectId(doc_id.toString())} + update: + $setOnInsert: project_id: ObjectId(project_id.toString()) + $push: + packs: {$each: newPacks, $sort: {v: 1}} + upsert: true + }, callback + + # Archiving packs to S3 + + archivePack: (project_id, doc_id, pack_id, callback) -> + clearFlagOnError = (err, cb) -> + if err? # clear the inS3 flag on error + PackManager.clearPackAsArchiveInProgress project_id, doc_id, pack_id, (err2) -> + return cb(err2) if err2? + return cb(err) + else + cb() + async.series [ + (cb) -> + PackManager.checkArchiveNotInProgress project_id, doc_id, pack_id, cb + (cb) -> + PackManager.markPackAsArchiveInProgress project_id, doc_id, pack_id, cb + (cb) -> + MongoAWS.archivePack project_id, doc_id, pack_id, (err) -> + clearFlagOnError(err, cb) + (cb) -> + PackManager.checkArchivedPack project_id, doc_id, pack_id, (err) -> + clearFlagOnError(err, cb) + (cb) -> + PackManager.markPackAsArchived project_id, doc_id, pack_id, cb + (cb) -> + PackManager.setTTLOnArchivedPack project_id, doc_id, pack_id, callback + ], callback + + + checkArchivedPack: (project_id, doc_id, pack_id, callback) -> + db.docHistory.findOne {_id: pack_id}, (err, pack) -> + return callback(err) if err? + return callback new Error("pack not found") if not pack? + MongoAWS.readArchivedPack project_id, doc_id, pack_id, (err, result) -> + delete result.last_checked + delete pack.last_checked + # need to compare ids as ObjectIds with .equals() + for key in ['_id', 'project_id', 'doc_id'] + result[key] = pack[key] if result[key].equals(pack[key]) + for op, i in result.pack + op._id = pack.pack[i]._id if op._id? and op._id.equals(pack.pack[i]._id) + if _.isEqual pack, result + callback() + else + logger.err {pack, result, jsondiff: JSON.stringify(pack) is JSON.stringify(result)}, "difference when comparing packs" + callback new Error("pack retrieved from s3 does not match pack in mongo") + + # Processing old packs via worker + + processOldPack: (project_id, doc_id, pack_id, callback) -> + markAsChecked = (err) -> + PackManager.markPackAsChecked project_id, doc_id, pack_id, (err2) -> + return callback(err2) if err2? + callback(err) + logger.log {project_id, doc_id}, "processing old packs" + db.docHistory.findOne {_id:pack_id}, (err, pack) -> + return markAsChecked(err) if err? + return markAsChecked() if not pack? + return callback() if pack.expiresAt? # return directly + PackManager.updateIndexIfNeeded project_id, doc_id, (err) -> + return markAsChecked(err) if err? + PackManager.findUnarchivedPacks project_id, doc_id, (err, unarchivedPacks) -> + return markAsChecked(err) if err? + if not unarchivedPacks?.length + logger.log "no packs need archiving" + return markAsChecked() + async.eachSeries unarchivedPacks, (pack, cb) -> + PackManager.archivePack project_id, doc_id, pack._id, cb + , (err) -> + return markAsChecked(err) if err? + logger.log "done processing" + markAsChecked() + + updateIndexIfNeeded: (project_id, doc_id, callback) -> + logger.log {project_id, doc_id}, "archiving old packs" + PackManager.getIndexWithKeys doc_id, (err, index) -> + return callback(err) if err? + if not index? + PackManager.initialiseIndex project_id, doc_id, callback + else + PackManager.updateIndex project_id, doc_id, callback + + markPackAsChecked: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id, pack_id}, "marking pack as checked" + db.docHistory.findAndModify { + query: {_id: pack_id} + update: {$currentDate: {"last_checked":true}} + }, callback + + findUnarchivedPacks: (project_id, doc_id, callback) -> + PackManager.getIndex doc_id, (err, indexResult) -> + return callback(err) if err? + indexPacks = indexResult?.packs or [] + unArchivedPacks = (pack for pack in indexPacks when not pack.inS3?) + logger.log {project_id, doc_id, n: unArchivedPacks.length}, "find unarchived packs" + callback(null, unArchivedPacks) + + # Archive locking flags + + checkArchiveNotInProgress: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id, pack_id}, "checking if archive in progress" + PackManager.getPackFromIndex doc_id, pack_id, (err, result) -> + return callback(err) if err? + return callback new Error("pack not found in index") if not result? + if result.inS3? + callback new Error("pack archiving already in progress") + else + callback() + + markPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id}, "marking pack as archive in progress status" + db.docHistoryIndex.findAndModify { + query: {_id:ObjectId(doc_id.toString()), packs: {$elemMatch: {"_id": pack_id, inS3: {$exists:false}}}} + fields: { "packs.$": 1 } + update: {$set: {"packs.$.inS3":false}} + }, (err, result) -> + return callback(err) if err? + return callback new Error("archive is already in progress") if not result? + logger.log {project_id, doc_id, pack_id}, "marked as archive in progress" + callback() + + clearPackAsArchiveInProgress: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id, pack_id}, "clearing as archive in progress" + db.docHistoryIndex.findAndModify { + query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}} + fields: { "packs.$": 1 } + update: {$unset: {"packs.$.inS3":true}} + }, callback + + markPackAsArchived: (project_id, doc_id, pack_id, callback) -> + logger.log {project_id, doc_id, pack_id}, "marking pack as archived" + db.docHistoryIndex.findAndModify { + query: {_id:ObjectId(doc_id.toString()), "packs" : {$elemMatch: {"_id": pack_id, inS3: false}}} + fields: { "packs.$": 1 } + update: {$set: {"packs.$.inS3":true}} + }, (err, result) -> + return callback(err) if err? + return callback new Error("archive is not marked as progress") if not result? + logger.log {project_id, doc_id, pack_id}, "marked as archived" + callback() + + setTTLOnArchivedPack: (project_id, doc_id, pack_id, callback) -> + db.docHistory.findAndModify { + query: {_id: pack_id} + update: {$set: {expiresAt: new Date(Date.now() + 1*DAYS)}} + }, (err) -> + logger.log {project_id, doc_id, pack_id}, "set expiry on pack" + callback() diff --git a/services/track-changes/app/coffee/PackWorker.coffee b/services/track-changes/app/coffee/PackWorker.coffee index 7b37969229..a6d31b8436 100644 --- a/services/track-changes/app/coffee/PackWorker.coffee +++ b/services/track-changes/app/coffee/PackWorker.coffee @@ -7,11 +7,13 @@ logger.initialize("track-changes-packworker") if Settings.sentry?.dsn? logger.initializeErrorReporting(Settings.sentry.dsn) +DAYS = 24 * 3600 * 1000 + LockManager = require "./LockManager" PackManager = require "./PackManager" # this worker script is forked by the main process to look for -# document histories which can be packed +# document histories which can be archived LIMIT = Number(process.argv[2]) || 1000 DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000 @@ -24,7 +26,7 @@ shutDownTimer = setTimeout () -> shutDownRequested = true # do a hard shutdown after a further 5 minutes setTimeout () -> - logger.error "HARD TIMEOUT in pack worker" + logger.error "HARD TIMEOUT in pack archive worker" process.exit() , 5*60*1000 , TIMEOUT @@ -47,54 +49,59 @@ finish = () -> db.close () -> logger.log 'closing LockManager Redis Connection' LockManager.close () -> - logger.log 'ready to exit from pack worker' + logger.log 'ready to exit from pack archive worker' hardTimeout = setTimeout () -> - logger.error 'hard exit from pack worker' + logger.error 'hard exit from pack archive worker' process.exit(1) , 5*1000 hardTimeout.unref() process.on 'exit', (code) -> - logger.log {code}, 'pack worker exited' + logger.log {code}, 'pack archive worker exited' processUpdates = (pending) -> - async.eachSeries pending, (doc_id, callback) -> - PackManager.packDocHistory doc_id, (err, result) -> + async.eachSeries pending, (result, callback) -> + {_id, project_id, doc_id} = result + if not project_id? or not doc_id? + logger.log {project_id, doc_id}, "skipping pack, missing project/doc id" + return callback() + PackManager.processOldPack project_id, doc_id, _id, (err, result) -> if err? - logger.error {err, result}, "error in pack worker" + logger.error {err, result}, "error in pack archive worker" return callback(err) if shutDownRequested - logger.error "shutting down pack worker" + logger.error "shutting down pack archive worker" return callback(new Error("shutdown")) setTimeout () -> callback(err, result) , DOCUMENT_PACK_DELAY , (err, results) -> if err? and err.message != "shutdown" - logger.error {err}, 'error in pack worker processUpdates' + logger.error {err}, 'error in pack archive worker processUpdates' finish() -# find the documents which can be packed, by checking the number of -# unpacked updates in the docHistoryStats collection +# find the packs which can be archived -db.docHistoryStats.find({ - update_count: {$gt : PackManager.MIN_COUNT} -}).sort({ - update_count:-1 +ObjectIdFromDate = (date) -> + id = Math.floor(date.getTime() / 1000).toString(16) + "0000000000000000"; + return ObjectId(id) + +# new approach, two passes +# find packs to be marked as finalised:true, those which have a newer pack present +# then only consider finalised:true packs for archiving + +db.docHistory.find({ + expiresAt: {$exists: false} + project_id: {$exists: true} + v_end: {$exists: true} + _id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))} +}, {_id:1, doc_id:1, project_id:1}).sort({ + last_checked:1 }).limit LIMIT, (err, results) -> if err? logger.log {err}, 'error checking for updates' finish() return - results = _.filter results, (doc) -> - if doc.last_checked? and doc.last_checked > doc.last_update - # skip documents which don't have any updates since last check - return false - else if doc.last_packed? and doc.last_packed > doc.last_update - # skip documents which don't have any updates since last pack - return false - else - return true - pending = _.pluck results, 'doc_id' - logger.log "found #{pending.length} documents to pack" + pending = _.uniq results, false, (result) -> result.doc_id.toString() + logger.log "found #{pending.length} documents to archive" processUpdates pending diff --git a/services/track-changes/app/coffee/ProjectIterator.coffee b/services/track-changes/app/coffee/ProjectIterator.coffee new file mode 100644 index 0000000000..b0bed523df --- /dev/null +++ b/services/track-changes/app/coffee/ProjectIterator.coffee @@ -0,0 +1,66 @@ +async = require "async" +_ = require "underscore" +{db, ObjectId, BSON} = require "./mongojs" +logger = require "logger-sharelatex" +Heap = require "heap" + +module.exports = ProjectIterator = + + class ProjectIterator + constructor: (packs, @before, @getPackByIdFn) -> + byEndTs = (a,b) -> (b.meta.end_ts - a.meta.end_ts) || (a.fromIndex - b.fromIndex) + @packs = packs.slice().sort byEndTs + @queue = new Heap(byEndTs) + + next: (callback) -> + # what's up next + #console.log ">>> top item", iterator.packs[0] + iterator = this + before = @before + queue = iterator.queue + opsToReturn = [] + nextPack = iterator.packs[0] + lowWaterMark = nextPack?.meta.end_ts || 0 + nextItem = queue.peek() + + #console.log "queue empty?", queue.empty() + #console.log "nextItem", nextItem + #console.log "nextItem.meta.end_ts", nextItem?.meta.end_ts + #console.log "lowWaterMark", lowWaterMark + + while before? and nextPack?.meta.start_ts > before + # discard pack that is outside range + iterator.packs.shift() + nextPack = iterator.packs[0] + lowWaterMark = nextPack?.meta.end_ts || 0 + + if (queue.empty() or nextItem?.meta.end_ts <= lowWaterMark) and nextPack? + # retrieve the next pack and populate the queue + return @getPackByIdFn nextPack.project_id, nextPack.doc_id, nextPack._id, (err, pack) -> + return callback(err) if err? + iterator.packs.shift() # have now retrieved this pack, remove it + #console.log "got pack", pack + for op in pack.pack when (not before? or op.meta.end_ts < before) + #console.log "adding op", op + op.doc_id = nextPack.doc_id + op.project_id = nextPack.project_id + queue.push op + # now try again + return iterator.next(callback) + + #console.log "nextItem", nextItem, "lowWaterMark", lowWaterMark + while nextItem? and (nextItem?.meta.end_ts > lowWaterMark) + opsToReturn.push nextItem + queue.pop() + nextItem = queue.peek() + + #console.log "queue empty?", queue.empty() + #console.log "nextPack", nextPack? + + if queue.empty() and not nextPack? # got everything + iterator._done = true + + callback(null, opsToReturn) + + done: () -> + return @_done diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index bffd7f540e..537d87868d 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -7,7 +7,6 @@ WebApiManager = require "./WebApiManager" UpdateTrimmer = require "./UpdateTrimmer" logger = require "logger-sharelatex" async = require "async" -DocArchiveManager = require "./DocArchiveManager" _ = require "underscore" Settings = require "settings-sharelatex" @@ -17,6 +16,8 @@ module.exports = UpdatesManager = if length == 0 return callback() + # FIXME: we no longer need the lastCompressedUpdate, so change functions not to need it + # CORRECTION: we do use it to log the time in case of error MongoManager.peekLastCompressedUpdate doc_id, (error, lastCompressedUpdate, lastVersion) -> # lastCompressedUpdate is the most recent update in Mongo, and # lastVersion is its sharejs version number. @@ -58,46 +59,10 @@ module.exports = UpdatesManager = logger.error err: error, doc_id: doc_id, project_id: project_id, size: size, rawUpdate: rawUpdate, "dropped op - too big" rawUpdate.op = [] - if (not lastCompressedUpdate?) or lastCompressedUpdate.pack? # handle pack append as a special case - UpdatesManager._updatePack project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback - else #use the existing op code - UpdatesManager._updateOp project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback - - _updatePack: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> - compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates - PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) -> - return callback(error) if error? - logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? - callback() - - _updateOp: (project_id, doc_id, rawUpdates, temporary, lastCompressedUpdate, lastVersion, callback) -> - compressedUpdates = UpdateCompressor.compressRawUpdates lastCompressedUpdate, rawUpdates - - if not lastCompressedUpdate? - # no existing update, insert everything - updateToModify = null - updatesToInsert = compressedUpdates - else - # there are existing updates, see what happens when we - # compress them together with the new ones - [firstUpdate, additionalUpdates...] = compressedUpdates - - if firstUpdate.v == lastCompressedUpdate.v and _.isEqual(firstUpdate, lastCompressedUpdate) - # first update version hasn't changed, skip it and insert remaining updates - # this is an optimisation, we could update the existing op with itself - updateToModify = null - updatesToInsert = additionalUpdates - else - # first update version did changed, modify it and insert remaining updates - updateToModify = firstUpdate - updatesToInsert = additionalUpdates - - MongoManager.modifyCompressedUpdate lastCompressedUpdate, updateToModify, (error, result) -> - return callback(error) if error? - logger.log {project_id, doc_id, orig_v: lastCompressedUpdate.v, new_v: result.v}, "applied update in-place" if result? - MongoManager.insertCompressedUpdates project_id, doc_id, updatesToInsert, temporary,(error) -> + compressedUpdates = UpdateCompressor.compressRawUpdates null, rawUpdates + PackManager.insertCompressedUpdates project_id, doc_id, lastCompressedUpdate, compressedUpdates, temporary, (error, result) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, rawUpdatesLength: rawUpdates.length, compressedUpdatesLength: compressedUpdates.length, "compressed doc updates" + logger.log {project_id, doc_id, orig_v: lastCompressedUpdate?.v, new_v: result.v}, "inserted updates into pack" if result? callback() REDIS_READ_BATCH_SIZE: 100 @@ -151,7 +116,12 @@ module.exports = UpdatesManager = getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> return callback(error) if error? - MongoManager.getDocUpdates doc_id, options, callback + #console.log "options", options + PackManager.getOpsByVersionRange project_id, doc_id, options.from, options.to, (error, updates) -> + return callback(error) if error? + UpdatesManager.fillUserInfo updates, (err, results) -> + return callback(err) if err? + callback null, results getDocUpdatesWithUserInfo: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.getDocUpdates project_id, doc_id, options, (error, updates) -> @@ -160,98 +130,83 @@ module.exports = UpdatesManager = return callback(error) if error? callback null, updates - getProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) -> - UpdatesManager.processUncompressedUpdatesForProject project_id, (error) -> - return callback(error) if error? - MongoManager.getProjectUpdates project_id, options, (error, updates) -> - jobs = [] - for update in updates - if update.inS3 - do (update) -> - jobs.push (callback) -> DocArchiveManager.unArchiveDocChanges update.project_id, update.doc_id, callback - if jobs.length? - async.series jobs, (err) -> - MongoManager.getProjectUpdates project_id, options, callback - else - callback(error, updates) - - getProjectUpdatesWithUserInfo: (project_id, options = {}, callback = (error, updates) ->) -> - UpdatesManager.getProjectUpdates project_id, options, (error, updates) -> - return callback(error) if error? - UpdatesManager.fillUserInfo updates, (error, updates) -> - return callback(error) if error? - callback null, updates - getSummarizedProjectUpdates: (project_id, options = {}, callback = (error, updates) ->) -> options.min_count ||= 25 summarizedUpdates = [] before = options.before - do fetchNextBatch = () -> - UpdatesManager._extendBatchOfSummarizedUpdates project_id, summarizedUpdates, before, options.min_count, (error, updates, nextBeforeUpdate) -> - return callback(error) if error? - if !nextBeforeUpdate? or updates.length >= options.min_count - callback null, updates, nextBeforeUpdate - else - before = nextBeforeUpdate - summarizedUpdates = updates - fetchNextBatch() - - _extendBatchOfSummarizedUpdates: ( - project_id, - existingSummarizedUpdates, - before, desiredLength, - callback = (error, summarizedUpdates, endOfDatabase) -> - ) -> - UpdatesManager.getProjectUpdatesWithUserInfo project_id, { before: before, limit: 3 * desiredLength }, (error, updates) -> + nextBeforeTimestamp = null + UpdatesManager.processUncompressedUpdatesForProject project_id, (error) -> return callback(error) if error? + PackManager.makeProjectIterator project_id, before, (err, iterator) -> + return callback(err) if err? + # repeatedly get updates and pass them through the summariser to get an final output with user info + async.whilst () -> + #console.log "checking iterator.done", iterator.done() + return summarizedUpdates.length < options.min_count and not iterator.done() + , (cb) -> + iterator.next (err, partialUpdates) -> + return callback(err) if err? + #logger.log {partialUpdates}, 'got partialUpdates' + return cb() if partialUpdates.length is 0 ## FIXME should try to avoid this happening + nextBeforeTimestamp = partialUpdates[partialUpdates.length - 1].meta.end_ts + # add the updates to the summary list + summarizedUpdates = UpdatesManager._summarizeUpdates partialUpdates, summarizedUpdates + cb() + , () -> + # finally done all updates + #console.log 'summarized Updates', summarizedUpdates + UpdatesManager.fillSummarizedUserInfo summarizedUpdates, (err, results) -> + return callback(err) if err? + callback null, results, if not iterator.done() then nextBeforeTimestamp else undefined - # Suppose in this request we have fetch the solid updates. In the next request we need - # to fetch the dotted updates. These are defined by having an end timestamp less than - # the last update's end timestamp (updates are ordered by descending end_ts). I.e. - # start_ts--v v--end_ts - # doc1: |......| |...| |-------| - # doc2: |------------------| - # ^----- Next time, fetch all updates with an - # end_ts less than this - # - if updates? and updates.length > 0 - nextBeforeTimestamp = updates[updates.length - 1].meta.end_ts - if nextBeforeTimestamp >= before - error = new Error("history order is broken") - logger.error err: error, project_id:project_id, nextBeforeTimestamp: nextBeforeTimestamp, before:before, "error in project history" - return callback(error) - else - nextBeforeTimestamp = null - - summarizedUpdates = UpdatesManager._summarizeUpdates( - updates, existingSummarizedUpdates - ) - callback null, - summarizedUpdates, - nextBeforeTimestamp - - fillUserInfo: (updates, callback = (error, updates) ->) -> - users = {} - for update in updates - if UpdatesManager._validUserId(update.meta.user_id) - users[update.meta.user_id] = true - + fetchUserInfo: (users, callback = (error, fetchedUserInfo) ->) -> jobs = [] + fetchedUserInfo = {} for user_id of users do (user_id) -> jobs.push (callback) -> WebApiManager.getUserInfo user_id, (error, userInfo) -> return callback(error) if error? - users[user_id] = userInfo + fetchedUserInfo[user_id] = userInfo callback() - async.series jobs, (error) -> + async.series jobs, (err) -> + return callback(err) if err? + callback(null, fetchedUserInfo) + + fillUserInfo: (updates, callback = (error, updates) ->) -> + users = {} + for update in updates + user_id = update.meta.user_id + if UpdatesManager._validUserId(user_id) + users[user_id] = true + + UpdatesManager.fetchUserInfo users, (error, fetchedUserInfo) -> return callback(error) if error? for update in updates user_id = update.meta.user_id delete update.meta.user_id if UpdatesManager._validUserId(user_id) - update.meta.user = users[user_id] + update.meta.user = fetchedUserInfo[user_id] + callback null, updates + + fillSummarizedUserInfo: (updates, callback = (error, updates) ->) -> + users = {} + for update in updates + user_ids = update.meta.user_ids or [] + for user_id in user_ids + if UpdatesManager._validUserId(user_id) + users[user_id] = true + + UpdatesManager.fetchUserInfo users, (error, fetchedUserInfo) -> + return callback(error) if error? + for update in updates + user_ids = update.meta.user_ids or [] + update.meta.users = [] + delete update.meta.user_ids + for user_id in user_ids + if UpdatesManager._validUserId(user_id) + update.meta.users.push fetchedUserInfo[user_id] callback null, updates _validUserId: (user_id) -> @@ -260,7 +215,6 @@ module.exports = UpdatesManager = else return !!user_id.match(/^[a-f0-9]{24}$/) - TIME_BETWEEN_DISTINCT_UPDATES: fiveMinutes = 5 * 60 * 1000 _summarizeUpdates: (updates, existingSummarizedUpdates = []) -> summarizedUpdates = existingSummarizedUpdates.slice() @@ -269,13 +223,7 @@ module.exports = UpdatesManager = if earliestUpdate and earliestUpdate.meta.start_ts - update.meta.end_ts < @TIME_BETWEEN_DISTINCT_UPDATES # check if the user in this update is already present in the earliest update, # if not, add them to the users list of the earliest update - userExists = false - for user in earliestUpdate.meta.users - if (!user and !update.meta.user) or (user?.id == update.meta.user?.id) - userExists = true - break - if !userExists - earliestUpdate.meta.users.push update.meta.user + earliestUpdate.meta.user_ids = _.union earliestUpdate.meta.user_ids, update.meta.user_id doc_id = update.doc_id.toString() doc = earliestUpdate.docs[doc_id] @@ -292,7 +240,7 @@ module.exports = UpdatesManager = else newUpdate = meta: - users: [] + user_ids: [] start_ts: update.meta.start_ts end_ts: update.meta.end_ts docs: {} @@ -300,7 +248,7 @@ module.exports = UpdatesManager = newUpdate.docs[update.doc_id.toString()] = fromV: update.v toV: update.v - newUpdate.meta.users.push update.meta.user + newUpdate.meta.user_ids.push update.meta.user_id summarizedUpdates.push newUpdate return summarizedUpdates diff --git a/services/track-changes/app/coffee/mongojs.coffee b/services/track-changes/app/coffee/mongojs.coffee index be29816b82..87867330bb 100644 --- a/services/track-changes/app/coffee/mongojs.coffee +++ b/services/track-changes/app/coffee/mongojs.coffee @@ -1,7 +1,7 @@ Settings = require "settings-sharelatex" mongojs = require "mongojs" bson = require "bson" -db = mongojs(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryStats"]) +db = mongojs(Settings.mongo.url, ["docHistory", "projectHistoryMetaData", "docHistoryIndex"]) module.exports = db: db ObjectId: mongojs.ObjectId diff --git a/services/track-changes/package.json b/services/track-changes/package.json index 581e97c79d..be0c3487a0 100644 --- a/services/track-changes/package.json +++ b/services/track-changes/package.json @@ -23,7 +23,8 @@ "underscore": "~1.7.0", "mongo-uri": "^0.1.2", "s3-streams": "^0.3.0", - "JSONStream": "^1.0.4" + "JSONStream": "^1.0.4", + "heap": "^0.2.6" }, "devDependencies": { "chai": "~1.9.0", diff --git a/services/track-changes/test/unit/coffee/DocArchive/MongoAWS.coffee b/services/track-changes/test/unit/coffee/DocArchive/MongoAWS.coffee index 5667f0fada..167524ffc6 100644 --- a/services/track-changes/test/unit/coffee/DocArchive/MongoAWS.coffee +++ b/services/track-changes/test/unit/coffee/DocArchive/MongoAWS.coffee @@ -9,12 +9,12 @@ describe "MongoAWS", -> beforeEach -> @MongoAWS = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = - filestore: + trackchanges: s3: secret: "s3-secret" key: "s3-key" stores: - user_files: "s3-bucket" + doc_history: "s3-bucket" "child_process": @child_process = {} "mongo-uri": @mongouri = {} "logger-sharelatex": @logger = {log: sinon.stub(), error: sinon.stub(), err:->} diff --git a/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee b/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee index d1787c1649..6e7caa7b0e 100644 --- a/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/MongoManager/MongoManagerTests.coffee @@ -101,66 +101,6 @@ describe "MongoManager", -> @callback.calledWith(null, null, @update.v).should.equal true - describe "insertCompressedUpdate", -> - beforeEach -> - @update = { op: "op", meta: "meta", v: "v"} - @db.docHistory = - insert: sinon.stub().callsArg(1) - - describe "temporarly", -> - beforeEach -> - @MongoManager.insertCompressedUpdate @project_id, @doc_id, @update, true, @callback - - it "should insert the update with a expiresAt field one week away", -> - @db.docHistory.insert - .calledWith({ - project_id: ObjectId(@project_id), - doc_id: ObjectId(@doc_id), - op: @update.op, - meta: @update.meta, - v: @update.v - expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000) - }) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true - - describe "permanenty", -> - beforeEach -> - @MongoManager.insertCompressedUpdate @project_id, @doc_id, @update, false, @callback - - it "should insert the update with no expiresAt field", -> - @db.docHistory.insert - .calledWith({ - project_id: ObjectId(@project_id), - doc_id: ObjectId(@doc_id), - op: @update.op, - meta: @update.meta, - v: @update.v - }) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true - - describe "insertCompressedUpdates", -> - beforeEach (done) -> - @updates = [ "mock-update-1", "mock-update-2" ] - @MongoManager.insertCompressedUpdate = sinon.stub().callsArg(4) - @MongoManager.insertCompressedUpdates @project_id, @doc_id, @updates, @temporary = true, (args...) => - @callback(args...) - done() - - it "should insert each update", -> - for update in @updates - @MongoManager.insertCompressedUpdate - .calledWith(@project_id, @doc_id, update, @temporary) - .should.equal true - - it "should call the callback", -> - @callback.called.should.equal true - describe "getDocUpdates", -> beforeEach -> @results = [ diff --git a/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee index 9ac4e5b96b..ad77427b19 100644 --- a/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee @@ -233,184 +233,3 @@ describe "PackManager", -> it "should call the callback", -> @callback.called.should.equal true - describe "convertDocsToPacks", -> - describe "with several small ops", -> - beforeEach -> - @ops = [ - { _id: "3", op: "op-3", meta: {start_ts: "ts3_s", end_ts: "ts3_e", user_id: "u3"}, v: 3}, - { _id: "4", op: "op-4", meta: {start_ts: "ts4_s", end_ts: "ts4_e", user_id: "u4"}, v: 4}, - ] - @PackManager.convertDocsToPacks @ops, @callback - - it "should create a single pack", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: [ - { _id: "3", op: "op-3", meta: {start_ts: "ts3_s", end_ts: "ts3_e", user_id: "u3"}, v: 3}, - { _id: "4", op: "op-4", meta: {start_ts: "ts4_s", end_ts: "ts4_e", user_id: "u4"}, v: 4}, - ] - v: 3 - v_end: 4 - meta: {start_ts: "ts3_s", end_ts: "ts4_e"} - n: 2 - sz: 202 - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true - - describe "with many small ops", -> - beforeEach -> - @ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: "ts#{n}_s", end_ts: "ts#{n}_e", user_id: "u#{n}"}, v: n} for n in [0...1024]) - @PackManager.convertDocsToPacks @ops, @callback - - it "should create two packs", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: @ops[0...512] - v: 0 - v_end: 511 - meta: {start_ts: "ts0_s", end_ts: "ts511_e"} - n: 512 - sz: 56282 - }, { - pack: @ops[512...1024] - v: 512 - v_end: 1023 - meta: {start_ts: "ts512_s", end_ts: "ts1023_e"} - n: 512 - sz: 56952 - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true - - describe "with many temporary ops", -> - beforeEach -> - @ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n, end_ts: n+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...1024]) - @PackManager.convertDocsToPacks @ops, @callback - - it "should create two packs", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: (_.omit(op, "expiresAt") for op in @ops[0...512]) - v: 0 - v_end: 511 - meta: {start_ts: 0 , end_ts: 512} - n: 512 - sz: 55990 - expiresAt: @ops[511].expiresAt - }, { - pack: (_.omit(op, "expiresAt") for op in @ops[512...1024]) - v: 512 - v_end: 1023 - meta: {start_ts: 512, end_ts: 1024} - n: 512 - sz: 56392 - expiresAt: @ops[1023].expiresAt - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true - - describe "with temporary ops spanning more than 1 day", -> - beforeEach -> - @ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48]) - @PackManager.convertDocsToPacks @ops, @callback - - it "should create two packs", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: (_.omit(op, "expiresAt") for op in @ops[0...24]) - v: 0 - v_end: 23 - meta: {start_ts: 0 , end_ts: 23*3600*1000+1} - n: 24 - sz: 2538 - expiresAt: @ops[23].expiresAt - }, { - pack: (_.omit(op, "expiresAt") for op in @ops[24...48]) - v: 24 - v_end: 47 - meta: {start_ts: 24*3600*1000, end_ts: 47*3600*1000+1} - n: 24 - sz: 2568 - expiresAt: @ops[47].expiresAt - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true - - describe "with mixture of temporary and permanent ops", -> - beforeEach -> - @ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48]) - for n in [10...48] - delete @ops[n].expiresAt - @PackManager.convertDocsToPacks @ops, @callback - - it "should create two packs", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: (_.omit(op, "expiresAt") for op in @ops[0...10]) - v: 0 - v_end: 9 - meta: {start_ts: 0 , end_ts: 9*3600*1000+1} - n: 10 - sz: 1040 - expiresAt: @ops[9].expiresAt - }, { - pack: (_.omit(op, "expiresAt") for op in @ops[10...48]) - v: 10 - v_end: 47 - meta: {start_ts: 10*3600*1000, end_ts: 47*3600*1000+1} - n: 38 - sz: 3496 - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true - - describe "with mixture of temporary and permanent ops and an existing pack", -> - beforeEach -> - @ops = ({ _id: "#{n}", op: "op-#{n}", meta: {start_ts: n*3600*1000, end_ts: n*3600*1000+1, user_id: "u#{n}"}, v: n, expiresAt: n+24*3600*1000 } for n in [0...48]) - for n in [10...48] - delete @ops[n].expiresAt - # convert op 16 into a pack - @ops[16].pack = [ @ops[16].op ] - delete @ops[16].op - @PackManager.convertDocsToPacks @ops, @callback - - it "should create three packs", (done) -> - @PackManager.convertDocsToPacks @ops, (err, packs) => - assert.deepEqual packs, [ { - pack: (_.omit(op, "expiresAt") for op in @ops[0...10]) - v: 0 - v_end: 9 - meta: {start_ts: 0 , end_ts: 9*3600*1000+1} - n: 10 - sz: 1040 - expiresAt: @ops[9].expiresAt - }, { - pack: @ops[10...16] - v: 10 - v_end: 15 - meta: {start_ts: 10*3600*1000, end_ts: 15*3600*1000+1} - n: 6 - sz: 552 - }, { - pack: @ops[17...48] - v: 17 - v_end: 47 - meta: {start_ts: 17*3600*1000, end_ts: 47*3600*1000+1} - n: 31 - sz: 2852 - }] - done() - - it "should call the callback", -> - @callback.called.should.equal true diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index 64ed839bdc..2657944461 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -26,12 +26,10 @@ describe "UpdatesManager", -> describe "when there are no raw ops", -> beforeEach -> @MongoManager.peekLastCompressedUpdate = sinon.stub() - @MongoManager.insertCompressedUpdates = sinon.stub() @UpdatesManager.compressAndSaveRawUpdates @project_id, @doc_id, [], @temporary, @callback it "should not need to access the database", -> @MongoManager.peekLastCompressedUpdate.called.should.equal false - @MongoManager.insertCompressedUpdates.called.should.equal false it "should call the callback", -> @callback.called.should.equal true @@ -65,8 +63,6 @@ describe "UpdatesManager", -> @compressedUpdates = [ { v: 12, op: "compressed-op-11+12" }, { v: 13, op: "compressed-op-12" } ] @MongoManager.peekLastCompressedUpdate = sinon.stub().callsArgWith(1, null, @lastCompressedUpdate, @lastCompressedUpdate.v) - @MongoManager.modifyCompressedUpdate = sinon.stub().callsArg(2) - @MongoManager.insertCompressedUpdates = sinon.stub().callsArg(4) @PackManager.insertCompressedUpdates = sinon.stub().callsArg(5) @UpdateCompressor.compressRawUpdates = sinon.stub().returns(@compressedUpdates) @@ -80,19 +76,14 @@ describe "UpdatesManager", -> .calledWith(@doc_id) .should.equal true - it "should compress the last compressed op and the raw ops", -> + it "should compress the raw ops", -> @UpdateCompressor.compressRawUpdates - .calledWith(@lastCompressedUpdate, @rawUpdates) + .calledWith(null, @rawUpdates) .should.equal true - it "should update the existing op", -> - @MongoManager.modifyCompressedUpdate - .calledWith(@lastCompressedUpdate, @compressedUpdates[0]) - .should.equal true - - it "should save the new compressed ops", -> - @MongoManager.insertCompressedUpdates - .calledWith(@project_id, @doc_id, @compressedUpdates[1..], @temporary) + it "should save the new compressed ops into a pack", -> + @PackManager.insertCompressedUpdates + .calledWith(@project_id, @doc_id, @lastCompressedUpdate, @compressedUpdates, @temporary) .should.equal true it "should call the callback", -> @@ -130,7 +121,7 @@ describe "UpdatesManager", -> it "should only compress the more recent raw ops", -> @UpdateCompressor.compressRawUpdates - .calledWith(@lastCompressedUpdate, @rawUpdates.slice(-2)) + .calledWith(null, @rawUpdates.slice(-2)) .should.equal true describe "when the raw ops do not follow from the last compressed op version", -> @@ -143,11 +134,8 @@ describe "UpdatesManager", -> .calledWith(new Error("Tried to apply raw op at version 13 to last compressed update with version 11")) .should.equal true - it "should not modify any update in mongo", -> - @MongoManager.modifyCompressedUpdate.called.should.equal false - it "should not insert any update into mongo", -> - @MongoManager.insertCompressedUpdates.called.should.equal false + @PackManager.insertCompressedUpdates.called.should.equal false describe "when the raw ops need appending to existing history which is in S3", -> beforeEach ->