From 8ef03c3d2f8a63289ffc8806e0efcb42f99ec02b Mon Sep 17 00:00:00 2001 From: James Allen Date: Thu, 23 Jun 2016 15:38:51 +0100 Subject: [PATCH] Add in application layer monitoring of the health of each cluster node --- .../app/coffee/RedisBackend.coffee | 25 ++++++++ .../app/coffee/RedisManager.coffee | 2 + .../RedisBackend/RedisBackendTests.coffee | 59 +++++++++++++++++++ .../RedisManager/RedisManagerTests.coffee | 4 +- 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/services/document-updater/app/coffee/RedisBackend.coffee b/services/document-updater/app/coffee/RedisBackend.coffee index 8df988faba..7d02ba72af 100644 --- a/services/document-updater/app/coffee/RedisBackend.coffee +++ b/services/document-updater/app/coffee/RedisBackend.coffee @@ -5,6 +5,8 @@ logger = require "logger-sharelatex" class Client constructor: (@clients) -> + @HEARTBEAT_INTERVAL = 5000 + @HEARTBEAT_TIMEOUT = 2000 multi: () -> return new MultiClient( @@ -16,6 +18,29 @@ class Client } ) + monitorAndReconnect: () -> + for client in @clients + if client.driver == "ioredis" + @_monitorCluster(client.rclient) + + _monitorCluster: (rclient) -> + setInterval () => + # Nodes can come and go as the cluster moves/heals, so each heartbeat + # we ask again for the currently known nodes. + for node in rclient.nodes("all") + do (node) => + timer = setTimeout () => + logger.error {err: new Error("Node timed out, reconnecting"), key: node.options.key} + node.stream.destroy() + timer = null + , @HEARTBEAT_TIMEOUT + node.ping (err) -> + if !err? + clearTimeout timer + timer = null + , @HEARTBEAT_INTERVAL + + class MultiClient constructor: (@clients) -> diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index 226214599e..7fe03f88d8 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -10,6 +10,8 @@ Errors = require "./Errors" # Make times easy to read minutes = 60 # seconds for Redis expire +rclient.monitorAndReconnect() + module.exports = RedisManager = putDocInMemory : (project_id, doc_id, docLines, version, _callback)-> timer = new metrics.Timer("redis.put-doc") diff --git a/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee b/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee index ca48aff7ff..263bc7deab 100644 --- a/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee @@ -42,6 +42,8 @@ describe "RedisBackend", -> class Cluster constructor: (@config) -> test_context.rclient_ioredis = @ + + nodes: sinon.stub() @RedisBackend = SandboxedModule.require modulePath, requires: "settings-sharelatex": @Settings @@ -305,3 +307,60 @@ describe "RedisBackend", -> }, "error in redis backend") .should.equal true + describe "monitorAndReconnect", -> + beforeEach -> + @client._monitorCluster = sinon.stub() + @client.monitorAndReconnect() + + it "should monitor the cluster client", -> + @client._monitorCluster + .calledWith(@rclient_ioredis) + .should.equal true + + describe "_monitorCluster", -> + beforeEach -> + @client.HEARTBEAT_TIMEOUT = 10 + @client.HEARTBEAT_INTERVAL = 100 + @nodes = [{ + options: key: "node-0" + stream: destroy: sinon.stub() + }, { + options: key: "node-1" + stream: destroy: sinon.stub() + }] + @rclient_ioredis.nodes = sinon.stub().returns(@nodes) + + describe "successfully", -> + beforeEach -> + @nodes[0].ping = (cb) -> cb() + @nodes[1].ping = (cb) -> cb() + @client._monitorCluster(@rclient_ioredis) + + it "should get all nodes", -> + setTimeout () => + @rclient_ioredis.nodes + .calledWith("all") + .should.equal true + , 200 + + it "should not reset the node connections", (done) -> + setTimeout () => + @nodes[0].stream.destroy.called.should.equal false + @nodes[1].stream.destroy.called.should.equal false + done() + , 200 + + describe "when ping fails to a node", -> + beforeEach -> + @nodes[0].ping = (cb) -> cb() + @nodes[1].ping = (cb) -> # Just hang + @client._monitorCluster(@rclient_ioredis) + + it "should reset the failing node connection", (done) -> + setTimeout () => + @nodes[0].stream.destroy.called.should.equal false + @nodes[1].stream.destroy.called.should.equal true + done() + , 200 + + \ No newline at end of file diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 7ee63de648..d88dafb9bb 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -10,9 +10,11 @@ describe "RedisManager", -> @rclient = auth: () -> exec: sinon.stub() + monitorAndReconnect: () -> @rclient.multi = () => @rclient @RedisManager = SandboxedModule.require modulePath, requires: - "./RedisBackend": createClient: () => @rclient + "./RedisBackend": + createClient: () => @rclient "./RedisKeyBuilder": blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" docLines: ({doc_id}) -> "doclines:#{doc_id}"