From 347ceaaf03728b3c8e5584eb99697e1e75e1183f Mon Sep 17 00:00:00 2001 From: James Allen Date: Fri, 14 Nov 2014 15:30:18 +0000 Subject: [PATCH] Listen for updates from doc updater and send them to clients --- services/real-time/app.coffee | 3 + .../coffee/DocumentUpdaterController.coffee | 39 +++++++ .../coffee/ReceiveUpdateTests.coffee | 101 +++++++++++++++++ .../DocumentUpdaterControllerTests.coffee | 103 ++++++++++++++++++ .../coffee/WebsocketControllerTests.coffee | 6 +- .../unit/coffee/helpers/MockClient.coffee | 17 +++ 6 files changed, 267 insertions(+), 2 deletions(-) create mode 100644 services/real-time/app/coffee/DocumentUpdaterController.coffee create mode 100644 services/real-time/test/acceptance/coffee/ReceiveUpdateTests.coffee create mode 100644 services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee create mode 100644 services/real-time/test/unit/coffee/helpers/MockClient.coffee diff --git a/services/real-time/app.coffee b/services/real-time/app.coffee index 016aa73b5b..b6d8c2e23c 100644 --- a/services/real-time/app.coffee +++ b/services/real-time/app.coffee @@ -44,6 +44,9 @@ Router.configure(app, io, sessionSockets) WebsocketLoadBalancer = require "./app/js/WebsocketLoadBalancer" WebsocketLoadBalancer.listenForEditorEvents(io) + +DocumentUpdaterController = require "./app/js/DocumentUpdaterController" +DocumentUpdaterController.listenForUpdatesFromDocumentUpdater(io) port = Settings.internal.realTime.port host = Settings.internal.realTime.host diff --git a/services/real-time/app/coffee/DocumentUpdaterController.coffee b/services/real-time/app/coffee/DocumentUpdaterController.coffee new file mode 100644 index 0000000000..3de0becfc9 --- /dev/null +++ b/services/real-time/app/coffee/DocumentUpdaterController.coffee @@ -0,0 +1,39 @@ +logger = require "logger-sharelatex" +settings = require 'settings-sharelatex' +redis = require("redis-sharelatex") +rclient = redis.createClient(settings.redis.web) + +module.exports = DocumentUpdaterController = + # DocumentUpdaterController is responsible for updates that come via Redis + # Pub/Sub from the document updater. + + listenForUpdatesFromDocumentUpdater: (io) -> + rclient.subscribe "applied-ops" + rclient.on "message", (channel, message) -> + DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message) + + _processMessageFromDocumentUpdater: (io, channel, message) -> + message = JSON.parse message + if message.op? + DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op) + else if message.error? + DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message) + + _applyUpdateFromDocumentUpdater: (io, doc_id, update) -> + for client in io.sockets.clients(doc_id) + if client.id == update.meta.source + logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, "distributing update to sender" + client.emit "otUpdateApplied", v: update.v, doc: update.doc + else + logger.log doc_id: doc_id, version: update.v, source: update.meta?.source, client_id: client.id, "distributing update to collaborator" + client.emit "otUpdateApplied", update + + _processErrorFromDocumentUpdater: (io, doc_id, error, message) -> + logger.error err: error, doc_id: doc_id, "error from document updater" + for client in io.sockets.clients(doc_id) + client.emit "otUpdateError", error, message + client.disconnect() + + + + diff --git a/services/real-time/test/acceptance/coffee/ReceiveUpdateTests.coffee b/services/real-time/test/acceptance/coffee/ReceiveUpdateTests.coffee new file mode 100644 index 0000000000..9c95567d93 --- /dev/null +++ b/services/real-time/test/acceptance/coffee/ReceiveUpdateTests.coffee @@ -0,0 +1,101 @@ +chai = require("chai") +expect = chai.expect +chai.should() + +RealTimeClient = require "./helpers/RealTimeClient" +MockWebServer = require "./helpers/MockWebServer" +FixturesManager = require "./helpers/FixturesManager" + +async = require "async" + +settings = require "settings-sharelatex" +redis = require "redis-sharelatex" +rclient = redis.createClient(settings.redis.web) + +describe "receiveUpdate", -> + before (done) -> + @lines = ["test", "doc", "lines"] + @version = 42 + @ops = ["mock", "doc", "ops"] + + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { name: "Test Project" } + }, (error, {@user_id, @project_id}) => cb() + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connect", cb + + (cb) => + @clientB = RealTimeClient.connect() + @clientB.on "connect", cb + + (cb) => + @clientA.emit "joinProject", { + project_id: @project_id + }, cb + + (cb) => + @clientA.emit "joinDoc", @doc_id, cb + + (cb) => + @clientB.emit "joinProject", { + project_id: @project_id + }, cb + + (cb) => + @clientB.emit "joinDoc", @doc_id, cb + ], done + + describe "with an update from clientA", -> + before (done) -> + @clientAUpdates = [] + @clientA.on "otUpdateApplied", (update) => @clientAUpdates.push(update) + @clientBUpdates = [] + @clientB.on "otUpdateApplied", (update) => @clientBUpdates.push(update) + + @update = { + doc_id: @doc_id + op: + meta: + source: @clientA.socket.sessionid + v: @version + doc: @doc_id + op: [{i: "foo", p: 50}] + } + rclient.publish "applied-ops", JSON.stringify(@update) + setTimeout done, 200 # Give clients time to get message + + it "should send the full op to clientB", -> + @clientBUpdates.should.deep.equal [@update.op] + + it "should send an ack to clientA", -> + @clientAUpdates.should.deep.equal [{ + v: @version, doc: @doc_id + }] + + describe "with an error", -> + + before (done) -> + @clientAErrors = [] + @clientA.on "otUpdateError", (error) => @clientAErrors.push(error) + @clientBErrors = [] + @clientB.on "otUpdateError", (error) => @clientBErrors.push(error) + + rclient.publish "applied-ops", JSON.stringify({doc_id: @doc_id, error: @error = "something went wrong"}) + setTimeout done, 200 # Give clients time to get message + + it "should send the error to both clients", -> + @clientAErrors.should.deep.equal [@error] + @clientBErrors.should.deep.equal [@error] + + it "should disconnect the clients", -> + @clientA.socket.connected.should.equal false + @clientB.socket.connected.should.equal false \ No newline at end of file diff --git a/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee new file mode 100644 index 0000000000..479231743c --- /dev/null +++ b/services/real-time/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -0,0 +1,103 @@ +SandboxedModule = require('sandboxed-module') +sinon = require('sinon') +require('chai').should() +modulePath = require('path').join __dirname, '../../../app/js/DocumentUpdaterController' +MockClient = require "./helpers/MockClient" + +describe "DocumentUpdaterController", -> + beforeEach -> + @project_id = "project-id-123" + @doc_id = "doc-id-123" + @callback = sinon.stub() + @io = { "mock": "socket.io" } + @EditorUpdatesController = SandboxedModule.require modulePath, requires: + "logger-sharelatex": @logger = { error: sinon.stub(), log: sinon.stub() } + "settings-sharelatex": @settings = + redis: web: {} + "redis-sharelatex" : + createClient: ()=> + @rclient = {auth:->} + + describe "listenForUpdatesFromDocumentUpdater", -> + beforeEach -> + @rclient.subscribe = sinon.stub() + @rclient.on = sinon.stub() + @EditorUpdatesController.listenForUpdatesFromDocumentUpdater() + + it "should subscribe to the doc-updater stream", -> + @rclient.subscribe.calledWith("applied-ops").should.equal true + + it "should register a callback to handle updates", -> + @rclient.on.calledWith("message").should.equal true + + describe "_processMessageFromDocumentUpdater", -> + describe "with update", -> + beforeEach -> + @message = + doc_id: @doc_id + op: {t: "foo", p: 12} + @EditorUpdatesController._applyUpdateFromDocumentUpdater = sinon.stub() + @EditorUpdatesController._processMessageFromDocumentUpdater @io, "applied-ops", JSON.stringify(@message) + + it "should apply the update", -> + @EditorUpdatesController._applyUpdateFromDocumentUpdater + .calledWith(@io, @doc_id, @message.op) + .should.equal true + + describe "with error", -> + beforeEach -> + @message = + doc_id: @doc_id + error: "Something went wrong" + @EditorUpdatesController._processErrorFromDocumentUpdater = sinon.stub() + @EditorUpdatesController._processMessageFromDocumentUpdater @io, "applied-ops", JSON.stringify(@message) + + it "should process the error", -> + @EditorUpdatesController._processErrorFromDocumentUpdater + .calledWith(@io, @doc_id, @message.error) + .should.equal true + + describe "_applyUpdateFromDocumentUpdater", -> + beforeEach -> + @sourceClient = new MockClient() + @otherClients = [new MockClient(), new MockClient()] + @update = + op: [ t: "foo", p: 12 ] + meta: source: @sourceClient.id + v: @version = 42 + doc: @doc_id + @io.sockets = + clients: sinon.stub().returns([@sourceClient, @otherClients...]) + @EditorUpdatesController._applyUpdateFromDocumentUpdater @io, @doc_id, @update + + it "should send a version bump to the source client", -> + @sourceClient.emit + .calledWith("otUpdateApplied", v: @version, doc: @doc_id) + .should.equal true + + it "should get the clients connected to the document", -> + @io.sockets.clients + .calledWith(@doc_id) + .should.equal true + + it "should send the full update to the other clients", -> + for client in @otherClients + client.emit + .calledWith("otUpdateApplied", @update) + .should.equal true + + describe "_processErrorFromDocumentUpdater", -> + beforeEach -> + @clients = [new MockClient(), new MockClient()] + @io.sockets = + clients: sinon.stub().returns(@clients) + @EditorUpdatesController._processErrorFromDocumentUpdater @io, @doc_id, "Something went wrong" + + it "should log out an error", -> + @logger.error.called.should.equal true + + it "should disconnect all clients in that document", -> + @io.sockets.clients.calledWith(@doc_id).should.equal true + for client in @clients + client.disconnect.called.should.equal true + diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index 8ea62f466b..db7832cf05 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -345,8 +345,10 @@ describe 'WebsocketController', -> @AuthorizationManager.assertClientCanEditProject = sinon.stub().callsArgWith(1, @error = new Error("not authorized")) @WebsocketController.applyOtUpdate @client, @doc_id, @update, @callback - it "should disconnect the client", -> - @client.disconnect.called.should.equal true + # This happens in a setTimeout to allow the client a chance to receive the error first. + # I'm not sure how to unit test, but it is acceptance tested. + # it "should disconnect the client", -> + # @client.disconnect.called.should.equal true it "should log an error", -> @logger.error.called.should.equal true diff --git a/services/real-time/test/unit/coffee/helpers/MockClient.coffee b/services/real-time/test/unit/coffee/helpers/MockClient.coffee new file mode 100644 index 0000000000..82c3c02b19 --- /dev/null +++ b/services/real-time/test/unit/coffee/helpers/MockClient.coffee @@ -0,0 +1,17 @@ +sinon = require('sinon') + +idCounter = 0 + +module.exports = class MockClient + constructor: () -> + @attributes = {} + @join = sinon.stub() + @emit = sinon.stub() + @disconnect = sinon.stub() + @id = idCounter++ + set : (key, value, callback) -> + @attributes[key] = value + callback() if callback? + get : (key, callback) -> + callback null, @attributes[key] + disconnect: () ->