diff --git a/services/real-time/app.coffee b/services/real-time/app.coffee index 15097f51b8..016aa73b5b 100644 --- a/services/real-time/app.coffee +++ b/services/real-time/app.coffee @@ -41,6 +41,9 @@ io.configure -> Router = require "./app/js/Router" Router.configure(app, io, sessionSockets) + +WebsocketLoadBalancer = require "./app/js/WebsocketLoadBalancer" +WebsocketLoadBalancer.listenForEditorEvents(io) port = Settings.internal.realTime.port host = Settings.internal.realTime.host diff --git a/services/real-time/app/coffee/ConnectedUsersManager.coffee b/services/real-time/app/coffee/ConnectedUsersManager.coffee index 5a77660f55..fa7e366ad8 100644 --- a/services/real-time/app/coffee/ConnectedUsersManager.coffee +++ b/services/real-time/app/coffee/ConnectedUsersManager.coffee @@ -61,7 +61,6 @@ module.exports = else result.connected = true result.client_id = client_id - console.log "RESULT", result if result.cursorData? try result.cursorData = JSON.parse(result.cursorData) diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 1e7860daf0..a115023ea5 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -3,6 +3,7 @@ WebApiManager = require "./WebApiManager" AuthorizationManager = require "./AuthorizationManager" DocumentUpdaterManager = require "./DocumentUpdaterManager" ConnectedUsersManager = require "./ConnectedUsersManager" +WebsocketLoadBalancer = require "./WebsocketLoadBalancer" Utils = require "./Utils" module.exports = WebsocketController = @@ -94,8 +95,7 @@ module.exports = WebsocketController = else cursorData.name = "Anonymous" callback() - #EditorRealTimeController.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData) - #callback() + WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData) getConnectedUsers: (client, callback = (error, users) ->) -> Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> diff --git a/services/real-time/app/coffee/WebsocketLoadBalancer.coffee b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee new file mode 100644 index 0000000000..966955f117 --- /dev/null +++ b/services/real-time/app/coffee/WebsocketLoadBalancer.coffee @@ -0,0 +1,30 @@ +Settings = require 'settings-sharelatex' +redis = require("redis-sharelatex") +rclientPub = redis.createClient(Settings.redis.web) +rclientSub = redis.createClient(Settings.redis.web) + +module.exports = WebsocketLoadBalancer = + rclientPub: rclientPub + rclientSub: rclientSub + + emitToRoom: (room_id, message, payload...) -> + @rclientPub.publish "editor-events", JSON.stringify + room_id: room_id + message: message + payload: payload + + emitToAll: (message, payload...) -> + @emitToRoom "all", message, payload... + + listenForEditorEvents: (io) -> + @rclientSub.subscribe "editor-events" + @rclientSub.on "message", (channel, message) -> + WebsocketLoadBalancer._processEditorEvent io, channel, message + + _processEditorEvent: (io, channel, message) -> + message = JSON.parse(message) + if message.room_id == "all" + io.sockets.emit(message.message, message.payload...) + else + io.sockets.in(message.room_id).emit(message.message, message.payload...) + diff --git a/services/real-time/test/acceptance/coffee/ClientTrackingTests.coffee b/services/real-time/test/acceptance/coffee/ClientTrackingTests.coffee index 391030a1c5..f83659c5f7 100644 --- a/services/real-time/test/acceptance/coffee/ClientTrackingTests.coffee +++ b/services/real-time/test/acceptance/coffee/ClientTrackingTests.coffee @@ -31,7 +31,7 @@ describe "clientTracking", -> describe "when a client updates its cursor location", -> before (done) -> @updates = [] - @clientB.on "clientTracking.clientUpdated", (data) -> + @clientB.on "clientTracking.clientUpdated", (data) => @updates.push data @clientA.emit "clientTracking.updatePosition", { @@ -42,7 +42,17 @@ describe "clientTracking", -> throw error if error? done() - it "should tell other clients about the update" + it "should tell other clients about the update", -> + @updates.should.deep.equal [ + { + row: @row + column: @column + doc_id: @doc_id + id: @clientA.socket.sessionid + user_id: @user_id + name: "Joe Bloggs" + } + ] it "should record the update in getConnectedUsers", (done) -> @clientB.emit "clientTracking.getConnectedUsers", (error, users) => diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index 8aedca2366..e0be68fc50 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -29,6 +29,7 @@ describe 'WebsocketController', -> "./AuthorizationManager": @AuthorizationManager = {} "./DocumentUpdaterManager": @DocumentUpdaterManager = {} "./ConnectedUsersManager": @ConnectedUsersManager = {} + "./WebsocketLoadBalancer": @WebsocketLoadBalancer = {} "logger-sharelatex": @logger = { log: sinon.stub(), error: sinon.stub() } afterEach -> @@ -219,7 +220,7 @@ describe 'WebsocketController', -> describe "updateClientPosition", -> beforeEach -> - #@EditorRealTimeController.emitToRoom = sinon.stub() + @WebsocketLoadBalancer.emitToRoom = sinon.stub() @ConnectedUsersManager.updateUserPosition = sinon.stub().callsArgWith(4) @update = { doc_id: @doc_id = "doc-id-123" @@ -248,8 +249,8 @@ describe 'WebsocketController', -> email: @email user_id: @user_id - # it "should send the update to the project room with the user's name", -> - # @EditorRealTimeController.emitToRoom.calledWith(@project_id, "clientTracking.clientUpdated", @populatedCursorData).should.equal true + it "should send the update to the project room with the user's name", -> + @WebsocketLoadBalancer.emitToRoom.calledWith(@project_id, "clientTracking.clientUpdated", @populatedCursorData).should.equal true it "should send the cursor data to the connected user manager", (done)-> @ConnectedUsersManager.updateUserPosition.calledWith(@project_id, @client.id, { @@ -272,16 +273,16 @@ describe 'WebsocketController', -> @client.get = (param, callback) => callback null, @clientParams[param] @WebsocketController.updateClientPosition @client, @update - # it "should send the update to the project room with an anonymous name", -> - # @EditorRealTimeController.emitToRoom - # .calledWith(@project_id, "clientTracking.clientUpdated", { - # doc_id: @doc_id, - # id: @client.id - # name: "Anonymous" - # row: @row - # column: @column - # }) - # .should.equal true + it "should send the update to the project room with an anonymous name", -> + @WebsocketLoadBalancer.emitToRoom + .calledWith(@project_id, "clientTracking.clientUpdated", { + doc_id: @doc_id, + id: @client.id + name: "Anonymous" + row: @row + column: @column + }) + .should.equal true it "should not send cursor data to the connected user manager", (done)-> @ConnectedUsersManager.updateUserPosition.called.should.equal false diff --git a/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee new file mode 100644 index 0000000000..547d0aff58 --- /dev/null +++ b/services/real-time/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -0,0 +1,89 @@ +SandboxedModule = require('sandboxed-module') +sinon = require('sinon') +require('chai').should() +modulePath = require('path').join __dirname, '../../../app/js/WebsocketLoadBalancer' + +describe "WebsocketLoadBalancer", -> + beforeEach -> + @WebsocketLoadBalancer = SandboxedModule.require modulePath, requires: + "redis-sharelatex": + createClient: () -> + auth:-> + @io = {} + @WebsocketLoadBalancer.rclientPub = publish: sinon.stub() + @WebsocketLoadBalancer.rclientSub = + subscribe: sinon.stub() + on: sinon.stub() + + @room_id = "room-id" + @message = "message-to-editor" + @payload = ["argument one", 42] + + describe "emitToRoom", -> + beforeEach -> + @WebsocketLoadBalancer.emitToRoom(@room_id, @message, @payload...) + + it "should publish the message to redis", -> + @WebsocketLoadBalancer.rclientPub.publish + .calledWith("editor-events", JSON.stringify( + room_id: @room_id, + message: @message + payload: @payload + )) + .should.equal true + + describe "emitToAll", -> + beforeEach -> + @WebsocketLoadBalancer.emitToRoom = sinon.stub() + @WebsocketLoadBalancer.emitToAll @message, @payload... + + it "should emit to the room 'all'", -> + @WebsocketLoadBalancer.emitToRoom + .calledWith("all", @message, @payload...) + .should.equal true + + describe "listenForEditorEvents", -> + beforeEach -> + @WebsocketLoadBalancer._processEditorEvent = sinon.stub() + @WebsocketLoadBalancer.listenForEditorEvents() + + it "should subscribe to the editor-events channel", -> + @WebsocketLoadBalancer.rclientSub.subscribe + .calledWith("editor-events") + .should.equal true + + it "should process the events with _processEditorEvent", -> + @WebsocketLoadBalancer.rclientSub.on + .calledWith("message", sinon.match.func) + .should.equal true + + describe "_processEditorEvent", -> + describe "with a designated room", -> + beforeEach -> + @io.sockets = + in: sinon.stub().returns(emit: @emit = sinon.stub()) + data = JSON.stringify + room_id: @room_id + message: @message + payload: @payload + @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) + + it "should send the message to all clients in the room", -> + @io.sockets.in + .calledWith(@room_id) + .should.equal true + @emit.calledWith(@message, @payload...).should.equal true + + describe "when emitting to all", -> + beforeEach -> + @io.sockets = + emit: @emit = sinon.stub() + data = JSON.stringify + room_id: "all" + message: @message + payload: @payload + @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) + + it "should send the message to all clients", -> + @emit.calledWith(@message, @payload...).should.equal true +