From 804f4c2bd289e33e9fb28af0adee111848029a83 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 18 Jul 2019 11:25:10 +0100 Subject: [PATCH 01/20] listen on separate channels for each project/doc --- .../app/coffee/ChannelManager.coffee | 43 ++++++++++++ .../coffee/DocumentUpdaterController.coffee | 13 +++- .../real-time/app/coffee/RoomManager.coffee | 69 +++++++++++++++++++ .../app/coffee/WebsocketController.coffee | 8 ++- .../app/coffee/WebsocketLoadBalancer.coffee | 14 +++- 5 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 services/real-time/app/coffee/ChannelManager.coffee create mode 100644 services/real-time/app/coffee/RoomManager.coffee diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee new file mode 100644 index 0000000000..0e6b2fb98e --- /dev/null +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -0,0 +1,43 @@ +logger = require 'logger-sharelatex' +metrics = require "metrics-sharelatex" + +ClientMap = new Map() # for each redis client, stores a Set of subscribed channels + +# Manage redis pubsub subscriptions for individual projects and docs, ensuring +# that we never subscribe to a channel multiple times. The socket.io side is +# handled by RoomManager. + +module.exports = ChannelManager = + _createNewClientEntry: (rclient) -> + ClientMap.set(rclient, new Set()).get(rclient) + + subscribe: (rclient, baseChannel, id) -> + existingChannelSet = ClientMap.get(rclient) || @_createNewClientEntry(rclient) + channel = "#{baseChannel}:#{id}" + if existingChannelSet.has(channel) + logger.error {channel}, "already subscribed" + else + rclient.subscribe channel + existingChannelSet.add(channel) + logger.log {channel}, "subscribed to new channel" + metrics.inc "subscribe.#{baseChannel}" + + unsubscribe: (rclient, baseChannel, id) -> + existingChannelSet = ClientMap.get(rclient) + channel = "#{baseChannel}:#{id}" + if !existingChannelSet.has(channel) + logger.error {channel}, "not subscribed, cannot unsubscribe" + else + rclient.unsubscribe channel + existingChannelSet.delete(channel) + logger.log {channel}, "unsubscribed from channel" + metrics.inc "unsubscribe.#{baseChannel}" + + publish: (rclient, baseChannel, id, data) -> + if id is 'all' + channel = baseChannel + else + channel = "#{baseChannel}:#{id}" + # we publish on a different client to the subscribe, so we can't + # check for the channel existing here + rclient.publish channel, data \ No newline at end of file diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 05b95e5fac..0cc5751d7c 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -27,7 +29,16 @@ module.exports = DocumentUpdaterController = do (i) -> rclient.on "message", () -> metrics.inc "rclient-#{i}", 0.001 # per client event rate metric - + for rclient in @rclientList + @handleRoomUpdates(rclient) + + handleRoomUpdates: (rclientSub) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'doc-active', (doc_id) -> + ChannelManager.subscribe rclientSub, "applied-ops", doc_id + roomEvents.on 'doc-empty', (doc_id) -> + ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id + _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> if error? diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee new file mode 100644 index 0000000000..e9787aa1df --- /dev/null +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -0,0 +1,69 @@ +logger = require 'logger-sharelatex' +{EventEmitter} = require 'events' + +IdMap = new Map() # keep track of whether ids are from projects or docs +RoomEvents = new EventEmitter() + +# Manage socket.io rooms for individual projects and docs +# +# The first time someone joins a project or doc we emit a 'project-active' or +# 'doc-active' event. +# +# When the last person leaves a project or doc, we emit 'project-empty' or +# 'doc-empty' event. +# +# The pubsub side is handled by ChannelManager + +module.exports = RoomManager = + + joinProject: (client, project_id) -> + @_join client, "project", project_id + + joinDoc: (client, doc_id) -> + @_join client, "doc", doc_id + + leaveDoc: (client, doc_id) -> + @_leave client, "doc", doc_id + + leaveProjectAndDocs: (client) -> + # what rooms is this client in? we need to leave them all + for id in @_roomsClientIsIn(client) + entity = IdMap.get(id) + @_leave client, entity, id + + eventSource: () -> + return RoomEvents + + _clientsInRoom: (client, room) -> + nsp = client.namespace.name + name = (nsp + '/') + room; + return (client.manager?.rooms?[name] || []).length + + _roomsClientIsIn: (client) -> + roomList = for fullRoomPath of client.manager.roomClients?[client.id] when fullRoomPath isnt '' + # strip socket.io prefix from room to get original id + [prefix, room] = fullRoomPath.split('/', 2) + room + return roomList + + _join: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.join id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" + # is this a new room? if so, subscribe + if beforeCount == 0 and afterCount == 1 + logger.log {entity, id}, "room is now active" + RoomEvents.emit "#{entity}-active", id + IdMap.set(id, entity) + + _leave: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.leave id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" + # is the room now empty? if so, unsubscribe + if beforeCount == 1 and afterCount == 0 + logger.log {entity, id}, "room is now empty" + RoomEvents.emit "#{entity}-empty", id + IdMap.delete(id) \ No newline at end of file diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 470b1f2a52..6d8965883f 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -5,6 +5,7 @@ AuthorizationManager = require "./AuthorizationManager" DocumentUpdaterManager = require "./DocumentUpdaterManager" ConnectedUsersManager = require "./ConnectedUsersManager" WebsocketLoadBalancer = require "./WebsocketLoadBalancer" +RoomManager = require "./RoomManager" Utils = require "./Utils" module.exports = WebsocketController = @@ -25,7 +26,7 @@ module.exports = WebsocketController = logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project" return callback(err) - client.join project_id + RoomManager.joinProject(client, project_id) client.set("privilege_level", privilegeLevel) client.set("user_id", user_id) @@ -71,6 +72,7 @@ module.exports = WebsocketController = if err? logger.error {err, project_id, user_id, client_id: client.id}, "error marking client as disconnected" + RoomManager.leaveProjectAndDocs(client) setTimeout () -> remainingClients = io.sockets.clients(project_id) if remainingClients.length == 0 @@ -116,7 +118,7 @@ module.exports = WebsocketController = return callback(err) AuthorizationManager.addAccessToDoc client, doc_id - client.join(doc_id) + RoomManager.joinDoc(client, doc_id) callback null, escapedLines, version, ops, ranges logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" @@ -124,7 +126,7 @@ module.exports = WebsocketController = metrics.inc "editor.leave-doc" Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc" - client.leave doc_id + RoomManager.leaveDoc(client, doc_id) # we could remove permission when user leaves a doc, but because # the connection is per-project, we continue to allow access # after the initial joinDoc since we know they are already authorised. diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index a9d5052410..1bb74c6a3e 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -4,6 +4,8 @@ RedisClientManager = require "./RedisClientManager" SafeJsonParse = require "./SafeJsonParse" EventLogger = require "./EventLogger" HealthCheckManager = require "./HealthCheckManager" +RoomManager = require "./RoomManager" +ChannelManager = require "./ChannelManager" module.exports = WebsocketLoadBalancer = rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub) @@ -18,8 +20,9 @@ module.exports = WebsocketLoadBalancer = message: message payload: payload logger.log {room_id, message, payload, length: data.length}, "emitting to room" + for rclientPub in @rclientPubList - rclientPub.publish "editor-events", data + ChannelManager.publish rclientPub, "editor-events", room_id, data emitToAll: (message, payload...) -> @emitToRoom "all", message, payload... @@ -32,6 +35,15 @@ module.exports = WebsocketLoadBalancer = rclientSub.on "message", (channel, message) -> EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 WebsocketLoadBalancer._processEditorEvent io, channel, message + for rclientSub in @rclientSubList + @handleRoomUpdates(rclientSub) + + handleRoomUpdates: (rclientSub) -> + roomEvents = RoomManager.eventSource() + roomEvents.on 'project-active', (project_id) -> + ChannelManager.subscribe rclientSub, "editor-events", project_id + roomEvents.on 'project-empty', (project_id) -> + ChannelManager.unsubscribe rclientSub, "editor-events", project_id _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> From f6f6f549d9de2e88d077bc774be0d44230d8b0df Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 18 Jul 2019 12:40:14 +0100 Subject: [PATCH 02/20] don't publish on individual channels until explicitly set --- services/real-time/app/coffee/ChannelManager.coffee | 3 ++- services/real-time/config/settings.defaults.coffee | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 0e6b2fb98e..7ea676267f 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -1,5 +1,6 @@ logger = require 'logger-sharelatex' metrics = require "metrics-sharelatex" +settings = require "settings-sharelatex" ClientMap = new Map() # for each redis client, stores a Set of subscribed channels @@ -34,7 +35,7 @@ module.exports = ChannelManager = metrics.inc "unsubscribe.#{baseChannel}" publish: (rclient, baseChannel, id, data) -> - if id is 'all' + if id is 'all' or !settings.publishOnIndividualChannels channel = baseChannel else channel = "#{baseChannel}:#{id}" diff --git a/services/real-time/config/settings.defaults.coffee b/services/real-time/config/settings.defaults.coffee index ceeb65191d..a3128e84d1 100644 --- a/services/real-time/config/settings.defaults.coffee +++ b/services/real-time/config/settings.defaults.coffee @@ -54,6 +54,8 @@ settings = checkEventOrder: process.env['CHECK_EVENT_ORDER'] or false + publishOnIndividualChannels: process.env['PUBLISH_ON_INDIVIDUAL_CHANNELS'] or false + sentry: dsn: process.env.SENTRY_DSN From 3bf5dd5d6bc5223ec244d97166992a157bef0254 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Thu, 18 Jul 2019 14:25:25 +0100 Subject: [PATCH 03/20] clarify errors for subscribe/unsubscribe --- services/real-time/app/coffee/ChannelManager.coffee | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 7ea676267f..4d5c843d10 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -16,9 +16,9 @@ module.exports = ChannelManager = existingChannelSet = ClientMap.get(rclient) || @_createNewClientEntry(rclient) channel = "#{baseChannel}:#{id}" if existingChannelSet.has(channel) - logger.error {channel}, "already subscribed" + logger.error {channel}, "already subscribed - shouldn't happen" else - rclient.subscribe channel + rclient.subscribe channel # completes in the background existingChannelSet.add(channel) logger.log {channel}, "subscribed to new channel" metrics.inc "subscribe.#{baseChannel}" @@ -27,9 +27,9 @@ module.exports = ChannelManager = existingChannelSet = ClientMap.get(rclient) channel = "#{baseChannel}:#{id}" if !existingChannelSet.has(channel) - logger.error {channel}, "not subscribed, cannot unsubscribe" + logger.error {channel}, "not subscribed - shouldn't happen" else - rclient.unsubscribe channel + rclient.unsubscribe channel # completes in the background existingChannelSet.delete(channel) logger.log {channel}, "unsubscribed from channel" metrics.inc "unsubscribe.#{baseChannel}" From 40353a410fdf59f78caf94c4ee4da44e6e1b2d03 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 19 Jul 2019 08:49:57 +0100 Subject: [PATCH 04/20] fix unit tests --- .../DocumentUpdaterControllerTests.coffee | 3 +++ .../coffee/WebsocketControllerTests.coffee | 21 +++++++++++++------ .../coffee/WebsocketLoadBalancerTests.coffee | 7 +++++-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee index 24551396d9..b5574c8d16 100644 --- a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -11,6 +11,7 @@ describe "DocumentUpdaterController", -> @callback = sinon.stub() @io = { "mock": "socket.io" } @rclient = [] + @RoomEvents = { on: sinon.stub() } @EditorUpdatesController = SandboxedModule.require modulePath, requires: "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub(), warn: sinon.stub() } "settings-sharelatex": @settings = @@ -28,6 +29,8 @@ describe "DocumentUpdaterController", -> "./EventLogger": @EventLogger = {checkEventOrder: sinon.stub()} "./HealthCheckManager": {check: sinon.stub()} "metrics-sharelatex": @metrics = {inc: sinon.stub()} + "./RoomManager" : @RoomManager = { eventSource: sinon.stub().returns @RoomEvents} + "./ChannelManager": @ChannelManager = {} describe "listenForUpdatesFromDocumentUpdater", -> beforeEach -> diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index 1ddf4e6359..ab442006c2 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -36,7 +36,7 @@ describe 'WebsocketController', -> "metrics-sharelatex": @metrics = inc: sinon.stub() set: sinon.stub() - + "./RoomManager": @RoomManager = {} afterEach -> tk.reset() @@ -54,6 +54,7 @@ describe 'WebsocketController', -> @privilegeLevel = "owner" @ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4) @WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel) + @RoomManager.joinProject = sinon.stub() @WebsocketController.joinProject @client, @user, @project_id, @callback it "should load the project from web", -> @@ -62,7 +63,7 @@ describe 'WebsocketController', -> .should.equal true it "should join the project room", -> - @client.join.calledWith(@project_id).should.equal true + @RoomManager.joinProject.calledWith(@client, @project_id).should.equal true it "should set the privilege level on the client", -> @client.set.calledWith("privilege_level", @privilegeLevel).should.equal true @@ -125,6 +126,7 @@ describe 'WebsocketController', -> @DocumentUpdaterManager.flushProjectToMongoAndDelete = sinon.stub().callsArg(1) @ConnectedUsersManager.markUserAsDisconnected = sinon.stub().callsArg(2) @WebsocketLoadBalancer.emitToRoom = sinon.stub() + @RoomManager.leaveProjectAndDocs = sinon.stub() @clientsInRoom = [] @io = sockets: @@ -160,6 +162,11 @@ describe 'WebsocketController', -> it "should increment the leave-project metric", -> @metrics.inc.calledWith("editor.leave-project").should.equal true + it "should track the disconnection in RoomManager", -> + @RoomManager.leaveProjectAndDocs + .calledWith(@client) + .should.equal true + describe "when the project is not empty", -> beforeEach -> @clientsInRoom = ["mock-remaining-client"] @@ -230,6 +237,7 @@ describe 'WebsocketController', -> @AuthorizationManager.addAccessToDoc = sinon.stub() @AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null) @DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops) + @RoomManager.joinDoc = sinon.stub() describe "works", -> beforeEach -> @@ -251,8 +259,8 @@ describe 'WebsocketController', -> .should.equal true it "should join the client to room for the doc_id", -> - @client.join - .calledWith(@doc_id) + @RoomManager.joinDoc + .calledWith(@client, @doc_id) .should.equal true it "should call the callback with the lines, version, ranges and ops", -> @@ -330,11 +338,12 @@ describe 'WebsocketController', -> beforeEach -> @doc_id = "doc-id-123" @client.params.project_id = @project_id + @RoomManager.leaveDoc = sinon.stub() @WebsocketController.leaveDoc @client, @doc_id, @callback it "should remove the client from the doc_id room", -> - @client.leave - .calledWith(@doc_id).should.equal true + @RoomManager.leaveDoc + .calledWith(@client, @doc_id).should.equal true it "should call the callback", -> @callback.called.should.equal true diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 14df2df851..c4f4519790 100644 --- a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -6,6 +6,7 @@ modulePath = require('path').join __dirname, '../../../app/js/WebsocketLoadBalan describe "WebsocketLoadBalancer", -> beforeEach -> @rclient = {} + @RoomEvents = {on: sinon.stub()} @WebsocketLoadBalancer = SandboxedModule.require modulePath, requires: "./RedisClientManager": createClientList: () => [] @@ -14,6 +15,8 @@ describe "WebsocketLoadBalancer", -> parse: (data, cb) => cb null, JSON.parse(data) "./EventLogger": {checkEventOrder: sinon.stub()} "./HealthCheckManager": {check: sinon.stub()} + "./RoomManager" : @RoomManager = {eventSource: sinon.stub().returns @RoomEvents} + "./ChannelManager": @ChannelManager = {publish: sinon.stub()} @io = {} @WebsocketLoadBalancer.rclientPubList = [{publish: sinon.stub()}] @WebsocketLoadBalancer.rclientSubList = [{ @@ -30,8 +33,8 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...) it "should publish the message to redis", -> - @WebsocketLoadBalancer.rclientPubList[0].publish - .calledWith("editor-events", JSON.stringify( + @ChannelManager.publish + .calledWith(@WebsocketLoadBalancer.rclientPubList[0], "editor-events", @room_id, JSON.stringify( room_id: @room_id, message: @message payload: @payload From 616014e05d0f3b6f304253bf3ac5fa8a4b4939a8 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 19 Jul 2019 08:50:43 +0100 Subject: [PATCH 05/20] add comment about automatically leaving rooms --- services/real-time/app/coffee/RoomManager.coffee | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index e9787aa1df..be44edd002 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -27,6 +27,8 @@ module.exports = RoomManager = leaveProjectAndDocs: (client) -> # what rooms is this client in? we need to leave them all + # FIXME: socket.io will cause us to leave the rooms, so we only need + # to manage our channel subscriptions for id in @_roomsClientIsIn(client) entity = IdMap.get(id) @_leave client, entity, id From a538d104885ded31dde3b23d6533de93f1f6734b Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 19 Jul 2019 08:56:38 +0100 Subject: [PATCH 06/20] extend comment re disconnection --- services/real-time/app/coffee/RoomManager.coffee | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index be44edd002..d3a3bea5e8 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -26,9 +26,11 @@ module.exports = RoomManager = @_leave client, "doc", doc_id leaveProjectAndDocs: (client) -> - # what rooms is this client in? we need to leave them all - # FIXME: socket.io will cause us to leave the rooms, so we only need - # to manage our channel subscriptions + # what rooms is this client in? we need to leave them all. socket.io + # will cause us to leave the rooms, so we only need to manage our + # channel subscriptions... but it will be safer if we leave them + # explicitly, and then socket.io will just regard this as a client that + # has not joined any rooms and do a final disconnection. for id in @_roomsClientIsIn(client) entity = IdMap.get(id) @_leave client, entity, id @@ -36,6 +38,9 @@ module.exports = RoomManager = eventSource: () -> return RoomEvents + # internal functions below, these access socket.io rooms data directly and + # will need updating for socket.io v2 + _clientsInRoom: (client, room) -> nsp = client.namespace.name name = (nsp + '/') + room; From 9f7df5f10c5b1692f8b7b2429e5e605fb0950fd9 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Fri, 19 Jul 2019 11:58:40 +0100 Subject: [PATCH 07/20] wip unit tests --- .../test/unit/coffee/ChannelManager.coffee | 33 +++++++++++++++++ .../test/unit/coffee/RoomManagerTests.coffee | 37 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 services/real-time/test/unit/coffee/ChannelManager.coffee create mode 100644 services/real-time/test/unit/coffee/RoomManagerTests.coffee diff --git a/services/real-time/test/unit/coffee/ChannelManager.coffee b/services/real-time/test/unit/coffee/ChannelManager.coffee new file mode 100644 index 0000000000..314e9914e1 --- /dev/null +++ b/services/real-time/test/unit/coffee/ChannelManager.coffee @@ -0,0 +1,33 @@ +chai = require('chai') +should = chai.should() +sinon = require("sinon") +modulePath = "../../../app/js/ChannelManager.js" +SandboxedModule = require('sandboxed-module') + +describe 'ChannelManager', -> + beforeEach -> + @project_id = "project-id-123" + @user_id = "user-id-123" + @user = {_id: @user_id} + @callback = sinon.stub() + @ChannelManager = SandboxedModule.require modulePath, requires: + "settings-sharelatex": @settings = {} + "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + + describe "subscribe", -> + + describe "when the project room is empty", -> + + describe "when there are other clients in the project room", -> + + describe "unsubscribe", -> + + describe "when the doc room is empty", -> + + describe "when there are other clients in the doc room", -> + + describe "publish", -> + + describe "when the channel is 'all'", -> + + describe "when the channel has an specific id", -> diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee new file mode 100644 index 0000000000..75d241cf6b --- /dev/null +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -0,0 +1,37 @@ +chai = require('chai') +should = chai.should() +sinon = require("sinon") +modulePath = "../../../app/js/RoomManager.js" +SandboxedModule = require('sandboxed-module') + +describe 'RoomManager', -> + beforeEach -> + @project_id = "project-id-123" + @user_id = "user-id-123" + @user = {_id: @user_id} + @callback = sinon.stub() + @RoomManager = SandboxedModule.require modulePath, requires: + "settings-sharelatex": @settings = {} + "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + + describe "joinProject", -> + + describe "when the project room is empty", -> + + describe "when there are other clients in the project room", -> + + describe "joinDoc", -> + + describe "when the doc room is empty", -> + + describe "when there are other clients in the doc room", -> + + describe "leaveDoc", -> + + describe "when doc room will be empty after this client has left", -> + + describe "when there are other clients in the doc room", -> + + describe "leaveProjectAndDocs", -> + + describe "when the client is connected to multiple docs", -> \ No newline at end of file From 8c7b73480f7a08c1ea9dc467d697189b43bb2378 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 22 Jul 2019 11:23:02 +0100 Subject: [PATCH 08/20] upgrade sinon to 1.17.7 for onCall support --- services/real-time/npm-shrinkwrap.json | 134 ++++++++++++++++++++++--- services/real-time/package.json | 4 +- 2 files changed, 121 insertions(+), 17 deletions(-) diff --git a/services/real-time/npm-shrinkwrap.json b/services/real-time/npm-shrinkwrap.json index 4f45764207..a146f50e1f 100644 --- a/services/real-time/npm-shrinkwrap.json +++ b/services/real-time/npm-shrinkwrap.json @@ -356,18 +356,6 @@ "resolved": "https://registry.npmjs.org/bunyan/-/bunyan-0.22.3.tgz", "dev": true }, - "buster-core": { - "version": "0.6.4", - "from": "buster-core@0.6.4", - "resolved": "https://registry.npmjs.org/buster-core/-/buster-core-0.6.4.tgz", - "dev": true - }, - "buster-format": { - "version": "0.5.6", - "from": "buster-format@>=0.5.0 <0.6.0", - "resolved": "https://registry.npmjs.org/buster-format/-/buster-format-0.5.6.tgz", - "dev": true - }, "bytes": { "version": "3.0.0", "from": "bytes@3.0.0", @@ -484,6 +472,12 @@ "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-0.1.3.tgz", "dev": true }, + "define-properties": { + "version": "1.1.3", + "from": "define-properties@>=1.1.3 <2.0.0", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz", + "dev": true + }, "delay": { "version": "4.3.0", "from": "delay@>=4.0.1 <5.0.0", @@ -556,6 +550,18 @@ "from": "ent@>=2.2.0 <3.0.0", "resolved": "https://registry.npmjs.org/ent/-/ent-2.2.0.tgz" }, + "es-abstract": { + "version": "1.13.0", + "from": "es-abstract@>=1.12.0 <2.0.0", + "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.13.0.tgz", + "dev": true + }, + "es-to-primitive": { + "version": "1.2.0", + "from": "es-to-primitive@>=1.2.0 <2.0.0", + "resolved": "https://registry.npmjs.org/es-to-primitive/-/es-to-primitive-1.2.0.tgz", + "dev": true + }, "es6-promise": { "version": "4.2.8", "from": "es6-promise@>=4.0.3 <5.0.0", @@ -717,6 +723,12 @@ "from": "form-data@>=2.3.2 <2.4.0", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.3.3.tgz" }, + "formatio": { + "version": "1.1.1", + "from": "formatio@1.1.1", + "resolved": "https://registry.npmjs.org/formatio/-/formatio-1.1.1.tgz", + "dev": true + }, "forwarded": { "version": "0.1.2", "from": "forwarded@>=0.1.2 <0.2.0", @@ -733,6 +745,12 @@ "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", "dev": true }, + "function-bind": { + "version": "1.1.1", + "from": "function-bind@>=1.1.1 <2.0.0", + "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", + "dev": true + }, "gaxios": { "version": "1.8.4", "from": "gaxios@>=1.2.1 <2.0.0", @@ -800,6 +818,18 @@ "from": "har-validator@>=5.1.0 <5.2.0", "resolved": "https://registry.npmjs.org/har-validator/-/har-validator-5.1.3.tgz" }, + "has": { + "version": "1.0.3", + "from": "has@>=1.0.3 <2.0.0", + "resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz", + "dev": true + }, + "has-symbols": { + "version": "1.0.0", + "from": "has-symbols@>=1.0.0 <2.0.0", + "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.0.0.tgz", + "dev": true + }, "he": { "version": "1.1.1", "from": "he@1.1.1", @@ -880,11 +910,47 @@ "from": "is@>=3.2.0 <4.0.0", "resolved": "https://registry.npmjs.org/is/-/is-3.3.0.tgz" }, + "is-arguments": { + "version": "1.0.4", + "from": "is-arguments@>=1.0.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-arguments/-/is-arguments-1.0.4.tgz", + "dev": true + }, "is-buffer": { "version": "2.0.3", "from": "is-buffer@>=2.0.2 <3.0.0", "resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-2.0.3.tgz" }, + "is-callable": { + "version": "1.1.4", + "from": "is-callable@>=1.1.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-callable/-/is-callable-1.1.4.tgz", + "dev": true + }, + "is-date-object": { + "version": "1.0.1", + "from": "is-date-object@>=1.0.1 <2.0.0", + "resolved": "https://registry.npmjs.org/is-date-object/-/is-date-object-1.0.1.tgz", + "dev": true + }, + "is-generator-function": { + "version": "1.0.7", + "from": "is-generator-function@>=1.0.7 <2.0.0", + "resolved": "https://registry.npmjs.org/is-generator-function/-/is-generator-function-1.0.7.tgz", + "dev": true + }, + "is-regex": { + "version": "1.0.4", + "from": "is-regex@>=1.0.4 <2.0.0", + "resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.0.4.tgz", + "dev": true + }, + "is-symbol": { + "version": "1.0.2", + "from": "is-symbol@>=1.0.2 <2.0.0", + "resolved": "https://registry.npmjs.org/is-symbol/-/is-symbol-1.0.2.tgz", + "dev": true + }, "is-typedarray": { "version": "1.0.0", "from": "is-typedarray@>=1.0.0 <1.1.0", @@ -985,6 +1051,12 @@ } } }, + "lolex": { + "version": "1.3.2", + "from": "lolex@1.3.2", + "resolved": "https://registry.npmjs.org/lolex/-/lolex-1.3.2.tgz", + "dev": true + }, "long": { "version": "4.0.0", "from": "long@>=4.0.0 <5.0.0", @@ -1176,6 +1248,18 @@ "from": "oauth-sign@>=0.9.0 <0.10.0", "resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.9.0.tgz" }, + "object-keys": { + "version": "1.1.1", + "from": "object-keys@>=1.0.12 <2.0.0", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "dev": true + }, + "object.entries": { + "version": "1.1.0", + "from": "object.entries@>=1.1.0 <2.0.0", + "resolved": "https://registry.npmjs.org/object.entries/-/object.entries-1.1.0.tgz", + "dev": true + }, "on-finished": { "version": "2.3.0", "from": "on-finished@>=2.3.0 <2.4.0", @@ -1469,6 +1553,12 @@ "from": "safer-buffer@>=2.1.2 <3.0.0", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" }, + "samsam": { + "version": "1.1.2", + "from": "samsam@1.1.2", + "resolved": "https://registry.npmjs.org/samsam/-/samsam-1.1.2.tgz", + "dev": true + }, "sandboxed-module": { "version": "0.3.0", "from": "sandboxed-module@>=0.3.0 <0.4.0", @@ -1533,9 +1623,9 @@ "resolved": "https://registry.npmjs.org/shimmer/-/shimmer-1.2.1.tgz" }, "sinon": { - "version": "1.5.2", - "from": "sinon@>=1.5.2 <1.6.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-1.5.2.tgz", + "version": "1.17.7", + "from": "sinon@1.17.7", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-1.17.7.tgz", "dev": true }, "socket.io": { @@ -1715,6 +1805,20 @@ } } }, + "util": { + "version": "0.12.1", + "from": "util@>=0.10.3 <1.0.0", + "resolved": "https://registry.npmjs.org/util/-/util-0.12.1.tgz", + "dev": true, + "dependencies": { + "safe-buffer": { + "version": "5.2.0", + "from": "safe-buffer@>=5.1.2 <6.0.0", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.0.tgz", + "dev": true + } + } + }, "util-deprecate": { "version": "1.0.2", "from": "util-deprecate@>=1.0.1 <1.1.0", diff --git a/services/real-time/package.json b/services/real-time/package.json index f4548a98d7..dd2d6f0198 100644 --- a/services/real-time/package.json +++ b/services/real-time/package.json @@ -42,9 +42,9 @@ "chai": "~1.9.1", "cookie-signature": "^1.0.5", "sandboxed-module": "~0.3.0", - "sinon": "~1.5.2", + "sinon": "^1.5.2", "mocha": "^4.0.1", "uid-safe": "^1.0.1", "timekeeper": "0.0.4" } -} +} \ No newline at end of file From 92e691018078f0c9535ab5097369c6ee446583bd Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 22 Jul 2019 11:23:33 +0100 Subject: [PATCH 09/20] cleanup --- .../app/coffee/ChannelManager.coffee | 10 ++-- .../real-time/app/coffee/RoomManager.coffee | 55 ++++++++++--------- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 4d5c843d10..1eca7a9143 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -9,11 +9,13 @@ ClientMap = new Map() # for each redis client, stores a Set of subscribed channe # handled by RoomManager. module.exports = ChannelManager = - _createNewClientEntry: (rclient) -> - ClientMap.set(rclient, new Set()).get(rclient) + getClientMapEntry: (rclient) -> + # return the rclient channel set if it exists, otherwise create and + # return an empty set for the client. + ClientMap.get(rclient) || ClientMap.set(rclient, new Set()).get(rclient) subscribe: (rclient, baseChannel, id) -> - existingChannelSet = ClientMap.get(rclient) || @_createNewClientEntry(rclient) + existingChannelSet = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" if existingChannelSet.has(channel) logger.error {channel}, "already subscribed - shouldn't happen" @@ -24,7 +26,7 @@ module.exports = ChannelManager = metrics.inc "subscribe.#{baseChannel}" unsubscribe: (rclient, baseChannel, id) -> - existingChannelSet = ClientMap.get(rclient) + existingChannelSet = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" if !existingChannelSet.has(channel) logger.error {channel}, "not subscribed - shouldn't happen" diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index d3a3bea5e8..225dd37f6d 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -17,13 +17,13 @@ RoomEvents = new EventEmitter() module.exports = RoomManager = joinProject: (client, project_id) -> - @_join client, "project", project_id + @joinEntity client, "project", project_id joinDoc: (client, doc_id) -> - @_join client, "doc", doc_id + @joinEntity client, "doc", doc_id leaveDoc: (client, doc_id) -> - @_leave client, "doc", doc_id + @leaveEntity client, "doc", doc_id leaveProjectAndDocs: (client) -> # what rooms is this client in? we need to leave them all. socket.io @@ -33,11 +33,36 @@ module.exports = RoomManager = # has not joined any rooms and do a final disconnection. for id in @_roomsClientIsIn(client) entity = IdMap.get(id) - @_leave client, entity, id + @leaveEntity client, entity, id eventSource: () -> return RoomEvents + joinEntity: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.join id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" + # is this a new room? if so, subscribe + if beforeCount == 0 and afterCount == 1 + logger.log {entity, id}, "room is now active" + RoomEvents.emit "#{entity}-active", id + IdMap.set(id, entity) + + leaveEntity: (client, entity, id) -> + beforeCount = @_clientsInRoom(client, id) + client.leave id + afterCount = @_clientsInRoom(client, id) + logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" + # is the room now empty? if so, unsubscribe + if !entity? + logger.error {entity: id}, "unknown entity when leaving with id" + return + if beforeCount == 1 and afterCount == 0 + logger.log {entity, id}, "room is now empty" + RoomEvents.emit "#{entity}-empty", id + IdMap.delete(id) + # internal functions below, these access socket.io rooms data directly and # will need updating for socket.io v2 @@ -52,25 +77,3 @@ module.exports = RoomManager = [prefix, room] = fullRoomPath.split('/', 2) room return roomList - - _join: (client, entity, id) -> - beforeCount = @_clientsInRoom(client, id) - client.join id - afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" - # is this a new room? if so, subscribe - if beforeCount == 0 and afterCount == 1 - logger.log {entity, id}, "room is now active" - RoomEvents.emit "#{entity}-active", id - IdMap.set(id, entity) - - _leave: (client, entity, id) -> - beforeCount = @_clientsInRoom(client, id) - client.leave id - afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" - # is the room now empty? if so, unsubscribe - if beforeCount == 1 and afterCount == 0 - logger.log {entity, id}, "room is now empty" - RoomEvents.emit "#{entity}-empty", id - IdMap.delete(id) \ No newline at end of file From 1afebd12a14cab488b920a8421e9fbf902bea796 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 22 Jul 2019 11:23:43 +0100 Subject: [PATCH 10/20] unit tests --- .../test/unit/coffee/ChannelManager.coffee | 93 +++++++++- .../test/unit/coffee/RoomManagerTests.coffee | 175 +++++++++++++++++- 2 files changed, 255 insertions(+), 13 deletions(-) diff --git a/services/real-time/test/unit/coffee/ChannelManager.coffee b/services/real-time/test/unit/coffee/ChannelManager.coffee index 314e9914e1..4ed852ddcf 100644 --- a/services/real-time/test/unit/coffee/ChannelManager.coffee +++ b/services/real-time/test/unit/coffee/ChannelManager.coffee @@ -6,28 +6,103 @@ SandboxedModule = require('sandboxed-module') describe 'ChannelManager', -> beforeEach -> - @project_id = "project-id-123" - @user_id = "user-id-123" - @user = {_id: @user_id} - @callback = sinon.stub() + @rclient = {} + @other_rclient = {} @ChannelManager = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = {} + "metrics-sharelatex": @metrics = {inc: sinon.stub()} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } describe "subscribe", -> - - describe "when the project room is empty", -> - describe "when there are other clients in the project room", -> + describe "when there is no existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should subscribe to the redis channel", -> + @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true + + describe "when there is an existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + @rclient.subscribe = sinon.stub() # discard the original stub + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should not subscribe to the redis channel", -> + @rclient.subscribe.called.should.equal false + + describe "when there is an existing subscription for another redis client but not this one", -> + beforeEach -> + @other_rclient.subscribe = sinon.stub() + @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" + @rclient.subscribe = sinon.stub() # discard the original stub + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should subscribe to the redis channel on this redis client", -> + @rclient.subscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true describe "unsubscribe", -> - describe "when the doc room is empty", -> + describe "when there is no existing subscription for this redis client", -> + beforeEach -> + @rclient.unsubscribe = sinon.stub() + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" - describe "when there are other clients in the doc room", -> + it "should not unsubscribe from the redis channel", -> + @rclient.unsubscribe.called.should.equal false + + + describe "when there is an existing subscription for this another redis client but not this one", -> + beforeEach -> + @other_rclient.subscribe = sinon.stub() + @rclient.unsubscribe = sinon.stub() + @ChannelManager.subscribe @other_rclient, "applied-ops", "1234567890abcdef" + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should not unsubscribe from the redis channel on this client", -> + @rclient.unsubscribe.called.should.equal false + + describe "when there is an existing subscription for this redis client", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @rclient.unsubscribe = sinon.stub() + @ChannelManager.subscribe @rclient, "applied-ops", "1234567890abcdef" + @ChannelManager.unsubscribe @rclient, "applied-ops", "1234567890abcdef" + + it "should unsubscribe from the redis channel", -> + @rclient.unsubscribe.calledWithExactly("applied-ops:1234567890abcdef").should.equal true describe "publish", -> describe "when the channel is 'all'", -> + beforeEach -> + @rclient.publish = sinon.stub() + @ChannelManager.publish @rclient, "applied-ops", "all", "random-message" + + it "should publish on the base channel", -> + @rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true describe "when the channel has an specific id", -> + + describe "when the individual channel setting is false", -> + beforeEach -> + @rclient.publish = sinon.stub() + @settings.publishOnIndividualChannels = false + @ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message" + + it "should publish on the per-id channel", -> + @rclient.publish.calledWithExactly("applied-ops", "random-message").should.equal true + @rclient.publish.calledOnce.should.equal true + + describe "when the individual channel setting is true", -> + beforeEach -> + @rclient.publish = sinon.stub() + @settings.publishOnIndividualChannels = true + @ChannelManager.publish @rclient, "applied-ops", "1234567890abcdef", "random-message" + + it "should publish on the per-id channel", -> + @rclient.publish.calledWithExactly("applied-ops:1234567890abcdef", "random-message").should.equal true + @rclient.publish.calledOnce.should.equal true + diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee index 75d241cf6b..2f78b33c52 100644 --- a/services/real-time/test/unit/coffee/RoomManagerTests.coffee +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -7,31 +7,198 @@ SandboxedModule = require('sandboxed-module') describe 'RoomManager', -> beforeEach -> @project_id = "project-id-123" - @user_id = "user-id-123" - @user = {_id: @user_id} - @callback = sinon.stub() + @doc_id = "doc-id-456" + @other_doc_id = "doc-id-789" + @client = {namespace: {name: ''}, id: "first-client"} @RoomManager = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = {} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + @RoomManager._clientsInRoom = sinon.stub() + @RoomEvents = @RoomManager.eventSource() + sinon.spy(@RoomEvents, 'emit') describe "joinProject", -> describe "when the project room is empty", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(0) + .onSecondCall().returns(1) + @client.join = sinon.stub() + @RoomManager.joinProject @client, @project_id + + it "should join the room using the id", -> + @client.join.calledWithExactly(@project_id).should.equal true + + it "should emit a 'project-active' event with the id", -> + @RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true + describe "when there are other clients in the project room", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(123) + .onSecondCall().returns(124) + @client.join = sinon.stub() + @RoomManager.joinProject @client, @project_id + + it "should join the room using the id", -> + @client.join.called.should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + describe "joinDoc", -> describe "when the doc room is empty", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(0) + .onSecondCall().returns(1) + @client.join = sinon.stub() + @RoomManager.joinDoc @client, @doc_id + + it "should join the room using the id", -> + @client.join.calledWithExactly(@doc_id).should.equal true + + it "should emit a 'doc-active' event with the id", -> + @RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true + describe "when there are other clients in the doc room", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(124) + @client.join = sinon.stub() + @RoomManager.joinDoc @client, @doc_id + + it "should join the room using the id", -> + @client.join.called.should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + describe "leaveDoc", -> describe "when doc room will be empty after this client has left", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(1) + .onSecondCall().returns(0) + @client.leave = sinon.stub() + @RoomManager.leaveDoc @client, @doc_id + + it "should leave the room using the id", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + + it "should emit a 'doc-empty' event with the id", -> + @RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true + + describe "when there are other clients in the doc room", -> + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @client.leave = sinon.stub() + @RoomManager.leaveDoc @client, @doc_id + + it "should leave the room using the id", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false + + describe "leaveProjectAndDocs", -> - describe "when the client is connected to multiple docs", -> \ No newline at end of file + describe "when the client is connected to the project and multiple docs", -> + + beforeEach -> + @RoomManager._roomsClientIsIn = sinon.stub().returns [@project_id, @doc_id, @other_doc_id] + @client.join = sinon.stub() + @client.leave = sinon.stub() + + describe "when this is the only client connected", -> + + beforeEach -> + # first and secondc calls are for the join, + # calls 2 and 3 are for the leave + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onCall(0).returns(0) + .onSecondCall().returns(1) + .onCall(2).returns(1) + .onCall(3).returns(0) + @RoomManager._clientsInRoom + .withArgs(@client, @other_doc_id) + .onCall(0).returns(0) + .onCall(1).returns(1) + .onCall(2).returns(1) + .onCall(3).returns(0) + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onCall(0).returns(0) + .onCall(1).returns(1) + .onCall(2).returns(1) + .onCall(3).returns(0) + # put the client in the rooms + @RoomManager.joinProject(@client, @project_id) + @RoomManager.joinDoc(@client, @doc_id) + @RoomManager.joinDoc(@client, @other_doc_id) + # now leave the project + @RoomManager.leaveProjectAndDocs @client + + it "should leave all the docs", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + @client.leave.calledWithExactly(@other_doc_id).should.equal true + + it "should leave the project", -> + @client.leave.calledWithExactly(@project_id).should.equal true + + it "should emit a 'doc-empty' event with the id for each doc", -> + @RoomEvents.emit.calledWithExactly('doc-empty', @doc_id).should.equal true + @RoomEvents.emit.calledWithExactly('doc-empty', @other_doc_id).should.equal true + + it "should emit a 'project-empty' event with the id for the project", -> + @RoomEvents.emit.calledWithExactly('project-empty', @project_id).should.equal true + + describe "when other clients are still connected", -> + + beforeEach -> + @RoomManager._clientsInRoom + .withArgs(@client, @doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager._clientsInRoom + .withArgs(@client, @other_doc_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager._clientsInRoom + .withArgs(@client, @project_id) + .onFirstCall().returns(123) + .onSecondCall().returns(122) + @RoomManager.leaveProjectAndDocs @client + + it "should leave all the docs", -> + @client.leave.calledWithExactly(@doc_id).should.equal true + @client.leave.calledWithExactly(@other_doc_id).should.equal true + + it "should leave the project", -> + @client.leave.calledWithExactly(@project_id).should.equal true + + it "should not emit any events", -> + @RoomEvents.emit.called.should.equal false \ No newline at end of file From bb629c27a1936247ae47b217cadefeb6dbd65fe0 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 22 Jul 2019 11:28:49 +0100 Subject: [PATCH 11/20] rename unit test ChannelManager to ChannelManagerTests --- .../coffee/{ChannelManager.coffee => ChannelManagerTests.coffee} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename services/real-time/test/unit/coffee/{ChannelManager.coffee => ChannelManagerTests.coffee} (100%) diff --git a/services/real-time/test/unit/coffee/ChannelManager.coffee b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee similarity index 100% rename from services/real-time/test/unit/coffee/ChannelManager.coffee rename to services/real-time/test/unit/coffee/ChannelManagerTests.coffee From 84e6ff616f46d7bcdb2712239ea4ba8bfccb106d Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Mon, 22 Jul 2019 12:25:41 +0100 Subject: [PATCH 12/20] whitespace fix --- services/real-time/app/coffee/ChannelManager.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 1eca7a9143..0efef6ce96 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -43,4 +43,4 @@ module.exports = ChannelManager = channel = "#{baseChannel}:#{id}" # we publish on a different client to the subscribe, so we can't # check for the channel existing here - rclient.publish channel, data \ No newline at end of file + rclient.publish channel, data From 159b39c4915ec1821b39528e363014e30c8fba12 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Tue, 23 Jul 2019 17:02:09 +0100 Subject: [PATCH 13/20] ensure redis channel is subscribed when joining room --- .../app/coffee/ChannelManager.coffee | 13 ++- .../coffee/DocumentUpdaterController.coffee | 16 ++-- .../real-time/app/coffee/RoomManager.coffee | 28 +++--- .../app/coffee/WebsocketController.coffee | 14 +-- .../app/coffee/WebsocketLoadBalancer.coffee | 15 ++-- .../test/unit/coffee/RoomManagerTests.coffee | 88 ++++++++++++------- .../coffee/WebsocketControllerTests.coffee | 4 +- 7 files changed, 109 insertions(+), 69 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 0efef6ce96..900fd764f5 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -2,7 +2,7 @@ logger = require 'logger-sharelatex' metrics = require "metrics-sharelatex" settings = require "settings-sharelatex" -ClientMap = new Map() # for each redis client, stores a Set of subscribed channels +ClientMap = new Map() # for each redis client, stores a Map of subscribed channels (channelname -> subscribe promise) # Manage redis pubsub subscriptions for individual projects and docs, ensuring # that we never subscribe to a channel multiple times. The socket.io side is @@ -12,18 +12,23 @@ module.exports = ChannelManager = getClientMapEntry: (rclient) -> # return the rclient channel set if it exists, otherwise create and # return an empty set for the client. - ClientMap.get(rclient) || ClientMap.set(rclient, new Set()).get(rclient) + ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient) subscribe: (rclient, baseChannel, id) -> existingChannelSet = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" if existingChannelSet.has(channel) logger.error {channel}, "already subscribed - shouldn't happen" + # return the subscribe promise, so we can wait for it to resolve + return existingChannelSet.get(channel) else - rclient.subscribe channel # completes in the background - existingChannelSet.add(channel) + # get the subscribe promise and return it, the actual subscribe + # completes in the background + subscribePromise = rclient.subscribe channel + existingChannelSet.set(channel, subscribePromise) logger.log {channel}, "subscribed to new channel" metrics.inc "subscribe.#{baseChannel}" + return subscribePromise unsubscribe: (rclient, baseChannel, id) -> existingChannelSet = @getClientMapEntry(rclient) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 0cc5751d7c..e2d27fc343 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -7,6 +7,7 @@ HealthCheckManager = require "./HealthCheckManager" RoomManager = require "./RoomManager" ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" +util = require "util" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb @@ -29,15 +30,20 @@ module.exports = DocumentUpdaterController = do (i) -> rclient.on "message", () -> metrics.inc "rclient-#{i}", 0.001 # per client event rate metric - for rclient in @rclientList - @handleRoomUpdates(rclient) + @handleRoomUpdates(@rclientList) - handleRoomUpdates: (rclientSub) -> + handleRoomUpdates: (rclientSubList) -> roomEvents = RoomManager.eventSource() roomEvents.on 'doc-active', (doc_id) -> - ChannelManager.subscribe rclientSub, "applied-ops", doc_id + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "applied-ops", doc_id + subscribeResult = Promise.all(subscribePromises) + emitResult = (err) => this.emit("doc-subscribed-#{doc_id}", err) + subscribeResult.then () -> emitResult() + subscribeResult.catch (err) -> emitResult(err) roomEvents.on 'doc-empty', (doc_id) -> - ChannelManager.unsubscribe rclientSub, "applied-ops", doc_id + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "applied-ops", doc_id _processMessageFromDocumentUpdater: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index 225dd37f6d..08e65b5e52 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -16,11 +16,11 @@ RoomEvents = new EventEmitter() module.exports = RoomManager = - joinProject: (client, project_id) -> - @joinEntity client, "project", project_id + joinProject: (client, project_id, callback = () ->) -> + @joinEntity client, "project", project_id, callback - joinDoc: (client, doc_id) -> - @joinEntity client, "doc", doc_id + joinDoc: (client, doc_id, callback = () ->) -> + @joinEntity client, "doc", doc_id, callback leaveDoc: (client, doc_id) -> @leaveEntity client, "doc", doc_id @@ -38,27 +38,31 @@ module.exports = RoomManager = eventSource: () -> return RoomEvents - joinEntity: (client, entity, id) -> + joinEntity: (client, entity, id, callback) -> beforeCount = @_clientsInRoom(client, id) - client.join id - afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client joined room" # is this a new room? if so, subscribe - if beforeCount == 0 and afterCount == 1 + if beforeCount == 0 logger.log {entity, id}, "room is now active" + RoomEvents.once "#{entity}-subscribed-#{id}", (err) -> + logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel" + client.join id + callback(err) RoomEvents.emit "#{entity}-active", id IdMap.set(id, entity) + else + logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room" + client.join id + callback() leaveEntity: (client, entity, id) -> - beforeCount = @_clientsInRoom(client, id) client.leave id afterCount = @_clientsInRoom(client, id) - logger.log {client: client.id, entity, id, beforeCount, afterCount}, "client left room" + logger.log {client: client.id, entity, id, afterCount}, "client left room" # is the room now empty? if so, unsubscribe if !entity? logger.error {entity: id}, "unknown entity when leaving with id" return - if beforeCount == 1 and afterCount == 0 + if afterCount == 0 logger.log {entity, id}, "room is now empty" RoomEvents.emit "#{entity}-empty", id IdMap.delete(id) diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 6d8965883f..22ea7e7a0c 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -25,8 +25,7 @@ module.exports = WebsocketController = err = new Error("not authorized") logger.warn {err, project_id, user_id, client_id: client.id}, "user is not authorized to join project" return callback(err) - - RoomManager.joinProject(client, project_id) + client.set("privilege_level", privilegeLevel) client.set("user_id", user_id) @@ -39,8 +38,9 @@ module.exports = WebsocketController = client.set("signup_date", user?.signUpDate) client.set("login_count", user?.loginCount) - callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION - logger.log {user_id, project_id, client_id: client.id}, "user joined project" + RoomManager.joinProject client, project_id, (err) -> + logger.log {user_id, project_id, client_id: client.id}, "user joined project" + callback null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION # No need to block for setting the user as connected in the cursor tracking ConnectedUsersManager.updateUserPosition project_id, client.id, user, null, () -> @@ -118,9 +118,9 @@ module.exports = WebsocketController = return callback(err) AuthorizationManager.addAccessToDoc client, doc_id - RoomManager.joinDoc(client, doc_id) - callback null, escapedLines, version, ops, ranges - logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + RoomManager.joinDoc client, doc_id, (err) -> + logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" + callback null, escapedLines, version, ops, ranges leaveDoc: (client, doc_id, callback = (error) ->) -> metrics.inc "editor.leave-doc" diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index 1bb74c6a3e..e8ff88aa7b 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -35,15 +35,20 @@ module.exports = WebsocketLoadBalancer = rclientSub.on "message", (channel, message) -> EventLogger.debugEvent(channel, message) if Settings.debugEvents > 0 WebsocketLoadBalancer._processEditorEvent io, channel, message - for rclientSub in @rclientSubList - @handleRoomUpdates(rclientSub) + @handleRoomUpdates(@rclientSubList) - handleRoomUpdates: (rclientSub) -> + handleRoomUpdates: (rclientSubList) -> roomEvents = RoomManager.eventSource() roomEvents.on 'project-active', (project_id) -> - ChannelManager.subscribe rclientSub, "editor-events", project_id + subscribePromises = for rclient in rclientSubList + ChannelManager.subscribe rclient, "editor-events", project_id + subscribeResult = Promise.all(subscribePromises) + emitResult = (err) => this.emit("project-subscribed-#{project_id}", err) + subscribeResult.then () -> emitResult() + subscribeResult.catch (err) -> emitResult(err) roomEvents.on 'project-empty', (project_id) -> - ChannelManager.unsubscribe rclientSub, "editor-events", project_id + for rclient in rclientSubList + ChannelManager.unsubscribe rclient, "editor-events", project_id _processEditorEvent: (io, channel, message) -> SafeJsonParse.parse message, (error, message) -> diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee index 2f78b33c52..f9fde4dd83 100644 --- a/services/real-time/test/unit/coffee/RoomManagerTests.coffee +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -15,26 +15,36 @@ describe 'RoomManager', -> "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } @RoomManager._clientsInRoom = sinon.stub() @RoomEvents = @RoomManager.eventSource() - sinon.spy(@RoomEvents, 'emit') + sinon.spy(@RoomEvents, 'emit') + sinon.spy(@RoomEvents, 'once') describe "joinProject", -> describe "when the project room is empty", -> - beforeEach -> + beforeEach (done) -> @RoomManager._clientsInRoom .withArgs(@client, @project_id) .onFirstCall().returns(0) - .onSecondCall().returns(1) @client.join = sinon.stub() - @RoomManager.joinProject @client, @project_id - - it "should join the room using the id", -> - @client.join.calledWithExactly(@project_id).should.equal true + @callback = sinon.stub() + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomManager.joinProject @client, @project_id, (err) => + @callback(err) + done() it "should emit a 'project-active' event with the id", -> @RoomEvents.emit.calledWithExactly('project-active', @project_id).should.equal true + it "should listen for the 'project-subscribed-id' event", -> + @RoomEvents.once.calledWith("project-subscribed-#{@project_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@project_id).should.equal true + describe "when there are other clients in the project room", -> beforeEach -> @@ -56,20 +66,29 @@ describe 'RoomManager', -> describe "when the doc room is empty", -> - beforeEach -> + beforeEach (done) -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) .onFirstCall().returns(0) - .onSecondCall().returns(1) @client.join = sinon.stub() - @RoomManager.joinDoc @client, @doc_id - - it "should join the room using the id", -> - @client.join.calledWithExactly(@doc_id).should.equal true + @callback = sinon.stub() + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 + @RoomManager.joinDoc @client, @doc_id, (err) => + @callback(err) + done() it "should emit a 'doc-active' event with the id", -> @RoomEvents.emit.calledWithExactly('doc-active', @doc_id).should.equal true + it "should listen for the 'doc-subscribed-id' event", -> + @RoomEvents.once.calledWith("doc-subscribed-#{@doc_id}").should.equal true + + it "should join the room using the id", -> + @client.join.calledWithExactly(@doc_id).should.equal true + describe "when there are other clients in the doc room", -> beforeEach -> @@ -94,8 +113,7 @@ describe 'RoomManager', -> beforeEach -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) - .onFirstCall().returns(1) - .onSecondCall().returns(0) + .onCall(0).returns(0) @client.leave = sinon.stub() @RoomManager.leaveDoc @client, @doc_id @@ -111,8 +129,7 @@ describe 'RoomManager', -> beforeEach -> @RoomManager._clientsInRoom .withArgs(@client, @doc_id) - .onFirstCall().returns(123) - .onSecondCall().returns(122) + .onCall(0).returns(123) @client.leave = sinon.stub() @RoomManager.leaveDoc @client, @doc_id @@ -134,33 +151,36 @@ describe 'RoomManager', -> describe "when this is the only client connected", -> - beforeEach -> - # first and secondc calls are for the join, - # calls 2 and 3 are for the leave + beforeEach (done) -> + # first call is for the join, + # second for the leave @RoomManager._clientsInRoom .withArgs(@client, @doc_id) .onCall(0).returns(0) - .onSecondCall().returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) @RoomManager._clientsInRoom .withArgs(@client, @other_doc_id) .onCall(0).returns(0) - .onCall(1).returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) @RoomManager._clientsInRoom .withArgs(@client, @project_id) .onCall(0).returns(0) - .onCall(1).returns(1) - .onCall(2).returns(1) - .onCall(3).returns(0) + .onCall(1).returns(0) + @RoomEvents.on 'project-active', (id) => + setTimeout () => + @RoomEvents.emit "project-subscribed-#{id}" + , 100 + @RoomEvents.on 'doc-active', (id) => + setTimeout () => + @RoomEvents.emit "doc-subscribed-#{id}" + , 100 # put the client in the rooms - @RoomManager.joinProject(@client, @project_id) - @RoomManager.joinDoc(@client, @doc_id) - @RoomManager.joinDoc(@client, @other_doc_id) - # now leave the project - @RoomManager.leaveProjectAndDocs @client + @RoomManager.joinProject @client, @project_id, () => + @RoomManager.joinDoc @client, @doc_id, () => + @RoomManager.joinDoc @client, @other_doc_id, () => + # now leave the project + @RoomManager.leaveProjectAndDocs @client + done() it "should leave all the docs", -> @client.leave.calledWithExactly(@doc_id).should.equal true diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index ab442006c2..d0dad108e7 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -54,7 +54,7 @@ describe 'WebsocketController', -> @privilegeLevel = "owner" @ConnectedUsersManager.updateUserPosition = sinon.stub().callsArg(4) @WebApiManager.joinProject = sinon.stub().callsArgWith(2, null, @project, @privilegeLevel) - @RoomManager.joinProject = sinon.stub() + @RoomManager.joinProject = sinon.stub().callsArg(2) @WebsocketController.joinProject @client, @user, @project_id, @callback it "should load the project from web", -> @@ -237,7 +237,7 @@ describe 'WebsocketController', -> @AuthorizationManager.addAccessToDoc = sinon.stub() @AuthorizationManager.assertClientCanViewProject = sinon.stub().callsArgWith(1, null) @DocumentUpdaterManager.getDocument = sinon.stub().callsArgWith(3, null, @doc_lines, @version, @ranges, @ops) - @RoomManager.joinDoc = sinon.stub() + @RoomManager.joinDoc = sinon.stub().callsArg(2) describe "works", -> beforeEach -> From 61b3a000b406de1c3e77c786c0d5b70480641246 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 09:52:20 +0100 Subject: [PATCH 14/20] fix whitespace --- services/real-time/app/coffee/ChannelManager.coffee | 2 +- services/real-time/test/unit/coffee/RoomManagerTests.coffee | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 900fd764f5..8cec24156d 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -24,7 +24,7 @@ module.exports = ChannelManager = else # get the subscribe promise and return it, the actual subscribe # completes in the background - subscribePromise = rclient.subscribe channel + subscribePromise = rclient.subscribe channel existingChannelSet.set(channel, subscribePromise) logger.log {channel}, "subscribed to new channel" metrics.inc "subscribe.#{baseChannel}" diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee index f9fde4dd83..294365dd61 100644 --- a/services/real-time/test/unit/coffee/RoomManagerTests.coffee +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -16,7 +16,7 @@ describe 'RoomManager', -> @RoomManager._clientsInRoom = sinon.stub() @RoomEvents = @RoomManager.eventSource() sinon.spy(@RoomEvents, 'emit') - sinon.spy(@RoomEvents, 'once') + sinon.spy(@RoomEvents, 'once') describe "joinProject", -> @@ -152,7 +152,7 @@ describe 'RoomManager', -> describe "when this is the only client connected", -> beforeEach (done) -> - # first call is for the join, + # first call is for the join, # second for the leave @RoomManager._clientsInRoom .withArgs(@client, @doc_id) From cb53bfafd6038c28123bc3e43d1bfcb600b7b347 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 09:52:31 +0100 Subject: [PATCH 15/20] remove unnecessary require --- services/real-time/app/coffee/DocumentUpdaterController.coffee | 1 - 1 file changed, 1 deletion(-) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index e2d27fc343..6d6ec4b79d 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -7,7 +7,6 @@ HealthCheckManager = require "./HealthCheckManager" RoomManager = require "./RoomManager" ChannelManager = require "./ChannelManager" metrics = require "metrics-sharelatex" -util = require "util" MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 # 1Mb From e14a94906aca668af5593f0fe73b0682d6ab5bfd Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 14:17:19 +0100 Subject: [PATCH 16/20] update naming from Set -> Map --- .../app/coffee/ChannelManager.coffee | 24 +++++++++---------- .../unit/coffee/ChannelManagerTests.coffee | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 8cec24156d..1749ab2d58 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -2,7 +2,7 @@ logger = require 'logger-sharelatex' metrics = require "metrics-sharelatex" settings = require "settings-sharelatex" -ClientMap = new Map() # for each redis client, stores a Map of subscribed channels (channelname -> subscribe promise) +ClientMap = new Map() # for each redis client, store a Map of subscribed channels (channelname -> subscribe promise) # Manage redis pubsub subscriptions for individual projects and docs, ensuring # that we never subscribe to a channel multiple times. The socket.io side is @@ -10,34 +10,34 @@ ClientMap = new Map() # for each redis client, stores a Map of subscribed channe module.exports = ChannelManager = getClientMapEntry: (rclient) -> - # return the rclient channel set if it exists, otherwise create and - # return an empty set for the client. + # return the per-client channel map if it exists, otherwise create and + # return an empty map for the client. ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient) subscribe: (rclient, baseChannel, id) -> - existingChannelSet = @getClientMapEntry(rclient) + clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" - if existingChannelSet.has(channel) - logger.error {channel}, "already subscribed - shouldn't happen" - # return the subscribe promise, so we can wait for it to resolve - return existingChannelSet.get(channel) + if clientChannelMap.has(channel) + logger.warn {channel}, "subscribe already actioned" + # return the existing subscribe promise, so we can wait for it to resolve + return clientChannelMap.get(channel) else # get the subscribe promise and return it, the actual subscribe # completes in the background subscribePromise = rclient.subscribe channel - existingChannelSet.set(channel, subscribePromise) + clientChannelMap.set(channel, subscribePromise) logger.log {channel}, "subscribed to new channel" metrics.inc "subscribe.#{baseChannel}" return subscribePromise unsubscribe: (rclient, baseChannel, id) -> - existingChannelSet = @getClientMapEntry(rclient) + clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" - if !existingChannelSet.has(channel) + if !clientChannelMap.has(channel) logger.error {channel}, "not subscribed - shouldn't happen" else rclient.unsubscribe channel # completes in the background - existingChannelSet.delete(channel) + clientChannelMap.delete(channel) logger.log {channel}, "unsubscribed from channel" metrics.inc "unsubscribe.#{baseChannel}" diff --git a/services/real-time/test/unit/coffee/ChannelManagerTests.coffee b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee index 4ed852ddcf..e550e963d4 100644 --- a/services/real-time/test/unit/coffee/ChannelManagerTests.coffee +++ b/services/real-time/test/unit/coffee/ChannelManagerTests.coffee @@ -11,7 +11,7 @@ describe 'ChannelManager', -> @ChannelManager = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = {} "metrics-sharelatex": @metrics = {inc: sinon.stub()} - "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + "logger-sharelatex": @logger = { log: sinon.stub(), warn: sinon.stub(), error: sinon.stub() } describe "subscribe", -> From 273af3f3aad574b17e9602ba5efc6077c0bf21f6 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 14:30:48 +0100 Subject: [PATCH 17/20] refactor subscribe resolution --- .../real-time/app/coffee/DocumentUpdaterController.coffee | 5 +---- services/real-time/app/coffee/RoomManager.coffee | 5 +++++ services/real-time/app/coffee/WebsocketLoadBalancer.coffee | 5 +---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee index 6d6ec4b79d..2611d484ad 100644 --- a/services/real-time/app/coffee/DocumentUpdaterController.coffee +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -36,10 +36,7 @@ module.exports = DocumentUpdaterController = roomEvents.on 'doc-active', (doc_id) -> subscribePromises = for rclient in rclientSubList ChannelManager.subscribe rclient, "applied-ops", doc_id - subscribeResult = Promise.all(subscribePromises) - emitResult = (err) => this.emit("doc-subscribed-#{doc_id}", err) - subscribeResult.then () -> emitResult() - subscribeResult.catch (err) -> emitResult(err) + RoomManager.emitOnCompletion(subscribePromises, "doc-subscribed-#{doc_id}") roomEvents.on 'doc-empty', (doc_id) -> for rclient in rclientSubList ChannelManager.unsubscribe rclient, "applied-ops", doc_id diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index 08e65b5e52..152a290020 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -35,6 +35,11 @@ module.exports = RoomManager = entity = IdMap.get(id) @leaveEntity client, entity, id + emitOnCompletion: (promiseList, eventName) -> + result = Promise.all(promiseList) + result.then () -> RoomEvents.emit(eventName) + result.catch (err) -> RoomEvents.emit(eventName, err) + eventSource: () -> return RoomEvents diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee index e8ff88aa7b..865249c63e 100644 --- a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -42,10 +42,7 @@ module.exports = WebsocketLoadBalancer = roomEvents.on 'project-active', (project_id) -> subscribePromises = for rclient in rclientSubList ChannelManager.subscribe rclient, "editor-events", project_id - subscribeResult = Promise.all(subscribePromises) - emitResult = (err) => this.emit("project-subscribed-#{project_id}", err) - subscribeResult.then () -> emitResult() - subscribeResult.catch (err) -> emitResult(err) + RoomManager.emitOnCompletion(subscribePromises, "project-subscribed-#{project_id}") roomEvents.on 'project-empty', (project_id) -> for rclient in rclientSubList ChannelManager.unsubscribe rclient, "editor-events", project_id From 1c74cbbc4ef21d5531d2486a2a57829a7e3546d6 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 15:41:25 +0100 Subject: [PATCH 18/20] add comments --- services/real-time/app/coffee/ChannelManager.coffee | 6 ++++++ services/real-time/app/coffee/RoomManager.coffee | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 1749ab2d58..3ea5c2e71e 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -17,6 +17,9 @@ module.exports = ChannelManager = subscribe: (rclient, baseChannel, id) -> clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" + # we track pending subscribes because we want to be sure that the + # channel is active before letting the client join the doc or project, + # so that events are not lost. if clientChannelMap.has(channel) logger.warn {channel}, "subscribe already actioned" # return the existing subscribe promise, so we can wait for it to resolve @@ -33,6 +36,9 @@ module.exports = ChannelManager = unsubscribe: (rclient, baseChannel, id) -> clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" + # we don't need to track pending unsubscribes, because we there is no + # harm if events continue to arrive on the channel while the unsubscribe + # command in pending. if !clientChannelMap.has(channel) logger.error {channel}, "not subscribed - shouldn't happen" else diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index 152a290020..9f42083198 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -2,7 +2,7 @@ logger = require 'logger-sharelatex' {EventEmitter} = require 'events' IdMap = new Map() # keep track of whether ids are from projects or docs -RoomEvents = new EventEmitter() +RoomEvents = new EventEmitter() # emits {project,doc}-active and {project,doc}-empty events # Manage socket.io rooms for individual projects and docs # @@ -49,6 +49,7 @@ module.exports = RoomManager = if beforeCount == 0 logger.log {entity, id}, "room is now active" RoomEvents.once "#{entity}-subscribed-#{id}", (err) -> + # only allow the client to join when all the relevant channels have subscribed logger.log {client: client.id, entity, id, beforeCount}, "client joined room after subscribing channel" client.join id callback(err) From 277ec71a5b7db19259dee339897c07fdb6f7beec Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 15:43:48 +0100 Subject: [PATCH 19/20] subscribe to doc updates before requesting doc content --- .../app/coffee/WebsocketController.coffee | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 22ea7e7a0c..fce505c1bc 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -92,33 +92,36 @@ module.exports = WebsocketController = AuthorizationManager.assertClientCanViewProject client, (error) -> return callback(error) if error? - DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) -> + # ensure the per-doc applied-ops channel is subscribed before sending the + # doc to the client, so that no events are missed. + RoomManager.joinDoc client, doc_id, (error) -> return callback(error) if error? + DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) -> + return callback(error) if error? - # Encode any binary bits of data so it can go via WebSockets - # See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html - encodeForWebsockets = (text) -> unescape(encodeURIComponent(text)) - escapedLines = [] - for line in lines - try - line = encodeForWebsockets(line) - catch err - logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component" - return callback(err) - escapedLines.push line - if options.encodeRanges - try - for comment in ranges?.comments or [] - comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c? - for change in ranges?.changes or [] - change.op.i = encodeForWebsockets(change.op.i) if change.op.i? - change.op.d = encodeForWebsockets(change.op.d) if change.op.d? - catch err - logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component" - return callback(err) + # Encode any binary bits of data so it can go via WebSockets + # See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html + encodeForWebsockets = (text) -> unescape(encodeURIComponent(text)) + escapedLines = [] + for line in lines + try + line = encodeForWebsockets(line) + catch err + logger.err {err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component" + return callback(err) + escapedLines.push line + if options.encodeRanges + try + for comment in ranges?.comments or [] + comment.op.c = encodeForWebsockets(comment.op.c) if comment.op.c? + for change in ranges?.changes or [] + change.op.i = encodeForWebsockets(change.op.i) if change.op.i? + change.op.d = encodeForWebsockets(change.op.d) if change.op.d? + catch err + logger.err {err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component" + return callback(err) - AuthorizationManager.addAccessToDoc client, doc_id - RoomManager.joinDoc client, doc_id, (err) -> + AuthorizationManager.addAccessToDoc client, doc_id logger.log {user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc" callback null, escapedLines, version, ops, ranges From 22d722f3e8afca3ebdf773d9d57a1ef83f1ac706 Mon Sep 17 00:00:00 2001 From: Brian Gough Date: Wed, 24 Jul 2019 16:25:45 +0100 Subject: [PATCH 20/20] add metric for RoomEvents listeners --- services/real-time/app/coffee/RoomManager.coffee | 4 ++++ services/real-time/test/unit/coffee/RoomManagerTests.coffee | 1 + 2 files changed, 5 insertions(+) diff --git a/services/real-time/app/coffee/RoomManager.coffee b/services/real-time/app/coffee/RoomManager.coffee index 9f42083198..adf472e26c 100644 --- a/services/real-time/app/coffee/RoomManager.coffee +++ b/services/real-time/app/coffee/RoomManager.coffee @@ -1,4 +1,5 @@ logger = require 'logger-sharelatex' +metrics = require "metrics-sharelatex" {EventEmitter} = require 'events' IdMap = new Map() # keep track of whether ids are from projects or docs @@ -55,6 +56,8 @@ module.exports = RoomManager = callback(err) RoomEvents.emit "#{entity}-active", id IdMap.set(id, entity) + # keep track of the number of listeners + metrics.gauge "room-listeners", RoomEvents.eventNames().length else logger.log {client: client.id, entity, id, beforeCount}, "client joined existing room" client.join id @@ -72,6 +75,7 @@ module.exports = RoomManager = logger.log {entity, id}, "room is now empty" RoomEvents.emit "#{entity}-empty", id IdMap.delete(id) + metrics.gauge "room-listeners", RoomEvents.eventNames().length # internal functions below, these access socket.io rooms data directly and # will need updating for socket.io v2 diff --git a/services/real-time/test/unit/coffee/RoomManagerTests.coffee b/services/real-time/test/unit/coffee/RoomManagerTests.coffee index 294365dd61..ee46a3ef04 100644 --- a/services/real-time/test/unit/coffee/RoomManagerTests.coffee +++ b/services/real-time/test/unit/coffee/RoomManagerTests.coffee @@ -13,6 +13,7 @@ describe 'RoomManager', -> @RoomManager = SandboxedModule.require modulePath, requires: "settings-sharelatex": @settings = {} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } + "metrics-sharelatex": @metrics = { gauge: sinon.stub() } @RoomManager._clientsInRoom = sinon.stub() @RoomEvents = @RoomManager.eventSource() sinon.spy(@RoomEvents, 'emit')