diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index af0ad242c2..df26b81a2f 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -46,6 +46,7 @@ app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLo app.delete '/project/:project_id/doc/:doc_id', HttpController.flushAndDeleteDoc app.delete '/project/:project_id', HttpController.deleteProject app.post '/project/:project_id/flush', HttpController.flushProject +app.post '/project/:project_id/track_changes', HttpController.setTrackChanges app.get '/total', (req, res)-> timer = new Metrics.Timer("http.allDocList") diff --git a/services/document-updater/app/coffee/ChangesTracker.coffee b/services/document-updater/app/coffee/ChangesTracker.coffee new file mode 100644 index 0000000000..8bc4cf9380 --- /dev/null +++ b/services/document-updater/app/coffee/ChangesTracker.coffee @@ -0,0 +1,457 @@ +load = (EventEmitter) -> + class ChangesTracker extends EventEmitter + # The purpose of this class is to track a set of inserts and deletes to a document, like + # track changes in Word. We store these as a set of ShareJs style ranges: + # {i: "foo", p: 42} # Insert 'foo' at offset 42 + # {d: "bar", p: 37} # Delete 'bar' at offset 37 + # We only track the inserts and deletes, not the whole document, but by being given all + # updates that are applied to a document, we can update these appropriately. + # + # Note that the set of inserts and deletes we store applies to the document as-is at the moment. + # So inserts correspond to text which is in the document, while deletes correspond to text which + # is no longer there, so their lengths do not affect the position of later offsets. + # E.g. + # this is the current text of the document + # |-----| | + # {i: "current ", p:12} -^ ^- {d: "old ", p: 31} + # + # Track changes rules (should be consistent with Word): + # * When text is inserted at a delete, the text goes to the left of the delete + # I.e. "foo|bar" -> "foobaz|bar", where | is the delete, and 'baz' is inserted + # * Deleting content flagged as 'inserted' does not create a new delete marker, it only + # removes the insert marker. E.g. + # * "abdefghijkl" -> "abfghijkl" when 'de' is deleted. No delete marker added + # |---| <- inserted |-| <- inserted + # * Deletes overlapping regular text and inserted text will insert a delete marker for the + # regular text: + # "abcdefghijkl" -> "abcdejkl" when 'fghi' is deleted + # |----| |--|| + # ^- inserted 'bcdefg' \ ^- deleted 'hi' + # \--inserted 'bcde' + # * Deletes overlapping other deletes are merged. E.g. + # "abcghijkl" -> "ahijkl" when 'bcg is deleted' + # | <- delete 'def' | <- delete 'bcdefg' + # * Deletes by another user will consume deletes by the first user + # * Inserts by another user will not combine with inserts by the first user. If they are in the + # middle of a previous insert by the first user, the original insert will be split into two. + constructor: (@changes = [], @comments = []) -> + # Change objects have the following structure: + # { + # id: ... # Uniquely generated by us + # op: { # ShareJs style op tracking the offset (p) and content inserted (i) or deleted (d) + # i: "..." + # p: 42 + # } + # } + # + # Ids are used to uniquely identify a change, e.g. for updating it in the database, or keeping in + # sync with Ace ranges. + @id = 0 + + addComment: (offset, length, metadata) -> + # TODO: Don't allow overlapping comments? + @comments.push comment = { + id: @_newId() + offset, length, metadata + } + @emit "comment:added", comment + return comment + + getComment: (comment_id) -> + comment = null + for c in @comments + if c.id == comment_id + comment = c + break + return comment + + resolveCommentId: (comment_id, resolved_data) -> + comment = @getComment(comment_id) + return if !comment? + comment.metadata.resolved = true + comment.metadata.resolved_data = resolved_data + @emit "comment:resolved", comment + + unresolveCommentId: (comment_id) -> + comment = @getComment(comment_id) + return if !comment? + comment.metadata.resolved = false + @emit "comment:unresolved", comment + + removeCommentId: (comment_id) -> + comment = @getComment(comment_id) + return if !comment? + @comments = @comments.filter (c) -> c.id != comment_id + @emit "comment:removed", comment + + getChange: (change_id) -> + change = null + for c in @changes + if c.id == change_id + change = c + break + return change + + removeChangeId: (change_id) -> + change = @getChange(change_id) + return if !change? + @_removeChange(change) + + applyOp: (op, metadata = {}) -> + metadata.ts ?= new Date() + # Apply an op that has been applied to the document to our changes to keep them up to date + if op.i? + @applyInsertToChanges(op, metadata) + @applyInsertToComments(op) + else if op.d? + @applyDeleteToChanges(op, metadata) + @applyDeleteToComments(op) + + applyInsertToComments: (op) -> + for comment in @comments + if op.p <= comment.offset + comment.offset += op.i.length + @emit "comment:moved", comment + else if op.p < comment.offset + comment.length + comment.length += op.i.length + @emit "comment:moved", comment + + applyDeleteToComments: (op) -> + op_start = op.p + op_length = op.d.length + op_end = op.p + op_length + for comment in @comments + comment_end = comment.offset + comment.length + if op_end <= comment.offset + # delete is fully before comment + comment.offset -= op_length + @emit "comment:moved", comment + else if op_start >= comment_end + # delete is fully after comment, nothing to do + else + # delete and comment overlap + delete_length_before = Math.max(0, comment.offset - op_start) + delete_length_after = Math.max(0, op_end - comment_end) + delete_length_overlapping = op_length - delete_length_before - delete_length_after + comment.offset = Math.min(comment.offset, op_start) + comment.length -= delete_length_overlapping + @emit "comment:moved", comment + + applyInsertToChanges: (op, metadata) -> + op_start = op.p + op_length = op.i.length + op_end = op.p + op_length + + already_merged = false + previous_change = null + moved_changes = [] + remove_changes = [] + new_changes = [] + for change in @changes + change_start = change.op.p + + if change.op.d? + # Shift any deletes after this along by the length of this insert + if op_start < change_start + change.op.p += op_length + moved_changes.push change + else if op_start == change_start + # If the insert matches the start of the delete, just remove it from the delete instead + if change.op.d.length >= op.i.length and change.op.d.slice(0, op.i.length) == op.i + change.op.d = change.op.d.slice(op.i.length) + change.op.p += op.i.length + if change.op.d == "" + remove_changes.push change + else + moved_changes.push change + already_merged = true + else + change.op.p += op_length + moved_changes.push change + else if change.op.i? + change_end = change_start + change.op.i.length + is_change_overlapping = (op_start >= change_start and op_start <= change_end) + + # Only merge inserts if they are from the same user + is_same_user = metadata.user_id == change.metadata.user_id + + # If there is a delete at the start of the insert, and we're inserting + # at the start, we SHOULDN'T merge since the delete acts as a partition. + # The previous op will be the delete, but it's already been shifted by this insert + # + # I.e. + # Originally: |-- existing insert --| + # | <- existing delete at same offset + # + # Now: |-- existing insert --| <- not shifted yet + # |-- this insert --|| <- existing delete shifted along to end of this op + # + # After: |-- existing insert --| + # |-- this insert --|| <- existing delete + # + # Without the delete, the inserts would be merged. + is_insert_blocked_by_delete = (previous_change? and previous_change.op.d? and previous_change.op.p == op_end) + + # If the insert is overlapping another insert, either at the beginning in the middle or touching the end, + # then we merge them into one. + if @track_changes and + is_change_overlapping and + !is_insert_blocked_by_delete and + !already_merged and + is_same_user + offset = op_start - change_start + change.op.i = change.op.i.slice(0, offset) + op.i + change.op.i.slice(offset) + change.metadata.ts = metadata.ts + already_merged = true + moved_changes.push change + else if op_start <= change_start + # If we're fully before the other insert we can just shift the other insert by our length. + # If they are touching, and should have been merged, they will have been above. + # If not merged above, then it must be blocked by a delete, and will be after this insert, so we shift it along as well + change.op.p += op_length + moved_changes.push change + else if (!is_same_user or !@track_changes) and change_start < op_start < change_end + # This user is inserting inside a change by another user, so we need to split the + # other user's change into one before and after this one. + offset = op_start - change_start + before_content = change.op.i.slice(0, offset) + after_content = change.op.i.slice(offset) + + # The existing change can become the 'before' change + change.op.i = before_content + moved_changes.push change + + # Create a new op afterwards + after_change = { + op: { + i: after_content + p: change_start + offset + op_length + } + metadata: {} + } + after_change.metadata[key] = value for key, value of change.metadata + new_changes.push after_change + + previous_change = change + + if @track_changes and !already_merged + @_addOp op, metadata + for {op, metadata} in new_changes + @_addOp op, metadata + + for change in remove_changes + @_removeChange change + + if moved_changes.length > 0 + @emit "changes:moved", moved_changes + + applyDeleteToChanges: (op, metadata) -> + op_start = op.p + op_length = op.d.length + op_end = op.p + op_length + remove_changes = [] + moved_changes = [] + + # We might end up modifying our delete op if it merges with existing deletes, or cancels out + # with an existing insert. Since we might do multiple modifications, we record them and do + # all the modifications after looping through the existing changes, so as not to mess up the + # offset indexes as we go. + op_modifications = [] + for change in @changes + if change.op.i? + change_start = change.op.p + change_end = change_start + change.op.i.length + if op_end <= change_start + # Shift ops after us back by our length + change.op.p -= op_length + moved_changes.push change + else if op_start >= change_end + # Delete is after insert, nothing to do + else + # When the new delete overlaps an insert, we should remove the part of the insert that + # is now deleted, and also remove the part of the new delete that overlapped. I.e. + # the two cancel out where they overlap. + if op_start >= change_start + # |-- existing insert --| + # insert_remaining_before -> |.....||-- new delete --| + delete_remaining_before = "" + insert_remaining_before = change.op.i.slice(0, op_start - change_start) + else + # delete_remaining_before -> |.....||-- existing insert --| + # |-- new delete --| + delete_remaining_before = op.d.slice(0, change_start - op_start) + insert_remaining_before = "" + + if op_end <= change_end + # |-- existing insert --| + # |-- new delete --||.....| <- insert_remaining_after + delete_remaining_after = "" + insert_remaining_after = change.op.i.slice(op_end - change_start) + else + # |-- existing insert --||.....| <- delete_remaining_after + # |-- new delete --| + delete_remaining_after = op.d.slice(change_end - op_start) + insert_remaining_after = "" + + insert_remaining = insert_remaining_before + insert_remaining_after + if insert_remaining.length > 0 + change.op.i = insert_remaining + change.op.p = Math.min(change_start, op_start) + change.metadata.ts = metadata.ts + moved_changes.push change + else + remove_changes.push change + + # We know what we want to preserve of our delete op before (delete_remaining_before) and what we want to preserve + # afterwards (delete_remaining_before). Now we need to turn that into a modification which deletes the + # chunk in the middle not covered by these. + delete_removed_length = op.d.length - delete_remaining_before.length - delete_remaining_after.length + delete_removed_start = delete_remaining_before.length + modification = { + d: op.d.slice(delete_removed_start, delete_removed_start + delete_removed_length) + p: delete_removed_start + } + if modification.d.length > 0 + op_modifications.push modification + else if change.op.d? + change_start = change.op.p + if op_end < change_start or (!@track_changes and op_end == change_start) + # Shift ops after us back by our length. + # If we're tracking changes, it must be strictly before, since we'll merge + # below if they are touching. Otherwise, touching is fine. + change.op.p -= op_length + moved_changes.push change + else if op_start <= change_start <= op_end + if @track_changes + # If we overlap a delete, add it in our content, and delete the existing change. + # It's easier to do it this way, rather than modifying the existing delete in case + # we overlap many deletes and we'd need to track that. We have a workaround to + # update the delete in place if possible below. + offset = change_start - op_start + op_modifications.push { i: change.op.d, p: offset } + remove_changes.push change + else + change.op.p = op_start + moved_changes.push change + + # Copy rather than modify because we still need to apply it to comments + op = { + p: op.p + d: @_applyOpModifications(op.d, op_modifications) + } + + for change in remove_changes + # This is a bit of hack to avoid removing one delete and replacing it with another. + # If we don't do this, it causes the UI to flicker + if op.d.length > 0 and change.op.d? and op.p <= change.op.p <= op.p + op.d.length + change.op.p = op.p + change.op.d = op.d + change.metadata = metadata + moved_changes.push change + op.d = "" # stop it being added + else + @_removeChange change + + if @track_changes and op.d.length > 0 + @_addOp op, metadata + else + # It's possible that we deleted an insert between two other inserts. I.e. + # If we delete 'user_2 insert' in: + # |-- user_1 insert --||-- user_2 insert --||-- user_1 insert --| + # it becomes: + # |-- user_1 insert --||-- user_1 insert --| + # We need to merge these together again + results = @_scanAndMergeAdjacentUpdates() + moved_changes = moved_changes.concat(results.moved_changes) + for change in results.remove_changes + @_removeChange change + moved_changes = moved_changes.filter (c) -> c != change + + if moved_changes.length > 0 + @emit "changes:moved", moved_changes + + _newId: () -> + (@id++).toString() + + _addOp: (op, metadata) -> + change = { + id: @_newId() + op: op + metadata: metadata + } + @changes.push change + + # Keep ops in order of offset, with deletes before inserts + @changes.sort (c1, c2) -> + result = c1.op.p - c2.op.p + if result != 0 + return result + else if c1.op.i? and c2.op.d? + return 1 + else + return -1 + + if op.d? + @emit "delete:added", change + else if op.i? + @emit "insert:added", change + + _removeChange: (change) -> + @changes = @changes.filter (c) -> c.id != change.id + if change.op.d? + @emit "delete:removed", change + else if change.op.i? + @emit "insert:removed", change + + _applyOpModifications: (content, op_modifications) -> + # Put in descending position order, with deleting first if at the same offset + # (Inserting first would modify the content that the delete will delete) + op_modifications.sort (a, b) -> + result = b.p - a.p + if result != 0 + return result + else if a.i? and b.d? + return 1 + else + return -1 + + for modification in op_modifications + if modification.i? + content = content.slice(0, modification.p) + modification.i + content.slice(modification.p) + else if modification.d? + if content.slice(modification.p, modification.p + modification.d.length) != modification.d + throw new Error("deleted content does not match. content: #{JSON.stringify(content)}; modification: #{JSON.stringify(modification)}") + content = content.slice(0, modification.p) + content.slice(modification.p + modification.d.length) + return content + + _scanAndMergeAdjacentUpdates: () -> + # This should only need calling when deleting an update between two + # other updates. There's no other way to get two adjacent updates from the + # same user, since they would be merged on insert. + previous_change = null + remove_changes = [] + moved_changes = [] + for change in @changes + if previous_change?.op.i? and change.op.i? + previous_change_end = previous_change.op.p + previous_change.op.i.length + previous_change_user_id = previous_change.metadata.user_id + change_start = change.op.p + change_user_id = change.metadata.user_id + if previous_change_end == change_start and previous_change_user_id == change_user_id + remove_changes.push change + previous_change.op.i += change.op.i + moved_changes.push previous_change + else if previous_change?.op.d? and change.op.d? and previous_change.op.p == change.op.p + # Merge adjacent deletes + previous_change.op.d += change.op.d + remove_changes.push change + moved_changes.push previous_change + else # Only update to the current change if we haven't removed it. + previous_change = change + return { moved_changes, remove_changes } + +if define? + define ["utils/EventEmitter"], load +else + EventEmitter = require("events").EventEmitter + module.exports = load(EventEmitter) \ No newline at end of file diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index ebbdc3a66e..9c7277c469 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -3,7 +3,8 @@ PersistenceManager = require "./PersistenceManager" DiffCodec = require "./DiffCodec" logger = require "logger-sharelatex" Metrics = require "./Metrics" -TrackChangesManager = require "./TrackChangesManager" +HistoryManager = require "./HistoryManager" +WebRedisManager = require "./WebRedisManager" module.exports = DocumentManager = getDoc: (project_id, doc_id, _callback = (error, lines, version, alreadyLoaded) ->) -> @@ -12,18 +13,18 @@ module.exports = DocumentManager = timer.done() _callback(args...) - RedisManager.getDoc project_id, doc_id, (error, lines, version) -> + RedisManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) -> return callback(error) if error? if !lines? or !version? - logger.log project_id: project_id, doc_id: doc_id, "doc not in redis so getting from persistence API" - PersistenceManager.getDoc project_id, doc_id, (error, lines, version) -> + logger.log {project_id, doc_id, track_changes}, "doc not in redis so getting from persistence API" + PersistenceManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) -> return callback(error) if error? - logger.log project_id: project_id, doc_id: doc_id, lines: lines, version: version, "got doc from persistence API" - RedisManager.putDocInMemory project_id, doc_id, lines, version, (error) -> + logger.log {project_id, doc_id, lines, version, track_changes}, "got doc from persistence API" + RedisManager.putDocInMemory project_id, doc_id, lines, version, track_changes, track_changes_entries, (error) -> return callback(error) if error? - callback null, lines, version, false + callback null, lines, version, track_changes, track_changes_entries, false else - callback null, lines, version, true + callback null, lines, version, track_changes, track_changes_entries, true getDocAndRecentOps: (project_id, doc_id, fromVersion, _callback = (error, lines, version, recentOps) ->) -> timer = new Metrics.Timer("docManager.getDocAndRecentOps") @@ -50,7 +51,7 @@ module.exports = DocumentManager = return callback(new Error("No lines were provided to setDoc")) UpdateManager = require "./UpdateManager" - DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, alreadyLoaded) -> + DocumentManager.getDoc project_id, doc_id, (error, oldLines, version, track_changes, alreadyLoaded) -> return callback(error) if error? if oldLines? and oldLines.length > 0 and oldLines[0].text? @@ -89,14 +90,14 @@ module.exports = DocumentManager = callback = (args...) -> timer.done() _callback(args...) - RedisManager.getDoc project_id, doc_id, (error, lines, version) -> + RedisManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) -> return callback(error) if error? if !lines? or !version? logger.log project_id: project_id, doc_id: doc_id, "doc is not loaded so not flushing" callback null # TODO: return a flag to bail out, as we go on to remove doc from memory? else logger.log project_id: project_id, doc_id: doc_id, version: version, "flushing doc" - PersistenceManager.setDoc project_id, doc_id, lines, version, (error) -> + PersistenceManager.setDoc project_id, doc_id, lines, version, track_changes, track_changes_entries, (error) -> return callback(error) if error? callback null @@ -111,13 +112,19 @@ module.exports = DocumentManager = # Flush in the background since it requires and http request # to track changes - TrackChangesManager.flushDocChanges project_id, doc_id, (err) -> + HistoryManager.flushDocChanges project_id, doc_id, (err) -> if err? logger.err {err, project_id, doc_id}, "error flushing to track changes" RedisManager.removeDocFromMemory project_id, doc_id, (error) -> return callback(error) if error? callback null + + setTrackChanges: (project_id, doc_id, track_changes_on, callback = (error) ->) -> + RedisManager.setTrackChanges project_id, doc_id, track_changes_on, (error) -> + return callback(error) if error? + WebRedisManager.sendData {project_id, doc_id, track_changes_on} + callback() getDocWithLock: (project_id, doc_id, callback = (error, lines, version) ->) -> UpdateManager = require "./UpdateManager" @@ -138,3 +145,7 @@ module.exports = DocumentManager = flushAndDeleteDocWithLock: (project_id, doc_id, callback = (error) ->) -> UpdateManager = require "./UpdateManager" UpdateManager.lockUpdatesAndDo DocumentManager.flushAndDeleteDoc, project_id, doc_id, callback + + setTrackChangesWithLock: (project_id, doc_id, track_changes_on, callback = (error) ->) -> + UpdateManager = require "./UpdateManager" + UpdateManager.lockUpdatesAndDo DocumentManager.setTrackChanges, project_id, doc_id, track_changes_on, callback \ No newline at end of file diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee new file mode 100644 index 0000000000..637fd2cb5f --- /dev/null +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -0,0 +1,44 @@ +settings = require "settings-sharelatex" +request = require "request" +logger = require "logger-sharelatex" +async = require "async" +WebRedisManager = require "./WebRedisManager" + +module.exports = HistoryManager = + flushDocChanges: (project_id, doc_id, callback = (error) ->) -> + if !settings.apis?.trackchanges? + logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing" + return callback() + + url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush" + logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api" + request.post url, (error, res, body)-> + if error? + return callback(error) + else if res.statusCode >= 200 and res.statusCode < 300 + return callback(null) + else + error = new Error("track changes api returned a failure status code: #{res.statusCode}") + return callback(error) + + FLUSH_EVERY_N_OPS: 50 + pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> + if ops.length == 0 + return callback() + WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> + return callback(error) if error? + # We want to flush every 50 ops, i.e. 50, 100, 150, etc + # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these + # ops. If we've changed, then we've gone over a multiple of 50 and should flush. + # (Most of the time, we will only hit 50 and then flushing will put us back to 0) + previousLength = length - ops.length + prevBlock = Math.floor(previousLength / HistoryManager.FLUSH_EVERY_N_OPS) + newBlock = Math.floor(length / HistoryManager.FLUSH_EVERY_N_OPS) + if newBlock != prevBlock + # Do this in the background since it uses HTTP and so may be too + # slow to wait for when processing a doc update. + logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" + HistoryManager.flushDocChanges project_id, doc_id, (error) -> + if error? + logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" + callback() \ No newline at end of file diff --git a/services/document-updater/app/coffee/HttpController.coffee b/services/document-updater/app/coffee/HttpController.coffee index ee6359b104..0366746d56 100644 --- a/services/document-updater/app/coffee/HttpController.coffee +++ b/services/document-updater/app/coffee/HttpController.coffee @@ -96,3 +96,15 @@ module.exports = HttpController = return next(error) if error? logger.log project_id: project_id, "deleted project via http" res.send 204 # No Content + + setTrackChanges: (req, res, next = (error) ->) -> + project_id = req.params.project_id + track_changes_on = req.body.on + if !track_changes_on? + return res.send 400 + track_changes_on = !!track_changes_on # Make boolean + logger.log {project_id, track_changes_on}, "turning on track changes via http" + ProjectManager.setTrackChangesWithLocks project_id, track_changes_on, (error) -> + return next(error) if error? + res.send 204 + diff --git a/services/document-updater/app/coffee/PersistenceManager.coffee b/services/document-updater/app/coffee/PersistenceManager.coffee index 605425eb5e..8d5578b4cf 100644 --- a/services/document-updater/app/coffee/PersistenceManager.coffee +++ b/services/document-updater/app/coffee/PersistenceManager.coffee @@ -12,14 +12,14 @@ MAX_HTTP_REQUEST_LENGTH = 5000 # 5 seconds module.exports = PersistenceManager = getDoc: (project_id, doc_id, callback = (error, lines, version) ->) -> - PersistenceManager.getDocFromWeb project_id, doc_id, (error, lines) -> + PersistenceManager.getDocFromWeb project_id, doc_id, (error, lines, track_changes, track_changes_entries) -> return callback(error) if error? PersistenceManager.getDocVersionInMongo doc_id, (error, version) -> return callback(error) if error? - callback null, lines, version + callback null, lines, version, track_changes, track_changes_entries - setDoc: (project_id, doc_id, lines, version, callback = (error) ->) -> - PersistenceManager.setDocInWeb project_id, doc_id, lines, (error) -> + setDoc: (project_id, doc_id, lines, version, track_changes, track_changes_entries, callback = (error) ->) -> + PersistenceManager.setDocInWeb project_id, doc_id, lines, track_changes, track_changes_entries, (error) -> return callback(error) if error? PersistenceManager.setDocVersionInMongo doc_id, version, (error) -> return callback(error) if error? @@ -50,13 +50,13 @@ module.exports = PersistenceManager = body = JSON.parse body catch e return callback(e) - return callback null, body.lines + return callback null, body.lines, body.track_changes, body.track_changes_entries else if res.statusCode == 404 return callback(new Errors.NotFoundError("doc not not found: #{url}")) else return callback(new Error("error accessing web API: #{url} #{res.statusCode}")) - setDocInWeb: (project_id, doc_id, lines, _callback = (error) ->) -> + setDocInWeb: (project_id, doc_id, lines, track_changes, track_changes_entries, _callback = (error) ->) -> timer = new Metrics.Timer("persistenceManager.setDoc") callback = (args...) -> timer.done() @@ -68,6 +68,8 @@ module.exports = PersistenceManager = method: "POST" body: JSON.stringify lines: lines + track_changes: track_changes + track_changes_entries: track_changes_entries headers: "content-type": "application/json" auth: diff --git a/services/document-updater/app/coffee/ProjectManager.coffee b/services/document-updater/app/coffee/ProjectManager.coffee index f0f62b6d1b..a38fe08397 100644 --- a/services/document-updater/app/coffee/ProjectManager.coffee +++ b/services/document-updater/app/coffee/ProjectManager.coffee @@ -57,4 +57,30 @@ module.exports = ProjectManager = else callback(null) + setTrackChangesWithLocks: (project_id, track_changes_on, _callback = (error) ->) -> + timer = new Metrics.Timer("projectManager.toggleTrackChangesWithLocks") + callback = (args...) -> + timer.done() + _callback(args...) + + RedisManager.getDocIdsInProject project_id, (error, doc_ids) -> + return callback(error) if error? + jobs = [] + errors = [] + for doc_id in (doc_ids or []) + do (doc_id) -> + jobs.push (callback) -> + DocumentManager.setTrackChangesWithLock project_id, doc_id, track_changes_on, (error) -> + if error? + logger.error {err: error, project_id, doc_ids, track_changes_on}, "error toggle track changes for doc" + errors.push(error) + callback() + # TODO: If no docs, turn on track changes in Mongo manually + + logger.log {project_id, doc_ids, track_changes_on}, "toggling track changes for docs" + async.series jobs, () -> + if errors.length > 0 + callback new Error("Errors toggling track changes for docs. See log for details") + else + callback(null) diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee index 0e9e59e8f1..c09fb43f00 100644 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ b/services/document-updater/app/coffee/RedisKeyBuilder.coffee @@ -34,6 +34,10 @@ module.exports = RedisKeyBuilder = return (key_schema) -> key_schema.uncompressedHistoryOp({doc_id}) pendingUpdates: ({doc_id}) -> return (key_schema) -> key_schema.pendingUpdates({doc_id}) + trackChangesEnabled: ({doc_id}) -> + return (key_schema) -> key_schema.trackChangesEnabled({doc_id}) + trackChangesEntries: ({doc_id}) -> + return (key_schema) -> key_schema.trackChangesEntries({doc_id}) docsInProject: ({project_id}) -> return (key_schema) -> key_schema.docsInProject({project_id}) docsWithHistoryOps: ({project_id}) -> diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 87e31b7826..6ee764cb7e 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -13,7 +13,7 @@ minutes = 60 # seconds for Redis expire module.exports = RedisManager = rclient: rclient - putDocInMemory : (project_id, doc_id, docLines, version, _callback)-> + putDocInMemory : (project_id, doc_id, docLines, version, track_changes, track_changes_entries, _callback)-> timer = new metrics.Timer("redis.put-doc") callback = (error) -> timer.done() @@ -23,6 +23,8 @@ module.exports = RedisManager = multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) multi.set keys.projectKey({doc_id:doc_id}), project_id multi.set keys.docVersion(doc_id:doc_id), version + multi.set keys.trackChangesEnabled(doc_id:doc_id), if track_changes then "1" else "0" + multi.set keys.trackChangesEntries(doc_id:doc_id), JSON.stringify(track_changes_entries) multi.exec (error) -> return callback(error) if error? rclient.sadd keys.docsInProject(project_id:project_id), doc_id, callback @@ -41,30 +43,36 @@ module.exports = RedisManager = multi.del keys.docLines(doc_id:doc_id) multi.del keys.projectKey(doc_id:doc_id) multi.del keys.docVersion(doc_id:doc_id) + multi.del keys.trackChangesEnabled(doc_id:doc_id) + multi.del keys.trackChangesEntries(doc_id:doc_id) multi.exec (error) -> return callback(error) if error? rclient.srem keys.docsInProject(project_id:project_id), doc_id, callback - getDoc : (project_id, doc_id, callback = (error, lines, version, project_id) ->)-> + getDoc : (project_id, doc_id, callback = (error, lines, version, track_changes, track_changes_entries) ->)-> timer = new metrics.Timer("redis.get-doc") multi = rclient.multi() multi.get keys.docLines(doc_id:doc_id) multi.get keys.docVersion(doc_id:doc_id) multi.get keys.projectKey(doc_id:doc_id) + multi.get keys.trackChangesEnabled(doc_id:doc_id) + multi.get keys.trackChangesEntries(doc_id:doc_id) multi.exec (error, result)-> timer.done() return callback(error) if error? try docLines = JSON.parse result[0] + track_changes_entries = JSON.parse result[4] catch e return callback(e) version = parseInt(result[1] or 0, 10) doc_project_id = result[2] + track_changes = (result[3] == "1") # check doc is in requested project if doc_project_id? and doc_project_id isnt project_id logger.error project_id: project_id, doc_id: doc_id, doc_project_id: doc_project_id, "doc not in project" return callback(new Errors.NotFoundError("document not found")) - callback null, docLines, version, project_id + callback null, docLines, version, track_changes, track_changes_entries getDocVersion: (doc_id, callback = (error, version) ->) -> rclient.get keys.docVersion(doc_id: doc_id), (error, version) -> @@ -104,7 +112,7 @@ module.exports = RedisManager = DOC_OPS_TTL: 60 * minutes DOC_OPS_MAX_LENGTH: 100 - updateDocument : (doc_id, docLines, newVersion, appliedOps = [], callback = (error) ->)-> + updateDocument : (doc_id, docLines, newVersion, appliedOps = [], track_changes_entries, callback = (error) ->)-> RedisManager.getDocVersion doc_id, (error, currentVersion) -> return callback(error) if error? if currentVersion + appliedOps.length != newVersion @@ -119,6 +127,7 @@ module.exports = RedisManager = multi.rpush keys.docOps(doc_id: doc_id), jsonOps... multi.expire keys.docOps(doc_id: doc_id), RedisManager.DOC_OPS_TTL multi.ltrim keys.docOps(doc_id: doc_id), -RedisManager.DOC_OPS_MAX_LENGTH, -1 + multi.set keys.trackChangesEntries(doc_id:doc_id), JSON.stringify(track_changes_entries) multi.exec (error, replys) -> return callback(error) if error? return callback() @@ -126,3 +135,6 @@ module.exports = RedisManager = getDocIdsInProject: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers keys.docsInProject(project_id: project_id), callback + setTrackChanges: (project_id, doc_id, track_changes_on, callback = (error) ->) -> + value = (if track_changes_on then "1" else "0") + rclient.set keys.trackChangesEnabled({doc_id}), value, callback diff --git a/services/document-updater/app/coffee/ShareJsDB.coffee b/services/document-updater/app/coffee/ShareJsDB.coffee index 3d80c680cb..a21c8aea7f 100644 --- a/services/document-updater/app/coffee/ShareJsDB.coffee +++ b/services/document-updater/app/coffee/ShareJsDB.coffee @@ -1,12 +1,11 @@ Keys = require('./UpdateKeys') Settings = require('settings-sharelatex') -DocumentManager = require "./DocumentManager" RedisManager = require "./RedisManager" Errors = require "./Errors" logger = require "logger-sharelatex" module.exports = class ShareJsDB - constructor: () -> + constructor: (@project_id, @doc_id, @lines, @version) -> @appliedOps = {} # ShareJS calls this detacted from the instance, so we need # bind it to keep our context that can access @appliedOps @@ -31,22 +30,14 @@ module.exports = class ShareJsDB callback() getSnapshot: (doc_key, callback) -> - [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) - DocumentManager.getDoc project_id, doc_id, (error, lines, version) -> - return callback(error) if error? - if !lines? or !version? - return callback(new Errors.NotFoundError("document not found: #{doc_id}")) - - if lines.length > 0 and lines[0].text? - type = "json" - snapshot = lines: lines - else - type = "text" - snapshot = lines.join("\n") - callback null, - snapshot: snapshot - v: parseInt(version, 10) - type: type + if doc_key != Keys.combineProjectIdAndDocId(@project_id, @doc_id) + return callback(new Errors.NotFoundError("unexpected doc_key #{doc_key}, expected #{Keys.combineProjectIdAndDocId(@project_id, @doc_id)}")) + else + return callback null, { + snapshot: @lines.join("\n") + v: parseInt(@version, 10) + type: "text" + } # To be able to remove a doc from the ShareJS memory # we need to called Model::delete, which calls this diff --git a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee index 985d03094a..876d56e71b 100644 --- a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee +++ b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee @@ -6,22 +6,19 @@ Settings = require('settings-sharelatex') Keys = require "./UpdateKeys" {EventEmitter} = require "events" util = require "util" - -redis = require("redis-sharelatex") -rclient = redis.createClient(Settings.redis.web) - +WebRedisManager = require "./WebRedisManager" ShareJsModel:: = {} util.inherits ShareJsModel, EventEmitter module.exports = ShareJsUpdateManager = - getNewShareJsModel: () -> - db = new ShareJsDB() + getNewShareJsModel: (project_id, doc_id, lines, version) -> + db = new ShareJsDB(project_id, doc_id, lines, version) model = new ShareJsModel(db, maxDocLength: Settings.max_doc_length) model.db = db return model - applyUpdate: (project_id, doc_id, update, callback = (error, updatedDocLines) ->) -> + applyUpdate: (project_id, doc_id, update, lines, version, callback = (error, updatedDocLines) ->) -> logger.log project_id: project_id, doc_id: doc_id, update: update, "applying sharejs updates" jobs = [] @@ -30,7 +27,7 @@ module.exports = ShareJsUpdateManager = # getting stuck due to queued callbacks (line 260 of sharejs/server/model.coffee) # This adds a small but hopefully acceptable overhead (~12ms per 1000 updates on # my 2009 MBP). - model = @getNewShareJsModel() + model = @getNewShareJsModel(project_id, doc_id, lines, version) @_listenForOps(model) doc_key = Keys.combineProjectIdAndDocId(project_id, doc_id) model.applyOp doc_key, update, (error) -> @@ -55,18 +52,9 @@ module.exports = ShareJsUpdateManager = [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) ShareJsUpdateManager._sendOp(project_id, doc_id, opData) - _sendOp: (project_id, doc_id, opData) -> - data = - project_id: project_id - doc_id: doc_id - op: opData - data = JSON.stringify data - rclient.publish "applied-ops", data + _sendOp: (project_id, doc_id, op) -> + WebRedisManager.sendData {project_id, doc_id, op} _sendError: (project_id, doc_id, error) -> - data = JSON.stringify - project_id: project_id - doc_id: doc_id - error: error.message || error - rclient.publish "applied-ops", data + WebRedisManager.sendData {project_id, doc_id, error: error.message || error} diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index 9aa1c0ad47..94f8a11ca1 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -1,44 +1,12 @@ -settings = require "settings-sharelatex" -request = require "request" -logger = require "logger-sharelatex" -async = require "async" -WebRedisManager = require "./WebRedisManager" +ChangesTracker = require "./ChangesTracker" module.exports = TrackChangesManager = - flushDocChanges: (project_id, doc_id, callback = (error) ->) -> - if !settings.apis?.trackchanges? - logger.warn doc_id: doc_id, "track changes API is not configured, so not flushing" - return callback() - - url = "#{settings.apis.trackchanges.url}/project/#{project_id}/doc/#{doc_id}/flush" - logger.log project_id: project_id, doc_id: doc_id, url: url, "flushing doc in track changes api" - request.post url, (error, res, body)-> - if error? - return callback(error) - else if res.statusCode >= 200 and res.statusCode < 300 - return callback(null) - else - error = new Error("track changes api returned a failure status code: #{res.statusCode}") - return callback(error) - - FLUSH_EVERY_N_OPS: 50 - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> - if ops.length == 0 - return callback() - WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> - return callback(error) if error? - # We want to flush every 50 ops, i.e. 50, 100, 150, etc - # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these - # ops. If we've changed, then we've gone over a multiple of 50 and should flush. - # (Most of the time, we will only hit 50 and then flushing will put us back to 0) - previousLength = length - ops.length - prevBlock = Math.floor(previousLength / TrackChangesManager.FLUSH_EVERY_N_OPS) - newBlock = Math.floor(length / TrackChangesManager.FLUSH_EVERY_N_OPS) - if newBlock != prevBlock - # Do this in the background since it uses HTTP and so may be too - # slow to wait for when processing a doc update. - logger.log length: length, doc_id: doc_id, project_id: project_id, "flushing track changes api" - TrackChangesManager.flushDocChanges project_id, doc_id, (error) -> - if error? - logger.error err: error, doc_id: doc_id, project_id: project_id, "error flushing doc to track changes api" - callback() \ No newline at end of file + applyUpdate: (project_id, doc_id, entries = {}, updates = [], track_changes, callback = (error, new_entries) ->) -> + {changes, comments} = entries + changesTracker = new ChangesTracker(changes, comments) + changesTracker.track_changes = track_changes + for update in updates + for op in update.op + changesTracker.applyOp(op, { user_id: update.meta?.user_id, }) + {changes, comments} = changesTracker + callback null, {changes, comments} \ No newline at end of file diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index f35bc1a9b7..d08d9a62f3 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -2,11 +2,14 @@ LockManager = require "./LockManager" RedisManager = require "./RedisManager" WebRedisManager = require "./WebRedisManager" ShareJsUpdateManager = require "./ShareJsUpdateManager" -TrackChangesManager = require "./TrackChangesManager" +HistoryManager = require "./HistoryManager" Settings = require('settings-sharelatex') async = require("async") logger = require('logger-sharelatex') Metrics = require "./Metrics" +Errors = require "./Errors" +DocumentManager = require "./DocumentManager" +TrackChangesManager = require "./TrackChangesManager" module.exports = UpdateManager = processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) -> @@ -45,12 +48,18 @@ module.exports = UpdateManager = applyUpdate: (project_id, doc_id, update, callback = (error) ->) -> UpdateManager._sanitizeUpdate update - ShareJsUpdateManager.applyUpdate project_id, doc_id, update, (error, updatedDocLines, version, appliedOps) -> + DocumentManager.getDoc project_id, doc_id, (error, lines, version, track_changes, track_changes_entries) -> return callback(error) if error? - logger.log doc_id: doc_id, version: version, "updating doc in redis" - RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, (error) -> + if !lines? or !version? + return callback(new Errors.NotFoundError("document not found: #{doc_id}")) + ShareJsUpdateManager.applyUpdate project_id, doc_id, update, lines, version, (error, updatedDocLines, version, appliedOps) -> return callback(error) if error? - TrackChangesManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback + TrackChangesManager.applyUpdate project_id, doc_id, track_changes_entries, appliedOps, track_changes, (error, new_track_changes_entries) -> + return callback(error) if error? + logger.log doc_id: doc_id, version: version, "updating doc in redis" + RedisManager.updateDocument doc_id, updatedDocLines, version, appliedOps, new_track_changes_entries, (error) -> + return callback(error) if error? + HistoryManager.pushUncompressedHistoryOps project_id, doc_id, appliedOps, callback lockUpdatesAndDo: (method, project_id, doc_id, args..., callback) -> LockManager.getLock doc_id, (error, lockValue) -> diff --git a/services/document-updater/app/coffee/WebRedisManager.coffee b/services/document-updater/app/coffee/WebRedisManager.coffee index 73c099f9da..eb3b6a583c 100644 --- a/services/document-updater/app/coffee/WebRedisManager.coffee +++ b/services/document-updater/app/coffee/WebRedisManager.coffee @@ -32,4 +32,7 @@ module.exports = WebRedisManager = ], (error, results) -> return callback(error) if error? [length, _] = results - callback(error, length) \ No newline at end of file + callback(error, length) + + sendData: (data) -> + rclient.publish "applied-ops", JSON.stringify(data) \ No newline at end of file diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index 15456db932..edb8c56ad3 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -32,6 +32,8 @@ module.exports = docVersion: ({doc_id}) -> "DocVersion:#{doc_id}" projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" + trackChangesEnabled: ({doc_id}) -> "TrackChangesEnabled:#{doc_id}" + trackChangesEntries: ({doc_id}) -> "TrackChangesEntries:#{doc_id}" # }, { # cluster: [{ # port: "7000" @@ -44,6 +46,8 @@ module.exports = # docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" # projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" # docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" + # trackChangesEnabled: ({doc_id}) -> "TrackChangesEnabled:{#{doc_id}}" + # trackChangesEntries: ({doc_id}) -> "TrackChangesEntries:{#{doc_id}}" }] max_doc_length: 2 * 1024 * 1024 # 2mb diff --git a/services/document-updater/test/acceptance/coffee/TrackChangesTests.coffee b/services/document-updater/test/acceptance/coffee/TrackChangesTests.coffee new file mode 100644 index 0000000000..406f46b430 --- /dev/null +++ b/services/document-updater/test/acceptance/coffee/TrackChangesTests.coffee @@ -0,0 +1,96 @@ +sinon = require "sinon" +chai = require("chai") +chai.should() +async = require "async" +rclient = require("redis").createClient() + +MockWebApi = require "./helpers/MockWebApi" +DocUpdaterClient = require "./helpers/DocUpdaterClient" + +describe "Track changes", -> + describe "turning on track changes", -> + before (done) -> + DocUpdaterClient.subscribeToAppliedOps @appliedOpsListener = sinon.stub() + @project_id = DocUpdaterClient.randomId() + @docs = [{ + id: doc_id0 = DocUpdaterClient.randomId() + lines: ["one", "two", "three"] + updatedLines: ["one", "one and a half", "two", "three"] + }, { + id: doc_id1 = DocUpdaterClient.randomId() + lines: ["four", "five", "six"] + updatedLines: ["four", "four and a half", "five", "six"] + }] + for doc in @docs + MockWebApi.insertDoc @project_id, doc.id, { + lines: doc.lines + version: 0 + } + async.series @docs.map((doc) => + (callback) => + DocUpdaterClient.preloadDoc @project_id, doc.id, callback + ), (error) => + throw error if error? + setTimeout () => + DocUpdaterClient.setTrackChangesOn @project_id, (error, res, body) => + @statusCode = res.statusCode + done() + , 200 + + it "should return a 204 status code", -> + @statusCode.should.equal 204 + + it "should send a track changes message to real-time for each doc", -> + @appliedOpsListener.calledWith("applied-ops", JSON.stringify({ + project_id: @project_id, doc_id: @docs[0].id, track_changes_on: true + })).should.equal true + @appliedOpsListener.calledWith("applied-ops", JSON.stringify({ + project_id: @project_id, doc_id: @docs[1].id, track_changes_on: true + })).should.equal true + + it "should set the track changes key in redis", (done) -> + rclient.get "TrackChangesEnabled:#{@docs[0].id}", (error, value) => + throw error if error? + value.should.equal "1" + rclient.get "TrackChangesEnabled:#{@docs[1].id}", (error, value) -> + throw error if error? + value.should.equal "1" + done() + + describe "tracking changes", -> + before (done) -> + @project_id = DocUpdaterClient.randomId() + @doc = { + id: doc_id0 = DocUpdaterClient.randomId() + lines: ["one", "two", "three"] + } + @update = + doc: @doc.id + op: [{ + i: "one and a half\n" + p: 4 + }] + v: 0 + meta: + user_id: @user_id = DocUpdaterClient.randomId() + MockWebApi.insertDoc @project_id, @doc.id, { + lines: @doc.lines + version: 0 + } + DocUpdaterClient.preloadDoc @project_id, @doc.id, (error) => + throw error if error? + DocUpdaterClient.setTrackChangesOn @project_id, (error, res, body) => + throw error if error? + DocUpdaterClient.sendUpdate @project_id, @doc.id, @update, (error) -> + throw error if error? + setTimeout done, 200 + + it "should set the updated track changes entries in redis", (done) -> + rclient.get "TrackChangesEntries:#{@doc.id}", (error, value) => + throw error if error? + entries = JSON.parse(value) + change = entries.changes[0] + change.op.should.deep.equal @update.op[0] + change.metadata.user_id.should.equal @user_id + done() + diff --git a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee index a14f6f9364..b90e7ea82e 100644 --- a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee +++ b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee @@ -72,7 +72,18 @@ module.exports = DocUpdaterClient = deleteProject: (project_id, callback = () ->) -> request.del "http://localhost:3003/project/#{project_id}", callback - - - - + + setTrackChangesOn: (project_id, callback = () ->) -> + request.post { + url: "http://localhost:3003/project/#{project_id}/track_changes" + json: + on: true + }, callback + + setTrackChangesOff: (project_id, callback = () ->) -> + request.post { + url: "http://localhost:3003/project/#{project_id}/track_changes" + json: + on: false + }, callback +