From 7fa906101598e2d53a608bc60adeda691a7e82a4 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 28 Apr 2020 17:03:38 +0100 Subject: [PATCH] [misc] stop processing requests as we detect a client disconnect v2 expose `client.connected`; v0 exposes `client.disconnected` (cherry-picked from commit a9d70484343ca9be367d45bf7bb949e4be449647) --- .../app/coffee/WebsocketController.coffee | 33 ++++ .../acceptance/coffee/EarlyDisconnect.coffee | 160 ++++++++++++++++++ .../coffee/WebsocketControllerTests.coffee | 127 +++++++++++++- 3 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 services/real-time/test/acceptance/coffee/EarlyDisconnect.coffee diff --git a/services/real-time/app/coffee/WebsocketController.coffee b/services/real-time/app/coffee/WebsocketController.coffee index 17d27b15b6..bd83b0c19a 100644 --- a/services/real-time/app/coffee/WebsocketController.coffee +++ b/services/real-time/app/coffee/WebsocketController.coffee @@ -16,11 +16,18 @@ module.exports = WebsocketController = PROTOCOL_VERSION: 2 joinProject: (client, user, project_id, callback = (error, project, privilegeLevel, protocolVersion) ->) -> + if client.disconnected + metrics.inc('disconnected_join_project') + return callback() + user_id = user?._id logger.log {user_id, project_id, client_id: client.id}, "user joining project" metrics.inc "editor.join-project" WebApiManager.joinProject project_id, user, (error, project, privilegeLevel, isRestrictedUser) -> return callback(error) if error? + if client.disconnected + metrics.inc('disconnected_join_project') + return callback() if !privilegeLevel or privilegeLevel == "" err = new Error("not authorized") @@ -77,6 +84,10 @@ module.exports = WebsocketController = , WebsocketController.FLUSH_IF_EMPTY_DELAY joinDoc: (client, doc_id, fromVersion = -1, options, callback = (error, doclines, version, ops, ranges) ->) -> + if client.disconnected + metrics.inc('disconnected_join_doc') + return callback() + metrics.inc "editor.join-doc" Utils.getClientAttributes client, ["project_id", "user_id", "is_restricted_user"], (error, {project_id, user_id, is_restricted_user}) -> return callback(error) if error? @@ -89,8 +100,17 @@ module.exports = WebsocketController = # doc to the client, so that no events are missed. RoomManager.joinDoc client, doc_id, (error) -> return callback(error) if error? + if client.disconnected + metrics.inc('disconnected_join_doc') + # the client will not read the response anyways + return callback() + DocumentUpdaterManager.getDocument project_id, doc_id, fromVersion, (error, lines, version, ranges, ops) -> return callback(error) if error? + if client.disconnected + metrics.inc('disconnected_join_doc') + # the client will not read the response anyways + return callback() if is_restricted_user and ranges?.comments? ranges.comments = [] @@ -122,6 +142,7 @@ module.exports = WebsocketController = callback null, escapedLines, version, ops, ranges leaveDoc: (client, doc_id, callback = (error) ->) -> + # client may have disconnected, but we have to cleanup internal state. metrics.inc "editor.leave-doc" Utils.getClientAttributes client, ["project_id", "user_id"], (error, {project_id, user_id}) -> logger.log {user_id, project_id, doc_id, client_id: client.id}, "client leaving doc" @@ -132,6 +153,10 @@ module.exports = WebsocketController = ## AuthorizationManager.removeAccessToDoc client, doc_id callback() updateClientPosition: (client, cursorData, callback = (error) ->) -> + if client.disconnected + # do not create a ghost entry in redis + return callback() + metrics.inc "editor.update-client-position", 0.1 Utils.getClientAttributes client, [ "project_id", "first_name", "last_name", "email", "user_id" @@ -173,6 +198,10 @@ module.exports = WebsocketController = CLIENT_REFRESH_DELAY: 1000 getConnectedUsers: (client, callback = (error, users) ->) -> + if client.disconnected + # they are not interested anymore, skip the redis lookups + return callback() + metrics.inc "editor.get-connected-users" Utils.getClientAttributes client, ["project_id", "user_id", "is_restricted_user"], (error, clientAttributes) -> return callback(error) if error? @@ -192,6 +221,7 @@ module.exports = WebsocketController = , WebsocketController.CLIENT_REFRESH_DELAY applyOtUpdate: (client, doc_id, update, callback = (error) ->) -> + # client may have disconnected, but we can submit their update to doc-updater anyways. Utils.getClientAttributes client, ["user_id", "project_id"], (error, {user_id, project_id}) -> return callback(error) if error? return callback(new Error("no project_id found on client")) if !project_id? @@ -223,6 +253,9 @@ module.exports = WebsocketController = # trigger an out-of-sync error message = {project_id, doc_id, error: "update is too large"} setTimeout () -> + if client.disconnected + # skip the message broadcast, the client has moved on + return metrics.inc('disconnected_otUpdateError') client.emit "otUpdateError", message.error, message client.disconnect() , 100 diff --git a/services/real-time/test/acceptance/coffee/EarlyDisconnect.coffee b/services/real-time/test/acceptance/coffee/EarlyDisconnect.coffee new file mode 100644 index 0000000000..d90c36b430 --- /dev/null +++ b/services/real-time/test/acceptance/coffee/EarlyDisconnect.coffee @@ -0,0 +1,160 @@ +async = require "async" +{expect} = require("chai") + +RealTimeClient = require "./helpers/RealTimeClient" +MockDocUpdaterServer = require "./helpers/MockDocUpdaterServer" +MockWebServer = require "./helpers/MockWebServer" +FixturesManager = require "./helpers/FixturesManager" + +settings = require "settings-sharelatex" +redis = require "redis-sharelatex" +rclient = redis.createClient(settings.redis.pubsub) +rclientRT = redis.createClient(settings.redis.realtime) +KeysRT = settings.redis.realtime.key_schema + +describe "EarlyDisconnect", -> + before (done) -> + MockDocUpdaterServer.run done + + describe "when the client disconnects before joinProject completes", -> + before () -> + # slow down web-api requests to force the race condition + @actualWebAPIjoinProject = joinProject = MockWebServer.joinProject + MockWebServer.joinProject = (project_id, user_id, cb) -> + setTimeout () -> + joinProject(project_id, user_id, cb) + , 300 + + after () -> + MockWebServer.joinProject = @actualWebAPIjoinProject + + beforeEach (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connectionAccepted", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (() ->) + # disconnect before joinProject completes + @clientA.on "disconnect", () -> cb() + @clientA.disconnect() + + (cb) => + # wait for joinDoc and subscribe + setTimeout cb, 500 + ], done + + # we can force the race condition, there is no need to repeat too often + for attempt in Array.from(length: 5).map((_, i) -> i+1) + it "should not subscribe to the pub/sub channel anymore (race #{attempt})", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + expect(resp).to.not.include "editor-events:#{@project_id}" + done() + return null + + describe "when the client disconnects before joinDoc completes", -> + beforeEach (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connectionAccepted", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, (() ->) + # disconnect before joinDoc completes + @clientA.on "disconnect", () -> cb() + @clientA.disconnect() + + (cb) => + # wait for subscribe and unsubscribe + setTimeout cb, 100 + ], done + + # we can not force the race condition, so we have to try many times + for attempt in Array.from(length: 20).map((_, i) -> i+1) + it "should not subscribe to the pub/sub channels anymore (race #{attempt})", (done) -> + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + expect(resp).to.not.include "editor-events:#{@project_id}" + + rclient.pubsub 'CHANNELS', (err, resp) => + return done(err) if err + expect(resp).to.not.include "applied-ops:#{@doc_id}" + done() + return null + + describe "when the client disconnects before clientTracking.updatePosition starts", -> + beforeEach (done) -> + async.series [ + (cb) => + FixturesManager.setUpProject { + privilegeLevel: "owner" + project: { + name: "Test Project" + } + }, (e, {@project_id, @user_id}) => cb() + + (cb) => + @clientA = RealTimeClient.connect() + @clientA.on "connectionAccepted", cb + + (cb) => + @clientA.emit "joinProject", project_id: @project_id, (error, @project, @privilegeLevel, @protocolVersion) => + cb(error) + + (cb) => + FixturesManager.setUpDoc @project_id, {@lines, @version, @ops}, (e, {@doc_id}) => + cb(e) + + (cb) => + @clientA.emit "joinDoc", @doc_id, cb + + (cb) => + @clientA.emit "clientTracking.updatePosition", { + row: 42 + column: 36 + doc_id: @doc_id + }, (() ->) + # disconnect before updateClientPosition completes + @clientA.on "disconnect", () -> cb() + @clientA.disconnect() + + (cb) => + # wait for updateClientPosition + setTimeout cb, 100 + ], done + + # we can not force the race condition, so we have to try many times + for attempt in Array.from(length: 20).map((_, i) -> i+1) + it "should not show the client as connected (race #{attempt})", (done) -> + rclientRT.smembers KeysRT.clientsInProject({project_id: @project_id}), (err, results) -> + return done(err) if err + expect(results).to.deep.equal([]) + done() + return null diff --git a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee index 498b425281..3819b04d7b 100644 --- a/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee +++ b/services/real-time/test/unit/coffee/WebsocketControllerTests.coffee @@ -20,6 +20,7 @@ describe 'WebsocketController', -> } @callback = sinon.stub() @client = + disconnected: false id: @client_id = "mock-client-id-123" params: {} set: sinon.stub() @@ -147,6 +148,35 @@ describe 'WebsocketController', -> .should.equal true @callback.args[0][0].message.should.equal "subscribe failed" + describe "when the client has disconnected", -> + beforeEach -> + @client.disconnected = true + @WebApiManager.joinProject = sinon.stub().callsArg(2) + @WebsocketController.joinProject @client, @user, @project_id, @callback + + it "should not call WebApiManager.joinProject", -> + expect(@WebApiManager.joinProject.called).to.equal(false) + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal [] + + it "should increment the disconnected_join_project metric", -> + expect(@metrics.inc.calledWith("disconnected_join_project")).to.equal(true) + + describe "when the client disconnects while WebApiManager.joinProject is running", -> + beforeEach -> + @WebApiManager.joinProject = (project, user, cb) => + @client.disconnected = true + cb(null, @project, @privilegeLevel, @isRestrictedUser) + + @WebsocketController.joinProject @client, @user, @project_id, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal [] + + it "should increment the disconnected_join_project metric", -> + expect(@metrics.inc.calledWith("disconnected_join_project")).to.equal(true) + describe "leaveProject", -> beforeEach -> @DocumentUpdaterManager.flushProjectToMongoAndDelete = sinon.stub().callsArg(1) @@ -384,6 +414,51 @@ describe 'WebsocketController', -> ranges = @callback.args[0][4] expect(ranges.comments).to.deep.equal [] + describe "when the client has disconnected", -> + beforeEach -> + @client.disconnected = true + @WebsocketController.joinDoc @client, @doc_id, -1, @options, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal([]) + + it "should increment the disconnected_join_doc metric", -> + expect(@metrics.inc.calledWith("disconnected_join_doc")).to.equal(true) + + it "should not get the document", -> + expect(@DocumentUpdaterManager.getDocument.called).to.equal(false) + + describe "when the client disconnects while RoomManager.joinDoc is running", -> + beforeEach -> + @RoomManager.joinDoc = (client, doc_id, cb) => + @client.disconnected = true + cb() + + @WebsocketController.joinDoc @client, @doc_id, -1, @options, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal([]) + + it "should increment the disconnected_join_doc metric", -> + expect(@metrics.inc.calledWith("disconnected_join_doc")).to.equal(true) + + it "should not get the document", -> + expect(@DocumentUpdaterManager.getDocument.called).to.equal(false) + + describe "when the client disconnects while DocumentUpdaterManager.getDocument is running", -> + beforeEach -> + @DocumentUpdaterManager.getDocument = (project_id, doc_id, fromVersion, callback) => + @client.disconnected = true + callback(null, @doc_lines, @version, @ranges, @ops) + + @WebsocketController.joinDoc @client, @doc_id, -1, @options, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal [] + + it "should increment the disconnected_join_doc metric", -> + expect(@metrics.inc.calledWith("disconnected_join_doc")).to.equal(true) + describe "leaveDoc", -> beforeEach -> @doc_id = "doc-id-123" @@ -463,6 +538,18 @@ describe 'WebsocketController', -> .called .should.equal false + describe "when the client has disconnected", -> + beforeEach -> + @client.disconnected = true + @AuthorizationManager.assertClientCanViewProject = sinon.stub() + @WebsocketController.getConnectedUsers @client, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal([]) + + it "should not check permissions", -> + expect(@AuthorizationManager.assertClientCanViewProject.called).to.equal(false) + describe "updateClientPosition", -> beforeEach -> @WebsocketLoadBalancer.emitToRoom = sinon.stub() @@ -642,6 +729,18 @@ describe 'WebsocketController', -> @ConnectedUsersManager.updateUserPosition.called.should.equal false done() + describe "when the client has disconnected", -> + beforeEach -> + @client.disconnected = true + @AuthorizationManager.assertClientCanViewProjectAndDoc = sinon.stub() + @WebsocketController.updateClientPosition @client, @update, @callback + + it "should call the callback with no details", -> + expect(@callback.args[0]).to.deep.equal([]) + + it "should not check permissions", -> + expect(@AuthorizationManager.assertClientCanViewProjectAndDoc.called).to.equal(false) + describe "applyOtUpdate", -> beforeEach -> @update = {op: {p: 12, t: "foo"}} @@ -715,7 +814,7 @@ describe 'WebsocketController', -> @WebsocketController.applyOtUpdate @client, @doc_id, @update, @callback setTimeout -> done() - , 201 + , 1 it "should call the callback with no error", -> @callback.called.should.equal true @@ -727,11 +826,29 @@ describe 'WebsocketController', -> @user_id, @project_id, @doc_id, updateSize: 7372835 }, 'update is too large'] - it "should send an otUpdateError the client", -> - @client.emit.calledWith('otUpdateError').should.equal true + describe "after 100ms", -> + beforeEach (done) -> + setTimeout done, 100 - it "should disconnect the client", -> - @client.disconnect.called.should.equal true + it "should send an otUpdateError the client", -> + @client.emit.calledWith('otUpdateError').should.equal true + + it "should disconnect the client", -> + @client.disconnect.called.should.equal true + + describe "when the client disconnects during the next 100ms", -> + beforeEach (done) -> + @client.disconnected = true + setTimeout done, 100 + + it "should not send an otUpdateError the client", -> + @client.emit.calledWith('otUpdateError').should.equal false + + it "should not disconnect the client", -> + @client.disconnect.called.should.equal false + + it "should increment the disconnected_otUpdateError metric", -> + expect(@metrics.inc.calledWith("disconnected_otUpdateError")).to.equal(true) describe "_assertClientCanApplyUpdate", -> beforeEach ->