From d9ed026d9174916c58e298cb863cbc1ace731d1b Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 12 Apr 2017 16:34:28 +0100 Subject: [PATCH 1/3] simple flush for all projects does not work with redis cluster, only single redis --- services/track-changes/app.coffee | 2 ++ .../app/coffee/HttpController.coffee | 12 ++++++++++++ .../app/coffee/RedisManager.coffee | 9 +++++++++ .../app/coffee/UpdatesManager.coffee | 17 +++++++++++++++++ 4 files changed, 40 insertions(+) diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 0a43cd1503..9cd9edee75 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -50,6 +50,8 @@ app.post "/project/:project_id/doc/:doc_id/version/:version/restore", HttpContro app.post '/project/:project_id/doc/:doc_id/push', HttpController.pushDocHistory app.post '/project/:project_id/doc/:doc_id/pull', HttpController.pullDocHistory +app.post '/flush/all', HttpController.flushAll + packWorker = null # use a single packing worker app.post "/pack", (req, res, next) -> diff --git a/services/track-changes/app/coffee/HttpController.coffee b/services/track-changes/app/coffee/HttpController.coffee index eecc618330..44ba8d48ba 100644 --- a/services/track-changes/app/coffee/HttpController.coffee +++ b/services/track-changes/app/coffee/HttpController.coffee @@ -22,6 +22,18 @@ module.exports = HttpController = return next(error) if error? res.send 204 + flushAll: (req, res, next = (error) ->) -> + logger.log "flushing all projects" + UpdatesManager.flushAll (error, result) -> + return next(error) if error? + {failed, succeeded} = result + status = "#{succeeded.length} succeeded, #{failed.length} failed" + if failed.length > 0 + logger.log {failed: failed, succeeded: succeeded}, "error flushing projects" + res.status(500).send "#{status}\nfailed to flush:\n#{failed.join('\n')}\n" + else + res.status(200).send "#{status}\nflushed all #{succeeded.length} projects\n" + checkDoc: (req, res, next = (error) ->) -> doc_id = req.params.doc_id project_id = req.params.project_id diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index a634bbfed9..25719e753f 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -32,3 +32,12 @@ module.exports = RedisManager = getDocIdsWithHistoryOps: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers docsWithHistoryOpsKey(project_id), callback + + # this will only work on single node redis, not redis cluster + getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> + rclient.keys docsWithHistoryOpsKey("*"), (error, project_keys) -> + return callback(error) if error? + project_ids = for key in project_keys + [prefix, project_id] = key.split(":") + project_id + callback(error, project_ids) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index f01681e5f0..e1a7b086d8 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -144,6 +144,23 @@ module.exports = UpdatesManager = UpdatesManager._processUncompressedUpdatesForDocWithLock project_id, doc_id, temporary, cb async.parallelLimit jobs, 5, callback + # flush all outstanding changes + flushAll: (callback = (error, result) ->) -> + RedisManager.getProjectIdsWithHistoryOps (error, project_ids) -> + return callback(error) if error? + logger.log {count: project_ids?.length, project_ids: project_ids}, "found projects" + jobs = [] + for project_id in project_ids + do (project_id) -> + jobs.push (cb) -> + UpdatesManager.processUncompressedUpdatesForProject project_id, (err) -> + return cb(null, {failed: err?, project_id: project_id}) + async.series jobs, (error, result) -> + return callback(error) if error? + failedProjects = (x.project_id for x in result when x.failed) + succeededProjects = (x.project_id for x in result when not x.failed) + callback(null, {failed: failedProjects, succeeded: succeededProjects}) + getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> return callback(error) if error? From 9ce6d77cca93bc33fa567dddaedb84056d83a388 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 13 Apr 2017 11:31:45 +0100 Subject: [PATCH 2/3] add check for dangling updates --- services/track-changes/app.coffee | 1 + .../app/coffee/HttpController.coffee | 10 ++++++++++ .../app/coffee/RedisManager.coffee | 18 +++++++++++++++--- .../app/coffee/UpdatesManager.coffee | 14 ++++++++++++++ 4 files changed, 40 insertions(+), 3 deletions(-) diff --git a/services/track-changes/app.coffee b/services/track-changes/app.coffee index 9cd9edee75..43cddca498 100644 --- a/services/track-changes/app.coffee +++ b/services/track-changes/app.coffee @@ -51,6 +51,7 @@ app.post '/project/:project_id/doc/:doc_id/push', HttpController.pushDocHistory app.post '/project/:project_id/doc/:doc_id/pull', HttpController.pullDocHistory app.post '/flush/all', HttpController.flushAll +app.post '/check/dangling', HttpController.checkDanglingUpdates packWorker = null # use a single packing worker diff --git a/services/track-changes/app/coffee/HttpController.coffee b/services/track-changes/app/coffee/HttpController.coffee index 44ba8d48ba..0b59a6e1ec 100644 --- a/services/track-changes/app/coffee/HttpController.coffee +++ b/services/track-changes/app/coffee/HttpController.coffee @@ -34,6 +34,16 @@ module.exports = HttpController = else res.status(200).send "#{status}\nflushed all #{succeeded.length} projects\n" + checkDanglingUpdates: (req, res, next = (error) ->) -> + logger.log "checking dangling updates" + UpdatesManager.getDanglingUpdates (error, result) -> + return next(error) if error? + if result.length > 0 + logger.log {dangling: result}, "found dangling updates" + res.status(500).send "dangling updates:\n#{result.join('\n')}\n" + else + res.status(200).send "no dangling updates found\n" + checkDoc: (req, res, next = (error) ->) -> doc_id = req.params.doc_id project_id = req.params.project_id diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index 25719e753f..f29c9bc7ce 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -33,11 +33,23 @@ module.exports = RedisManager = getDocIdsWithHistoryOps: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers docsWithHistoryOpsKey(project_id), callback + # extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b + _extractIds: (keyList) -> + ids = (key.split(":")[1] for key in keyList) + return ids + # this will only work on single node redis, not redis cluster getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> rclient.keys docsWithHistoryOpsKey("*"), (error, project_keys) -> return callback(error) if error? - project_ids = for key in project_keys - [prefix, project_id] = key.split(":") - project_id + project_ids = RedisManager._extractIds project_keys callback(error, project_ids) + + # this will only work on single node redis, not redis cluster + getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) -> + # return all the docids, to find dangling history entries after + # everything is flushed. + rclient.keys rawUpdatesKey("*"), (error, doc_keys) -> + return callback(error) if error? + doc_ids = RedisManager._extractIds doc_keys + callback(error, doc_ids) diff --git a/services/track-changes/app/coffee/UpdatesManager.coffee b/services/track-changes/app/coffee/UpdatesManager.coffee index e1a7b086d8..ca79d26d59 100644 --- a/services/track-changes/app/coffee/UpdatesManager.coffee +++ b/services/track-changes/app/coffee/UpdatesManager.coffee @@ -159,8 +159,22 @@ module.exports = UpdatesManager = return callback(error) if error? failedProjects = (x.project_id for x in result when x.failed) succeededProjects = (x.project_id for x in result when not x.failed) + RedisManager.getAllDocIdsWithHistoryOps (error, doc_ids) -> callback(null, {failed: failedProjects, succeeded: succeededProjects}) + getDanglingUpdates: (callback = (error, doc_ids) ->) -> + RedisManager.getAllDocIdsWithHistoryOps (error, all_doc_ids) -> + return callback(error) if error? + RedisManager.getProjectIdsWithHistoryOps (error, all_project_ids) -> + return callback(error) if error? + # function to get doc_ids for each project + task = (cb) -> async.concatSeries all_project_ids, RedisManager.getDocIdsWithHistoryOps, cb + # find the dangling doc ids + task (error, project_doc_ids) -> + dangling_doc_ids = _.difference(all_doc_ids, project_doc_ids) + logger.log {all_doc_ids: all_doc_ids, all_project_ids: all_project_ids, project_doc_ids: project_doc_ids, dangling_doc_ids: dangling_doc_ids}, "checking for dangling doc ids" + callback(null, dangling_doc_ids) + getDocUpdates: (project_id, doc_id, options = {}, callback = (error, updates) ->) -> UpdatesManager.processUncompressedUpdatesWithLock project_id, doc_id, (error) -> return callback(error) if error? From b5b61b98d0d8e19d14f36cd1f4c6ef622644fcac Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 19 Apr 2017 15:39:33 +0100 Subject: [PATCH 3/3] avoid blocking when fetching redis keys use scan instead of keys method --- .../app/coffee/RedisManager.coffee | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/services/track-changes/app/coffee/RedisManager.coffee b/services/track-changes/app/coffee/RedisManager.coffee index f29c9bc7ce..b58b99f11f 100644 --- a/services/track-changes/app/coffee/RedisManager.coffee +++ b/services/track-changes/app/coffee/RedisManager.coffee @@ -33,6 +33,23 @@ module.exports = RedisManager = getDocIdsWithHistoryOps: (project_id, callback = (error, doc_ids) ->) -> rclient.smembers docsWithHistoryOpsKey(project_id), callback + # iterate over keys asynchronously using redis scan (non-blocking) + _getKeys: (pattern, callback) -> + cursor = 0 # redis iterator + keySet = {} # use hash to avoid duplicate results + # scan over all keys looking for pattern + doIteration = (cb) -> + rclient.scan cursor, "MATCH", pattern, "COUNT", 1000, (error, reply) -> + return callback(error) if error? + [cursor, keys] = reply + for key in keys + keySet[key] = true + if cursor == '0' # note redis returns string result not numeric + return callback(null, Object.keys(keySet)) + else + doIteration() + doIteration() + # extract ids from keys like DocsWithHistoryOps:57fd0b1f53a8396d22b2c24b _extractIds: (keyList) -> ids = (key.split(":")[1] for key in keyList) @@ -40,7 +57,7 @@ module.exports = RedisManager = # this will only work on single node redis, not redis cluster getProjectIdsWithHistoryOps: (callback = (error, project_ids) ->) -> - rclient.keys docsWithHistoryOpsKey("*"), (error, project_keys) -> + RedisManager._getKeys docsWithHistoryOpsKey("*"), (error, project_keys) -> return callback(error) if error? project_ids = RedisManager._extractIds project_keys callback(error, project_ids) @@ -49,7 +66,7 @@ module.exports = RedisManager = getAllDocIdsWithHistoryOps: (callback = (error, doc_ids) ->) -> # return all the docids, to find dangling history entries after # everything is flushed. - rclient.keys rawUpdatesKey("*"), (error, doc_keys) -> + RedisManager._getKeys rawUpdatesKey("*"), (error, doc_keys) -> return callback(error) if error? doc_ids = RedisManager._extractIds doc_keys callback(error, doc_ids)