diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 43cddca498..a83d78c7f1 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -61,7 +61,7 @@ app.post "/pack", (req, res, next) -> else logger.log "running pack" packWorker = child_process.fork(__dirname + '/app/js/PackWorker.js', - [req.query.limit, req.query.delay, req.query.timeout]) + [req.query.limit || 1000, req.query.delay || 1000, req.query.timeout || 30*60*1000]) packWorker.on 'exit', (code, signal) -> logger.log {code, signal}, "history auto pack exited" packWorker = null diff --git a/services/track-changes/app/coffee/LockManager.coffee b/services/track-changes/app/coffee/LockManager.coffee index 7e7468d1c2..18a764035c 100644 --- a/services/track-changes/app/coffee/LockManager.coffee +++ b/services/track-changes/app/coffee/LockManager.coffee @@ -1,6 +1,6 @@ Settings = require "settings-sharelatex" redis = require("redis-sharelatex") -rclient = redis.createClient(Settings.redis.web) +rclient = redis.createClient(Settings.redis.lock) os = require "os" crypto = require "crypto" logger = require "logger-sharelatex" diff --git a/services/track-changes/app/coffee/MongoAWS.coffee b/services/track-changes/app/coffee/MongoAWS.coffee index 8a0a065317..4308763a7a 100644 --- a/services/track-changes/app/coffee/MongoAWS.coffee +++ b/services/track-changes/app/coffee/MongoAWS.coffee @@ -46,6 +46,10 @@ module.exports = MongoAWS = return callback new Error("cannot find pack to send to s3") if not result? return callback new Error("refusing to send pack with TTL to s3") if result.expiresAt? uncompressedData = JSON.stringify(result) + if uncompressedData.indexOf("\u0000") != -1 + error = new Error("null bytes found in upload") + logger.error err: error, project_id: project_id, doc_id: doc_id, pack_id: pack_id, error.message + return callback(error) zlib.gzip uncompressedData, (err, buf) -> logger.log {project_id, doc_id, pack_id, origSize: uncompressedData.length, newSize: buf.length}, "compressed pack" return callback(err) if err? diff --git a/services/track-changes/app/coffee/PackManager.coffee b/services/track-changes/app/coffee/PackManager.coffee index d8069da518..d33978229d 100644 --- a/services/track-changes/app/coffee/PackManager.coffee +++ b/services/track-changes/app/coffee/PackManager.coffee @@ -6,6 +6,8 @@ LockManager = require "./LockManager" MongoAWS = require "./MongoAWS" Metrics = require "metrics-sharelatex" ProjectIterator = require "./ProjectIterator" +Settings = require "settings-sharelatex" +keys = Settings.redis.lock.key_schema # Sharejs operations are stored in a 'pack' object # @@ -319,7 +321,7 @@ module.exports = PackManager = insertPacksIntoIndexWithLock: (project_id, doc_id, newPacks, callback) -> LockManager.runWithLock( - "HistoryIndexLock:#{doc_id}", + keys.historyIndexLock({doc_id}), (releaseLock) -> PackManager._insertPacksIntoIndex project_id, doc_id, newPacks, releaseLock callback @@ -438,7 +440,7 @@ module.exports = PackManager = markPackAsFinalisedWithLock: (project_id, doc_id, pack_id, callback) -> LockManager.runWithLock( - "HistoryLock:#{doc_id}", + keys.historyLock({doc_id}), (releaseLock) -> PackManager._markPackAsFinalised project_id, doc_id, pack_id, releaseLock callback diff --git a/services/track-changes/app/coffee/PackWorker.coffee b/services/track-changes/app/coffee/PackWorker.coffee index be5c511d1f..e4c31c3b4a 100644 --- a/services/track-changes/app/coffee/PackWorker.coffee +++ b/services/track-changes/app/coffee/PackWorker.coffee @@ -21,8 +21,10 @@ PackManager = require "./PackManager" source = process.argv[2] DOCUMENT_PACK_DELAY = Number(process.argv[3]) || 1000 TIMEOUT = Number(process.argv[4]) || 30*60*1000 +COUNT = 0 # number processed +TOTAL = 0 # total number to process -if source.match(/[^0-9]/) +if !source.match(/^[0-9]+$/) file = fs.readFileSync source result = for line in file.toString().split('\n') [project_id, doc_id] = line.split(' ') @@ -37,10 +39,11 @@ shutDownTimer = setTimeout () -> # start the shutdown on the next pack shutDownRequested = true # do a hard shutdown after a further 5 minutes - setTimeout () -> + hardTimeout = setTimeout () -> logger.error "HARD TIMEOUT in pack archive worker" process.exit() , 5*60*1000 + hardTimeout.unref() , TIMEOUT logger.log "checking for updates, limit=#{LIMIT}, delay=#{DOCUMENT_PACK_DELAY}, timeout=#{TIMEOUT}" @@ -61,7 +64,7 @@ finish = () -> db.close () -> logger.log 'closing LockManager Redis Connection' LockManager.close () -> - logger.log 'ready to exit from pack archive worker' + logger.log {processedCount: COUNT, allCount: TOTAL}, 'ready to exit from pack archive worker' hardTimeout = setTimeout () -> logger.error 'hard exit from pack archive worker' process.exit(1) @@ -74,7 +77,8 @@ process.on 'exit', (code) -> processUpdates = (pending) -> async.eachSeries pending, (result, callback) -> {_id, project_id, doc_id} = result - logger.log {project_id, doc_id}, "processing" + COUNT++ + logger.log {project_id, doc_id}, "processing #{COUNT}/#{TOTAL}" if not project_id? or not doc_id? logger.log {project_id, doc_id}, "skipping pack, missing project/doc id" return callback() @@ -87,7 +91,7 @@ processUpdates = (pending) -> logger.error {err, result}, "error in pack archive worker" return callback(err) if shutDownRequested - logger.error "shutting down pack archive worker" + logger.warn "shutting down pack archive worker" return callback(new Error("shutdown")) setTimeout () -> callback(err, result) @@ -115,11 +119,13 @@ if pending? logger.log "got #{pending.length} entries from #{source}" processUpdates pending else + oneWeekAgo = new Date(Date.now() - 7 * DAYS) db.docHistory.find({ expiresAt: {$exists: false} project_id: {$exists: true} v_end: {$exists: true} - _id: {$lt: ObjectIdFromDate(new Date(Date.now() - 7 * DAYS))} + _id: {$lt: ObjectIdFromDate(oneWeekAgo)} + last_checked: {$lt: oneWeekAgo} }, {_id:1, doc_id:1, project_id:1}).sort({ last_checked:1 }).limit LIMIT, (err, results) -> @@ -128,5 +134,6 @@ else finish() return pending = _.uniq results, false, (result) -> result.doc_id.toString() - logger.log "found #{pending.length} documents to archive" + TOTAL = pending.length + logger.log "found #{TOTAL} documents to archive" processUpdates pending diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index ef64a1141d..56dcfd4372 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -2,6 +2,7 @@ Settings = require "settings-sharelatex" redis = require("redis-sharelatex") rclient = redis.createClient(Settings.redis.history) Keys = Settings.redis.history.key_schema +async = require "async" module.exports = RedisManager = @@ -33,12 +34,19 @@ module.exports = RedisManager = rclient.smembers Keys.docsWithHistoryOps({project_id}), callback # iterate over keys asynchronously using redis scan (non-blocking) + # handle all the cluster nodes or single redis server _getKeys: (pattern, callback) -> + nodes = rclient.nodes?('master') || [ rclient ]; + doKeyLookupForNode = (node, cb) -> + RedisManager._getKeysFromNode node, pattern, cb + async.concatSeries nodes, doKeyLookupForNode, callback + + _getKeysFromNode: (node, pattern, callback) -> cursor = 0 # redis iterator keySet = {} # use hash to avoid duplicate results # scan over all keys looking for pattern doIteration = (cb) -> - rclient.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) -> + node.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) -> return callback(error) if error? [cursor, keys] = reply for key in keys @@ -50,21 +58,20 @@ module.exports = RedisManager = doIteration() # extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b + # or DocsWithHistoryOps:{57fd0b1f53a8396d22b2c24b} (for redis cluster) _extractIds: (keyList) -> - ids = (key.split(":")[1] for key in keyList) + ids = for key in keyList + m = key.match(/:\{?([0-9a-f]{24})\}?/) # extract object id + m[1] return ids - # this will only work on single node redis, not redis cluster getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> - return callback(new Error("not supported")) if rclient.nodes? RedisManager._getKeys Keys.docsWithHistoryOps({project_id:"*"}), (error, project_keys) -> return callback(error) if error? project_ids = RedisManager._extractIds project_keys callback(error, project_ids) - # this will only work on single node redis, not redis cluster getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) -> - return callback(new Error("not supported")) if rclient.nodes? # return all the docids, to find dangling history entries after # everything is flushed. RedisManager._getKeys Keys.uncompressedHistoryOps({doc_id:"*"}), (error, doc_keys) -> diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index 48423a39ec..3a2cbf29c1 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -9,6 +9,7 @@ logger = require "logger-sharelatex" async = require "async" _ = require "underscore" Settings = require "settings-sharelatex" +keys = Settings.redis.lock.key_schema module.exports = UpdatesManager = compressAndSaveRawUpdates: (project_id, doc_id, rawUpdates, temporary, callback = (error) ->) -> @@ -96,7 +97,9 @@ module.exports = UpdatesManager = length = docUpdates.length # parse the redis strings into ShareJs updates RedisManager.expandDocUpdates docUpdates, (error, rawUpdates) -> - return callback(error) if error? + if error? + logger.err project_id: project_id, doc_id: doc_id, docUpdates: docUpdates, "failed to parse docUpdates" + return callback(error) logger.log project_id: project_id, doc_id: doc_id, rawUpdates: rawUpdates, "retrieved raw updates from redis" UpdatesManager.compressAndSaveRawUpdates project_id, doc_id, rawUpdates, temporary, (error) -> return callback(error) if error? @@ -126,7 +129,7 @@ module.exports = UpdatesManager = UpdatesManager._prepareDocForUpdates project_id, doc_id, (error) -> return callback(error) if error? LockManager.runWithLock( - "HistoryLock:#{doc_id}", + keys.historyLock({doc_id}), (releaseLock) -> UpdatesManager.processUncompressedUpdates project_id, doc_id, temporary, releaseLock callback diff --git a/services/track-changes/config/settings.defaults.coffee b/services/track-changes/config/settings.defaults.coffee index 084ce7de62..907d3c0033 100755 --- a/services/track-changes/config/settings.defaults.coffee +++ b/services/track-changes/config/settings.defaults.coffee @@ -18,10 +18,13 @@ module.exports = user: "sharelatex" pass: "password" redis: - web: + lock: host: "localhost" port: 6379 pass: "" + key_schema: + historyLock: ({doc_id}) -> "HistoryLock:#{doc_id}" + historyIndexLock: ({project_id}) -> "HistoryIndexLock:#{project_id}" history: port:"6379" host:"localhost" diff --git a/services/track-changes/fixdangling.coffee b/services/track-changes/fixdangling.coffee new file mode 100644 index 0000000000..1e656b6fe8 --- /dev/null +++ b/services/track-changes/fixdangling.coffee @@ -0,0 +1,54 @@ +Settings = require "settings-sharelatex" +logger = require "logger-sharelatex" +TrackChangesLogger = logger.initialize("track-changes").logger +async = require "async" +fs = require "fs" +request = require "request" +cli = require "cli" + +mongojs = require "mongojs" +bson = require "bson" +db = mongojs(Settings.mongo.url, ["docs"]) +ObjectId = mongojs.ObjectId + +options = cli.parse({ + port: ['p', 'port number for track changes', 'number'], + force: ['f', 'actually make the fix'] +}); + +if cli.args.length < 1 + console.log "fixdangling -p PORT file_of_doc_ids" + process.exit() + +file = cli.args.pop() +doc_ids = fs.readFileSync(file).toString().trim().split("\n") + +missing = 0 +errored = 0 +success = 0 + +fixDangling = (doc_id, callback) -> + # look up project id from doc id + db.docs.find {_id:ObjectId(doc_id)}, {project_id:1}, (err, result) -> + #console.log "doc_id", doc_id, "err", err, "result", result + if err? + errored++ + return callback() + if !result? or result.length == 0 + missing++ + return callback() + project_id = result[0].project_id + console.log "found project_id", project_id, "for doc_id", doc_id + url = "http://localhost:#{options.port}/project/#{project_id}/doc/#{doc_id}/flush" + if options.force + request.post url, (err, response, body) -> + if err? then errored++ else success++ + callback() + else + console.log "URL:", url + success++ + callback() + +async.eachSeries doc_ids, fixDangling, (err) -> + console.log "final result", err, "missing", missing, "errored", errored, "success", success + db.close() diff --git a/services/track-changes/npm-shrinkwrap.json b/services/track-changes/npm-shrinkwrap.json index 9cb3741716..ae4842c5d1 100644 --- a/services/track-changes/npm-shrinkwrap.json +++ b/services/track-changes/npm-shrinkwrap.json @@ -1025,10 +1025,22 @@ } }, "redis-sharelatex": { - "version": "1.0.0", - "from": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", - "resolved": "git+https://github.com/sharelatex/redis-sharelatex.git#0cd669a9ed73c0330e3b912fc9d9a42dae3c4fbd", + "version": "1.0.2", + "from": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.2", + "resolved": "git+https://github.com/sharelatex/redis-sharelatex.git#143b7eb192675f36d835080e534a4ac4899f918a", "dependencies": { + "async": { + "version": "2.4.0", + "from": "async@>=2.4.0 <3.0.0", + "resolved": "https://registry.npmjs.org/async/-/async-2.4.0.tgz", + "dependencies": { + "lodash": { + "version": "4.17.4", + "from": "lodash@>=4.14.0 <5.0.0", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.4.tgz" + } + } + }, "chai": { "version": "1.9.1", "from": "chai@1.9.1", diff --git a/services/track-changes/package.json b/services/track-changes/package.json index 165178b418..a2656c3840 100644 --- a/services/track-changes/package.json +++ b/services/track-changes/package.json @@ -19,7 +19,7 @@ "metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.7.1", "request": "~2.33.0", "requestretry": "^1.12.0", - "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", + "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.2", "redis": "~0.10.1", "underscore": "~1.7.0", "mongo-uri": "^0.1.2", diff --git a/services/track-changes/test/unit/coffee/LockManager/LockManagerTests.coffee b/services/track-changes/test/unit/coffee/LockManager/LockManagerTests.coffee index fd141ef426..ad4df198a9 100644 --- a/services/track-changes/test/unit/coffee/LockManager/LockManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/LockManager/LockManagerTests.coffee @@ -9,7 +9,7 @@ describe "LockManager", -> beforeEach -> @Settings = redis: - web:{} + lock:{} @LockManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => @rclient = diff --git a/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee index dd79d0bf21..283e288315 100644 --- a/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/PackManager/PackManagerTests.coffee @@ -22,6 +22,8 @@ describe "PackManager", -> "logger-sharelatex": { log: sinon.stub(), error: sinon.stub() } 'metrics-sharelatex': {inc: ()->} "./ProjectIterator": require("../../../../app/js/ProjectIterator.js") # Cache for speed + "settings-sharelatex": + redis: lock: key_schema: {} @callback = sinon.stub() @doc_id = ObjectId().toString() @project_id = ObjectId().toString() diff --git a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee index c422fffec0..88f1c7998a 100644 --- a/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee +++ b/services/track-changes/test/unit/coffee/UpdatesManager/UpdatesManagerTests.coffee @@ -17,6 +17,9 @@ describe "UpdatesManager", -> "./UpdateTrimmer": @UpdateTrimmer = {} "./DocArchiveManager": @DocArchiveManager = {} "logger-sharelatex": { log: sinon.stub(), error: sinon.stub() } + "settings-sharelatex": + redis: lock: key_schema: + historyLock: ({doc_id}) -> "HistoryLock:#{doc_id}" @doc_id = "doc-id-123" @project_id = "project-id-123" @callback = sinon.stub()