From befe4be5171392e7a92c87cece2501e68890228b Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 19 Mar 2019 10:55:12 +0000 Subject: [PATCH 1/6] add check for duplicate events --- .../real-time/app/coffee/EventLogger.coffee | 46 ++++++++++++++++ .../app/coffee/WebsocketLoadBalancer.coffee | 3 ++ .../test/unit/coffee/EventLoggerTests.coffee | 52 +++++++++++++++++++ .../coffee/WebsocketLoadBalancerTests.coffee | 1 + 4 files changed, 102 insertions(+) create mode 100644 services/real-time/app/coffee/EventLogger.coffee create mode 100644 services/real-time/test/unit/coffee/EventLoggerTests.coffee diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee new file mode 100644 index 0000000000..773c9dfa9b --- /dev/null +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -0,0 +1,46 @@ +logger = require 'logger-sharelatex' + +# keep track of message counters to detect duplicate and out of order events +# messsage ids have the format "UNIQUEHOSTKEY-COUNTER" + +EVENT_LOG_COUNTER = {} +EVENT_LOG_TIMESTAMP = {} +EVENT_COUNT = 0 + +module.exports = EventLogger = + + MAX_EVENTS_BEFORE_CLEAN: 100000 + MAX_STALE_TIME_IN_MS: 3600 * 1000 + + checkEventOrder: (message_id, message) -> + return if typeof(message_id) isnt 'string' + [key, count] = message_id.split("-", 2) + count = parseInt(count, 10) + if !count # ignore checks if counter is not present + return + # store the last count in a hash for each host + previous = EventLogger._storeEventCount(key, count) + if !previous? || count == (previous + 1) + return # order is ok + if (count == previous) + logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event" + return "duplicate" + else + logger.error {key:key, previous: previous, count:count, message:message}, "events out of order" + return # out of order + + _storeEventCount: (key, count) -> + previous = EVENT_LOG_COUNTER[key] + now = Date.now() + EVENT_LOG_COUNTER[key] = count + EVENT_LOG_TIMESTAMP[key] = now + # periodically remove old counts + if (++EVENT_COUNT % EventLogger.MAX_EVENTS_BEFORE_CLEAN) == 0 + EventLogger._cleanEventStream(now) + return previous + + _cleanEventStream: (now) -> + for key, timestamp of EVENT_LOG_TIMESTAMP + if (now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS + delete EVENT_LOG_COUNTER[key] + delete EVENT_LOG_TIMESTAMP[key] \ No newline at end of file diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index c30a3d3e85..13137805fd 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -4,6 +4,7 @@ redis = require("redis-sharelatex") SafeJsonParse = require "./SafeJsonParse" rclientPub = redis.createClient(Settings.redis.realtime) rclientSub = redis.createClient(Settings.redis.realtime) +EventLogger = require "./EventLogger" module.exports = WebsocketLoadBalancer = rclientPub: rclientPub @@ -36,6 +37,8 @@ module.exports = WebsocketLoadBalancer = if message.room_id == "all" io.sockets.emit(message.message, message.payload...) else if message.room_id? + if message._id? + EventLogger.checkEventOrder(message._id, message) io.sockets.in(message.room_id).emit(message.message, message.payload...) else if message.health_check? logger.debug {message}, "got health check message in editor events channel" diff --git a/services/real-time/test/unit/coffee/EventLoggerTests.coffee b/services/real-time/test/unit/coffee/EventLoggerTests.coffee new file mode 100644 index 0000000000..ce955a8e7d --- /dev/null +++ b/services/real-time/test/unit/coffee/EventLoggerTests.coffee @@ -0,0 +1,52 @@ +require('chai').should() +expect = require("chai").expect +SandboxedModule = require('sandboxed-module') +modulePath = '../../../app/js/EventLogger' +sinon = require("sinon") +tk = require "timekeeper" + +describe 'EventLogger', -> + beforeEach -> + @start = Date.now() + tk.freeze(new Date(@start)) + @EventLogger = SandboxedModule.require modulePath, requires: + "logger-sharelatex": @logger = {error: sinon.stub()} + @id_1 = "abc-1" + @message_1 = "message-1" + @id_2 = "abc-2" + @message_2 = "message-2" + + afterEach -> + tk.reset() + + describe 'checkEventOrder', -> + + it 'should accept events in order', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + status = @EventLogger.checkEventOrder(@id_2, @message_2) + expect(status).to.be.undefined + + it 'should return "duplicate" for the same event', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.equal "duplicate" + + it 'should log an error for out of order events', -> + @EventLogger.checkEventOrder(@id_1, @message_1) + @EventLogger.checkEventOrder(@id_2, @message_2) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.be.undefined + + it 'should flush old entries', -> + @EventLogger.MAX_EVENTS_BEFORE_CLEAN = 10 + @EventLogger.checkEventOrder(@id_1, @message_1) + for i in [1..8] + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.equal "duplicate" + # the next event should flush the old entries aboce + @EventLogger.MAX_STALE_TIME_IN_MS=1000 + tk.freeze(new Date(@start + 5 * 1000)) + # because we flushed the entries this should not be a duplicate + @EventLogger.checkEventOrder('other-1', @message_2) + status = @EventLogger.checkEventOrder(@id_1, @message_1) + expect(status).to.be.undefined \ No newline at end of file diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 5cae81a31d..ad0c70832c 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -12,6 +12,7 @@ describe "WebsocketLoadBalancer", -> "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } "./SafeJsonParse": @SafeJsonParse = parse: (data, cb) => cb null, JSON.parse(data) + "./EventLogger": {checkEventOrder: sinon.stub()} @io = {} @WebsocketLoadBalancer.rclientPub = publish: sinon.stub() @WebsocketLoadBalancer.rclientSub = From 9b25374cd34275dc1a6eb3599d24412be8a60e43 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 Mar 2019 14:48:51 +0000 Subject: [PATCH 2/6] use time-based cleaning of event log --- services/real-time/app/coffee/EventLogger.coffee | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee index 773c9dfa9b..bdc91f940a 100644 --- a/services/real-time/app/coffee/EventLogger.coffee +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -5,11 +5,10 @@ logger = require 'logger-sharelatex' EVENT_LOG_COUNTER = {} EVENT_LOG_TIMESTAMP = {} -EVENT_COUNT = 0 +EVENT_LAST_CLEAN_TIMESTAMP = 0 module.exports = EventLogger = - MAX_EVENTS_BEFORE_CLEAN: 100000 MAX_STALE_TIME_IN_MS: 3600 * 1000 checkEventOrder: (message_id, message) -> @@ -35,8 +34,9 @@ module.exports = EventLogger = EVENT_LOG_COUNTER[key] = count EVENT_LOG_TIMESTAMP[key] = now # periodically remove old counts - if (++EVENT_COUNT % EventLogger.MAX_EVENTS_BEFORE_CLEAN) == 0 + if (now - EVENT_LAST_CLEAN_TIMESTAMP) > EventLogger.MAX_STALE_TIME_IN_MS EventLogger._cleanEventStream(now) + EVENT_LAST_CLEAN_TIMESTAMP = now return previous _cleanEventStream: (now) -> From 57c5ec14bdd3885c22ce5f9beadc7a0f85f53982 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 Mar 2019 14:49:25 +0000 Subject: [PATCH 3/6] check for a valid counter value in event log --- services/real-time/app/coffee/EventLogger.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee index bdc91f940a..29e79740b9 100644 --- a/services/real-time/app/coffee/EventLogger.coffee +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -15,7 +15,7 @@ module.exports = EventLogger = return if typeof(message_id) isnt 'string' [key, count] = message_id.split("-", 2) count = parseInt(count, 10) - if !count # ignore checks if counter is not present + if !(count >= 0)# ignore checks if counter is not present return # store the last count in a hash for each host previous = EventLogger._storeEventCount(key, count) From 8c82faa966658823e77ea10fa23dcdf9a77e9ca7 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 Mar 2019 14:50:27 +0000 Subject: [PATCH 4/6] check order of messages on applied-ops channel --- services/real-time/app/coffee/DocumentUpdaterController.coffee | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 8396be263a..b809bd9086 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -3,6 +3,7 @@ settings = require 'settings-sharelatex' redis = require("redis-sharelatex") rclient = redis.createClient(settings.redis.documentupdater) SafeJsonParse = require "./SafeJsonParse" +EventLogger = require "./EventLogger" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -21,6 +22,8 @@ module.exports = DocumentUpdaterController = logger.error {err: error, channel}, "error parsing JSON" return if message.op? + if message._id? + EventLogger.checkEventOrder(message._id, message) DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op) else if message.error? DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message) From e91b967bdb3c2188067892cf743b4bbe8debcf5f Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 Mar 2019 14:56:31 +0000 Subject: [PATCH 5/6] use per-channel event metrics --- .../app/coffee/DocumentUpdaterController.coffee | 2 +- services/real-time/app/coffee/EventLogger.coffee | 10 +++++++--- .../real-time/app/coffee/WebsocketLoadBalancer.coffee | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index b809bd9086..dcdd8d142c 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -23,7 +23,7 @@ module.exports = DocumentUpdaterController = return if message.op? if message._id? - EventLogger.checkEventOrder(message._id, message) + EventLogger.checkEventOrder("applied-ops", message._id, message) DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op) else if message.error? DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message) diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee index 29e79740b9..9a52ccb842 100644 --- a/services/real-time/app/coffee/EventLogger.coffee +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -1,4 +1,5 @@ logger = require 'logger-sharelatex' +metrics = require 'metrics-sharelatex' # keep track of message counters to detect duplicate and out of order events # messsage ids have the format "UNIQUEHOSTKEY-COUNTER" @@ -11,7 +12,7 @@ module.exports = EventLogger = MAX_STALE_TIME_IN_MS: 3600 * 1000 - checkEventOrder: (message_id, message) -> + checkEventOrder: (channel, message_id, message) -> return if typeof(message_id) isnt 'string' [key, count] = message_id.split("-", 2) count = parseInt(count, 10) @@ -20,12 +21,15 @@ module.exports = EventLogger = # store the last count in a hash for each host previous = EventLogger._storeEventCount(key, count) if !previous? || count == (previous + 1) + metrics.inc "event.#{channel}.valid" return # order is ok if (count == previous) - logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event" + metrics.inc "event.#{channel}.duplicate" + # logger.error {key:key, previous: previous, count:count, message:message}, "duplicate event" return "duplicate" else - logger.error {key:key, previous: previous, count:count, message:message}, "events out of order" + metrics.inc "event.#{channel}.out-of-order" + # logger.error {key:key, previous: previous, count:count, message:message}, "events out of order" return # out of order _storeEventCount: (key, count) -> diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index 13137805fd..eeedb25916 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -38,7 +38,7 @@ module.exports = WebsocketLoadBalancer = io.sockets.emit(message.message, message.payload...) else if message.room_id? if message._id? - EventLogger.checkEventOrder(message._id, message) + EventLogger.checkEventOrder("editor-events", message._id, message) io.sockets.in(message.room_id).emit(message.message, message.payload...) else if message.health_check? logger.debug {message}, "got health check message in editor events channel" From 1ab5e526993423367ecbc1ad3df2c252d4e90689 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 21 Mar 2019 15:52:53 +0000 Subject: [PATCH 6/6] down-sample valid events by 1000 --- services/real-time/app/coffee/EventLogger.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/EventLogger.coffee b/services/real-time/app/coffee/EventLogger.coffee index 9a52ccb842..cf412c1904 100644 --- a/services/real-time/app/coffee/EventLogger.coffee +++ b/services/real-time/app/coffee/EventLogger.coffee @@ -21,7 +21,7 @@ module.exports = EventLogger = # store the last count in a hash for each host previous = EventLogger._storeEventCount(key, count) if !previous? || count == (previous + 1) - metrics.inc "event.#{channel}.valid" + metrics.inc "event.#{channel}.valid", 0.001 return # order is ok if (count == previous) metrics.inc "event.#{channel}.duplicate"