From f21208e8414e4398dbf0fa5b773ae6287147b878 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 12 Apr 2017 14:53:03 +0100 Subject: [PATCH 1/8] Use new redis-sharelatex instead of RedisBackend for cluster abstraction --- services/document-updater/app.coffee | 18 +- .../app/coffee/RedisBackend.coffee | 206 ------- .../app/coffee/RedisKeyBuilder.coffee | 44 -- .../app/coffee/RedisManager.coffee | 5 +- .../config/settings.defaults.coffee | 36 +- services/document-updater/package.json | 2 +- .../coffee/ApplyingUpdatesToADocTests.coffee | 18 +- .../coffee/SettingADocumentTests.coffee | 7 +- .../RedisBackend/RedisBackendTests.coffee | 504 ------------------ .../RedisManager/RedisManagerTests.coffee | 30 +- .../WebRedisManagerTests.coffee | 1 + 11 files changed, 63 insertions(+), 808 deletions(-) delete mode 100644 services/document-updater/app/coffee/RedisBackend.coffee delete mode 100644 services/document-updater/app/coffee/RedisKeyBuilder.coffee delete mode 100644 services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index 36c0cb3a72..31e8ebb3b3 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -8,7 +8,6 @@ if Settings.sentry?.dsn? RedisManager = require('./app/js/RedisManager') DispatchManager = require('./app/js/DispatchManager') -Keys = require('./app/js/RedisKeyBuilder') Errors = require "./app/js/Errors" HttpController = require "./app/js/HttpController" @@ -63,15 +62,18 @@ app.get '/status', (req, res)-> else res.send('document updater is alive') -redisCheck = require("redis-sharelatex").activeHealthCheckRedis(Settings.redis.web) -app.get "/health_check/redis", (req, res, next)-> - if redisCheck.isAlive() - res.send 200 - else - res.send 500 +webRedisClient = require("redis-sharelatex").createClient(Settings.redis.web) +app.get "/health_check/redis", (req, res, next) -> + webRedisClient.healthCheck (error) -> + if error? + logger.err {err: error}, "failed redis health check" + res.send 500 + else + res.send 200 +docUpdaterRedisClient = require("redis-sharelatex").createClient(Settings.redis.documentupdater) app.get "/health_check/redis_cluster", (req, res, next) -> - RedisManager.rclient.healthCheck (error, alive) -> + docUpdaterRedisClient.healthCheck (error) -> if error? logger.err {err: error}, "failed redis cluster health check" res.send 500 diff --git a/services/document-updater/app/coffee/RedisBackend.coffee b/services/document-updater/app/coffee/RedisBackend.coffee deleted file mode 100644 index d69cd21a6e..0000000000 --- a/services/document-updater/app/coffee/RedisBackend.coffee +++ /dev/null @@ -1,206 +0,0 @@ -Settings = require "settings-sharelatex" -async = require "async" -_ = require "underscore" -logger = require "logger-sharelatex" -Metrics = require "metrics-sharelatex" - -class Client - constructor: (@clients) -> - @SECONDARY_TIMEOUT = 600 - @HEARTBEAT_TIMEOUT = 2000 - - multi: () -> - return new MultiClient( - @clients.map (client) -> { - rclient: client.rclient.multi() - key_schema: client.key_schema - primary: client.primary - driver: client.driver - } - ) - - healthCheck: (callback) -> - jobs = @clients.map (client) => - (cb) => @_healthCheckClient(client, cb) - async.parallel jobs, callback - - _healthCheckClient: (client, callback) -> - if client.driver == "ioredis" - @_healthCheckClusterClient(client, callback) - else - @_healthCheckNodeRedisClient(client, callback) - - _healthCheckNodeRedisClient: (client, callback) -> - client.healthCheck ?= require("redis-sharelatex").activeHealthCheckRedis(Settings.redis.web) - if client.healthCheck.isAlive() - return callback() - else - return callback(new Error("node-redis client failed health check")) - - _healthCheckClusterClient: (client, callback) -> - jobs = client.rclient.nodes("all").map (n) => - (cb) => @_checkNode(n, cb) - async.parallel jobs, callback - - _checkNode: (node, _callback) -> - callback = (args...) -> - _callback(args...) - _callback = () -> - timer = setTimeout () -> - error = new Error("ioredis node ping check timed out") - logger.error {err: error, key: node.options.key}, "node timed out" - callback(error) - , @HEARTBEAT_TIMEOUT - node.ping (err) -> - clearTimeout timer - callback(err) - -class MultiClient - constructor: (@clients) -> - @SECONDARY_TIMEOUT = 600 - - exec: (callback) -> - primaryError = null - primaryResult = null - jobs = @clients.map (client) => - (cb) => - cb = _.once(cb) - timer = new Metrics.Timer("redis.#{client.driver}.exec") - - timeout = null - if !client.primary - timeout = setTimeout () -> - logger.error {err: new Error("#{client.driver} backend timed out")}, "backend timed out" - cb() - , @SECONDARY_TIMEOUT - - client.rclient.exec (error, result) => - timer.done() - if client.driver == "ioredis" - # ioredis returns an results like: - # [ [null, 42], [null, "foo"] ] - # where the first entries in each 2-tuple are - # presumably errors for each individual command, - # and the second entry is the result. We need to transform - # this into the same result as the old redis driver: - # [ 42, "foo" ] - filtered_result = [] - for entry in result or [] - if entry[0]? - return cb(entry[0]) - else - filtered_result.push entry[1] - result = filtered_result - - if client.primary - primaryError = error - primaryResult = result - if timeout? - clearTimeout(timeout) - cb(error, result) - async.parallel jobs, (error, results) -> - if error? - # suppress logging of errors - # logger.error {err: error}, "error in redis backend" - else - compareResults(results, "exec") - callback(primaryError, primaryResult) - -COMMANDS = { - "get": 0, - "smembers": 0, - "set": 0, - "srem": 0, - "sadd": 0, - "del": 0, - "lrange": 0, - "llen": 0, - "rpush": 0, - "expire": 0, - "ltrim": 0, - "incr": 0, - "eval": 2 -} -for command, key_pos of COMMANDS - do (command, key_pos) -> - Client.prototype[command] = (args..., callback) -> - primaryError = null - primaryResult = [] - jobs = @clients.map (client) => - (cb) => - cb = _.once(cb) - key_builder = args[key_pos] - key = key_builder(client.key_schema) - args_with_key = args.slice(0) - args_with_key[key_pos] = key - timer = new Metrics.Timer("redis.#{client.driver}.#{command}") - - timeout = null - if !client.primary - timeout = setTimeout () -> - logger.error {err: new Error("#{client.driver} backend timed out")}, "backend timed out" - cb() - , @SECONDARY_TIMEOUT - - client.rclient[command] args_with_key..., (error, result...) => - timer.done() - if client.primary - primaryError = error - primaryResult = result - if timeout? - clearTimeout(timeout) - cb(error, result...) - async.parallel jobs, (error, results) -> - if error? - logger.error {err: error}, "error in redis backend" - else - compareResults(results, command) - callback(primaryError, primaryResult...) - - MultiClient.prototype[command] = (args...) -> - for client in @clients - key_builder = args[key_pos] - key = key_builder(client.key_schema) - args_with_key = args.slice(0) - args_with_key[key_pos] = key - client.rclient[command] args_with_key... - -compareResults = (results, command) -> - return if results.length < 2 - first = results[0] - if command == "smembers" and first? - first = first.slice().sort() - for result in results.slice(1) - if command == "smembers" and result? - result = result.slice().sort() - if not _.isEqual(first, result) - logger.error results: results, "redis backend conflict" - Metrics.inc "backend-conflict" - else - Metrics.inc "backend-match" - -module.exports = - createClient: () -> - client_configs = Settings.redis.documentupdater - unless client_configs instanceof Array - client_configs.primary = true - client_configs = [client_configs] - clients = client_configs.map (config) -> - if config.cluster? - Redis = require("ioredis") - rclient = new Redis.Cluster(config.cluster) - driver = "ioredis" - else - redis_config = {} - for key in ["host", "port", "password", "endpoints", "masterName"] - if config[key]? - redis_config[key] = config[key] - rclient = require("redis-sharelatex").createClient(redis_config) - driver = "noderedis" - return { - rclient: rclient - key_schema: config.key_schema - primary: config.primary - driver: driver - } - return new Client(clients) \ No newline at end of file diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee deleted file mode 100644 index adde3ee1c9..0000000000 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ /dev/null @@ -1,44 +0,0 @@ -# The default key schema looks like: -# doclines:foo -# DocVersion:foo -# but if we use redis cluster, we want all 'foo' keys to map to the same -# node, so we must use: -# doclines:{foo} -# DocVersion:{foo} -# since redis hashes on the contents of {...}. -# -# To transparently support different key schemas for different clients -# (potential writing/reading to both a cluster and single instance -# while we migrate), instead of keys, we now pass around functions which -# will build the key when passed a schema. -# -# E.g. -# key_schema = Settings.redis.keys -# key_schema == { docLines: ({doc_id}) -> "doclines:#{doc_id}", ... } -# key_builder = RedisKeyBuilder.docLines({doc_id: "foo"}) -# key_builder == (key_schema) -> key_schema.docLines({doc_id: "foo"}) -# key = key_builder(key_schema) -# key == "doclines:foo" -module.exports = RedisKeyBuilder = - blockingKey: ({doc_id}) -> - return (key_schema) -> key_schema.blockingKey({doc_id}) - docLines: ({doc_id}) -> - return (key_schema) -> key_schema.docLines({doc_id}) - docOps: ({doc_id}) -> - return (key_schema) -> key_schema.docOps({doc_id}) - docVersion: ({doc_id}) -> - return (key_schema) -> key_schema.docVersion({doc_id}) - docHash: ({doc_id}) -> - return (key_schema) -> key_schema.docHash({doc_id}) - projectKey: ({doc_id}) -> - return (key_schema) -> key_schema.projectKey({doc_id}) - uncompressedHistoryOp: ({doc_id}) -> - return (key_schema) -> key_schema.uncompressedHistoryOp({doc_id}) - pendingUpdates: ({doc_id}) -> - return (key_schema) -> key_schema.pendingUpdates({doc_id}) - ranges: ({doc_id}) -> - return (key_schema) -> key_schema.ranges({doc_id}) - docsInProject: ({project_id}) -> - return (key_schema) -> key_schema.docsInProject({project_id}) - docsWithHistoryOps: ({project_id}) -> - return (key_schema) -> key_schema.docsWithHistoryOps({project_id}) diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index cf8249dbd7..3359a36231 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -1,8 +1,7 @@ Settings = require('settings-sharelatex') async = require('async') -rclient = require("./RedisBackend").createClient() +rclient = require("redis-sharelatex").createClient(Settings.redis.documentupdater) _ = require('underscore') -keys = require('./RedisKeyBuilder') logger = require('logger-sharelatex') metrics = require('./Metrics') Errors = require "./Errors" @@ -25,6 +24,8 @@ logHashWriteErrors = logHashErrors?.write MEGABYTES = 1024 * 1024 MAX_RANGES_SIZE = 3 * MEGABYTES +keys = Settings.redis.documentupdater.key_schema + module.exports = RedisManager = rclient: rclient diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index ae0f9fe681..d638329622 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -20,11 +20,10 @@ module.exports = port:"6379" host:"localhost" password:"" - documentupdater: [{ - primary: true - port:"6379" - host:"localhost" - password:"" + documentupdater: + port: "6379" + host: "localhost" + password: "" key_schema: blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" docLines: ({doc_id}) -> "doclines:#{doc_id}" @@ -34,20 +33,19 @@ module.exports = projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" - # }, { - # cluster: [{ - # port: "7000" - # host: "localhost" - # }] - # key_schema: - # blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" - # docLines: ({doc_id}) -> "doclines:{#{doc_id}}" - # docOps: ({doc_id}) -> "DocOps:{#{doc_id}}" - # docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" - # projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" - # docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" - # ranges: ({doc_id}) -> "Ranges:{#{doc_id}}" - }] + # cluster: [{ + # port: "7000" + # host: "localhost" + # }] + # key_schema: + # blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" + # docLines: ({doc_id}) -> "doclines:{#{doc_id}}" + # docOps: ({doc_id}) -> "DocOps:{#{doc_id}}" + # docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" + # docHash: ({doc_id}) -> "DocHash:{#{doc_id}}" + # projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" + # docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" + # ranges: ({doc_id}) -> "Ranges:{#{doc_id}}" max_doc_length: 2 * 1024 * 1024 # 2mb diff --git a/services/document-updater/package.json b/services/document-updater/package.json index 94e8881810..fecda6f936 100644 --- a/services/document-updater/package.json +++ b/services/document-updater/package.json @@ -14,7 +14,7 @@ "logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#v1.5.6", "lynx": "0.0.11", "metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.5.0", - "redis-sharelatex": "0.0.9", + "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", "request": "2.25.0", "sandboxed-module": "~0.2.0", "settings-sharelatex": "git+https://github.com/sharelatex/settings-sharelatex.git#v1.0.0", diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index bdfe89b990..b0cca1d18b 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -4,7 +4,9 @@ chai.should() expect = chai.expect async = require "async" Settings = require('settings-sharelatex') -rclient = require("redis-sharelatex").createClient(Settings.redis.web) +rclient_web = require("redis-sharelatex").createClient(Settings.redis.web) +rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater) +Keys = Settings.redis.documentupdater.key_schema MockTrackChangesApi = require "./helpers/MockTrackChangesApi" MockWebApi = require "./helpers/MockWebApi" @@ -47,10 +49,10 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => throw error if error? JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => throw error if error? result.should.equal 1 done() @@ -80,9 +82,9 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => result.should.equal 1 done() @@ -125,17 +127,17 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => updates = (JSON.parse(u) for u in updates) for appliedUpdate, i in @updates appliedUpdate.op.should.deep.equal updates[i].op - rclient.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => result.should.equal 1 done() it "should store the doc ops in the correct order", (done) -> - rclient.lrange "DocOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_du.lrange Keys.docOps({doc_id: @doc_id}), 0, -1, (error, updates) => updates = (JSON.parse(u) for u in updates) for appliedUpdate, i in @updates appliedUpdate.op.should.deep.equal updates[i].op diff --git a/services/document-updater/test/acceptance/coffee/SettingADocumentTests.coffee b/services/document-updater/test/acceptance/coffee/SettingADocumentTests.coffee index 1a5d790be8..97fae5cf14 100644 --- a/services/document-updater/test/acceptance/coffee/SettingADocumentTests.coffee +++ b/services/document-updater/test/acceptance/coffee/SettingADocumentTests.coffee @@ -3,7 +3,8 @@ chai = require("chai") chai.should() expect = require("chai").expect Settings = require('settings-sharelatex') -rclient = require("redis-sharelatex").createClient(Settings.redis.web) +rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater) +Keys = Settings.redis.documentupdater.key_schema MockTrackChangesApi = require "./helpers/MockTrackChangesApi" MockWebApi = require "./helpers/MockWebApi" @@ -65,7 +66,7 @@ describe "Setting a document", -> done() it "should leave the document in redis", (done) -> - rclient.get "doclines:#{@doc_id}", (error, lines) => + rclient_du.get Keys.docLines({doc_id: @doc_id}), (error, lines) => throw error if error? expect(JSON.parse(lines)).to.deep.equal @newLines done() @@ -90,7 +91,7 @@ describe "Setting a document", -> MockTrackChangesApi.flushDoc.calledWith(@doc_id).should.equal true it "should remove the document from redis", (done) -> - rclient.get "doclines:#{@doc_id}", (error, lines) => + rclient_du.get Keys.docLines({doc_id: @doc_id}), (error, lines) => throw error if error? expect(lines).to.not.exist done() diff --git a/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee b/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee deleted file mode 100644 index 4a136baae1..0000000000 --- a/services/document-updater/test/unit/coffee/RedisBackend/RedisBackendTests.coffee +++ /dev/null @@ -1,504 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisBackend.js" -SandboxedModule = require('sandboxed-module') -RedisKeyBuilder = require "../../../../app/js/RedisKeyBuilder" - -describe "RedisBackend", -> - beforeEach -> - @Settings = - redis: - documentupdater: [{ - primary: true - port: "6379" - host: "localhost" - password: "single-password" - key_schema: - blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" - docLines: ({doc_id}) -> "doclines:#{doc_id}" - docOps: ({doc_id}) -> "DocOps:#{doc_id}" - docVersion: ({doc_id}) -> "DocVersion:#{doc_id}" - docHash: ({doc_id}) -> "DocHash:#{doc_id}" - projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" - pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" - docsInProject: ({project_id}) -> "DocsIn:#{project_id}" - }, { - cluster: [{ - port: "7000" - host: "localhost" - }] - password: "cluster-password" - key_schema: - blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" - docLines: ({doc_id}) -> "doclines:{#{doc_id}}" - docOps: ({doc_id}) -> "DocOps:{#{doc_id}}" - docVersion: ({doc_id}) -> "DocVersion:{#{doc_id}}" - docHash: ({doc_id}) -> "DocHash:{#{doc_id}}" - projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" - pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}" - docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" - }] - - test_context = @ - class Cluster - constructor: (@config) -> - test_context.rclient_ioredis = @ - - nodes: sinon.stub() - - @timer = timer = sinon.stub() - class Timer - constructor: (args...) -> timer(args...) - done: () -> - - @RedisBackend = SandboxedModule.require modulePath, requires: - "settings-sharelatex": @Settings - "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() } - "redis-sharelatex": @redis = - createClient: sinon.stub().returns @rclient_redis = {} - activeHealthCheck: sinon.stub() - "ioredis": @ioredis = - Cluster: Cluster - "metrics-sharelatex": - @Metrics = - inc: sinon.stub() - Timer: Timer - - @client = @RedisBackend.createClient() - - @doc_id = "mock-doc-id" - @project_id = "mock-project-id" - - it "should create a redis client", -> - @redis.createClient - .calledWith({ - port: "6379" - host: "localhost" - password: "single-password" - }) - .should.equal true - - it "should create an ioredis cluster client", -> - @rclient_ioredis.config.should.deep.equal [{ - port: "7000" - host: "localhost" - }] - - describe "individual commands", -> - describe "with the same results", -> - beforeEach (done) -> - @content = "bar" - @rclient_redis.get = sinon.stub() - @rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, @content) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, @content) - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) => - setTimeout () -> # Let all background requests complete - done(error) - - it "should return the result", -> - @result.should.equal @content - - it "should have called the redis client with the appropriate key", -> - @rclient_redis.get - .calledWith("doclines:#{@doc_id}") - .should.equal true - - it "should have called the ioredis cluster client with the appropriate key", -> - @rclient_ioredis.get - .calledWith("doclines:{#{@doc_id}}") - .should.equal true - - it "should send a metric", -> - @Metrics.inc - .calledWith("backend-match") - .should.equal true - - it "should time the commands", -> - @timer - .calledWith("redis.ioredis.get") - .should.equal true - @timer - .calledWith("redis.noderedis.get") - .should.equal true - - describe "with different results", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, "primary-result") - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, "secondary-result") - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) => - setTimeout () -> # Let all background requests complete - done(error) - - it "should return the primary result", -> - @result.should.equal "primary-result" - - it "should send a metric", -> - @Metrics.inc - .calledWith("backend-conflict") - .should.equal true - - describe "with differently ordered results from smembers", -> - beforeEach (done) -> - @rclient_redis.smembers = sinon.stub() - @rclient_redis.smembers.withArgs("DocsIn:#{@project_id}").yields(null, ["one", "two"]) - @rclient_ioredis.smembers = sinon.stub() - @rclient_ioredis.smembers.withArgs("DocsIn:{#{@project_id}}").yields(null, ["two", "one"]) - @client.smembers RedisKeyBuilder.docsInProject({project_id: @project_id}), (error, @result) => - setTimeout () -> # Let all background requests complete - done(error) - - it "should return the primary result", -> - @result.should.deep.equal ["one", "two"] - - it "should send a metric indicating a match", -> - @Metrics.inc - .calledWith("backend-match") - .should.equal true - - describe "when the secondary errors", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(null, "primary-result") - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(@error = new Error("oops")) - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) => - setTimeout () -> # Let all background requests complete - done(error) - - it "should return the primary result", -> - @result.should.equal "primary-result" - - it "should log out the secondary error", -> - @logger.error - .calledWith({ - err: @error - }, "error in redis backend") - .should.equal true - - describe "when the primary errors", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.get.withArgs("doclines:#{@doc_id}").yields(@error = new Error("oops")) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, "secondary-result") - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (@returned_error, @result) => - setTimeout () -> # Let all background requests complete - done() - - it "should return the error", -> - @returned_error.should.equal @error - - it "should log out the error", -> - @logger.error - .calledWith({ - err: @error - }, "error in redis backend") - .should.equal true - - describe "when the command has the key in a non-zero argument index", -> - beforeEach (done) -> - @script = "mock-script" - @key_count = 1 - @value = "mock-value" - @rclient_redis.eval = sinon.stub() - @rclient_redis.eval.withArgs(@script, @key_count, "Blocking:#{@doc_id}", @value).yields(null) - @rclient_ioredis.eval = sinon.stub() - @rclient_ioredis.eval.withArgs(@script, @key_count, "Blocking:{#{@doc_id}}", @value).yields(null, @content) - @client.eval @script, @key_count, RedisKeyBuilder.blockingKey({doc_id: @doc_id}), @value, (error) => - setTimeout () -> # Let all background requests complete - done(error) - - it "should have called the redis client with the appropriate key", -> - @rclient_redis.eval - .calledWith(@script, @key_count, "Blocking:#{@doc_id}", @value) - .should.equal true - - it "should have called the ioredis cluster client with the appropriate key", -> - @rclient_ioredis.eval - .calledWith(@script, @key_count, "Blocking:{#{@doc_id}}", @value) - .should.equal true - - describe "when the secondary takes longer than SECONDARY_TIMEOUT", -> - beforeEach (done) -> - @client.SECONDARY_TIMEOUT = 10 - @content = "bar" - @rclient_redis.get = (key, cb) => - key.should.equal "doclines:#{@doc_id}" - setTimeout () => - cb(null, @content) - , @client.SECONDARY_TIMEOUT * 3 # If the secondary errors first, don't affect the primary result - @rclient_ioredis.get = (key, cb) => - key.should.equal "doclines:{#{@doc_id}}" - setTimeout () => - cb(null, @content) - , @client.SECONDARY_TIMEOUT * 2 - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) => - done(error) - - it "should log out an error for the backend", -> - @logger.error - .calledWith({err: new Error("backend timed out")}, "backend timed out") - .should.equal true - - it "should return the primary result", -> - @result.should.equal @content - - describe "when the primary takes longer than SECONDARY_TIMEOUT", -> - beforeEach (done) -> - @client.SECONDARY_TIMEOUT = 10 - @content = "bar" - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.get.withArgs("doclines:{#{@doc_id}}").yields(null, @content) - @rclient_redis.get = (key, cb) => - key.should.equal "doclines:#{@doc_id}" - setTimeout () => - cb(null, @content) - , @client.SECONDARY_TIMEOUT * 2 - @client.get RedisKeyBuilder.docLines({doc_id: @doc_id}), (error, @result) => - done(error) - - it "should not consider this an error", -> - @logger.error - .called - .should.equal false - - describe "multi commands", -> - beforeEach -> - # We will test with: - # rclient.multi() - # .get("doclines:foo") - # .get("DocVersion:foo") - # .exec (...) -> - @doclines = "mock-doclines" - @version = "42" - @rclient_redis.multi = sinon.stub().returns @rclient_redis - @rclient_ioredis.multi = sinon.stub().returns @rclient_ioredis - - describe "with the same results", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version]) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = sinon.stub().yields(null, [ [null, @doclines], [null, @version] ]) - - multi = @client.multi() - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (error, @result) => - setTimeout () -> - done(error) - - it "should return the result", -> - @result.should.deep.equal [@doclines, @version] - - it "should have called the redis client with the appropriate keys", -> - @rclient_redis.get - .calledWith("doclines:#{@doc_id}") - .should.equal true - @rclient_redis.get - .calledWith("DocVersion:#{@doc_id}") - .should.equal true - @rclient_ioredis.exec - .called - .should.equal true - - it "should have called the ioredis cluster client with the appropriate keys", -> - @rclient_ioredis.get - .calledWith("doclines:{#{@doc_id}}") - .should.equal true - @rclient_ioredis.get - .calledWith("DocVersion:{#{@doc_id}}") - .should.equal true - @rclient_ioredis.exec - .called - .should.equal true - - it "should send a metric", -> - @Metrics.inc - .calledWith("backend-match") - .should.equal true - - it "should time the exec", -> - @timer - .calledWith("redis.ioredis.exec") - .should.equal true - @timer - .calledWith("redis.noderedis.exec") - .should.equal true - - describe "with different results", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version]) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = sinon.stub().yields(null, [ [null, "different-doc-lines"], [null, @version] ]) - - multi = @client.multi() - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (error, @result) => - setTimeout () -> - done(error) - - it "should return the primary result", -> - @result.should.deep.equal [@doclines, @version] - - it "should send a metric", -> - @Metrics.inc - .calledWith("backend-conflict") - .should.equal true - - describe "when the secondary errors", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = sinon.stub().yields(null, [@doclines, @version]) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = sinon.stub().yields(@error = new Error("oops")) - - multi = @client.multi() - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (error, @result) => - setTimeout () -> - done(error) - - it "should return the primary result", -> - @result.should.deep.equal [@doclines, @version] - - describe "when the primary errors", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = sinon.stub().yields(@error = new Error("oops")) - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = sinon.stub().yields([ [null, @doclines], [null, @version] ]) - - multi = @client.multi() - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (@returned_error) => - setTimeout () -> done() - - it "should return the error", -> - @returned_error.should.equal @error - - describe "when the secondary takes longer than SECONDARY_TIMEOUT", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = (cb) => - setTimeout () => - cb(null, [@doclines, @version]) - , 30 # If secondary errors first, don't affect the primary result - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = (cb) => - setTimeout () => - cb(null, [ [null, @doclines], [null, @version] ]) - , 20 - - multi = @client.multi() - multi.SECONDARY_TIMEOUT = 10 - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (error, @result) => - done(error) - - it "should log out an error for the backend", -> - @logger.error - .calledWith({err: new Error("backend timed out")}, "backend timed out") - .should.equal true - - it "should return the primary result", -> - @result.should.deep.equal [@doclines, @version] - - describe "when the primary takes longer than SECONDARY_TIMEOUT", -> - beforeEach (done) -> - @rclient_redis.get = sinon.stub() - @rclient_redis.exec = (cb) => - setTimeout () => - cb(null, [@doclines, @version]) - , 20 - @rclient_ioredis.get = sinon.stub() - @rclient_ioredis.exec = sinon.stub().yields(null, [ [null, @doclines], [null, @version] ]) - - multi = @client.multi() - multi.SECONDARY_TIMEOUT = 10 - multi.get RedisKeyBuilder.docLines({doc_id: @doc_id}) - multi.get RedisKeyBuilder.docVersion({doc_id: @doc_id}) - multi.exec (error, @result) => - done(error) - - it "should not consider this an error", -> - @logger.error - .called - .should.equal false - - describe "_healthCheckNodeRedisClient", -> - beforeEach -> - @redis.activeHealthCheckRedis = sinon.stub().returns @healthCheck = { - isAlive: sinon.stub() - } - - describe "successfully", -> - beforeEach (done) -> - @healthCheck.isAlive.returns true - @redis_client = {} - @client._healthCheckNodeRedisClient(@redis_client, done) - - it "should check the status of the node redis client", -> - @healthCheck.isAlive.called.should.equal true - - it "should only create one health check when called multiple times", (done) -> - @client._healthCheckNodeRedisClient @redis_client, () => - @redis.activeHealthCheckRedis.calledOnce.should.equal true - @healthCheck.isAlive.calledTwice.should.equal true - done() - - describe "when failing", -> - beforeEach -> - @healthCheck.isAlive.returns false - @redis_client = {} - - it "should return an error", (done) -> - @client._healthCheckNodeRedisClient @redis_client, (error) -> - error.message.should.equal "node-redis client failed health check" - done() - - describe "_healthCheckClusterClient", -> - beforeEach -> - @client.HEARTBEAT_TIMEOUT = 10 - @nodes = [{ - options: key: "node-0" - stream: destroy: sinon.stub() - }, { - options: key: "node-1" - stream: destroy: sinon.stub() - }] - @rclient_ioredis.nodes = sinon.stub().returns(@nodes) - - describe "when both clients are successful", -> - beforeEach (done) -> - @nodes[0].ping = sinon.stub().yields() - @nodes[1].ping = sinon.stub().yields() - @client._healthCheckClusterClient({ rclient: @rclient_ioredis }, done) - - it "should get all cluster nodes", -> - @rclient_ioredis.nodes - .calledWith("all") - .should.equal true - - it "should ping each cluster node", -> - for node in @nodes - node.ping.called.should.equal true - - describe "when ping fails to a node", -> - beforeEach -> - @nodes[0].ping = (cb) -> cb() - @nodes[1].ping = (cb) -> # Just hang - - it "should return an error", (done) -> - @client._healthCheckClusterClient { rclient: @rclient_ioredis }, (error) -> - error.message.should.equal "ioredis node ping check timed out" - done() diff --git a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee index 258603be9b..070abd859a 100644 --- a/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RedisManager/RedisManagerTests.coffee @@ -14,20 +14,24 @@ describe "RedisManager", -> @rclient.multi = () => @rclient @RedisManager = SandboxedModule.require modulePath, requires: - "./RedisBackend": - createClient: () => @rclient - "./RedisKeyBuilder": - blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" - docLines: ({doc_id}) -> "doclines:#{doc_id}" - docOps: ({doc_id}) -> "DocOps:#{doc_id}" - docVersion: ({doc_id}) -> "DocVersion:#{doc_id}" - docHash: ({doc_id}) -> "DocHash:#{doc_id}" - projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" - pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" - docsInProject: ({project_id}) -> "DocsIn:#{project_id}" - ranges: ({doc_id}) -> "Ranges:#{doc_id}" "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() } - "settings-sharelatex": {documentupdater: {logHashErrors: {write:true, read:true}}} + "settings-sharelatex": { + documentupdater: {logHashErrors: {write:true, read:true}} + redis: + documentupdater: + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + docLines: ({doc_id}) -> "doclines:#{doc_id}" + docOps: ({doc_id}) -> "DocOps:#{doc_id}" + docVersion: ({doc_id}) -> "DocVersion:#{doc_id}" + docHash: ({doc_id}) -> "DocHash:#{doc_id}" + projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" + pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" + docsInProject: ({project_id}) -> "DocsIn:#{project_id}" + ranges: ({doc_id}) -> "Ranges:#{doc_id}" + } + "redis-sharelatex": + createClient: () => @rclient "./Metrics": @metrics = inc: sinon.stub() Timer: class Timer diff --git a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee index f3f0d8afdc..a0f88b33f1 100644 --- a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee @@ -14,6 +14,7 @@ describe "WebRedisManager", -> @WebRedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => @rclient "settings-sharelatex": redis: web: @settings = {"mock": "settings"} + "logger-sharelatex": { log: () -> } @doc_id = "doc-id-123" @project_id = "project-id-123" @callback = sinon.stub() From c5449ae282efccb32220b03161647617d7370c08 Mon Sep 17 00:00:00 2001 From: James Allen Date: Thu, 13 Apr 2017 17:00:42 +0100 Subject: [PATCH 2/8] Split out redis config for real-time and track-changes into separate cluster-compatible configs --- .../app/coffee/HistoryManager.coffee | 4 +- .../app/coffee/HistoryRedisManager.coffee | 20 ++++++ .../app/coffee/WebRedisManager.coffee | 24 ++----- .../config/settings.defaults.coffee | 11 +++ .../HistoryManager/HistoryManagerTests.coffee | 18 ++--- .../HistoryRedisManagerTests.coffee | 69 +++++++++++++++++++ .../WebRedisManagerTests.coffee | 50 ++------------ 7 files changed, 121 insertions(+), 75 deletions(-) create mode 100644 services/document-updater/app/coffee/HistoryRedisManager.coffee create mode 100644 services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee diff --git a/services/document-updater/app/coffee/HistoryManager.coffee b/services/document-updater/app/coffee/HistoryManager.coffee index 637fd2cb5f..512fd5e68b 100644 --- a/services/document-updater/app/coffee/HistoryManager.coffee +++ b/services/document-updater/app/coffee/HistoryManager.coffee @@ -2,7 +2,7 @@ settings = require "settings-sharelatex" request = require "request" logger = require "logger-sharelatex" async = require "async" -WebRedisManager = require "./WebRedisManager" +HistoryRedisManager = require "./HistoryRedisManager" module.exports = HistoryManager = flushDocChanges: (project_id, doc_id, callback = (error) ->) -> @@ -25,7 +25,7 @@ module.exports = HistoryManager = pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error) ->) -> if ops.length == 0 return callback() - WebRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> + HistoryRedisManager.pushUncompressedHistoryOps project_id, doc_id, ops, (error, length) -> return callback(error) if error? # We want to flush every 50 ops, i.e. 50, 100, 150, etc # Find out which 'block' (i.e. 0-49, 50-99) we were in before and after pushing these diff --git a/services/document-updater/app/coffee/HistoryRedisManager.coffee b/services/document-updater/app/coffee/HistoryRedisManager.coffee new file mode 100644 index 0000000000..315d9daabf --- /dev/null +++ b/services/document-updater/app/coffee/HistoryRedisManager.coffee @@ -0,0 +1,20 @@ +Settings = require('settings-sharelatex') +rclient = require("redis-sharelatex").createClient(Settings.redis.history) +Keys = Settings.redis.history.key_schema +async = require "async" +logger = require('logger-sharelatex') + +module.exports = HistoryRedisManager = + pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> + if ops.length == 0 + return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush + opVersions = ops.map (op) -> op?.v + logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops" + jsonOps = ops.map (op) -> JSON.stringify op + async.parallel [ + (cb) -> rclient.rpush Keys.uncompressedHistoryOps({doc_id}), jsonOps..., cb + (cb) -> rclient.sadd Keys.docsWithHistoryOps({project_id}), doc_id, cb + ], (error, results) -> + return callback(error) if error? + [length, _] = results + callback(error, length) \ No newline at end of file diff --git a/services/document-updater/app/coffee/WebRedisManager.coffee b/services/document-updater/app/coffee/WebRedisManager.coffee index f500c62daf..6b7326d73a 100644 --- a/services/document-updater/app/coffee/WebRedisManager.coffee +++ b/services/document-updater/app/coffee/WebRedisManager.coffee @@ -1,13 +1,13 @@ Settings = require('settings-sharelatex') rclient = require("redis-sharelatex").createClient(Settings.redis.web) -async = require "async" +Keys = Settings.redis.web.key_schema logger = require('logger-sharelatex') module.exports = WebRedisManager = getPendingUpdatesForDoc : (doc_id, callback)-> multi = rclient.multi() - multi.lrange "PendingUpdates:#{doc_id}", 0 , -1 - multi.del "PendingUpdates:#{doc_id}" + multi.lrange Keys.pendingUpdates({doc_id}), 0 , -1 + multi.del Keys.pendingUpdates({doc_id}) multi.exec (error, replys) -> return callback(error) if error? jsonUpdates = replys[0] @@ -21,21 +21,7 @@ module.exports = WebRedisManager = callback error, updates getUpdatesLength: (doc_id, callback)-> - rclient.llen "PendingUpdates:#{doc_id}", callback + rclient.llen Keys.pendingUpdates({doc_id}), callback - pushUncompressedHistoryOps: (project_id, doc_id, ops = [], callback = (error, length) ->) -> - if ops.length == 0 - return callback(new Error("cannot push no ops")) # This should never be called with no ops, but protect against a redis error if we sent an empty array to rpush - opVersions = ops.map (op) -> op?.v - logger.log project_id: project_id, doc_id: doc_id, op_versions: opVersions, "pushing uncompressed history ops" - jsonOps = ops.map (op) -> JSON.stringify op - async.parallel [ - (cb) -> rclient.rpush "UncompressedHistoryOps:#{doc_id}", jsonOps..., cb - (cb) -> rclient.sadd "DocsWithHistoryOps:#{project_id}", doc_id, cb - ], (error, results) -> - return callback(error) if error? - [length, _] = results - callback(error, length) - sendData: (data) -> - rclient.publish "applied-ops", JSON.stringify(data) \ No newline at end of file + rclient.publish "applied-ops", JSON.stringify(data) diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index d638329622..973cfca970 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -20,6 +20,8 @@ module.exports = port:"6379" host:"localhost" password:"" + key_schema: + pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" documentupdater: port: "6379" host: "localhost" @@ -33,6 +35,15 @@ module.exports = projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" + history: + port:"6379" + host:"localhost" + password:"" + key_schema: + uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" + docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" + + # cluster: [{ # port: "7000" # host: "localhost" diff --git a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee index c33a18d4e6..66ffd98e80 100644 --- a/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/HistoryManager/HistoryManagerTests.coffee @@ -9,7 +9,7 @@ describe "HistoryManager", -> "request": @request = {} "settings-sharelatex": @Settings = {} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } - "./WebRedisManager": @WebRedisManager = {} + "./HistoryRedisManager": @HistoryRedisManager = {} @project_id = "mock-project-id" @doc_id = "mock-doc-id" @callback = sinon.stub() @@ -47,11 +47,11 @@ describe "HistoryManager", -> describe "pushing the op", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) + @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback it "should push the ops into redis", -> - @WebRedisManager.pushUncompressedHistoryOps + @HistoryRedisManager.pushUncompressedHistoryOps .calledWith(@project_id, @doc_id, @ops) .should.equal true @@ -63,7 +63,7 @@ describe "HistoryManager", -> describe "when we hit a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -75,7 +75,7 @@ describe "HistoryManager", -> describe "when we go over a multiple of FLUSH_EVERY_N_OPS ops", -> beforeEach -> @ops = ["op1", "op2", "op3"] - @WebRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS + 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -86,7 +86,7 @@ describe "HistoryManager", -> describe "when HistoryManager errors", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOps = + @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 2 * @HistoryManager.FLUSH_EVERY_N_OPS) @HistoryManager.flushDocChanges = sinon.stub().callsArgWith(2, @error = new Error("oops")) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, @callback @@ -103,10 +103,10 @@ describe "HistoryManager", -> describe "with no ops", -> beforeEach -> - @WebRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) + @HistoryRedisManager.pushUncompressedHistoryOps = sinon.stub().callsArgWith(3, null, 1) @HistoryManager.pushUncompressedHistoryOps @project_id, @doc_id, [], @callback - it "should not call WebRedisManager.pushUncompressedHistoryOps", -> - @WebRedisManager.pushUncompressedHistoryOps.called.should.equal false + it "should not call HistoryRedisManager.pushUncompressedHistoryOps", -> + @HistoryRedisManager.pushUncompressedHistoryOps.called.should.equal false diff --git a/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee new file mode 100644 index 0000000000..f51942c1e1 --- /dev/null +++ b/services/document-updater/test/unit/coffee/HistoryRedisManager/HistoryRedisManagerTests.coffee @@ -0,0 +1,69 @@ +sinon = require('sinon') +chai = require('chai') +should = chai.should() +modulePath = "../../../../app/js/HistoryRedisManager.js" +SandboxedModule = require('sandboxed-module') +Errors = require "../../../../app/js/Errors" + +describe "HistoryRedisManager", -> + beforeEach -> + @rclient = + auth: () -> + exec: sinon.stub() + @rclient.multi = () => @rclient + @HistoryRedisManager = SandboxedModule.require modulePath, requires: + "redis-sharelatex": createClient: () => @rclient + "settings-sharelatex": + redis: + history: @settings = + key_schema: + uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" + docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" + "logger-sharelatex": { log: () -> } + @doc_id = "doc-id-123" + @project_id = "project-id-123" + @callback = sinon.stub() + + describe "pushUncompressedHistoryOps", -> + beforeEach -> + @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] + @rclient.rpush = sinon.stub().yields(null, @length = 42) + @rclient.sadd = sinon.stub().yields() + + describe "with ops", -> + beforeEach (done) -> + @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) => + @callback(args...) + done() + + it "should push the doc op into the doc ops list as JSON", -> + @rclient.rpush + .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) + .should.equal true + + it "should add the doc_id to the set of which records the project docs", -> + @rclient.sadd + .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) + .should.equal true + + it "should call the callback with the length", -> + @callback.calledWith(null, @length).should.equal true + + describe "with no ops", -> + beforeEach (done) -> + @HistoryRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) => + @callback(args...) + done() + + it "should not push the doc op into the doc ops list as JSON", -> + @rclient.rpush + .called + .should.equal false + + it "should not add the doc_id to the set of which records the project docs", -> + @rclient.sadd + .called + .should.equal false + + it "should call the callback with an error", -> + @callback.calledWith(new Error("cannot push no ops")).should.equal true diff --git a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee index a0f88b33f1..e1fb89eaed 100644 --- a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee @@ -13,7 +13,11 @@ describe "WebRedisManager", -> @rclient.multi = () => @rclient @WebRedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => @rclient - "settings-sharelatex": redis: web: @settings = {"mock": "settings"} + "settings-sharelatex": + redis: + web: @settings = + key_schema: + pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" "logger-sharelatex": { log: () -> } @doc_id = "doc-id-123" @project_id = "project-id-123" @@ -70,47 +74,3 @@ describe "WebRedisManager", -> it "should return the length", -> @callback.calledWith(null, @length).should.equal true - - describe "pushUncompressedHistoryOps", -> - beforeEach -> - @ops = [{ op: [{ i: "foo", p: 4 }] },{ op: [{ i: "bar", p: 56 }] }] - @rclient.rpush = sinon.stub().yields(null, @length = 42) - @rclient.sadd = sinon.stub().yields() - - describe "with ops", -> - beforeEach (done) -> - @WebRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, @ops, (args...) => - @callback(args...) - done() - - it "should push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .calledWith("UncompressedHistoryOps:#{@doc_id}", JSON.stringify(@ops[0]), JSON.stringify(@ops[1])) - .should.equal true - - it "should add the doc_id to the set of which records the project docs", -> - @rclient.sadd - .calledWith("DocsWithHistoryOps:#{@project_id}", @doc_id) - .should.equal true - - it "should call the callback with the length", -> - @callback.calledWith(null, @length).should.equal true - - describe "with no ops", -> - beforeEach (done) -> - @WebRedisManager.pushUncompressedHistoryOps @project_id, @doc_id, [], (args...) => - @callback(args...) - done() - - it "should not push the doc op into the doc ops list as JSON", -> - @rclient.rpush - .called - .should.equal false - - it "should not add the doc_id to the set of which records the project docs", -> - @rclient.sadd - .called - .should.equal false - - it "should call the callback with an error", -> - @callback.calledWith(new Error("cannot push no ops")).should.equal true From dc77bc207d92e90639ad5bceaf998775e240488f Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 24 Apr 2017 16:31:23 +0100 Subject: [PATCH 3/8] change acceptance test to use redis history client --- .../coffee/ApplyingUpdatesToADocTests.coffee | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index b0cca1d18b..abb10cc95a 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -4,7 +4,7 @@ chai.should() expect = chai.expect async = require "async" Settings = require('settings-sharelatex') -rclient_web = require("redis-sharelatex").createClient(Settings.redis.web) +rclient_history = require("redis-sharelatex").createClient(Settings.redis.history) rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater) Keys = Settings.redis.documentupdater.key_schema @@ -49,10 +49,10 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => throw error if error? JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => throw error if error? result.should.equal 1 done() @@ -82,9 +82,9 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => result.should.equal 1 done() @@ -127,12 +127,12 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_web.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => updates = (JSON.parse(u) for u in updates) for appliedUpdate, i in @updates appliedUpdate.op.should.deep.equal updates[i].op - rclient_web.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => result.should.equal 1 done() From 64aef0b55a7884e0e12d17ba010759f15b3a1fc4 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 27 Apr 2017 10:42:43 +0100 Subject: [PATCH 4/8] fix acceptance test to work with redis cluster too --- .../coffee/ApplyingUpdatesToADocTests.coffee | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee index abb10cc95a..d06119c690 100644 --- a/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee +++ b/services/document-updater/test/acceptance/coffee/ApplyingUpdatesToADocTests.coffee @@ -7,6 +7,7 @@ Settings = require('settings-sharelatex') rclient_history = require("redis-sharelatex").createClient(Settings.redis.history) rclient_du = require("redis-sharelatex").createClient(Settings.redis.documentupdater) Keys = Settings.redis.documentupdater.key_schema +HistoryKeys = Settings.redis.history.key_schema MockTrackChangesApi = require "./helpers/MockTrackChangesApi" MockWebApi = require "./helpers/MockWebApi" @@ -49,10 +50,10 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) => throw error if error? JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) => throw error if error? result.should.equal 1 done() @@ -82,9 +83,9 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) => JSON.parse(updates[0]).op.should.deep.equal @update.op - rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) => result.should.equal 1 done() @@ -127,12 +128,12 @@ describe "Applying updates to a doc", -> done() it "should push the applied updates to the track changes api", (done) -> - rclient_history.lrange "UncompressedHistoryOps:#{@doc_id}", 0, -1, (error, updates) => + rclient_history.lrange HistoryKeys.uncompressedHistoryOps({@doc_id}), 0, -1, (error, updates) => updates = (JSON.parse(u) for u in updates) for appliedUpdate, i in @updates appliedUpdate.op.should.deep.equal updates[i].op - rclient_history.sismember "DocsWithHistoryOps:#{@project_id}", @doc_id, (error, result) => + rclient_history.sismember HistoryKeys.docsWithHistoryOps({@project_id}), @doc_id, (error, result) => result.should.equal 1 done() From ed493d8ad3ec6b355d0a1c1ecb4c9bb25f548ea6 Mon Sep 17 00:00:00 2001 From: James Allen Date: Tue, 2 May 2017 15:38:33 +0100 Subject: [PATCH 5/8] Rename web -> realtime for consistency with realtime --- services/document-updater/app.coffee | 2 +- .../app/coffee/DocumentManager.coffee | 2 +- ...ager.coffee => RealTimeRedisManager.coffee} | 6 +++--- .../app/coffee/ShareJsUpdateManager.coffee | 4 ++-- .../app/coffee/UpdateManager.coffee | 8 ++++---- .../config/settings.defaults.coffee | 2 +- .../DocumentManagerTests.coffee | 2 +- .../RealTimeRedisManagerTests.coffee} | 14 +++++++------- .../ShareJsUpdateManagerTests.coffee | 6 +++--- .../UpdateManager/UpdateManagerTests.coffee | 18 +++++++++--------- 10 files changed, 32 insertions(+), 32 deletions(-) rename services/document-updater/app/coffee/{WebRedisManager.coffee => RealTimeRedisManager.coffee} (88%) rename services/document-updater/test/unit/coffee/{WebRedisManager/WebRedisManagerTests.coffee => RealTimeRedisManager/RealTimeRedisManagerTests.coffee} (83%) diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index 31e8ebb3b3..eb0ea771aa 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -62,7 +62,7 @@ app.get '/status', (req, res)-> else res.send('document updater is alive') -webRedisClient = require("redis-sharelatex").createClient(Settings.redis.web) +webRedisClient = require("redis-sharelatex").createClient(Settings.redis.realtime) app.get "/health_check/redis", (req, res, next) -> webRedisClient.healthCheck (error) -> if error? diff --git a/services/document-updater/app/coffee/DocumentManager.coffee b/services/document-updater/app/coffee/DocumentManager.coffee index be47ec4c8c..c155de58fe 100644 --- a/services/document-updater/app/coffee/DocumentManager.coffee +++ b/services/document-updater/app/coffee/DocumentManager.coffee @@ -4,7 +4,7 @@ DiffCodec = require "./DiffCodec" logger = require "logger-sharelatex" Metrics = require "./Metrics" HistoryManager = require "./HistoryManager" -WebRedisManager = require "./WebRedisManager" +RealTimeRedisManager = require "./RealTimeRedisManager" Errors = require "./Errors" RangesManager = require "./RangesManager" diff --git a/services/document-updater/app/coffee/WebRedisManager.coffee b/services/document-updater/app/coffee/RealTimeRedisManager.coffee similarity index 88% rename from services/document-updater/app/coffee/WebRedisManager.coffee rename to services/document-updater/app/coffee/RealTimeRedisManager.coffee index 6b7326d73a..197a4708c1 100644 --- a/services/document-updater/app/coffee/WebRedisManager.coffee +++ b/services/document-updater/app/coffee/RealTimeRedisManager.coffee @@ -1,9 +1,9 @@ Settings = require('settings-sharelatex') -rclient = require("redis-sharelatex").createClient(Settings.redis.web) -Keys = Settings.redis.web.key_schema +rclient = require("redis-sharelatex").createClient(Settings.redis.realtime) +Keys = Settings.redis.realtime.key_schema logger = require('logger-sharelatex') -module.exports = WebRedisManager = +module.exports = RealTimeRedisManager = getPendingUpdatesForDoc : (doc_id, callback)-> multi = rclient.multi() multi.lrange Keys.pendingUpdates({doc_id}), 0 , -1 diff --git a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee index f175796467..3ec90e4f62 100644 --- a/services/document-updater/app/coffee/ShareJsUpdateManager.coffee +++ b/services/document-updater/app/coffee/ShareJsUpdateManager.coffee @@ -6,7 +6,7 @@ Settings = require('settings-sharelatex') Keys = require "./UpdateKeys" {EventEmitter} = require "events" util = require "util" -WebRedisManager = require "./WebRedisManager" +RealTimeRedisManager = require "./RealTimeRedisManager" ShareJsModel:: = {} util.inherits ShareJsModel, EventEmitter @@ -52,5 +52,5 @@ module.exports = ShareJsUpdateManager = ShareJsUpdateManager._sendOp(project_id, doc_id, opData) _sendOp: (project_id, doc_id, op) -> - WebRedisManager.sendData {project_id, doc_id, op} + RealTimeRedisManager.sendData {project_id, doc_id, op} diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index b6a5f98c4c..5022f6bb38 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -1,6 +1,6 @@ LockManager = require "./LockManager" RedisManager = require "./RedisManager" -WebRedisManager = require "./WebRedisManager" +RealTimeRedisManager = require "./RealTimeRedisManager" ShareJsUpdateManager = require "./ShareJsUpdateManager" HistoryManager = require "./HistoryManager" Settings = require('settings-sharelatex') @@ -30,7 +30,7 @@ module.exports = UpdateManager = UpdateManager.continueProcessingUpdatesWithLock project_id, doc_id, callback continueProcessingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> - WebRedisManager.getUpdatesLength doc_id, (error, length) => + RealTimeRedisManager.getUpdatesLength doc_id, (error, length) => return callback(error) if error? if length > 0 UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, callback @@ -38,7 +38,7 @@ module.exports = UpdateManager = callback() fetchAndApplyUpdates: (project_id, doc_id, callback = (error) ->) -> - WebRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) => + RealTimeRedisManager.getPendingUpdatesForDoc doc_id, (error, updates) => return callback(error) if error? if updates.length == 0 return callback() @@ -49,7 +49,7 @@ module.exports = UpdateManager = applyUpdate: (project_id, doc_id, update, _callback = (error) ->) -> callback = (error) -> if error? - WebRedisManager.sendData {project_id, doc_id, error: error.message || error} + RealTimeRedisManager.sendData {project_id, doc_id, error: error.message || error} _callback(error) UpdateManager._sanitizeUpdate update diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index 973cfca970..ae5ff0522b 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -16,7 +16,7 @@ module.exports = url: "http://localhost:3015" redis: - web: + realtime: port:"6379" host:"localhost" password:"" diff --git a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee index 47fbde021b..e781546ec6 100644 --- a/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/DocumentManager/DocumentManagerTests.coffee @@ -16,7 +16,7 @@ describe "DocumentManager", -> "./Metrics": @Metrics = Timer: class Timer done: sinon.stub() - "./WebRedisManager": @WebRedisManager = {} + "./RealTimeRedisManager": @RealTimeRedisManager = {} "./DiffCodec": @DiffCodec = {} "./UpdateManager": @UpdateManager = {} "./RangesManager": @RangesManager = {} diff --git a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee b/services/document-updater/test/unit/coffee/RealTimeRedisManager/RealTimeRedisManagerTests.coffee similarity index 83% rename from services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee rename to services/document-updater/test/unit/coffee/RealTimeRedisManager/RealTimeRedisManagerTests.coffee index e1fb89eaed..b6aa35ac72 100644 --- a/services/document-updater/test/unit/coffee/WebRedisManager/WebRedisManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/RealTimeRedisManager/RealTimeRedisManagerTests.coffee @@ -1,21 +1,21 @@ sinon = require('sinon') chai = require('chai') should = chai.should() -modulePath = "../../../../app/js/WebRedisManager.js" +modulePath = "../../../../app/js/RealTimeRedisManager.js" SandboxedModule = require('sandboxed-module') Errors = require "../../../../app/js/Errors" -describe "WebRedisManager", -> +describe "RealTimeRedisManager", -> beforeEach -> @rclient = auth: () -> exec: sinon.stub() @rclient.multi = () => @rclient - @WebRedisManager = SandboxedModule.require modulePath, requires: + @RealTimeRedisManager = SandboxedModule.require modulePath, requires: "redis-sharelatex": createClient: () => @rclient "settings-sharelatex": redis: - web: @settings = + realtime: @settings = key_schema: pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" "logger-sharelatex": { log: () -> } @@ -36,7 +36,7 @@ describe "WebRedisManager", -> ] @jsonUpdates = @updates.map (update) -> JSON.stringify update @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) - @WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback + @RealTimeRedisManager.getPendingUpdatesForDoc @doc_id, @callback it "should get the pending updates", -> @rclient.lrange @@ -58,7 +58,7 @@ describe "WebRedisManager", -> "broken json" ] @rclient.exec = sinon.stub().callsArgWith(0, null, [@jsonUpdates]) - @WebRedisManager.getPendingUpdatesForDoc @doc_id, @callback + @RealTimeRedisManager.getPendingUpdatesForDoc @doc_id, @callback it "should return an error to the callback", -> @callback.calledWith(new Error("JSON parse error")).should.equal true @@ -67,7 +67,7 @@ describe "WebRedisManager", -> describe "getUpdatesLength", -> beforeEach -> @rclient.llen = sinon.stub().yields(null, @length = 3) - @WebRedisManager.getUpdatesLength @doc_id, @callback + @RealTimeRedisManager.getUpdatesLength @doc_id, @callback it "should look up the length", -> @rclient.llen.calledWith("PendingUpdates:#{@doc_id}").should.equal true diff --git a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee index 42ba3f331b..b7364b00a4 100644 --- a/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/ShareJsUpdateManager/ShareJsUpdateManagerTests.coffee @@ -17,7 +17,7 @@ describe "ShareJsUpdateManager", -> "./ShareJsDB" : @ShareJsDB = { mockDB: true } "redis-sharelatex" : createClient: () => @rclient = auth:-> "logger-sharelatex": @logger = { log: sinon.stub() } - "./WebRedisManager": @WebRedisManager = {} + "./RealTimeRedisManager": @RealTimeRedisManager = {} globals: clearTimeout: @clearTimeout = sinon.stub() @@ -105,11 +105,11 @@ describe "ShareJsUpdateManager", -> @opData = op: {t: "foo", p: 1} meta: source: "bar" - @WebRedisManager.sendData = sinon.stub() + @RealTimeRedisManager.sendData = sinon.stub() @callback("#{@project_id}:#{@doc_id}", @opData) it "should publish the op to redis", -> - @WebRedisManager.sendData + @RealTimeRedisManager.sendData .calledWith({project_id: @project_id, doc_id: @doc_id, op: @opData}) .should.equal true diff --git a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee index fbf9b21ddc..3e659ee078 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/UpdateManagerTests.coffee @@ -12,7 +12,7 @@ describe "UpdateManager", -> @UpdateManager = SandboxedModule.require modulePath, requires: "./LockManager" : @LockManager = {} "./RedisManager" : @RedisManager = {} - "./WebRedisManager" : @WebRedisManager = {} + "./RealTimeRedisManager" : @RealTimeRedisManager = {} "./ShareJsUpdateManager" : @ShareJsUpdateManager = {} "./HistoryManager" : @HistoryManager = {} "logger-sharelatex": @logger = { log: sinon.stub() } @@ -94,7 +94,7 @@ describe "UpdateManager", -> describe "continueProcessingUpdatesWithLock", -> describe "when there are outstanding updates", -> beforeEach -> - @WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3) + @RealTimeRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 3) @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) @UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback @@ -106,7 +106,7 @@ describe "UpdateManager", -> describe "when there are no outstanding updates", -> beforeEach -> - @WebRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0) + @RealTimeRedisManager.getUpdatesLength = sinon.stub().callsArgWith(1, null, 0) @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) @UpdateManager.continueProcessingUpdatesWithLock @project_id, @doc_id, @callback @@ -122,12 +122,12 @@ describe "UpdateManager", -> @updates = [{p: 1, t: "foo"}] @updatedDocLines = ["updated", "lines"] @version = 34 - @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) + @RealTimeRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) @UpdateManager.applyUpdate = sinon.stub().callsArgWith(3, null, @updatedDocLines, @version) @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback it "should get the pending updates", -> - @WebRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true + @RealTimeRedisManager.getPendingUpdatesForDoc.calledWith(@doc_id).should.equal true it "should apply the updates", -> for update in @updates @@ -141,7 +141,7 @@ describe "UpdateManager", -> describe "when there are no updates", -> beforeEach -> @updates = [] - @WebRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) + @RealTimeRedisManager.getPendingUpdatesForDoc = sinon.stub().callsArgWith(1, null, @updates) @UpdateManager.applyUpdate = sinon.stub() @RedisManager.setDocument = sinon.stub() @UpdateManager.fetchAndApplyUpdates @project_id, @doc_id, @callback @@ -165,7 +165,7 @@ describe "UpdateManager", -> @RangesManager.applyUpdate = sinon.stub().yields(null, @updated_ranges) @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(null, @updatedDocLines, @version, @appliedOps) @RedisManager.updateDocument = sinon.stub().yields() - @WebRedisManager.sendData = sinon.stub() + @RealTimeRedisManager.sendData = sinon.stub() @HistoryManager.pushUncompressedHistoryOps = sinon.stub().callsArg(3) describe "normally", -> @@ -214,8 +214,8 @@ describe "UpdateManager", -> @ShareJsUpdateManager.applyUpdate = sinon.stub().yields(@error) @UpdateManager.applyUpdate @project_id, @doc_id, @update, @callback - it "should call WebRedisManager.sendData with the error", -> - @WebRedisManager.sendData + it "should call RealTimeRedisManager.sendData with the error", -> + @RealTimeRedisManager.sendData .calledWith({ project_id: @project_id, doc_id: @doc_id, From 4104ca48894d0b064939ff285e101a435448c027 Mon Sep 17 00:00:00 2001 From: James Allen Date: Wed, 3 May 2017 16:27:32 +0100 Subject: [PATCH 6/8] Add in separate redis config for the lock and fix a few web -> realtime --- services/document-updater/app.coffee | 4 -- .../app/coffee/DispatchManager.coffee | 2 +- .../app/coffee/LockManager.coffee | 6 +-- .../config/settings.defaults.coffee | 41 +++++++++++++++---- .../coffee/helpers/DocUpdaterClient.coffee | 7 ++-- .../DispatchManagerTests.coffee | 2 +- 6 files changed, 40 insertions(+), 22 deletions(-) diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index eb0ea771aa..d036beda55 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -11,10 +11,6 @@ DispatchManager = require('./app/js/DispatchManager') Errors = require "./app/js/Errors" HttpController = require "./app/js/HttpController" -redis = require("redis-sharelatex") -rclient = redis.createClient(Settings.redis.web) - - Path = require "path" Metrics = require "metrics-sharelatex" Metrics.initialize("doc-updater") diff --git a/services/document-updater/app/coffee/DispatchManager.coffee b/services/document-updater/app/coffee/DispatchManager.coffee index 28397185dc..b7e50291b0 100644 --- a/services/document-updater/app/coffee/DispatchManager.coffee +++ b/services/document-updater/app/coffee/DispatchManager.coffee @@ -8,7 +8,7 @@ Metrics = require('./Metrics') module.exports = DispatchManager = createDispatcher: () -> - client = redis.createClient(Settings.redis.web) + client = redis.createClient(Settings.redis.realtime) worker = { client: client _waitForUpdateThenDispatchWorker: (callback = (error) ->) -> diff --git a/services/document-updater/app/coffee/LockManager.coffee b/services/document-updater/app/coffee/LockManager.coffee index 289075bca9..d237b51feb 100644 --- a/services/document-updater/app/coffee/LockManager.coffee +++ b/services/document-updater/app/coffee/LockManager.coffee @@ -1,7 +1,8 @@ metrics = require('./Metrics') Settings = require('settings-sharelatex') redis = require("redis-sharelatex") -rclient = redis.createClient(Settings.redis.web) +rclient = redis.createClient(Settings.redis.lock) +keys = Settings.redis.lock.key_schema logger = require "logger-sharelatex" os = require "os" crypto = require "crypto" @@ -11,9 +12,6 @@ PID = process.pid RND = crypto.randomBytes(4).toString('hex') COUNT = 0 -keys = - blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" - module.exports = LockManager = LOCK_TEST_INTERVAL: 50 # 50ms between each test of the lock MAX_LOCK_WAIT_TIME: 10000 # 10s maximum time to spend trying to get the lock diff --git a/services/document-updater/config/settings.defaults.coffee b/services/document-updater/config/settings.defaults.coffee index ae5ff0522b..b06b9d8bf9 100755 --- a/services/document-updater/config/settings.defaults.coffee +++ b/services/document-updater/config/settings.defaults.coffee @@ -22,6 +22,12 @@ module.exports = password:"" key_schema: pendingUpdates: ({doc_id}) -> "PendingUpdates:#{doc_id}" + # cluster: [{ + # port: "7000" + # host: "localhost" + # }] + # key_schema: + # pendingUpdates: ({doc_id}) -> "PendingUpdates:{#{doc_id}}" documentupdater: port: "6379" host: "localhost" @@ -35,15 +41,6 @@ module.exports = projectKey: ({doc_id}) -> "ProjectId:#{doc_id}" docsInProject: ({project_id}) -> "DocsIn:#{project_id}" ranges: ({doc_id}) -> "Ranges:#{doc_id}" - history: - port:"6379" - host:"localhost" - password:"" - key_schema: - uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" - docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" - - # cluster: [{ # port: "7000" # host: "localhost" @@ -57,6 +54,32 @@ module.exports = # projectKey: ({doc_id}) -> "ProjectId:{#{doc_id}}" # docsInProject: ({project_id}) -> "DocsIn:{#{project_id}}" # ranges: ({doc_id}) -> "Ranges:{#{doc_id}}" + history: + port:"6379" + host:"localhost" + password:"" + key_schema: + uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:#{doc_id}" + docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:#{project_id}" + # cluster: [{ + # port: "7000" + # host: "localhost" + # }] + # key_schema: + # uncompressedHistoryOps: ({doc_id}) -> "UncompressedHistoryOps:{#{doc_id}}" + # docsWithHistoryOps: ({project_id}) -> "DocsWithHistoryOps:{#{project_id}}" + lock: + port:"6379" + host:"localhost" + password:"" + key_schema: + blockingKey: ({doc_id}) -> "Blocking:#{doc_id}" + # cluster: [{ + # port: "7000" + # host: "localhost" + # }] + # key_schema: + # blockingKey: ({doc_id}) -> "Blocking:{#{doc_id}}" max_doc_length: 2 * 1024 * 1024 # 2mb diff --git a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee index 7755b656f1..4b57e0659f 100644 --- a/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee +++ b/services/document-updater/test/acceptance/coffee/helpers/DocUpdaterClient.coffee @@ -1,9 +1,10 @@ Settings = require('settings-sharelatex') -rclient = require("redis-sharelatex").createClient(Settings.redis.web) +rclient = require("redis-sharelatex").createClient(Settings.redis.realtime) +keys = Settings.redis.realtime.key_schema request = require("request").defaults(jar: false) async = require "async" -rclient_sub = require("redis-sharelatex").createClient(Settings.redis.web) +rclient_sub = require("redis-sharelatex").createClient(Settings.redis.realtime) rclient_sub.subscribe "applied-ops" rclient_sub.setMaxListeners(0) @@ -17,7 +18,7 @@ module.exports = DocUpdaterClient = rclient_sub.on "message", callback sendUpdate: (project_id, doc_id, update, callback = (error) ->) -> - rclient.rpush "PendingUpdates:#{doc_id}", JSON.stringify(update), (error)-> + rclient.rpush keys.pendingUpdates({doc_id}), JSON.stringify(update), (error)-> return callback(error) if error? doc_key = "#{project_id}:#{doc_id}" rclient.sadd "DocsWithPendingUpdates", doc_key, (error) -> diff --git a/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee b/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee index eddb1eaddb..a82a40af04 100644 --- a/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee +++ b/services/document-updater/test/unit/coffee/DispatchManager/DispatchManagerTests.coffee @@ -11,7 +11,7 @@ describe "DispatchManager", -> "logger-sharelatex": @logger = { log: sinon.stub() } "settings-sharelatex": @settings = redis: - web: {} + realtime: {} "redis-sharelatex": @redis = {} @callback = sinon.stub() From 5f93640077ad1ffcf8c334b9e0f97a91883fbaf2 Mon Sep 17 00:00:00 2001 From: James Allen Date: Thu, 4 May 2017 11:14:17 +0100 Subject: [PATCH 7/8] Add scripts for testing cluster failover scenarios --- .../coffee/test_blpop_failover.coffee | 41 +++++++++++++++++++ .../coffee/test_pubsub_failover.coffee | 33 +++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 services/document-updater/test/cluster_failover/coffee/test_blpop_failover.coffee create mode 100644 services/document-updater/test/cluster_failover/coffee/test_pubsub_failover.coffee diff --git a/services/document-updater/test/cluster_failover/coffee/test_blpop_failover.coffee b/services/document-updater/test/cluster_failover/coffee/test_blpop_failover.coffee new file mode 100644 index 0000000000..72a11164a4 --- /dev/null +++ b/services/document-updater/test/cluster_failover/coffee/test_blpop_failover.coffee @@ -0,0 +1,41 @@ +redis = require "redis-sharelatex" +rclient1 = redis.createClient(cluster: [{ + port: "7000" + host: "localhost" +}]) + +rclient2 = redis.createClient(cluster: [{ + port: "7000" + host: "localhost" +}]) + +counter = 0 +sendPing = (cb = () ->) -> + rclient1.rpush "test-blpop", counter, (error) -> + console.error "[SENDING ERROR]", error.message if error? + if !error? + counter += 1 + cb() + +previous = null +listenForPing = (cb) -> + rclient2.blpop "test-blpop", 200, (error, result) -> + return cb(error) if error? + [key, value] = result + value = parseInt(value, 10) + if value % 10 == 0 + console.log "." + if previous? and value != previous + 1 + error = new Error("Counter not in order. Got #{value}, expected #{previous + 1}") + previous = value + return cb(error, value) + +PING_DELAY = 100 +do sendPings = () -> + sendPing () -> + setTimeout sendPings, PING_DELAY + +do listenInBackground = (cb = () ->) -> + listenForPing (error, value) -> + console.error "[RECEIVING ERROR]", error.message if error + setTimeout listenInBackground \ No newline at end of file diff --git a/services/document-updater/test/cluster_failover/coffee/test_pubsub_failover.coffee b/services/document-updater/test/cluster_failover/coffee/test_pubsub_failover.coffee new file mode 100644 index 0000000000..31bddb5bca --- /dev/null +++ b/services/document-updater/test/cluster_failover/coffee/test_pubsub_failover.coffee @@ -0,0 +1,33 @@ +redis = require "redis-sharelatex" +rclient1 = redis.createClient(cluster: [{ + port: "7000" + host: "localhost" +}]) + +rclient2 = redis.createClient(cluster: [{ + port: "7000" + host: "localhost" +}]) + +counter = 0 +sendPing = (cb = () ->) -> + rclient1.publish "test-pubsub", counter, (error) -> + console.error "[SENDING ERROR]", error.message if error? + if !error? + counter += 1 + cb() + +previous = null +rclient2.subscribe "test-pubsub" +rclient2.on "message", (channel, value) -> + value = parseInt(value, 10) + if value % 10 == 0 + console.log "." + if previous? and value != previous + 1 + console.error "[RECEIVING ERROR]", "Counter not in order. Got #{value}, expected #{previous + 1}" + previous = value + +PING_DELAY = 100 +do sendPings = () -> + sendPing () -> + setTimeout sendPings, PING_DELAY From 7456238a71a64fbfdefc1244f1337deecd36c92b Mon Sep 17 00:00:00 2001 From: James Allen Date: Thu, 4 May 2017 15:42:10 +0100 Subject: [PATCH 8/8] Bump redis-sharelatex version --- services/document-updater/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/document-updater/package.json b/services/document-updater/package.json index fecda6f936..fd566f7b74 100644 --- a/services/document-updater/package.json +++ b/services/document-updater/package.json @@ -14,7 +14,7 @@ "logger-sharelatex": "git+https://github.com/sharelatex/logger-sharelatex.git#v1.5.6", "lynx": "0.0.11", "metrics-sharelatex": "git+https://github.com/sharelatex/metrics-sharelatex.git#v1.5.0", - "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.0", + "redis-sharelatex": "git+https://github.com/sharelatex/redis-sharelatex.git#v1.0.1", "request": "2.25.0", "sandboxed-module": "~0.2.0", "settings-sharelatex": "git+https://github.com/sharelatex/settings-sharelatex.git#v1.0.0",