diff --git a/services/track-changes/app/coffee/ConversionManager.coffee b/services/track-changes/app/coffee/ConversionManager.coffee new file mode 100644 index 0000000000..5e450fc01d --- /dev/null +++ b/services/track-changes/app/coffee/ConversionManager.coffee @@ -0,0 +1,62 @@ +{db, ObjectId} = require "./mongojs" +ConcatManager = require "./ConcatManager" + +module.exports = ConversionManager = + OPS_TO_LEAVE: 10 + + removeLatestCompressedUpdate: (doc_id, callback = (error) ->) -> + db.docHistory.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: 1 } }, callback + + getLatestCompressedUpdate: (doc_id, callback = (error) ->) -> + db.docHistory.find { doc_id: ObjectId(doc_id) }, { docOps: { $slice: -1 } }, (error, history) -> + return callback(error) if error? + history = history[0] or { docOps: [] } + callback null, history.docOps.slice(-1)[0] + + insertCompressedUpdates: (doc_id, updates, callback = (error) ->) -> + db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback + + trimLastRawUpdate: (doc_id, tailVersion, callback = (error) ->) -> + db.docOps.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: -1 }, $set: { tailVersion: tailVersion + 1 } }, callback + + getLastRawUpdateAndVersion: (doc_id, callback = (error, update, currentVersion, tailVersion) ->) -> + db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true, docOps: { $slice: 1 } }, (error, docs) -> + return callback(error) if error? + return callback(new Error("doc not found")) if docs.length == 0 + doc = docs[0] + callback null, doc.docOps[0], doc.version, doc.tailVersion or 0 + + convertOldestRawUpdate: (doc_id, callback = (error, converted) ->) -> + ConversionManager.getLastRawUpdateAndVersion doc_id, (error, rawUpdate, currentVersion, tailVersion) -> + return callback(error) if error? + + rawUpdates = ConcatManager.normalizeUpdate(rawUpdate) + + if currentVersion - tailVersion > ConcatManager.OPS_TO_LEAVE + ConversonManager.getLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) -> + return callback(error) if error? + + removeAndModifyPreviousCompressedUpdate = (callback, compressedUpdates) -> + if lastCompressedUpdate? + compressedUpdates = [lastCompressedUpdate] + for rawUpdate in rawUpdates + lastCompressedUpdate = compressedUpdates.pop() + compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate + ConversionManager.removeLatestCompressedUpdate doc_id, (error) -> + return callback(error) if error? + callback null, compressUpdates + else + callback null, rawUpdates + + removeAndModifyPreviousCompressedUpdate (error, newCompressedUpdates) -> + return callback(error) if error? + ConversionManager.insertCompressedUpdates doc_id, newCompressedUpdates, (error) -> + return callback(error) if error? + ConversionManager.trimLastRawUpdate doc_id, tailVersion, (error) -> + return callback(error) if error? + console.log "Pushed op", tailVersion + callback null, true + + else + console.log "Up to date" + callback null, false diff --git a/services/track-changes/compressHistory.coffee b/services/track-changes/compressHistory.coffee index ae2cbde219..d3ed3eae58 100644 --- a/services/track-changes/compressHistory.coffee +++ b/services/track-changes/compressHistory.coffee @@ -1,26 +1,9 @@ -{db, ObjectId} = require "./app/js/mongojs" -ConcatManager = require "./app/js/ConcatManager" +{db, ObjectId} = require "./app/coffee/mongojs" +ConversionManager = require "./app/coffee/ConversionManager" doc_id = process.argv.pop() console.log "DOC ID", doc_id -OPS_TO_LEAVE = 10 - -removeLatestCompressedUpdate = (doc_id, callback = (error) ->) -> - db.docHistory.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: 1 } }, callback - -getLatestCompressedUpdate = (doc_id, callback = (error) ->) -> - db.docHistory.find { doc_id: ObjectId(doc_id) }, { docOps: { $slice: -1 } }, (error, history) -> - return callback(error) if error? - history = history[0] or { docOps: [] } - callback null, history.docOps.slice(-1)[0] - -insertCompressedUpdates = (doc_id, updates, callback = (error) ->) -> - db.docHistory.update { doc_id: ObjectId(doc_id) }, { $push: { docOps: { $each: updates } } }, { upsert: true }, callback - -trimLastRawUpdate = (doc_id, tailVersion, callback = (error) ->) -> - db.docOps.update { doc_id: ObjectId(doc_id) }, { $pop: { docOps: -1 }, $set: { tailVersion: tailVersion + 1 } }, callback - done = () -> console.log "DONE! Here's the history:" db.docHistory.find { doc_id: ObjectId(doc_id) }, (error, docs) -> @@ -35,40 +18,11 @@ done = () -> process.exit() do next = () -> - db.docOps.find { doc_id: ObjectId(doc_id) }, { version: true, tailVersion: true, docOps: { $slice: 1 } }, (error, docs) -> + ConversionManager.convertOldestRawUpdate doc_id, (error, converted) -> throw error if error? - throw "doc not found" if docs.length < 1 - doc = docs[0] - tailVersion = doc.tailVersion or 0 - version = doc.version - - rawUpdate = doc.docOps[0] - rawUpdates = ConcatManager.normalizeUpdate(rawUpdate) - - if version - tailVersion > OPS_TO_LEAVE - getLatestCompressedUpdate doc_id, (error, lastCompressedUpdate) -> - throw error if error? - if lastCompressedUpdate? - compressedUpdates = [lastCompressedUpdate] - for rawUpdate in rawUpdates - lastCompressedUpdate = compressedUpdates.pop() - compressedUpdates = compressedUpdates.concat ConcatManager.concatTwoUpdates lastCompressedUpdate, rawUpdate - removeLatestCompressedUpdate doc_id, (error) -> - throw error if error? - insertCompressedUpdates doc_id, compressedUpdates, (error) -> - throw error if error? - trimLastRawUpdate doc_id, tailVersion, (error) -> - throw error if error? - console.log "Pushed compressed op" - next() - else - insertCompressedUpdates doc_id, rawUpdates, (error) -> - trimLastRawUpdate doc_id, tailVersion, (error) -> - throw error if error? - console.log "Pushed first op" - next() + if converted + next() else - console.log "Up to date" done() - +