From c823e06912a1f8ffda7d5762da2925ec1c1cd2b8 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 8 Jun 2016 16:42:09 +0100 Subject: [PATCH] Don't run redis commands in parallel for easier consistency reasoning --- .../app/coffee/RedisManager.coffee | 49 +++++++++---------- .../app/coffee/TrackChangesManager.coffee | 8 +-- .../TrackChangesManagerTests.coffee | 14 +++--- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index d5908be12e..75312298bb 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -11,40 +11,37 @@ Errors = require "./Errors" minutes = 60 # seconds for Redis expire module.exports = RedisManager = - putDocInMemory : (project_id, doc_id, docLines, version, callback)-> + putDocInMemory : (project_id, doc_id, docLines, version, _callback)-> timer = new metrics.Timer("redis.put-doc") - logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" - async.parallel [ - (cb) -> - multi = rclient.multi() - multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) - multi.set keys.projectKey({doc_id:doc_id}), project_id - multi.set keys.docVersion(doc_id:doc_id), version - multi.exec cb - (cb) -> - rclient.sadd keys.docsInProject(project_id:project_id), doc_id, cb - ], (err) -> + callback = (error) -> timer.done() - callback(err) + _callback(error) + logger.log project_id:project_id, doc_id:doc_id, version: version, "putting doc in redis" + multi = rclient.multi() + multi.set keys.docLines(doc_id:doc_id), JSON.stringify(docLines) + multi.set keys.projectKey({doc_id:doc_id}), project_id + multi.set keys.docVersion(doc_id:doc_id), version + multi.exec (error) -> + return callback(error) if error? + rclient.sadd keys.docsInProject(project_id:project_id), doc_id, callback - removeDocFromMemory : (project_id, doc_id, callback)-> + removeDocFromMemory : (project_id, doc_id, _callback)-> logger.log project_id:project_id, doc_id:doc_id, "removing doc from redis" - async.parallel [ - (cb) -> - multi = rclient.multi() - multi.del keys.docLines(doc_id:doc_id) - multi.del keys.projectKey(doc_id:doc_id) - multi.del keys.docVersion(doc_id:doc_id) - multi.exec cb - (cb) -> - rclient.srem keys.docsInProject(project_id:project_id), doc_id, cb - ], (err) -> + callback = (err) -> if err? logger.err project_id:project_id, doc_id:doc_id, err:err, "error removing doc from redis" - callback(err, null) + _callback(err) else logger.log project_id:project_id, doc_id:doc_id, "removed doc from redis" - callback() + _callback() + + multi = rclient.multi() + multi.del keys.docLines(doc_id:doc_id) + multi.del keys.projectKey(doc_id:doc_id) + multi.del keys.docVersion(doc_id:doc_id) + multi.exec (error) -> + return callback(error) if error? + rclient.srem keys.docsInProject(project_id:project_id), doc_id, callback getDoc : (doc_id, callback = (error, lines, version) ->)-> timer = new metrics.Timer("redis.get-doc") diff --git a/services/document-updater/app/coffee/TrackChangesManager.coffee b/services/document-updater/app/coffee/TrackChangesManager.coffee index 43d2314149..7661a52320 100644 --- a/services/document-updater/app/coffee/TrackChangesManager.coffee +++ b/services/document-updater/app/coffee/TrackChangesManager.coffee @@ -25,10 +25,10 @@ module.exports = TrackChangesManager = FLUSH_EVERY_N_OPS: 50 pushUncompressedHistoryOp: (project_id, doc_id, op, callback = (error) ->) -> jsonOp = JSON.stringify op - async.parallel [ - (cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp, cb - (cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb - ], (error, results) -> + multi = rclient.multi() + multi.rpush "UncompressedHistoryOps:#{doc_id}", jsonOp + multi.sadd "DocsWithHistoryOps:#{project_id}", doc_id + multi.exec (error, results) -> return callback(error) if error? [length, _] = results if length > 0 and length % TrackChangesManager.FLUSH_EVERY_N_OPS == 0 diff --git a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee index f43a3a0c43..bd72db3669 100644 --- a/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/TrackChangesManager/TrackChangesManagerTests.coffee @@ -43,8 +43,10 @@ describe "TrackChangesManager", -> describe "pushUncompressedHistoryOp", -> beforeEach -> @op = { op: [{ i: "foo", p: 4 }] } - @rclient.rpush = sinon.stub().yields(null, @length = 42) - @rclient.sadd = sinon.stub().yields() + @rclient.multi = sinon.stub().returns(@multi = {}) + @multi.rpush = sinon.stub() + @multi.sadd = sinon.stub() + @multi.exec = sinon.stub().yields(null, [@length = 42, "foo"]) @TrackChangesManager.flushDocChanges = sinon.stub().callsArg(2) describe "pushing the op", -> @@ -52,10 +54,10 @@ describe "TrackChangesManager", -> @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should push the op into redis", -> - @rclient.rpush + @multi.rpush .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify @op) .should.equal true - @rclient.sadd + @multi.sadd .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) .should.equal true @@ -67,7 +69,7 @@ describe "TrackChangesManager", -> describe "when there are a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @rclient.rpush = sinon.stub().yields(null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback it "should tell the track changes api to flush", -> @@ -77,7 +79,7 @@ describe "TrackChangesManager", -> describe "when TrackChangesManager errors", -> beforeEach -> - @rclient.rpush = sinon.stub().yields(null, 2 * @TrackChangesManager.FLUSH_EVERY_N_OPS) + @multi.exec = sinon.stub().yields(null, [2 * @TrackChangesManager.FLUSH_EVERY_N_OPS, "foo"]) @TrackChangesManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) @TrackChangesManager.pushUncompressedHistoryOp @project_id, @doc_id, @op, @callback