From 817844515dcc0432755c50a01153ce129cdcb5f8 Mon Sep 17 00:00:00 2001 From: decaffeinate Date: Tue, 23 Jun 2020 18:29:44 +0100 Subject: [PATCH] prettier: convert app/js decaffeinated files to Prettier format --- .../real-time/app/js/AuthorizationManager.js | 144 +-- services/real-time/app/js/ChannelManager.js | 158 ++-- .../real-time/app/js/ConnectedUsersManager.js | 269 ++++-- .../app/js/DocumentUpdaterController.js | 308 ++++--- .../app/js/DocumentUpdaterManager.js | 243 +++-- services/real-time/app/js/DrainManager.js | 97 +- services/real-time/app/js/Errors.js | 21 +- services/real-time/app/js/EventLogger.js | 147 +-- .../real-time/app/js/HealthCheckManager.js | 130 +-- .../real-time/app/js/HttpApiController.js | 88 +- services/real-time/app/js/HttpController.js | 118 ++- .../real-time/app/js/RedisClientManager.js | 54 +- services/real-time/app/js/RoomManager.js | 291 +++--- services/real-time/app/js/Router.js | 583 +++++++----- services/real-time/app/js/SafeJsonParse.js | 39 +- services/real-time/app/js/SessionSockets.js | 51 +- services/real-time/app/js/WebApiManager.js | 119 ++- .../real-time/app/js/WebsocketController.js | 866 +++++++++++------- .../real-time/app/js/WebsocketLoadBalancer.js | 323 ++++--- 19 files changed, 2425 insertions(+), 1624 deletions(-) diff --git a/services/real-time/app/js/AuthorizationManager.js b/services/real-time/app/js/AuthorizationManager.js index 41caee9ef2..15607a898a 100644 --- a/services/real-time/app/js/AuthorizationManager.js +++ b/services/real-time/app/js/AuthorizationManager.js @@ -11,61 +11,101 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let AuthorizationManager; -module.exports = (AuthorizationManager = { - assertClientCanViewProject(client, callback) { - if (callback == null) { callback = function(error) {}; } - return AuthorizationManager._assertClientHasPrivilegeLevel(client, ["readOnly", "readAndWrite", "owner"], callback); - }, +let AuthorizationManager +module.exports = AuthorizationManager = { + assertClientCanViewProject(client, callback) { + if (callback == null) { + callback = function (error) {} + } + return AuthorizationManager._assertClientHasPrivilegeLevel( + client, + ['readOnly', 'readAndWrite', 'owner'], + callback + ) + }, - assertClientCanEditProject(client, callback) { - if (callback == null) { callback = function(error) {}; } - return AuthorizationManager._assertClientHasPrivilegeLevel(client, ["readAndWrite", "owner"], callback); - }, - - _assertClientHasPrivilegeLevel(client, allowedLevels, callback) { - if (callback == null) { callback = function(error) {}; } - if (Array.from(allowedLevels).includes(client.ol_context.privilege_level)) { - return callback(null); - } else { - return callback(new Error("not authorized")); - } - }, + assertClientCanEditProject(client, callback) { + if (callback == null) { + callback = function (error) {} + } + return AuthorizationManager._assertClientHasPrivilegeLevel( + client, + ['readAndWrite', 'owner'], + callback + ) + }, - assertClientCanViewProjectAndDoc(client, doc_id, callback) { - if (callback == null) { callback = function(error) {}; } - return AuthorizationManager.assertClientCanViewProject(client, function(error) { - if (error != null) { return callback(error); } - return AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback); - }); - }, + _assertClientHasPrivilegeLevel(client, allowedLevels, callback) { + if (callback == null) { + callback = function (error) {} + } + if (Array.from(allowedLevels).includes(client.ol_context.privilege_level)) { + return callback(null) + } else { + return callback(new Error('not authorized')) + } + }, - assertClientCanEditProjectAndDoc(client, doc_id, callback) { - if (callback == null) { callback = function(error) {}; } - return AuthorizationManager.assertClientCanEditProject(client, function(error) { - if (error != null) { return callback(error); } - return AuthorizationManager._assertClientCanAccessDoc(client, doc_id, callback); - }); - }, + assertClientCanViewProjectAndDoc(client, doc_id, callback) { + if (callback == null) { + callback = function (error) {} + } + return AuthorizationManager.assertClientCanViewProject(client, function ( + error + ) { + if (error != null) { + return callback(error) + } + return AuthorizationManager._assertClientCanAccessDoc( + client, + doc_id, + callback + ) + }) + }, - _assertClientCanAccessDoc(client, doc_id, callback) { - if (callback == null) { callback = function(error) {}; } - if (client.ol_context[`doc:${doc_id}`] === "allowed") { - return callback(null); - } else { - return callback(new Error("not authorized")); - } - }, + assertClientCanEditProjectAndDoc(client, doc_id, callback) { + if (callback == null) { + callback = function (error) {} + } + return AuthorizationManager.assertClientCanEditProject(client, function ( + error + ) { + if (error != null) { + return callback(error) + } + return AuthorizationManager._assertClientCanAccessDoc( + client, + doc_id, + callback + ) + }) + }, - addAccessToDoc(client, doc_id, callback) { - if (callback == null) { callback = function(error) {}; } - client.ol_context[`doc:${doc_id}`] = "allowed"; - return callback(null); - }, + _assertClientCanAccessDoc(client, doc_id, callback) { + if (callback == null) { + callback = function (error) {} + } + if (client.ol_context[`doc:${doc_id}`] === 'allowed') { + return callback(null) + } else { + return callback(new Error('not authorized')) + } + }, - removeAccessToDoc(client, doc_id, callback) { - if (callback == null) { callback = function(error) {}; } - delete client.ol_context[`doc:${doc_id}`]; - return callback(null); - } -}); + addAccessToDoc(client, doc_id, callback) { + if (callback == null) { + callback = function (error) {} + } + client.ol_context[`doc:${doc_id}`] = 'allowed' + return callback(null) + }, + + removeAccessToDoc(client, doc_id, callback) { + if (callback == null) { + callback = function (error) {} + } + delete client.ol_context[`doc:${doc_id}`] + return callback(null) + } +} diff --git a/services/real-time/app/js/ChannelManager.js b/services/real-time/app/js/ChannelManager.js index 60e7c5c635..09e81cebf5 100644 --- a/services/real-time/app/js/ChannelManager.js +++ b/services/real-time/app/js/ChannelManager.js @@ -8,84 +8,98 @@ * DS102: Remove unnecessary code created because of implicit returns * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let ChannelManager; -const logger = require('logger-sharelatex'); -const metrics = require("metrics-sharelatex"); -const settings = require("settings-sharelatex"); +let ChannelManager +const logger = require('logger-sharelatex') +const metrics = require('metrics-sharelatex') +const settings = require('settings-sharelatex') -const ClientMap = new Map(); // for each redis client, store a Map of subscribed channels (channelname -> subscribe promise) +const ClientMap = new Map() // for each redis client, store a Map of subscribed channels (channelname -> subscribe promise) // Manage redis pubsub subscriptions for individual projects and docs, ensuring // that we never subscribe to a channel multiple times. The socket.io side is // handled by RoomManager. -module.exports = (ChannelManager = { - getClientMapEntry(rclient) { - // return the per-client channel map if it exists, otherwise create and - // return an empty map for the client. - return ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient); - }, +module.exports = ChannelManager = { + getClientMapEntry(rclient) { + // return the per-client channel map if it exists, otherwise create and + // return an empty map for the client. + return ( + ClientMap.get(rclient) || ClientMap.set(rclient, new Map()).get(rclient) + ) + }, - subscribe(rclient, baseChannel, id) { - const clientChannelMap = this.getClientMapEntry(rclient); - const channel = `${baseChannel}:${id}`; - const actualSubscribe = function() { - // subscribe is happening in the foreground and it should reject - const p = rclient.subscribe(channel); - p.finally(function() { - if (clientChannelMap.get(channel) === subscribePromise) { - return clientChannelMap.delete(channel); - }}).then(function() { - logger.log({channel}, "subscribed to channel"); - return metrics.inc(`subscribe.${baseChannel}`);}).catch(function(err) { - logger.error({channel, err}, "failed to subscribe to channel"); - return metrics.inc(`subscribe.failed.${baseChannel}`); - }); - return p; - }; - - const pendingActions = clientChannelMap.get(channel) || Promise.resolve(); - var subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe); - clientChannelMap.set(channel, subscribePromise); - logger.log({channel}, "planned to subscribe to channel"); - return subscribePromise; - }, - - unsubscribe(rclient, baseChannel, id) { - const clientChannelMap = this.getClientMapEntry(rclient); - const channel = `${baseChannel}:${id}`; - const actualUnsubscribe = function() { - // unsubscribe is happening in the background, it should not reject - const p = rclient.unsubscribe(channel) - .finally(function() { - if (clientChannelMap.get(channel) === unsubscribePromise) { - return clientChannelMap.delete(channel); - }}).then(function() { - logger.log({channel}, "unsubscribed from channel"); - return metrics.inc(`unsubscribe.${baseChannel}`);}).catch(function(err) { - logger.error({channel, err}, "unsubscribed from channel"); - return metrics.inc(`unsubscribe.failed.${baseChannel}`); - }); - return p; - }; - - const pendingActions = clientChannelMap.get(channel) || Promise.resolve(); - var unsubscribePromise = pendingActions.then(actualUnsubscribe, actualUnsubscribe); - clientChannelMap.set(channel, unsubscribePromise); - logger.log({channel}, "planned to unsubscribe from channel"); - return unsubscribePromise; - }, - - publish(rclient, baseChannel, id, data) { - let channel; - metrics.summary(`redis.publish.${baseChannel}`, data.length); - if ((id === 'all') || !settings.publishOnIndividualChannels) { - channel = baseChannel; - } else { - channel = `${baseChannel}:${id}`; + subscribe(rclient, baseChannel, id) { + const clientChannelMap = this.getClientMapEntry(rclient) + const channel = `${baseChannel}:${id}` + const actualSubscribe = function () { + // subscribe is happening in the foreground and it should reject + const p = rclient.subscribe(channel) + p.finally(function () { + if (clientChannelMap.get(channel) === subscribePromise) { + return clientChannelMap.delete(channel) } - // we publish on a different client to the subscribe, so we can't - // check for the channel existing here - return rclient.publish(channel, data); + }) + .then(function () { + logger.log({ channel }, 'subscribed to channel') + return metrics.inc(`subscribe.${baseChannel}`) + }) + .catch(function (err) { + logger.error({ channel, err }, 'failed to subscribe to channel') + return metrics.inc(`subscribe.failed.${baseChannel}`) + }) + return p } -}); + + const pendingActions = clientChannelMap.get(channel) || Promise.resolve() + var subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe) + clientChannelMap.set(channel, subscribePromise) + logger.log({ channel }, 'planned to subscribe to channel') + return subscribePromise + }, + + unsubscribe(rclient, baseChannel, id) { + const clientChannelMap = this.getClientMapEntry(rclient) + const channel = `${baseChannel}:${id}` + const actualUnsubscribe = function () { + // unsubscribe is happening in the background, it should not reject + const p = rclient + .unsubscribe(channel) + .finally(function () { + if (clientChannelMap.get(channel) === unsubscribePromise) { + return clientChannelMap.delete(channel) + } + }) + .then(function () { + logger.log({ channel }, 'unsubscribed from channel') + return metrics.inc(`unsubscribe.${baseChannel}`) + }) + .catch(function (err) { + logger.error({ channel, err }, 'unsubscribed from channel') + return metrics.inc(`unsubscribe.failed.${baseChannel}`) + }) + return p + } + + const pendingActions = clientChannelMap.get(channel) || Promise.resolve() + var unsubscribePromise = pendingActions.then( + actualUnsubscribe, + actualUnsubscribe + ) + clientChannelMap.set(channel, unsubscribePromise) + logger.log({ channel }, 'planned to unsubscribe from channel') + return unsubscribePromise + }, + + publish(rclient, baseChannel, id, data) { + let channel + metrics.summary(`redis.publish.${baseChannel}`, data.length) + if (id === 'all' || !settings.publishOnIndividualChannels) { + channel = baseChannel + } else { + channel = `${baseChannel}:${id}` + } + // we publish on a different client to the subscribe, so we can't + // check for the channel existing here + return rclient.publish(channel, data) + } +} diff --git a/services/real-time/app/js/ConnectedUsersManager.js b/services/real-time/app/js/ConnectedUsersManager.js index b2a0fc2eee..6770dd5421 100644 --- a/services/real-time/app/js/ConnectedUsersManager.js +++ b/services/real-time/app/js/ConnectedUsersManager.js @@ -10,112 +10,185 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -const async = require("async"); -const Settings = require('settings-sharelatex'); -const logger = require("logger-sharelatex"); -const redis = require("redis-sharelatex"); -const rclient = redis.createClient(Settings.redis.realtime); -const Keys = Settings.redis.realtime.key_schema; +const async = require('async') +const Settings = require('settings-sharelatex') +const logger = require('logger-sharelatex') +const redis = require('redis-sharelatex') +const rclient = redis.createClient(Settings.redis.realtime) +const Keys = Settings.redis.realtime.key_schema -const ONE_HOUR_IN_S = 60 * 60; -const ONE_DAY_IN_S = ONE_HOUR_IN_S * 24; -const FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4; +const ONE_HOUR_IN_S = 60 * 60 +const ONE_DAY_IN_S = ONE_HOUR_IN_S * 24 +const FOUR_DAYS_IN_S = ONE_DAY_IN_S * 4 -const USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4; -const REFRESH_TIMEOUT_IN_S = 10; // only show clients which have responded to a refresh request in the last 10 seconds +const USER_TIMEOUT_IN_S = ONE_HOUR_IN_S / 4 +const REFRESH_TIMEOUT_IN_S = 10 // only show clients which have responded to a refresh request in the last 10 seconds module.exports = { - // Use the same method for when a user connects, and when a user sends a cursor - // update. This way we don't care if the connected_user key has expired when - // we receive a cursor update. - updateUserPosition(project_id, client_id, user, cursorData, callback){ - if (callback == null) { callback = function(err){}; } - logger.log({project_id, client_id}, "marking user as joined or connected"); + // Use the same method for when a user connects, and when a user sends a cursor + // update. This way we don't care if the connected_user key has expired when + // we receive a cursor update. + updateUserPosition(project_id, client_id, user, cursorData, callback) { + if (callback == null) { + callback = function (err) {} + } + logger.log({ project_id, client_id }, 'marking user as joined or connected') - const multi = rclient.multi(); - - multi.sadd(Keys.clientsInProject({project_id}), client_id); - multi.expire(Keys.clientsInProject({project_id}), FOUR_DAYS_IN_S); - - multi.hset(Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now()); - multi.hset(Keys.connectedUser({project_id, client_id}), "user_id", user._id); - multi.hset(Keys.connectedUser({project_id, client_id}), "first_name", user.first_name || ""); - multi.hset(Keys.connectedUser({project_id, client_id}), "last_name", user.last_name || ""); - multi.hset(Keys.connectedUser({project_id, client_id}), "email", user.email || ""); - - if (cursorData != null) { - multi.hset(Keys.connectedUser({project_id, client_id}), "cursorData", JSON.stringify(cursorData)); - } - multi.expire(Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S); - - return multi.exec(function(err){ - if (err != null) { - logger.err({err, project_id, client_id}, "problem marking user as connected"); - } - return callback(err); - }); - }, + const multi = rclient.multi() - refreshClient(project_id, client_id, callback) { - if (callback == null) { callback = function(err) {}; } - logger.log({project_id, client_id}, "refreshing connected client"); - const multi = rclient.multi(); - multi.hset(Keys.connectedUser({project_id, client_id}), "last_updated_at", Date.now()); - multi.expire(Keys.connectedUser({project_id, client_id}), USER_TIMEOUT_IN_S); - return multi.exec(function(err){ - if (err != null) { - logger.err({err, project_id, client_id}, "problem refreshing connected client"); - } - return callback(err); - }); - }, + multi.sadd(Keys.clientsInProject({ project_id }), client_id) + multi.expire(Keys.clientsInProject({ project_id }), FOUR_DAYS_IN_S) - markUserAsDisconnected(project_id, client_id, callback){ - logger.log({project_id, client_id}, "marking user as disconnected"); - const multi = rclient.multi(); - multi.srem(Keys.clientsInProject({project_id}), client_id); - multi.expire(Keys.clientsInProject({project_id}), FOUR_DAYS_IN_S); - multi.del(Keys.connectedUser({project_id, client_id})); - return multi.exec(callback); - }, + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'last_updated_at', + Date.now() + ) + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'user_id', + user._id + ) + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'first_name', + user.first_name || '' + ) + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'last_name', + user.last_name || '' + ) + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'email', + user.email || '' + ) + if (cursorData != null) { + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'cursorData', + JSON.stringify(cursorData) + ) + } + multi.expire( + Keys.connectedUser({ project_id, client_id }), + USER_TIMEOUT_IN_S + ) - _getConnectedUser(project_id, client_id, callback){ - return rclient.hgetall(Keys.connectedUser({project_id, client_id}), function(err, result){ - if ((result == null) || (Object.keys(result).length === 0) || !result.user_id) { - result = { - connected : false, - client_id - }; - } else { - result.connected = true; - result.client_id = client_id; - result.client_age = (Date.now() - parseInt(result.last_updated_at,10)) / 1000; - if (result.cursorData != null) { - try { - result.cursorData = JSON.parse(result.cursorData); - } catch (e) { - logger.error({err: e, project_id, client_id, cursorData: result.cursorData}, "error parsing cursorData JSON"); - return callback(e); - } - } - } - return callback(err, result); - }); - }, + return multi.exec(function (err) { + if (err != null) { + logger.err( + { err, project_id, client_id }, + 'problem marking user as connected' + ) + } + return callback(err) + }) + }, - getConnectedUsers(project_id, callback){ - const self = this; - return rclient.smembers(Keys.clientsInProject({project_id}), function(err, results){ - if (err != null) { return callback(err); } - const jobs = results.map(client_id => cb => self._getConnectedUser(project_id, client_id, cb)); - return async.series(jobs, function(err, users){ - if (users == null) { users = []; } - if (err != null) { return callback(err); } - users = users.filter(user => (user != null ? user.connected : undefined) && ((user != null ? user.client_age : undefined) < REFRESH_TIMEOUT_IN_S)); - return callback(null, users); - }); - }); - } -}; + refreshClient(project_id, client_id, callback) { + if (callback == null) { + callback = function (err) {} + } + logger.log({ project_id, client_id }, 'refreshing connected client') + const multi = rclient.multi() + multi.hset( + Keys.connectedUser({ project_id, client_id }), + 'last_updated_at', + Date.now() + ) + multi.expire( + Keys.connectedUser({ project_id, client_id }), + USER_TIMEOUT_IN_S + ) + return multi.exec(function (err) { + if (err != null) { + logger.err( + { err, project_id, client_id }, + 'problem refreshing connected client' + ) + } + return callback(err) + }) + }, + markUserAsDisconnected(project_id, client_id, callback) { + logger.log({ project_id, client_id }, 'marking user as disconnected') + const multi = rclient.multi() + multi.srem(Keys.clientsInProject({ project_id }), client_id) + multi.expire(Keys.clientsInProject({ project_id }), FOUR_DAYS_IN_S) + multi.del(Keys.connectedUser({ project_id, client_id })) + return multi.exec(callback) + }, + + _getConnectedUser(project_id, client_id, callback) { + return rclient.hgetall( + Keys.connectedUser({ project_id, client_id }), + function (err, result) { + if ( + result == null || + Object.keys(result).length === 0 || + !result.user_id + ) { + result = { + connected: false, + client_id + } + } else { + result.connected = true + result.client_id = client_id + result.client_age = + (Date.now() - parseInt(result.last_updated_at, 10)) / 1000 + if (result.cursorData != null) { + try { + result.cursorData = JSON.parse(result.cursorData) + } catch (e) { + logger.error( + { + err: e, + project_id, + client_id, + cursorData: result.cursorData + }, + 'error parsing cursorData JSON' + ) + return callback(e) + } + } + } + return callback(err, result) + } + ) + }, + + getConnectedUsers(project_id, callback) { + const self = this + return rclient.smembers(Keys.clientsInProject({ project_id }), function ( + err, + results + ) { + if (err != null) { + return callback(err) + } + const jobs = results.map((client_id) => (cb) => + self._getConnectedUser(project_id, client_id, cb) + ) + return async.series(jobs, function (err, users) { + if (users == null) { + users = [] + } + if (err != null) { + return callback(err) + } + users = users.filter( + (user) => + (user != null ? user.connected : undefined) && + (user != null ? user.client_age : undefined) < REFRESH_TIMEOUT_IN_S + ) + return callback(null, users) + }) + }) + } +} diff --git a/services/real-time/app/js/DocumentUpdaterController.js b/services/real-time/app/js/DocumentUpdaterController.js index cbf5c600fd..b8dde3b426 100644 --- a/services/real-time/app/js/DocumentUpdaterController.js +++ b/services/real-time/app/js/DocumentUpdaterController.js @@ -12,131 +12,197 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let DocumentUpdaterController; -const logger = require("logger-sharelatex"); -const settings = require('settings-sharelatex'); -const RedisClientManager = require("./RedisClientManager"); -const SafeJsonParse = require("./SafeJsonParse"); -const EventLogger = require("./EventLogger"); -const HealthCheckManager = require("./HealthCheckManager"); -const RoomManager = require("./RoomManager"); -const ChannelManager = require("./ChannelManager"); -const metrics = require("metrics-sharelatex"); +let DocumentUpdaterController +const logger = require('logger-sharelatex') +const settings = require('settings-sharelatex') +const RedisClientManager = require('./RedisClientManager') +const SafeJsonParse = require('./SafeJsonParse') +const EventLogger = require('./EventLogger') +const HealthCheckManager = require('./HealthCheckManager') +const RoomManager = require('./RoomManager') +const ChannelManager = require('./ChannelManager') +const metrics = require('metrics-sharelatex') -const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024; // 1Mb +const MESSAGE_SIZE_LOG_LIMIT = 1024 * 1024 // 1Mb -module.exports = (DocumentUpdaterController = { - // DocumentUpdaterController is responsible for updates that come via Redis - // Pub/Sub from the document updater. - rclientList: RedisClientManager.createClientList(settings.redis.pubsub), +module.exports = DocumentUpdaterController = { + // DocumentUpdaterController is responsible for updates that come via Redis + // Pub/Sub from the document updater. + rclientList: RedisClientManager.createClientList(settings.redis.pubsub), - listenForUpdatesFromDocumentUpdater(io) { - let i, rclient; - logger.log({rclients: this.rclientList.length}, "listening for applied-ops events"); - for (i = 0; i < this.rclientList.length; i++) { - rclient = this.rclientList[i]; - rclient.subscribe("applied-ops"); - rclient.on("message", function(channel, message) { - metrics.inc("rclient", 0.001); // global event rate metric - if (settings.debugEvents > 0) { EventLogger.debugEvent(channel, message); } - return DocumentUpdaterController._processMessageFromDocumentUpdater(io, channel, message); - }); - } - // create metrics for each redis instance only when we have multiple redis clients - if (this.rclientList.length > 1) { - for (i = 0; i < this.rclientList.length; i++) { - rclient = this.rclientList[i]; - ((i => // per client event rate metric - rclient.on("message", () => metrics.inc(`rclient-${i}`, 0.001))))(i); - } - } - return this.handleRoomUpdates(this.rclientList); - }, + listenForUpdatesFromDocumentUpdater(io) { + let i, rclient + logger.log( + { rclients: this.rclientList.length }, + 'listening for applied-ops events' + ) + for (i = 0; i < this.rclientList.length; i++) { + rclient = this.rclientList[i] + rclient.subscribe('applied-ops') + rclient.on('message', function (channel, message) { + metrics.inc('rclient', 0.001) // global event rate metric + if (settings.debugEvents > 0) { + EventLogger.debugEvent(channel, message) + } + return DocumentUpdaterController._processMessageFromDocumentUpdater( + io, + channel, + message + ) + }) + } + // create metrics for each redis instance only when we have multiple redis clients + if (this.rclientList.length > 1) { + for (i = 0; i < this.rclientList.length; i++) { + rclient = this.rclientList[i] + ;(( + i // per client event rate metric + ) => rclient.on('message', () => metrics.inc(`rclient-${i}`, 0.001)))(i) + } + } + return this.handleRoomUpdates(this.rclientList) + }, - handleRoomUpdates(rclientSubList) { - const roomEvents = RoomManager.eventSource(); - roomEvents.on('doc-active', function(doc_id) { - const subscribePromises = Array.from(rclientSubList).map((rclient) => - ChannelManager.subscribe(rclient, "applied-ops", doc_id)); - return RoomManager.emitOnCompletion(subscribePromises, `doc-subscribed-${doc_id}`); - }); - return roomEvents.on('doc-empty', doc_id => Array.from(rclientSubList).map((rclient) => - ChannelManager.unsubscribe(rclient, "applied-ops", doc_id))); - }, + handleRoomUpdates(rclientSubList) { + const roomEvents = RoomManager.eventSource() + roomEvents.on('doc-active', function (doc_id) { + const subscribePromises = Array.from(rclientSubList).map((rclient) => + ChannelManager.subscribe(rclient, 'applied-ops', doc_id) + ) + return RoomManager.emitOnCompletion( + subscribePromises, + `doc-subscribed-${doc_id}` + ) + }) + return roomEvents.on('doc-empty', (doc_id) => + Array.from(rclientSubList).map((rclient) => + ChannelManager.unsubscribe(rclient, 'applied-ops', doc_id) + ) + ) + }, - _processMessageFromDocumentUpdater(io, channel, message) { - return SafeJsonParse.parse(message, function(error, message) { - if (error != null) { - logger.error({err: error, channel}, "error parsing JSON"); - return; - } - if (message.op != null) { - if ((message._id != null) && settings.checkEventOrder) { - const status = EventLogger.checkEventOrder("applied-ops", message._id, message); - if (status === 'duplicate') { - return; // skip duplicate events - } - } - return DocumentUpdaterController._applyUpdateFromDocumentUpdater(io, message.doc_id, message.op); - } else if (message.error != null) { - return DocumentUpdaterController._processErrorFromDocumentUpdater(io, message.doc_id, message.error, message); - } else if (message.health_check != null) { - logger.debug({message}, "got health check message in applied ops channel"); - return HealthCheckManager.check(channel, message.key); - } - }); - }, - - _applyUpdateFromDocumentUpdater(io, doc_id, update) { - let client; - const clientList = io.sockets.clients(doc_id); - // avoid unnecessary work if no clients are connected - if (clientList.length === 0) { - return; - } - // send updates to clients - logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), socketIoClients: (((() => { - const result = []; - for (client of Array.from(clientList)) { result.push(client.id); - } - return result; - })()))}, "distributing updates to clients"); - const seen = {}; - // send messages only to unique clients (due to duplicate entries in io.sockets.clients) - for (client of Array.from(clientList)) { - if (!seen[client.id]) { - seen[client.id] = true; - if (client.publicId === update.meta.source) { - logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined)}, "distributing update to sender"); - client.emit("otUpdateApplied", {v: update.v, doc: update.doc}); - } else if (!update.dup) { // Duplicate ops should just be sent back to sending client for acknowledgement - logger.log({doc_id, version: update.v, source: (update.meta != null ? update.meta.source : undefined), client_id: client.id}, "distributing update to collaborator"); - client.emit("otUpdateApplied", update); - } - } - } - if (Object.keys(seen).length < clientList.length) { - metrics.inc("socket-io.duplicate-clients", 0.1); - return logger.log({doc_id, socketIoClients: (((() => { - const result1 = []; - for (client of Array.from(clientList)) { result1.push(client.id); - } - return result1; - })()))}, "discarded duplicate clients"); - } - }, - - _processErrorFromDocumentUpdater(io, doc_id, error, message) { - return (() => { - const result = []; - for (const client of Array.from(io.sockets.clients(doc_id))) { - logger.warn({err: error, doc_id, client_id: client.id}, "error from document updater, disconnecting client"); - client.emit("otUpdateError", error, message); - result.push(client.disconnect()); - } - return result; - })(); - } -}); + _processMessageFromDocumentUpdater(io, channel, message) { + return SafeJsonParse.parse(message, function (error, message) { + if (error != null) { + logger.error({ err: error, channel }, 'error parsing JSON') + return + } + if (message.op != null) { + if (message._id != null && settings.checkEventOrder) { + const status = EventLogger.checkEventOrder( + 'applied-ops', + message._id, + message + ) + if (status === 'duplicate') { + return // skip duplicate events + } + } + return DocumentUpdaterController._applyUpdateFromDocumentUpdater( + io, + message.doc_id, + message.op + ) + } else if (message.error != null) { + return DocumentUpdaterController._processErrorFromDocumentUpdater( + io, + message.doc_id, + message.error, + message + ) + } else if (message.health_check != null) { + logger.debug( + { message }, + 'got health check message in applied ops channel' + ) + return HealthCheckManager.check(channel, message.key) + } + }) + }, + _applyUpdateFromDocumentUpdater(io, doc_id, update) { + let client + const clientList = io.sockets.clients(doc_id) + // avoid unnecessary work if no clients are connected + if (clientList.length === 0) { + return + } + // send updates to clients + logger.log( + { + doc_id, + version: update.v, + source: update.meta != null ? update.meta.source : undefined, + socketIoClients: (() => { + const result = [] + for (client of Array.from(clientList)) { + result.push(client.id) + } + return result + })() + }, + 'distributing updates to clients' + ) + const seen = {} + // send messages only to unique clients (due to duplicate entries in io.sockets.clients) + for (client of Array.from(clientList)) { + if (!seen[client.id]) { + seen[client.id] = true + if (client.publicId === update.meta.source) { + logger.log( + { + doc_id, + version: update.v, + source: update.meta != null ? update.meta.source : undefined + }, + 'distributing update to sender' + ) + client.emit('otUpdateApplied', { v: update.v, doc: update.doc }) + } else if (!update.dup) { + // Duplicate ops should just be sent back to sending client for acknowledgement + logger.log( + { + doc_id, + version: update.v, + source: update.meta != null ? update.meta.source : undefined, + client_id: client.id + }, + 'distributing update to collaborator' + ) + client.emit('otUpdateApplied', update) + } + } + } + if (Object.keys(seen).length < clientList.length) { + metrics.inc('socket-io.duplicate-clients', 0.1) + return logger.log( + { + doc_id, + socketIoClients: (() => { + const result1 = [] + for (client of Array.from(clientList)) { + result1.push(client.id) + } + return result1 + })() + }, + 'discarded duplicate clients' + ) + } + }, + _processErrorFromDocumentUpdater(io, doc_id, error, message) { + return (() => { + const result = [] + for (const client of Array.from(io.sockets.clients(doc_id))) { + logger.warn( + { err: error, doc_id, client_id: client.id }, + 'error from document updater, disconnecting client' + ) + client.emit('otUpdateError', error, message) + result.push(client.disconnect()) + } + return result + })() + } +} diff --git a/services/real-time/app/js/DocumentUpdaterManager.js b/services/real-time/app/js/DocumentUpdaterManager.js index dc5865db62..bafc81ed14 100644 --- a/services/real-time/app/js/DocumentUpdaterManager.js +++ b/services/real-time/app/js/DocumentUpdaterManager.js @@ -11,104 +11,159 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let DocumentUpdaterManager; -const request = require("request"); -const _ = require("underscore"); -const logger = require("logger-sharelatex"); -const settings = require("settings-sharelatex"); -const metrics = require("metrics-sharelatex"); +let DocumentUpdaterManager +const request = require('request') +const _ = require('underscore') +const logger = require('logger-sharelatex') +const settings = require('settings-sharelatex') +const metrics = require('metrics-sharelatex') -const rclient = require("redis-sharelatex").createClient(settings.redis.documentupdater); -const Keys = settings.redis.documentupdater.key_schema; +const rclient = require('redis-sharelatex').createClient( + settings.redis.documentupdater +) +const Keys = settings.redis.documentupdater.key_schema -module.exports = (DocumentUpdaterManager = { - getDocument(project_id, doc_id, fromVersion, callback) { - if (callback == null) { callback = function(error, exists, doclines, version) {}; } - const timer = new metrics.Timer("get-document"); - const url = `${settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}?fromVersion=${fromVersion}`; - logger.log({project_id, doc_id, fromVersion}, "getting doc from document updater"); - return request.get(url, function(err, res, body) { - timer.done(); - if (err != null) { - logger.error({err, url, project_id, doc_id}, "error getting doc from doc updater"); - return callback(err); - } - if (res.statusCode >= 200 && res.statusCode < 300) { - logger.log({project_id, doc_id}, "got doc from document document updater"); - try { - body = JSON.parse(body); - } catch (error) { - return callback(error); - } - return callback(null, body != null ? body.lines : undefined, body != null ? body.version : undefined, body != null ? body.ranges : undefined, body != null ? body.ops : undefined); - } else if ([404, 422].includes(res.statusCode)) { - err = new Error("doc updater could not load requested ops"); - err.statusCode = res.statusCode; - logger.warn({err, project_id, doc_id, url, fromVersion}, "doc updater could not load requested ops"); - return callback(err); - } else { - err = new Error(`doc updater returned a non-success status code: ${res.statusCode}`); - err.statusCode = res.statusCode; - logger.error({err, project_id, doc_id, url}, `doc updater returned a non-success status code: ${res.statusCode}`); - return callback(err); - } - }); - }, +module.exports = DocumentUpdaterManager = { + getDocument(project_id, doc_id, fromVersion, callback) { + if (callback == null) { + callback = function (error, exists, doclines, version) {} + } + const timer = new metrics.Timer('get-document') + const url = `${settings.apis.documentupdater.url}/project/${project_id}/doc/${doc_id}?fromVersion=${fromVersion}` + logger.log( + { project_id, doc_id, fromVersion }, + 'getting doc from document updater' + ) + return request.get(url, function (err, res, body) { + timer.done() + if (err != null) { + logger.error( + { err, url, project_id, doc_id }, + 'error getting doc from doc updater' + ) + return callback(err) + } + if (res.statusCode >= 200 && res.statusCode < 300) { + logger.log( + { project_id, doc_id }, + 'got doc from document document updater' + ) + try { + body = JSON.parse(body) + } catch (error) { + return callback(error) + } + return callback( + null, + body != null ? body.lines : undefined, + body != null ? body.version : undefined, + body != null ? body.ranges : undefined, + body != null ? body.ops : undefined + ) + } else if ([404, 422].includes(res.statusCode)) { + err = new Error('doc updater could not load requested ops') + err.statusCode = res.statusCode + logger.warn( + { err, project_id, doc_id, url, fromVersion }, + 'doc updater could not load requested ops' + ) + return callback(err) + } else { + err = new Error( + `doc updater returned a non-success status code: ${res.statusCode}` + ) + err.statusCode = res.statusCode + logger.error( + { err, project_id, doc_id, url }, + `doc updater returned a non-success status code: ${res.statusCode}` + ) + return callback(err) + } + }) + }, - flushProjectToMongoAndDelete(project_id, callback) { - // this method is called when the last connected user leaves the project - if (callback == null) { callback = function(){}; } - logger.log({project_id}, "deleting project from document updater"); - const timer = new metrics.Timer("delete.mongo.project"); - // flush the project in the background when all users have left - const url = `${settings.apis.documentupdater.url}/project/${project_id}?background=true` + - (settings.shutDownInProgress ? "&shutdown=true" : ""); - return request.del(url, function(err, res, body){ - timer.done(); - if (err != null) { - logger.error({err, project_id}, "error deleting project from document updater"); - return callback(err); - } else if (res.statusCode >= 200 && res.statusCode < 300) { - logger.log({project_id}, "deleted project from document updater"); - return callback(null); - } else { - err = new Error(`document updater returned a failure status code: ${res.statusCode}`); - err.statusCode = res.statusCode; - logger.error({err, project_id}, `document updater returned failure status code: ${res.statusCode}`); - return callback(err); - } - }); - }, + flushProjectToMongoAndDelete(project_id, callback) { + // this method is called when the last connected user leaves the project + if (callback == null) { + callback = function () {} + } + logger.log({ project_id }, 'deleting project from document updater') + const timer = new metrics.Timer('delete.mongo.project') + // flush the project in the background when all users have left + const url = + `${settings.apis.documentupdater.url}/project/${project_id}?background=true` + + (settings.shutDownInProgress ? '&shutdown=true' : '') + return request.del(url, function (err, res, body) { + timer.done() + if (err != null) { + logger.error( + { err, project_id }, + 'error deleting project from document updater' + ) + return callback(err) + } else if (res.statusCode >= 200 && res.statusCode < 300) { + logger.log({ project_id }, 'deleted project from document updater') + return callback(null) + } else { + err = new Error( + `document updater returned a failure status code: ${res.statusCode}` + ) + err.statusCode = res.statusCode + logger.error( + { err, project_id }, + `document updater returned failure status code: ${res.statusCode}` + ) + return callback(err) + } + }) + }, - queueChange(project_id, doc_id, change, callback){ - let error; - if (callback == null) { callback = function(){}; } - const allowedKeys = [ 'doc', 'op', 'v', 'dupIfSource', 'meta', 'lastV', 'hash']; - change = _.pick(change, allowedKeys); - const jsonChange = JSON.stringify(change); - if (jsonChange.indexOf("\u0000") !== -1) { - // memory corruption check - error = new Error("null bytes found in op"); - logger.error({err: error, project_id, doc_id, jsonChange}, error.message); - return callback(error); - } + queueChange(project_id, doc_id, change, callback) { + let error + if (callback == null) { + callback = function () {} + } + const allowedKeys = [ + 'doc', + 'op', + 'v', + 'dupIfSource', + 'meta', + 'lastV', + 'hash' + ] + change = _.pick(change, allowedKeys) + const jsonChange = JSON.stringify(change) + if (jsonChange.indexOf('\u0000') !== -1) { + // memory corruption check + error = new Error('null bytes found in op') + logger.error( + { err: error, project_id, doc_id, jsonChange }, + error.message + ) + return callback(error) + } - const updateSize = jsonChange.length; - if (updateSize > settings.maxUpdateSize) { - error = new Error("update is too large"); - error.updateSize = updateSize; - return callback(error); - } + const updateSize = jsonChange.length + if (updateSize > settings.maxUpdateSize) { + error = new Error('update is too large') + error.updateSize = updateSize + return callback(error) + } - // record metric for each update added to queue - metrics.summary('redis.pendingUpdates', updateSize, {status: 'push'}); + // record metric for each update added to queue + metrics.summary('redis.pendingUpdates', updateSize, { status: 'push' }) - const doc_key = `${project_id}:${doc_id}`; - // Push onto pendingUpdates for doc_id first, because once the doc updater - // gets an entry on pending-updates-list, it starts processing. - return rclient.rpush(Keys.pendingUpdates({doc_id}), jsonChange, function(error) { - if (error != null) { return callback(error); } - return rclient.rpush("pending-updates-list", doc_key, callback); - }); - } -}); + const doc_key = `${project_id}:${doc_id}` + // Push onto pendingUpdates for doc_id first, because once the doc updater + // gets an entry on pending-updates-list, it starts processing. + return rclient.rpush(Keys.pendingUpdates({ doc_id }), jsonChange, function ( + error + ) { + if (error != null) { + return callback(error) + } + return rclient.rpush('pending-updates-list', doc_key, callback) + }) + } +} diff --git a/services/real-time/app/js/DrainManager.js b/services/real-time/app/js/DrainManager.js index 466c80fd0c..b8c08356bb 100644 --- a/services/real-time/app/js/DrainManager.js +++ b/services/real-time/app/js/DrainManager.js @@ -9,54 +9,55 @@ * DS102: Remove unnecessary code created because of implicit returns * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let DrainManager; -const logger = require("logger-sharelatex"); +let DrainManager +const logger = require('logger-sharelatex') -module.exports = (DrainManager = { +module.exports = DrainManager = { + startDrainTimeWindow(io, minsToDrain) { + const drainPerMin = io.sockets.clients().length / minsToDrain + return DrainManager.startDrain(io, Math.max(drainPerMin / 60, 4)) + }, // enforce minimum drain rate - startDrainTimeWindow(io, minsToDrain){ - const drainPerMin = io.sockets.clients().length / minsToDrain; - return DrainManager.startDrain(io, Math.max(drainPerMin / 60, 4)); - }, // enforce minimum drain rate + startDrain(io, rate) { + // Clear out any old interval + let pollingInterval + clearInterval(this.interval) + logger.log({ rate }, 'starting drain') + if (rate === 0) { + return + } else if (rate < 1) { + // allow lower drain rates + // e.g. rate=0.1 will drain one client every 10 seconds + pollingInterval = 1000 / rate + rate = 1 + } else { + pollingInterval = 1000 + } + return (this.interval = setInterval(() => { + return this.reconnectNClients(io, rate) + }, pollingInterval)) + }, - startDrain(io, rate) { - // Clear out any old interval - let pollingInterval; - clearInterval(this.interval); - logger.log({rate}, "starting drain"); - if (rate === 0) { - return; - } else if (rate < 1) { - // allow lower drain rates - // e.g. rate=0.1 will drain one client every 10 seconds - pollingInterval = 1000 / rate; - rate = 1; - } else { - pollingInterval = 1000; - } - return this.interval = setInterval(() => { - return this.reconnectNClients(io, rate); - } - , pollingInterval); - }, - - RECONNECTED_CLIENTS: {}, - reconnectNClients(io, N) { - let drainedCount = 0; - for (const client of Array.from(io.sockets.clients())) { - if (!this.RECONNECTED_CLIENTS[client.id]) { - this.RECONNECTED_CLIENTS[client.id] = true; - logger.log({client_id: client.id}, "Asking client to reconnect gracefully"); - client.emit("reconnectGracefully"); - drainedCount++; - } - const haveDrainedNClients = (drainedCount === N); - if (haveDrainedNClients) { - break; - } - } - if (drainedCount < N) { - return logger.log("All clients have been told to reconnectGracefully"); - } - } -}); + RECONNECTED_CLIENTS: {}, + reconnectNClients(io, N) { + let drainedCount = 0 + for (const client of Array.from(io.sockets.clients())) { + if (!this.RECONNECTED_CLIENTS[client.id]) { + this.RECONNECTED_CLIENTS[client.id] = true + logger.log( + { client_id: client.id }, + 'Asking client to reconnect gracefully' + ) + client.emit('reconnectGracefully') + drainedCount++ + } + const haveDrainedNClients = drainedCount === N + if (haveDrainedNClients) { + break + } + } + if (drainedCount < N) { + return logger.log('All clients have been told to reconnectGracefully') + } + } +} diff --git a/services/real-time/app/js/Errors.js b/services/real-time/app/js/Errors.js index 04437742fb..8bfe3763b0 100644 --- a/services/real-time/app/js/Errors.js +++ b/services/real-time/app/js/Errors.js @@ -4,15 +4,14 @@ */ // TODO: This file was created by bulk-decaffeinate. // Fix any style issues and re-enable lint. -let Errors; -var CodedError = function(message, code) { - const error = new Error(message); - error.name = "CodedError"; - error.code = code; - error.__proto__ = CodedError.prototype; - return error; -}; -CodedError.prototype.__proto__ = Error.prototype; +let Errors +var CodedError = function (message, code) { + const error = new Error(message) + error.name = 'CodedError' + error.code = code + error.__proto__ = CodedError.prototype + return error +} +CodedError.prototype.__proto__ = Error.prototype -module.exports = (Errors = - {CodedError}); +module.exports = Errors = { CodedError } diff --git a/services/real-time/app/js/EventLogger.js b/services/real-time/app/js/EventLogger.js index 8a700326b5..1133ebdaf8 100644 --- a/services/real-time/app/js/EventLogger.js +++ b/services/real-time/app/js/EventLogger.js @@ -10,84 +10,91 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let EventLogger; -const logger = require('logger-sharelatex'); -const metrics = require('metrics-sharelatex'); -const settings = require('settings-sharelatex'); +let EventLogger +const logger = require('logger-sharelatex') +const metrics = require('metrics-sharelatex') +const settings = require('settings-sharelatex') // keep track of message counters to detect duplicate and out of order events // messsage ids have the format "UNIQUEHOSTKEY-COUNTER" -const EVENT_LOG_COUNTER = {}; -const EVENT_LOG_TIMESTAMP = {}; -let EVENT_LAST_CLEAN_TIMESTAMP = 0; +const EVENT_LOG_COUNTER = {} +const EVENT_LOG_TIMESTAMP = {} +let EVENT_LAST_CLEAN_TIMESTAMP = 0 // counter for debug logs -let COUNTER = 0; +let COUNTER = 0 -module.exports = (EventLogger = { +module.exports = EventLogger = { + MAX_STALE_TIME_IN_MS: 3600 * 1000, - MAX_STALE_TIME_IN_MS: 3600 * 1000, + debugEvent(channel, message) { + if (settings.debugEvents > 0) { + logger.log({ channel, message, counter: COUNTER++ }, 'logging event') + return settings.debugEvents-- + } + }, - debugEvent(channel, message) { - if (settings.debugEvents > 0) { - logger.log({channel, message, counter: COUNTER++}, "logging event"); - return settings.debugEvents--; - } - }, + checkEventOrder(channel, message_id, message) { + let result + if (typeof message_id !== 'string') { + return + } + if (!(result = message_id.match(/^(.*)-(\d+)$/))) { + return + } + const key = result[1] + const count = parseInt(result[2], 0) + if (!(count >= 0)) { + // ignore checks if counter is not present + return + } + // store the last count in a hash for each host + const previous = EventLogger._storeEventCount(key, count) + if (previous == null || count === previous + 1) { + metrics.inc(`event.${channel}.valid`, 0.001) // downsample high rate docupdater events + return // order is ok + } + if (count === previous) { + metrics.inc(`event.${channel}.duplicate`) + logger.warn({ channel, message_id }, 'duplicate event') + return 'duplicate' + } else { + metrics.inc(`event.${channel}.out-of-order`) + logger.warn( + { channel, message_id, key, previous, count }, + 'out of order event' + ) + return 'out-of-order' + } + }, - checkEventOrder(channel, message_id, message) { - let result; - if (typeof(message_id) !== 'string') { return; } - if (!(result = message_id.match(/^(.*)-(\d+)$/))) { return; } - const key = result[1]; - const count = parseInt(result[2], 0); - if (!(count >= 0)) {// ignore checks if counter is not present - return; - } - // store the last count in a hash for each host - const previous = EventLogger._storeEventCount(key, count); - if ((previous == null) || (count === (previous + 1))) { - metrics.inc(`event.${channel}.valid`, 0.001); // downsample high rate docupdater events - return; // order is ok - } - if (count === previous) { - metrics.inc(`event.${channel}.duplicate`); - logger.warn({channel, message_id}, "duplicate event"); - return "duplicate"; - } else { - metrics.inc(`event.${channel}.out-of-order`); - logger.warn({channel, message_id, key, previous, count}, "out of order event"); - return "out-of-order"; - } - }, + _storeEventCount(key, count) { + const previous = EVENT_LOG_COUNTER[key] + const now = Date.now() + EVENT_LOG_COUNTER[key] = count + EVENT_LOG_TIMESTAMP[key] = now + // periodically remove old counts + if (now - EVENT_LAST_CLEAN_TIMESTAMP > EventLogger.MAX_STALE_TIME_IN_MS) { + EventLogger._cleanEventStream(now) + EVENT_LAST_CLEAN_TIMESTAMP = now + } + return previous + }, - _storeEventCount(key, count) { - const previous = EVENT_LOG_COUNTER[key]; - const now = Date.now(); - EVENT_LOG_COUNTER[key] = count; - EVENT_LOG_TIMESTAMP[key] = now; - // periodically remove old counts - if ((now - EVENT_LAST_CLEAN_TIMESTAMP) > EventLogger.MAX_STALE_TIME_IN_MS) { - EventLogger._cleanEventStream(now); - EVENT_LAST_CLEAN_TIMESTAMP = now; - } - return previous; - }, - - _cleanEventStream(now) { - return (() => { - const result = []; - for (const key in EVENT_LOG_TIMESTAMP) { - const timestamp = EVENT_LOG_TIMESTAMP[key]; - if ((now - timestamp) > EventLogger.MAX_STALE_TIME_IN_MS) { - delete EVENT_LOG_COUNTER[key]; - result.push(delete EVENT_LOG_TIMESTAMP[key]); - } else { - result.push(undefined); - } - } - return result; - })(); - } -}); \ No newline at end of file + _cleanEventStream(now) { + return (() => { + const result = [] + for (const key in EVENT_LOG_TIMESTAMP) { + const timestamp = EVENT_LOG_TIMESTAMP[key] + if (now - timestamp > EventLogger.MAX_STALE_TIME_IN_MS) { + delete EVENT_LOG_COUNTER[key] + result.push(delete EVENT_LOG_TIMESTAMP[key]) + } else { + result.push(undefined) + } + } + return result + })() + } +} diff --git a/services/real-time/app/js/HealthCheckManager.js b/services/real-time/app/js/HealthCheckManager.js index f8a9aa672e..4704aa5e88 100644 --- a/services/real-time/app/js/HealthCheckManager.js +++ b/services/real-time/app/js/HealthCheckManager.js @@ -10,76 +10,84 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let HealthCheckManager; -const metrics = require("metrics-sharelatex"); -const logger = require("logger-sharelatex"); +let HealthCheckManager +const metrics = require('metrics-sharelatex') +const logger = require('logger-sharelatex') -const os = require("os"); -const HOST = os.hostname(); -const PID = process.pid; -let COUNT = 0; +const os = require('os') +const HOST = os.hostname() +const PID = process.pid +let COUNT = 0 -const CHANNEL_MANAGER = {}; // hash of event checkers by channel name -const CHANNEL_ERROR = {}; // error status by channel name +const CHANNEL_MANAGER = {} // hash of event checkers by channel name +const CHANNEL_ERROR = {} // error status by channel name -module.exports = (HealthCheckManager = class HealthCheckManager { - // create an instance of this class which checks that an event with a unique - // id is received only once within a timeout - constructor(channel, timeout) { - // unique event string - this.channel = channel; - if (timeout == null) { timeout = 1000; } - this.id = `host=${HOST}:pid=${PID}:count=${COUNT++}`; - // count of number of times the event is received - this.count = 0; - // after a timeout check the status of the count - this.handler = setTimeout(() => { - return this.setStatus(); - } - , timeout); - // use a timer to record the latency of the channel - this.timer = new metrics.Timer(`event.${this.channel}.latency`); - // keep a record of these objects to dispatch on - CHANNEL_MANAGER[this.channel] = this; +module.exports = HealthCheckManager = class HealthCheckManager { + // create an instance of this class which checks that an event with a unique + // id is received only once within a timeout + constructor(channel, timeout) { + // unique event string + this.channel = channel + if (timeout == null) { + timeout = 1000 } + this.id = `host=${HOST}:pid=${PID}:count=${COUNT++}` + // count of number of times the event is received + this.count = 0 + // after a timeout check the status of the count + this.handler = setTimeout(() => { + return this.setStatus() + }, timeout) + // use a timer to record the latency of the channel + this.timer = new metrics.Timer(`event.${this.channel}.latency`) + // keep a record of these objects to dispatch on + CHANNEL_MANAGER[this.channel] = this + } - processEvent(id) { - // if this is our event record it - if (id === this.id) { - this.count++; - if (this.timer != null) { - this.timer.done(); - } - return this.timer = null; // only time the latency of the first event - } + processEvent(id) { + // if this is our event record it + if (id === this.id) { + this.count++ + if (this.timer != null) { + this.timer.done() + } + return (this.timer = null) // only time the latency of the first event } + } - setStatus() { - // if we saw the event anything other than a single time that is an error - if (this.count !== 1) { - logger.err({channel:this.channel, count:this.count, id:this.id}, "redis channel health check error"); - } - const error = (this.count !== 1); - return CHANNEL_ERROR[this.channel] = error; + setStatus() { + // if we saw the event anything other than a single time that is an error + if (this.count !== 1) { + logger.err( + { channel: this.channel, count: this.count, id: this.id }, + 'redis channel health check error' + ) } + const error = this.count !== 1 + return (CHANNEL_ERROR[this.channel] = error) + } - // class methods - static check(channel, id) { - // dispatch event to manager for channel - return (CHANNEL_MANAGER[channel] != null ? CHANNEL_MANAGER[channel].processEvent(id) : undefined); - } + // class methods + static check(channel, id) { + // dispatch event to manager for channel + return CHANNEL_MANAGER[channel] != null + ? CHANNEL_MANAGER[channel].processEvent(id) + : undefined + } - static status() { - // return status of all channels for logging - return CHANNEL_ERROR; - } + static status() { + // return status of all channels for logging + return CHANNEL_ERROR + } - static isFailing() { - // check if any channel status is bad - for (const channel in CHANNEL_ERROR) { - const error = CHANNEL_ERROR[channel]; - if (error === true) { return true; } - } - return false; + static isFailing() { + // check if any channel status is bad + for (const channel in CHANNEL_ERROR) { + const error = CHANNEL_ERROR[channel] + if (error === true) { + return true + } } -}); + return false + } +} diff --git a/services/real-time/app/js/HttpApiController.js b/services/real-time/app/js/HttpApiController.js index 88bbc1a5e3..a512961797 100644 --- a/services/real-time/app/js/HttpApiController.js +++ b/services/real-time/app/js/HttpApiController.js @@ -10,47 +10,53 @@ * DS102: Remove unnecessary code created because of implicit returns * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let HttpApiController; -const WebsocketLoadBalancer = require("./WebsocketLoadBalancer"); -const DrainManager = require("./DrainManager"); -const logger = require("logger-sharelatex"); +let HttpApiController +const WebsocketLoadBalancer = require('./WebsocketLoadBalancer') +const DrainManager = require('./DrainManager') +const logger = require('logger-sharelatex') -module.exports = (HttpApiController = { - sendMessage(req, res, next) { - logger.log({message: req.params.message}, "sending message"); - if (Array.isArray(req.body)) { - for (const payload of Array.from(req.body)) { - WebsocketLoadBalancer.emitToRoom(req.params.project_id, req.params.message, payload); - } - } else { - WebsocketLoadBalancer.emitToRoom(req.params.project_id, req.params.message, req.body); - } - return res.send(204); - }, // No content - - startDrain(req, res, next) { - const io = req.app.get("io"); - let rate = req.query.rate || "4"; - rate = parseFloat(rate) || 0; - logger.log({rate}, "setting client drain rate"); - DrainManager.startDrain(io, rate); - return res.send(204); - }, +module.exports = HttpApiController = { + sendMessage(req, res, next) { + logger.log({ message: req.params.message }, 'sending message') + if (Array.isArray(req.body)) { + for (const payload of Array.from(req.body)) { + WebsocketLoadBalancer.emitToRoom( + req.params.project_id, + req.params.message, + payload + ) + } + } else { + WebsocketLoadBalancer.emitToRoom( + req.params.project_id, + req.params.message, + req.body + ) + } + return res.send(204) + }, // No content - disconnectClient(req, res, next) { - const io = req.app.get("io"); - const { - client_id - } = req.params; - const client = io.sockets.sockets[client_id]; + startDrain(req, res, next) { + const io = req.app.get('io') + let rate = req.query.rate || '4' + rate = parseFloat(rate) || 0 + logger.log({ rate }, 'setting client drain rate') + DrainManager.startDrain(io, rate) + return res.send(204) + }, - if (!client) { - logger.info({client_id}, "api: client already disconnected"); - res.sendStatus(404); - return; - } - logger.warn({client_id}, "api: requesting client disconnect"); - client.on("disconnect", () => res.sendStatus(204)); - return client.disconnect(); - } -}); + disconnectClient(req, res, next) { + const io = req.app.get('io') + const { client_id } = req.params + const client = io.sockets.sockets[client_id] + + if (!client) { + logger.info({ client_id }, 'api: client already disconnected') + res.sendStatus(404) + return + } + logger.warn({ client_id }, 'api: requesting client disconnect') + client.on('disconnect', () => res.sendStatus(204)) + return client.disconnect() + } +} diff --git a/services/real-time/app/js/HttpController.js b/services/real-time/app/js/HttpController.js index 4d33af44b3..deabf5876d 100644 --- a/services/real-time/app/js/HttpController.js +++ b/services/real-time/app/js/HttpController.js @@ -10,50 +10,78 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let HttpController; -const async = require("async"); +let HttpController +const async = require('async') -module.exports = (HttpController = { - // The code in this controller is hard to unit test because of a lot of - // dependencies on internal socket.io methods. It is not critical to the running - // of ShareLaTeX, and is only used for getting stats about connected clients, - // and for checking internal state in acceptance tests. The acceptances tests - // should provide appropriate coverage. - _getConnectedClientView(ioClient, callback) { - if (callback == null) { callback = function(error, client) {}; } - const client_id = ioClient.id; - const {project_id, user_id, first_name, last_name, email, connected_time} = ioClient.ol_context; - const client = {client_id, project_id, user_id, first_name, last_name, email, connected_time}; - client.rooms = []; - for (const name in ioClient.manager.roomClients[client_id]) { - const joined = ioClient.manager.roomClients[client_id][name]; - if (joined && (name !== "")) { - client.rooms.push(name.replace(/^\//, "")); // Remove leading / - } - } - return callback(null, client); - }, +module.exports = HttpController = { + // The code in this controller is hard to unit test because of a lot of + // dependencies on internal socket.io methods. It is not critical to the running + // of ShareLaTeX, and is only used for getting stats about connected clients, + // and for checking internal state in acceptance tests. The acceptances tests + // should provide appropriate coverage. + _getConnectedClientView(ioClient, callback) { + if (callback == null) { + callback = function (error, client) {} + } + const client_id = ioClient.id + const { + project_id, + user_id, + first_name, + last_name, + email, + connected_time + } = ioClient.ol_context + const client = { + client_id, + project_id, + user_id, + first_name, + last_name, + email, + connected_time + } + client.rooms = [] + for (const name in ioClient.manager.roomClients[client_id]) { + const joined = ioClient.manager.roomClients[client_id][name] + if (joined && name !== '') { + client.rooms.push(name.replace(/^\//, '')) // Remove leading / + } + } + return callback(null, client) + }, - getConnectedClients(req, res, next) { - const io = req.app.get("io"); - const ioClients = io.sockets.clients(); - return async.map(ioClients, HttpController._getConnectedClientView, function(error, clients) { - if (error != null) { return next(error); } - return res.json(clients); - }); - }, - - getConnectedClient(req, res, next) { - const {client_id} = req.params; - const io = req.app.get("io"); - const ioClient = io.sockets.sockets[client_id]; - if (!ioClient) { - res.sendStatus(404); - return; - } - return HttpController._getConnectedClientView(ioClient, function(error, client) { - if (error != null) { return next(error); } - return res.json(client); - }); - } -}); + getConnectedClients(req, res, next) { + const io = req.app.get('io') + const ioClients = io.sockets.clients() + return async.map( + ioClients, + HttpController._getConnectedClientView, + function (error, clients) { + if (error != null) { + return next(error) + } + return res.json(clients) + } + ) + }, + + getConnectedClient(req, res, next) { + const { client_id } = req.params + const io = req.app.get('io') + const ioClient = io.sockets.sockets[client_id] + if (!ioClient) { + res.sendStatus(404) + return + } + return HttpController._getConnectedClientView(ioClient, function ( + error, + client + ) { + if (error != null) { + return next(error) + } + return res.json(client) + }) + } +} diff --git a/services/real-time/app/js/RedisClientManager.js b/services/real-time/app/js/RedisClientManager.js index 3da2136b46..b43262aeda 100644 --- a/services/real-time/app/js/RedisClientManager.js +++ b/services/real-time/app/js/RedisClientManager.js @@ -10,31 +10,31 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let RedisClientManager; -const redis = require("redis-sharelatex"); -const logger = require('logger-sharelatex'); +let RedisClientManager +const redis = require('redis-sharelatex') +const logger = require('logger-sharelatex') -module.exports = (RedisClientManager = { - createClientList(...configs) { - // create a dynamic list of redis clients, excluding any configurations which are not defined - const clientList = (() => { - const result = []; - for (const x of Array.from(configs)) { - if (x != null) { - const redisType = (x.cluster != null) ? - "cluster" - : (x.sentinels != null) ? - "sentinel" - : (x.host != null) ? - "single" - : - "unknown"; - logger.log({redis: redisType}, "creating redis client"); - result.push(redis.createClient(x)); - } - } - return result; - })(); - return clientList; - } -}); \ No newline at end of file +module.exports = RedisClientManager = { + createClientList(...configs) { + // create a dynamic list of redis clients, excluding any configurations which are not defined + const clientList = (() => { + const result = [] + for (const x of Array.from(configs)) { + if (x != null) { + const redisType = + x.cluster != null + ? 'cluster' + : x.sentinels != null + ? 'sentinel' + : x.host != null + ? 'single' + : 'unknown' + logger.log({ redis: redisType }, 'creating redis client') + result.push(redis.createClient(x)) + } + } + return result + })() + return clientList + } +} diff --git a/services/real-time/app/js/RoomManager.js b/services/real-time/app/js/RoomManager.js index c75cc68626..8dd34e9340 100644 --- a/services/real-time/app/js/RoomManager.js +++ b/services/real-time/app/js/RoomManager.js @@ -13,13 +13,13 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let RoomManager; -const logger = require('logger-sharelatex'); -const metrics = require("metrics-sharelatex"); -const {EventEmitter} = require('events'); +let RoomManager +const logger = require('logger-sharelatex') +const metrics = require('metrics-sharelatex') +const { EventEmitter } = require('events') -const IdMap = new Map(); // keep track of whether ids are from projects or docs -const RoomEvents = new EventEmitter(); // emits {project,doc}-active and {project,doc}-empty events +const IdMap = new Map() // keep track of whether ids are from projects or docs +const RoomEvents = new EventEmitter() // emits {project,doc}-active and {project,doc}-empty events // Manage socket.io rooms for individual projects and docs // @@ -31,130 +31,159 @@ const RoomEvents = new EventEmitter(); // emits {project,doc}-active and {projec // // The pubsub side is handled by ChannelManager -module.exports = (RoomManager = { - - joinProject(client, project_id, callback) { - if (callback == null) { callback = function() {}; } - return this.joinEntity(client, "project", project_id, callback); - }, - - joinDoc(client, doc_id, callback) { - if (callback == null) { callback = function() {}; } - return this.joinEntity(client, "doc", doc_id, callback); - }, - - leaveDoc(client, doc_id) { - return this.leaveEntity(client, "doc", doc_id); - }, - - leaveProjectAndDocs(client) { - // what rooms is this client in? we need to leave them all. socket.io - // will cause us to leave the rooms, so we only need to manage our - // channel subscriptions... but it will be safer if we leave them - // explicitly, and then socket.io will just regard this as a client that - // has not joined any rooms and do a final disconnection. - const roomsToLeave = this._roomsClientIsIn(client); - logger.log({client: client.id, roomsToLeave}, "client leaving project"); - return (() => { - const result = []; - for (const id of Array.from(roomsToLeave)) { - const entity = IdMap.get(id); - result.push(this.leaveEntity(client, entity, id)); - } - return result; - })(); - }, - - emitOnCompletion(promiseList, eventName) { - return Promise.all(promiseList) - .then(() => RoomEvents.emit(eventName)) - .catch(err => RoomEvents.emit(eventName, err)); - }, - - eventSource() { - return RoomEvents; - }, - - joinEntity(client, entity, id, callback) { - const beforeCount = this._clientsInRoom(client, id); - // client joins room immediately but joinDoc request does not complete - // until room is subscribed - client.join(id); - // is this a new room? if so, subscribe - if (beforeCount === 0) { - logger.log({entity, id}, "room is now active"); - RoomEvents.once(`${entity}-subscribed-${id}`, function(err) { - // only allow the client to join when all the relevant channels have subscribed - logger.log({client: client.id, entity, id, beforeCount}, "client joined new room and subscribed to channel"); - return callback(err); - }); - RoomEvents.emit(`${entity}-active`, id); - IdMap.set(id, entity); - // keep track of the number of listeners - return metrics.gauge("room-listeners", RoomEvents.eventNames().length); - } else { - logger.log({client: client.id, entity, id, beforeCount}, "client joined existing room"); - client.join(id); - return callback(); - } - }, - - leaveEntity(client, entity, id) { - // Ignore any requests to leave when the client is not actually in the - // room. This can happen if the client sends spurious leaveDoc requests - // for old docs after a reconnection. - // This can now happen all the time, as we skip the join for clients that - // disconnect before joinProject/joinDoc completed. - if (!this._clientAlreadyInRoom(client, id)) { - logger.log({client: client.id, entity, id}, "ignoring request from client to leave room it is not in"); - return; - } - client.leave(id); - const afterCount = this._clientsInRoom(client, id); - logger.log({client: client.id, entity, id, afterCount}, "client left room"); - // is the room now empty? if so, unsubscribe - if ((entity == null)) { - logger.error({entity: id}, "unknown entity when leaving with id"); - return; - } - if (afterCount === 0) { - logger.log({entity, id}, "room is now empty"); - RoomEvents.emit(`${entity}-empty`, id); - IdMap.delete(id); - return metrics.gauge("room-listeners", RoomEvents.eventNames().length); - } - }, - - // internal functions below, these access socket.io rooms data directly and - // will need updating for socket.io v2 - - _clientsInRoom(client, room) { - const nsp = client.namespace.name; - const name = (nsp + '/') + room; - return (__guard__(client.manager != null ? client.manager.rooms : undefined, x => x[name]) || []).length; - }, - - _roomsClientIsIn(client) { - const roomList = (() => { - const result = []; - for (const fullRoomPath in (client.manager.roomClients != null ? client.manager.roomClients[client.id] : undefined)) { - // strip socket.io prefix from room to get original id - if (fullRoomPath !== '') { - const [prefix, room] = Array.from(fullRoomPath.split('/', 2)); - result.push(room); - } - } - return result; - })(); - return roomList; - }, - - _clientAlreadyInRoom(client, room) { - const nsp = client.namespace.name; - const name = (nsp + '/') + room; - return __guard__(client.manager.roomClients != null ? client.manager.roomClients[client.id] : undefined, x => x[name]); +module.exports = RoomManager = { + joinProject(client, project_id, callback) { + if (callback == null) { + callback = function () {} } -}); + return this.joinEntity(client, 'project', project_id, callback) + }, + + joinDoc(client, doc_id, callback) { + if (callback == null) { + callback = function () {} + } + return this.joinEntity(client, 'doc', doc_id, callback) + }, + + leaveDoc(client, doc_id) { + return this.leaveEntity(client, 'doc', doc_id) + }, + + leaveProjectAndDocs(client) { + // what rooms is this client in? we need to leave them all. socket.io + // will cause us to leave the rooms, so we only need to manage our + // channel subscriptions... but it will be safer if we leave them + // explicitly, and then socket.io will just regard this as a client that + // has not joined any rooms and do a final disconnection. + const roomsToLeave = this._roomsClientIsIn(client) + logger.log({ client: client.id, roomsToLeave }, 'client leaving project') + return (() => { + const result = [] + for (const id of Array.from(roomsToLeave)) { + const entity = IdMap.get(id) + result.push(this.leaveEntity(client, entity, id)) + } + return result + })() + }, + + emitOnCompletion(promiseList, eventName) { + return Promise.all(promiseList) + .then(() => RoomEvents.emit(eventName)) + .catch((err) => RoomEvents.emit(eventName, err)) + }, + + eventSource() { + return RoomEvents + }, + + joinEntity(client, entity, id, callback) { + const beforeCount = this._clientsInRoom(client, id) + // client joins room immediately but joinDoc request does not complete + // until room is subscribed + client.join(id) + // is this a new room? if so, subscribe + if (beforeCount === 0) { + logger.log({ entity, id }, 'room is now active') + RoomEvents.once(`${entity}-subscribed-${id}`, function (err) { + // only allow the client to join when all the relevant channels have subscribed + logger.log( + { client: client.id, entity, id, beforeCount }, + 'client joined new room and subscribed to channel' + ) + return callback(err) + }) + RoomEvents.emit(`${entity}-active`, id) + IdMap.set(id, entity) + // keep track of the number of listeners + return metrics.gauge('room-listeners', RoomEvents.eventNames().length) + } else { + logger.log( + { client: client.id, entity, id, beforeCount }, + 'client joined existing room' + ) + client.join(id) + return callback() + } + }, + + leaveEntity(client, entity, id) { + // Ignore any requests to leave when the client is not actually in the + // room. This can happen if the client sends spurious leaveDoc requests + // for old docs after a reconnection. + // This can now happen all the time, as we skip the join for clients that + // disconnect before joinProject/joinDoc completed. + if (!this._clientAlreadyInRoom(client, id)) { + logger.log( + { client: client.id, entity, id }, + 'ignoring request from client to leave room it is not in' + ) + return + } + client.leave(id) + const afterCount = this._clientsInRoom(client, id) + logger.log( + { client: client.id, entity, id, afterCount }, + 'client left room' + ) + // is the room now empty? if so, unsubscribe + if (entity == null) { + logger.error({ entity: id }, 'unknown entity when leaving with id') + return + } + if (afterCount === 0) { + logger.log({ entity, id }, 'room is now empty') + RoomEvents.emit(`${entity}-empty`, id) + IdMap.delete(id) + return metrics.gauge('room-listeners', RoomEvents.eventNames().length) + } + }, + + // internal functions below, these access socket.io rooms data directly and + // will need updating for socket.io v2 + + _clientsInRoom(client, room) { + const nsp = client.namespace.name + const name = nsp + '/' + room + return ( + __guard__( + client.manager != null ? client.manager.rooms : undefined, + (x) => x[name] + ) || [] + ).length + }, + + _roomsClientIsIn(client) { + const roomList = (() => { + const result = [] + for (const fullRoomPath in client.manager.roomClients != null + ? client.manager.roomClients[client.id] + : undefined) { + // strip socket.io prefix from room to get original id + if (fullRoomPath !== '') { + const [prefix, room] = Array.from(fullRoomPath.split('/', 2)) + result.push(room) + } + } + return result + })() + return roomList + }, + + _clientAlreadyInRoom(client, room) { + const nsp = client.namespace.name + const name = nsp + '/' + room + return __guard__( + client.manager.roomClients != null + ? client.manager.roomClients[client.id] + : undefined, + (x) => x[name] + ) + } +} function __guard__(value, transform) { - return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; -} \ No newline at end of file + return typeof value !== 'undefined' && value !== null + ? transform(value) + : undefined +} diff --git a/services/real-time/app/js/Router.js b/services/real-time/app/js/Router.js index f475596036..0e19c46bc0 100644 --- a/services/real-time/app/js/Router.js +++ b/services/real-time/app/js/Router.js @@ -13,259 +13,390 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let Router; -const metrics = require("metrics-sharelatex"); -const logger = require("logger-sharelatex"); -const settings = require("settings-sharelatex"); -const WebsocketController = require("./WebsocketController"); -const HttpController = require("./HttpController"); -const HttpApiController = require("./HttpApiController"); -const bodyParser = require("body-parser"); -const base64id = require("base64id"); +let Router +const metrics = require('metrics-sharelatex') +const logger = require('logger-sharelatex') +const settings = require('settings-sharelatex') +const WebsocketController = require('./WebsocketController') +const HttpController = require('./HttpController') +const HttpApiController = require('./HttpApiController') +const bodyParser = require('body-parser') +const base64id = require('base64id') -const basicAuth = require('basic-auth-connect'); -const httpAuth = basicAuth(function(user, pass){ - const isValid = (user === settings.internal.realTime.user) && (pass === settings.internal.realTime.pass); - if (!isValid) { - logger.err({user, pass}, "invalid login details"); - } - return isValid; -}); +const basicAuth = require('basic-auth-connect') +const httpAuth = basicAuth(function (user, pass) { + const isValid = + user === settings.internal.realTime.user && + pass === settings.internal.realTime.pass + if (!isValid) { + logger.err({ user, pass }, 'invalid login details') + } + return isValid +}) -module.exports = (Router = { - _handleError(callback, error, client, method, attrs) { - if (callback == null) { callback = function(error) {}; } - if (attrs == null) { attrs = {}; } - for (const key of ["project_id", "doc_id", "user_id"]) { - attrs[key] = client.ol_context[key]; - } - attrs.client_id = client.id; - attrs.err = error; - if (error.name === "CodedError") { - logger.warn(attrs, error.message, {code: error.code}); - return callback({message: error.message, code: error.code}); - } - if (error.message === 'unexpected arguments') { - // the payload might be very large, put it on level info - logger.log(attrs, 'unexpected arguments'); - metrics.inc('unexpected-arguments', 1, { status: method }); - return callback({ message: error.message }); - } - if (["not authorized", "doc updater could not load requested ops", "no project_id found on client"].includes(error.message)) { - logger.warn(attrs, error.message); - return callback({message: error.message}); - } else { - logger.error(attrs, `server side error in ${method}`); - // Don't return raw error to prevent leaking server side info - return callback({message: "Something went wrong in real-time service"}); - } - }, +module.exports = Router = { + _handleError(callback, error, client, method, attrs) { + if (callback == null) { + callback = function (error) {} + } + if (attrs == null) { + attrs = {} + } + for (const key of ['project_id', 'doc_id', 'user_id']) { + attrs[key] = client.ol_context[key] + } + attrs.client_id = client.id + attrs.err = error + if (error.name === 'CodedError') { + logger.warn(attrs, error.message, { code: error.code }) + return callback({ message: error.message, code: error.code }) + } + if (error.message === 'unexpected arguments') { + // the payload might be very large, put it on level info + logger.log(attrs, 'unexpected arguments') + metrics.inc('unexpected-arguments', 1, { status: method }) + return callback({ message: error.message }) + } + if ( + [ + 'not authorized', + 'doc updater could not load requested ops', + 'no project_id found on client' + ].includes(error.message) + ) { + logger.warn(attrs, error.message) + return callback({ message: error.message }) + } else { + logger.error(attrs, `server side error in ${method}`) + // Don't return raw error to prevent leaking server side info + return callback({ message: 'Something went wrong in real-time service' }) + } + }, - _handleInvalidArguments(client, method, args) { - const error = new Error("unexpected arguments"); - let callback = args[args.length - 1]; - if (typeof callback !== 'function') { - callback = (function() {}); - } - const attrs = {arguments: args}; - return Router._handleError(callback, error, client, method, attrs); - }, + _handleInvalidArguments(client, method, args) { + const error = new Error('unexpected arguments') + let callback = args[args.length - 1] + if (typeof callback !== 'function') { + callback = function () {} + } + const attrs = { arguments: args } + return Router._handleError(callback, error, client, method, attrs) + }, - configure(app, io, session) { - app.set("io", io); - app.get("/clients", HttpController.getConnectedClients); - app.get("/clients/:client_id", HttpController.getConnectedClient); + configure(app, io, session) { + app.set('io', io) + app.get('/clients', HttpController.getConnectedClients) + app.get('/clients/:client_id', HttpController.getConnectedClient) - app.post("/project/:project_id/message/:message", httpAuth, bodyParser.json({limit: "5mb"}), HttpApiController.sendMessage); - - app.post("/drain", httpAuth, HttpApiController.startDrain); - app.post("/client/:client_id/disconnect", httpAuth, HttpApiController.disconnectClient); + app.post( + '/project/:project_id/message/:message', + httpAuth, + bodyParser.json({ limit: '5mb' }), + HttpApiController.sendMessage + ) - return session.on('connection', function(error, client, session) { - // init client context, we may access it in Router._handleError before - // setting any values - let user; - client.ol_context = {}; + app.post('/drain', httpAuth, HttpApiController.startDrain) + app.post( + '/client/:client_id/disconnect', + httpAuth, + HttpApiController.disconnectClient + ) - if (client != null) { - client.on("error", function(err) { - logger.err({ clientErr: err }, "socket.io client error"); - if (client.connected) { - client.emit("reconnectGracefully"); - return client.disconnect(); - } - }); - } + return session.on('connection', function (error, client, session) { + // init client context, we may access it in Router._handleError before + // setting any values + let user + client.ol_context = {} - if (settings.shutDownInProgress) { - client.emit("connectionRejected", {message: "retry"}); - client.disconnect(); - return; - } + if (client != null) { + client.on('error', function (err) { + logger.err({ clientErr: err }, 'socket.io client error') + if (client.connected) { + client.emit('reconnectGracefully') + return client.disconnect() + } + }) + } - if ((client != null) && __guard__(error != null ? error.message : undefined, x => x.match(/could not look up session by key/))) { - logger.warn({err: error, client: (client != null), session: (session != null)}, "invalid session"); - // tell the client to reauthenticate if it has an invalid session key - client.emit("connectionRejected", {message: "invalid session"}); - client.disconnect(); - return; - } + if (settings.shutDownInProgress) { + client.emit('connectionRejected', { message: 'retry' }) + client.disconnect() + return + } - if (error != null) { - logger.err({err: error, client: (client != null), session: (session != null)}, "error when client connected"); - if (client != null) { - client.emit("connectionRejected", {message: "error"}); - } - if (client != null) { - client.disconnect(); - } - return; - } + if ( + client != null && + __guard__(error != null ? error.message : undefined, (x) => + x.match(/could not look up session by key/) + ) + ) { + logger.warn( + { err: error, client: client != null, session: session != null }, + 'invalid session' + ) + // tell the client to reauthenticate if it has an invalid session key + client.emit('connectionRejected', { message: 'invalid session' }) + client.disconnect() + return + } - // send positive confirmation that the client has a valid connection - client.publicId = 'P.' + base64id.generateId(); - client.emit("connectionAccepted", null, client.publicId); + if (error != null) { + logger.err( + { err: error, client: client != null, session: session != null }, + 'error when client connected' + ) + if (client != null) { + client.emit('connectionRejected', { message: 'error' }) + } + if (client != null) { + client.disconnect() + } + return + } - metrics.inc('socket-io.connection'); - metrics.gauge('socket-io.clients', __guard__(io.sockets.clients(), x1 => x1.length)); + // send positive confirmation that the client has a valid connection + client.publicId = 'P.' + base64id.generateId() + client.emit('connectionAccepted', null, client.publicId) - logger.log({session, client_id: client.id}, "client connected"); + metrics.inc('socket-io.connection') + metrics.gauge( + 'socket-io.clients', + __guard__(io.sockets.clients(), (x1) => x1.length) + ) - if (__guard__(session != null ? session.passport : undefined, x2 => x2.user) != null) { - ({ - user - } = session.passport); - } else if ((session != null ? session.user : undefined) != null) { - ({ - user - } = session); - } else { - user = {_id: "anonymous-user"}; - } + logger.log({ session, client_id: client.id }, 'client connected') - client.on("joinProject", function(data, callback) { - if (data == null) { data = {}; } - if (typeof callback !== 'function') { - return Router._handleInvalidArguments(client, 'joinProject', arguments); - } + if ( + __guard__( + session != null ? session.passport : undefined, + (x2) => x2.user + ) != null + ) { + ;({ user } = session.passport) + } else if ((session != null ? session.user : undefined) != null) { + ;({ user } = session) + } else { + user = { _id: 'anonymous-user' } + } - if (data.anonymousAccessToken) { - user.anonymousAccessToken = data.anonymousAccessToken; - } - return WebsocketController.joinProject(client, user, data.project_id, function(err, ...args) { - if (err != null) { - return Router._handleError(callback, err, client, "joinProject", {project_id: data.project_id, user_id: (user != null ? user.id : undefined)}); - } else { - return callback(null, ...Array.from(args)); - } - }); - }); + client.on('joinProject', function (data, callback) { + if (data == null) { + data = {} + } + if (typeof callback !== 'function') { + return Router._handleInvalidArguments( + client, + 'joinProject', + arguments + ) + } - client.on("disconnect", function() { - metrics.inc('socket-io.disconnect'); - metrics.gauge('socket-io.clients', __guard__(io.sockets.clients(), x3 => x3.length) - 1); + if (data.anonymousAccessToken) { + user.anonymousAccessToken = data.anonymousAccessToken + } + return WebsocketController.joinProject( + client, + user, + data.project_id, + function (err, ...args) { + if (err != null) { + return Router._handleError(callback, err, client, 'joinProject', { + project_id: data.project_id, + user_id: user != null ? user.id : undefined + }) + } else { + return callback(null, ...Array.from(args)) + } + } + ) + }) - return WebsocketController.leaveProject(io, client, function(err) { - if (err != null) { - return Router._handleError((function() {}), err, client, "leaveProject"); - } - }); - }); + client.on('disconnect', function () { + metrics.inc('socket-io.disconnect') + metrics.gauge( + 'socket-io.clients', + __guard__(io.sockets.clients(), (x3) => x3.length) - 1 + ) - // Variadic. The possible arguments: - // doc_id, callback - // doc_id, fromVersion, callback - // doc_id, options, callback - // doc_id, fromVersion, options, callback - client.on("joinDoc", function(doc_id, fromVersion, options, callback) { - if ((typeof fromVersion === "function") && !options) { - callback = fromVersion; - fromVersion = -1; - options = {}; - } else if ((typeof fromVersion === "number") && (typeof options === "function")) { - callback = options; - options = {}; - } else if ((typeof fromVersion === "object") && (typeof options === "function")) { - callback = options; - options = fromVersion; - fromVersion = -1; - } else if ((typeof fromVersion === "number") && (typeof options === "object") && (typeof callback === 'function')) { - // Called with 4 args, things are as expected - } else { - return Router._handleInvalidArguments(client, 'joinDoc', arguments); - } + return WebsocketController.leaveProject(io, client, function (err) { + if (err != null) { + return Router._handleError( + function () {}, + err, + client, + 'leaveProject' + ) + } + }) + }) - return WebsocketController.joinDoc(client, doc_id, fromVersion, options, function(err, ...args) { - if (err != null) { - return Router._handleError(callback, err, client, "joinDoc", {doc_id, fromVersion}); - } else { - return callback(null, ...Array.from(args)); - } - }); - }); + // Variadic. The possible arguments: + // doc_id, callback + // doc_id, fromVersion, callback + // doc_id, options, callback + // doc_id, fromVersion, options, callback + client.on('joinDoc', function (doc_id, fromVersion, options, callback) { + if (typeof fromVersion === 'function' && !options) { + callback = fromVersion + fromVersion = -1 + options = {} + } else if ( + typeof fromVersion === 'number' && + typeof options === 'function' + ) { + callback = options + options = {} + } else if ( + typeof fromVersion === 'object' && + typeof options === 'function' + ) { + callback = options + options = fromVersion + fromVersion = -1 + } else if ( + typeof fromVersion === 'number' && + typeof options === 'object' && + typeof callback === 'function' + ) { + // Called with 4 args, things are as expected + } else { + return Router._handleInvalidArguments(client, 'joinDoc', arguments) + } - client.on("leaveDoc", function(doc_id, callback) { - if (typeof callback !== 'function') { - return Router._handleInvalidArguments(client, 'leaveDoc', arguments); - } + return WebsocketController.joinDoc( + client, + doc_id, + fromVersion, + options, + function (err, ...args) { + if (err != null) { + return Router._handleError(callback, err, client, 'joinDoc', { + doc_id, + fromVersion + }) + } else { + return callback(null, ...Array.from(args)) + } + } + ) + }) - return WebsocketController.leaveDoc(client, doc_id, function(err, ...args) { - if (err != null) { - return Router._handleError(callback, err, client, "leaveDoc"); - } else { - return callback(null, ...Array.from(args)); - } - }); - }); + client.on('leaveDoc', function (doc_id, callback) { + if (typeof callback !== 'function') { + return Router._handleInvalidArguments(client, 'leaveDoc', arguments) + } - client.on("clientTracking.getConnectedUsers", function(callback) { - if (callback == null) { callback = function(error, users) {}; } - if (typeof callback !== 'function') { - return Router._handleInvalidArguments(client, 'clientTracking.getConnectedUsers', arguments); - } + return WebsocketController.leaveDoc(client, doc_id, function ( + err, + ...args + ) { + if (err != null) { + return Router._handleError(callback, err, client, 'leaveDoc') + } else { + return callback(null, ...Array.from(args)) + } + }) + }) - return WebsocketController.getConnectedUsers(client, function(err, users) { - if (err != null) { - return Router._handleError(callback, err, client, "clientTracking.getConnectedUsers"); - } else { - return callback(null, users); - } - }); - }); + client.on('clientTracking.getConnectedUsers', function (callback) { + if (callback == null) { + callback = function (error, users) {} + } + if (typeof callback !== 'function') { + return Router._handleInvalidArguments( + client, + 'clientTracking.getConnectedUsers', + arguments + ) + } - client.on("clientTracking.updatePosition", function(cursorData, callback) { - if (callback == null) { callback = function(error) {}; } - if (typeof callback !== 'function') { - return Router._handleInvalidArguments(client, 'clientTracking.updatePosition', arguments); - } + return WebsocketController.getConnectedUsers(client, function ( + err, + users + ) { + if (err != null) { + return Router._handleError( + callback, + err, + client, + 'clientTracking.getConnectedUsers' + ) + } else { + return callback(null, users) + } + }) + }) - return WebsocketController.updateClientPosition(client, cursorData, function(err) { - if (err != null) { - return Router._handleError(callback, err, client, "clientTracking.updatePosition"); - } else { - return callback(); - } - }); - }); + client.on('clientTracking.updatePosition', function ( + cursorData, + callback + ) { + if (callback == null) { + callback = function (error) {} + } + if (typeof callback !== 'function') { + return Router._handleInvalidArguments( + client, + 'clientTracking.updatePosition', + arguments + ) + } - return client.on("applyOtUpdate", function(doc_id, update, callback) { - if (callback == null) { callback = function(error) {}; } - if (typeof callback !== 'function') { - return Router._handleInvalidArguments(client, 'applyOtUpdate', arguments); - } + return WebsocketController.updateClientPosition( + client, + cursorData, + function (err) { + if (err != null) { + return Router._handleError( + callback, + err, + client, + 'clientTracking.updatePosition' + ) + } else { + return callback() + } + } + ) + }) - return WebsocketController.applyOtUpdate(client, doc_id, update, function(err) { - if (err != null) { - return Router._handleError(callback, err, client, "applyOtUpdate", {doc_id, update}); - } else { - return callback(); - } - }); - }); - }); - } -}); + return client.on('applyOtUpdate', function (doc_id, update, callback) { + if (callback == null) { + callback = function (error) {} + } + if (typeof callback !== 'function') { + return Router._handleInvalidArguments( + client, + 'applyOtUpdate', + arguments + ) + } + + return WebsocketController.applyOtUpdate( + client, + doc_id, + update, + function (err) { + if (err != null) { + return Router._handleError( + callback, + err, + client, + 'applyOtUpdate', + { doc_id, update } + ) + } else { + return callback() + } + } + ) + }) + }) + } +} function __guard__(value, transform) { - return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; -} \ No newline at end of file + return typeof value !== 'undefined' && value !== null + ? transform(value) + : undefined +} diff --git a/services/real-time/app/js/SafeJsonParse.js b/services/real-time/app/js/SafeJsonParse.js index f5e8dd3797..6e2e287853 100644 --- a/services/real-time/app/js/SafeJsonParse.js +++ b/services/real-time/app/js/SafeJsonParse.js @@ -9,22 +9,27 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -const Settings = require("settings-sharelatex"); -const logger = require("logger-sharelatex"); +const Settings = require('settings-sharelatex') +const logger = require('logger-sharelatex') module.exports = { - parse(data, callback) { - let parsed; - if (callback == null) { callback = function(error, parsed) {}; } - if (data.length > Settings.maxUpdateSize) { - logger.error({head: data.slice(0,1024), length: data.length}, "data too large to parse"); - return callback(new Error("data too large to parse")); - } - try { - parsed = JSON.parse(data); - } catch (e) { - return callback(e); - } - return callback(null, parsed); - } -}; \ No newline at end of file + parse(data, callback) { + let parsed + if (callback == null) { + callback = function (error, parsed) {} + } + if (data.length > Settings.maxUpdateSize) { + logger.error( + { head: data.slice(0, 1024), length: data.length }, + 'data too large to parse' + ) + return callback(new Error('data too large to parse')) + } + try { + parsed = JSON.parse(data) + } catch (e) { + return callback(e) + } + return callback(null, parsed) + } +} diff --git a/services/real-time/app/js/SessionSockets.js b/services/real-time/app/js/SessionSockets.js index 894c7b53d5..b01920dfa7 100644 --- a/services/real-time/app/js/SessionSockets.js +++ b/services/real-time/app/js/SessionSockets.js @@ -5,32 +5,33 @@ * DS102: Remove unnecessary code created because of implicit returns * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -const {EventEmitter} = require('events'); +const { EventEmitter } = require('events') -module.exports = function(io, sessionStore, cookieParser, cookieName) { - const missingSessionError = new Error('could not look up session by key'); +module.exports = function (io, sessionStore, cookieParser, cookieName) { + const missingSessionError = new Error('could not look up session by key') - const sessionSockets = new EventEmitter(); - const next = (error, socket, session) => sessionSockets.emit('connection', error, socket, session); + const sessionSockets = new EventEmitter() + const next = (error, socket, session) => + sessionSockets.emit('connection', error, socket, session) - io.on('connection', function(socket) { - const req = socket.handshake; - return cookieParser(req, {}, function() { - const sessionId = req.signedCookies && req.signedCookies[cookieName]; - if (!sessionId) { - return next(missingSessionError, socket); - } - return sessionStore.get(sessionId, function(error, session) { - if (error) { - return next(error, socket); - } - if (!session) { - return next(missingSessionError, socket); - } - return next(null, socket, session); - }); - }); - }); + io.on('connection', function (socket) { + const req = socket.handshake + return cookieParser(req, {}, function () { + const sessionId = req.signedCookies && req.signedCookies[cookieName] + if (!sessionId) { + return next(missingSessionError, socket) + } + return sessionStore.get(sessionId, function (error, session) { + if (error) { + return next(error, socket) + } + if (!session) { + return next(missingSessionError, socket) + } + return next(null, socket, session) + }) + }) + }) - return sessionSockets; -}; + return sessionSockets +} diff --git a/services/real-time/app/js/WebApiManager.js b/services/real-time/app/js/WebApiManager.js index 9598d83106..266135333a 100644 --- a/services/real-time/app/js/WebApiManager.js +++ b/services/real-time/app/js/WebApiManager.js @@ -11,51 +11,76 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let WebApiManager; -const request = require("request"); -const settings = require("settings-sharelatex"); -const logger = require("logger-sharelatex"); -const { CodedError } = require("./Errors"); +let WebApiManager +const request = require('request') +const settings = require('settings-sharelatex') +const logger = require('logger-sharelatex') +const { CodedError } = require('./Errors') -module.exports = (WebApiManager = { - joinProject(project_id, user, callback) { - if (callback == null) { callback = function(error, project, privilegeLevel, isRestrictedUser) {}; } - const user_id = user._id; - logger.log({project_id, user_id}, "sending join project request to web"); - const url = `${settings.apis.web.url}/project/${project_id}/join`; - const headers = {}; - if (user.anonymousAccessToken != null) { - headers['x-sl-anonymous-access-token'] = user.anonymousAccessToken; - } - return request.post({ - url, - qs: {user_id}, - auth: { - user: settings.apis.web.user, - pass: settings.apis.web.pass, - sendImmediately: true - }, - json: true, - jar: false, - headers - }, function(error, response, data) { - let err; - if (error != null) { return callback(error); } - if (response.statusCode >= 200 && response.statusCode < 300) { - if ((data == null) || ((data != null ? data.project : undefined) == null)) { - err = new Error('no data returned from joinProject request'); - logger.error({err, project_id, user_id}, "error accessing web api"); - return callback(err); - } - return callback(null, data.project, data.privilegeLevel, data.isRestrictedUser); - } else if (response.statusCode === 429) { - logger.log(project_id, user_id, "rate-limit hit when joining project"); - return callback(new CodedError("rate-limit hit when joining project", "TooManyRequests")); - } else { - err = new Error(`non-success status code from web: ${response.statusCode}`); - logger.error({err, project_id, user_id}, "error accessing web api"); - return callback(err); - } - }); - } -}); +module.exports = WebApiManager = { + joinProject(project_id, user, callback) { + if (callback == null) { + callback = function (error, project, privilegeLevel, isRestrictedUser) {} + } + const user_id = user._id + logger.log({ project_id, user_id }, 'sending join project request to web') + const url = `${settings.apis.web.url}/project/${project_id}/join` + const headers = {} + if (user.anonymousAccessToken != null) { + headers['x-sl-anonymous-access-token'] = user.anonymousAccessToken + } + return request.post( + { + url, + qs: { user_id }, + auth: { + user: settings.apis.web.user, + pass: settings.apis.web.pass, + sendImmediately: true + }, + json: true, + jar: false, + headers + }, + function (error, response, data) { + let err + if (error != null) { + return callback(error) + } + if (response.statusCode >= 200 && response.statusCode < 300) { + if ( + data == null || + (data != null ? data.project : undefined) == null + ) { + err = new Error('no data returned from joinProject request') + logger.error( + { err, project_id, user_id }, + 'error accessing web api' + ) + return callback(err) + } + return callback( + null, + data.project, + data.privilegeLevel, + data.isRestrictedUser + ) + } else if (response.statusCode === 429) { + logger.log(project_id, user_id, 'rate-limit hit when joining project') + return callback( + new CodedError( + 'rate-limit hit when joining project', + 'TooManyRequests' + ) + ) + } else { + err = new Error( + `non-success status code from web: ${response.statusCode}` + ) + logger.error({ err, project_id, user_id }, 'error accessing web api') + return callback(err) + } + } + ) + } +} diff --git a/services/real-time/app/js/WebsocketController.js b/services/real-time/app/js/WebsocketController.js index aa51bbb372..92632cc154 100644 --- a/services/real-time/app/js/WebsocketController.js +++ b/services/real-time/app/js/WebsocketController.js @@ -13,344 +13,596 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let WebsocketController; -const logger = require("logger-sharelatex"); -const metrics = require("metrics-sharelatex"); -const settings = require("settings-sharelatex"); -const WebApiManager = require("./WebApiManager"); -const AuthorizationManager = require("./AuthorizationManager"); -const DocumentUpdaterManager = require("./DocumentUpdaterManager"); -const ConnectedUsersManager = require("./ConnectedUsersManager"); -const WebsocketLoadBalancer = require("./WebsocketLoadBalancer"); -const RoomManager = require("./RoomManager"); +let WebsocketController +const logger = require('logger-sharelatex') +const metrics = require('metrics-sharelatex') +const settings = require('settings-sharelatex') +const WebApiManager = require('./WebApiManager') +const AuthorizationManager = require('./AuthorizationManager') +const DocumentUpdaterManager = require('./DocumentUpdaterManager') +const ConnectedUsersManager = require('./ConnectedUsersManager') +const WebsocketLoadBalancer = require('./WebsocketLoadBalancer') +const RoomManager = require('./RoomManager') -module.exports = (WebsocketController = { - // If the protocol version changes when the client reconnects, - // it will force a full refresh of the page. Useful for non-backwards - // compatible protocol changes. Use only in extreme need. - PROTOCOL_VERSION: 2, +module.exports = WebsocketController = { + // If the protocol version changes when the client reconnects, + // it will force a full refresh of the page. Useful for non-backwards + // compatible protocol changes. Use only in extreme need. + PROTOCOL_VERSION: 2, - joinProject(client, user, project_id, callback) { - if (callback == null) { callback = function(error, project, privilegeLevel, protocolVersion) {}; } - if (client.disconnected) { - metrics.inc('editor.join-project.disconnected', 1, {status: 'immediately'}); - return callback(); - } + joinProject(client, user, project_id, callback) { + if (callback == null) { + callback = function (error, project, privilegeLevel, protocolVersion) {} + } + if (client.disconnected) { + metrics.inc('editor.join-project.disconnected', 1, { + status: 'immediately' + }) + return callback() + } - const user_id = user != null ? user._id : undefined; - logger.log({user_id, project_id, client_id: client.id}, "user joining project"); - metrics.inc("editor.join-project"); - return WebApiManager.joinProject(project_id, user, function(error, project, privilegeLevel, isRestrictedUser) { - if (error != null) { return callback(error); } - if (client.disconnected) { - metrics.inc('editor.join-project.disconnected', 1, {status: 'after-web-api-call'}); - return callback(); - } + const user_id = user != null ? user._id : undefined + logger.log( + { user_id, project_id, client_id: client.id }, + 'user joining project' + ) + metrics.inc('editor.join-project') + return WebApiManager.joinProject(project_id, user, function ( + error, + project, + privilegeLevel, + isRestrictedUser + ) { + if (error != null) { + return callback(error) + } + if (client.disconnected) { + metrics.inc('editor.join-project.disconnected', 1, { + status: 'after-web-api-call' + }) + return callback() + } - if (!privilegeLevel || (privilegeLevel === "")) { - const err = new Error("not authorized"); - logger.warn({err, project_id, user_id, client_id: client.id}, "user is not authorized to join project"); - return callback(err); - } + if (!privilegeLevel || privilegeLevel === '') { + const err = new Error('not authorized') + logger.warn( + { err, project_id, user_id, client_id: client.id }, + 'user is not authorized to join project' + ) + return callback(err) + } - client.ol_context = {}; - client.ol_context.privilege_level = privilegeLevel; - client.ol_context.user_id = user_id; - client.ol_context.project_id = project_id; - client.ol_context.owner_id = __guard__(project != null ? project.owner : undefined, x => x._id); - client.ol_context.first_name = user != null ? user.first_name : undefined; - client.ol_context.last_name = user != null ? user.last_name : undefined; - client.ol_context.email = user != null ? user.email : undefined; - client.ol_context.connected_time = new Date(); - client.ol_context.signup_date = user != null ? user.signUpDate : undefined; - client.ol_context.login_count = user != null ? user.loginCount : undefined; - client.ol_context.is_restricted_user = !!(isRestrictedUser); + client.ol_context = {} + client.ol_context.privilege_level = privilegeLevel + client.ol_context.user_id = user_id + client.ol_context.project_id = project_id + client.ol_context.owner_id = __guard__( + project != null ? project.owner : undefined, + (x) => x._id + ) + client.ol_context.first_name = user != null ? user.first_name : undefined + client.ol_context.last_name = user != null ? user.last_name : undefined + client.ol_context.email = user != null ? user.email : undefined + client.ol_context.connected_time = new Date() + client.ol_context.signup_date = user != null ? user.signUpDate : undefined + client.ol_context.login_count = user != null ? user.loginCount : undefined + client.ol_context.is_restricted_user = !!isRestrictedUser - RoomManager.joinProject(client, project_id, function(err) { - if (err) { return callback(err); } - logger.log({user_id, project_id, client_id: client.id}, "user joined project"); - return callback(null, project, privilegeLevel, WebsocketController.PROTOCOL_VERSION); - }); + RoomManager.joinProject(client, project_id, function (err) { + if (err) { + return callback(err) + } + logger.log( + { user_id, project_id, client_id: client.id }, + 'user joined project' + ) + return callback( + null, + project, + privilegeLevel, + WebsocketController.PROTOCOL_VERSION + ) + }) - // No need to block for setting the user as connected in the cursor tracking - return ConnectedUsersManager.updateUserPosition(project_id, client.publicId, user, null, function() {}); - }); - }, + // No need to block for setting the user as connected in the cursor tracking + return ConnectedUsersManager.updateUserPosition( + project_id, + client.publicId, + user, + null, + function () {} + ) + }) + }, - // We want to flush a project if there are no more (local) connected clients - // but we need to wait for the triggering client to disconnect. How long we wait - // is determined by FLUSH_IF_EMPTY_DELAY. - FLUSH_IF_EMPTY_DELAY: 500, // ms - leaveProject(io, client, callback) { - if (callback == null) { callback = function(error) {}; } - const {project_id, user_id} = client.ol_context; - if (!project_id) { return callback(); } // client did not join project + // We want to flush a project if there are no more (local) connected clients + // but we need to wait for the triggering client to disconnect. How long we wait + // is determined by FLUSH_IF_EMPTY_DELAY. + FLUSH_IF_EMPTY_DELAY: 500, // ms + leaveProject(io, client, callback) { + if (callback == null) { + callback = function (error) {} + } + const { project_id, user_id } = client.ol_context + if (!project_id) { + return callback() + } // client did not join project - metrics.inc("editor.leave-project"); - logger.log({project_id, user_id, client_id: client.id}, "client leaving project"); - WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientDisconnected", client.publicId); + metrics.inc('editor.leave-project') + logger.log( + { project_id, user_id, client_id: client.id }, + 'client leaving project' + ) + WebsocketLoadBalancer.emitToRoom( + project_id, + 'clientTracking.clientDisconnected', + client.publicId + ) - // We can do this in the background - ConnectedUsersManager.markUserAsDisconnected(project_id, client.publicId, function(err) { - if (err != null) { - return logger.error({err, project_id, user_id, client_id: client.id}, "error marking client as disconnected"); - } - }); + // We can do this in the background + ConnectedUsersManager.markUserAsDisconnected( + project_id, + client.publicId, + function (err) { + if (err != null) { + return logger.error( + { err, project_id, user_id, client_id: client.id }, + 'error marking client as disconnected' + ) + } + } + ) - RoomManager.leaveProjectAndDocs(client); - return setTimeout(function() { - const remainingClients = io.sockets.clients(project_id); - if (remainingClients.length === 0) { - // Flush project in the background - DocumentUpdaterManager.flushProjectToMongoAndDelete(project_id, function(err) { - if (err != null) { - return logger.error({err, project_id, user_id, client_id: client.id}, "error flushing to doc updater after leaving project"); - } - }); - } - return callback(); - } - , WebsocketController.FLUSH_IF_EMPTY_DELAY); - }, + RoomManager.leaveProjectAndDocs(client) + return setTimeout(function () { + const remainingClients = io.sockets.clients(project_id) + if (remainingClients.length === 0) { + // Flush project in the background + DocumentUpdaterManager.flushProjectToMongoAndDelete( + project_id, + function (err) { + if (err != null) { + return logger.error( + { err, project_id, user_id, client_id: client.id }, + 'error flushing to doc updater after leaving project' + ) + } + } + ) + } + return callback() + }, WebsocketController.FLUSH_IF_EMPTY_DELAY) + }, - joinDoc(client, doc_id, fromVersion, options, callback) { - if (fromVersion == null) { fromVersion = -1; } - if (callback == null) { callback = function(error, doclines, version, ops, ranges) {}; } - if (client.disconnected) { - metrics.inc('editor.join-doc.disconnected', 1, {status: 'immediately'}); - return callback(); - } + joinDoc(client, doc_id, fromVersion, options, callback) { + if (fromVersion == null) { + fromVersion = -1 + } + if (callback == null) { + callback = function (error, doclines, version, ops, ranges) {} + } + if (client.disconnected) { + metrics.inc('editor.join-doc.disconnected', 1, { status: 'immediately' }) + return callback() + } - metrics.inc("editor.join-doc"); - const {project_id, user_id, is_restricted_user} = client.ol_context; - if ((project_id == null)) { return callback(new Error("no project_id found on client")); } - logger.log({user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joining doc"); + metrics.inc('editor.join-doc') + const { project_id, user_id, is_restricted_user } = client.ol_context + if (project_id == null) { + return callback(new Error('no project_id found on client')) + } + logger.log( + { user_id, project_id, doc_id, fromVersion, client_id: client.id }, + 'client joining doc' + ) - return AuthorizationManager.assertClientCanViewProject(client, function(error) { - if (error != null) { return callback(error); } - // ensure the per-doc applied-ops channel is subscribed before sending the - // doc to the client, so that no events are missed. - return RoomManager.joinDoc(client, doc_id, function(error) { - if (error != null) { return callback(error); } - if (client.disconnected) { - metrics.inc('editor.join-doc.disconnected', 1, {status: 'after-joining-room'}); - // the client will not read the response anyways - return callback(); - } + return AuthorizationManager.assertClientCanViewProject(client, function ( + error + ) { + if (error != null) { + return callback(error) + } + // ensure the per-doc applied-ops channel is subscribed before sending the + // doc to the client, so that no events are missed. + return RoomManager.joinDoc(client, doc_id, function (error) { + if (error != null) { + return callback(error) + } + if (client.disconnected) { + metrics.inc('editor.join-doc.disconnected', 1, { + status: 'after-joining-room' + }) + // the client will not read the response anyways + return callback() + } - return DocumentUpdaterManager.getDocument(project_id, doc_id, fromVersion, function(error, lines, version, ranges, ops) { - let err; - if (error != null) { return callback(error); } - if (client.disconnected) { - metrics.inc('editor.join-doc.disconnected', 1, {status: 'after-doc-updater-call'}); - // the client will not read the response anyways - return callback(); - } + return DocumentUpdaterManager.getDocument( + project_id, + doc_id, + fromVersion, + function (error, lines, version, ranges, ops) { + let err + if (error != null) { + return callback(error) + } + if (client.disconnected) { + metrics.inc('editor.join-doc.disconnected', 1, { + status: 'after-doc-updater-call' + }) + // the client will not read the response anyways + return callback() + } - if (is_restricted_user && ((ranges != null ? ranges.comments : undefined) != null)) { - ranges.comments = []; - } + if ( + is_restricted_user && + (ranges != null ? ranges.comments : undefined) != null + ) { + ranges.comments = [] + } - // Encode any binary bits of data so it can go via WebSockets - // See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html - const encodeForWebsockets = text => unescape(encodeURIComponent(text)); - const escapedLines = []; - for (let line of Array.from(lines)) { - try { - line = encodeForWebsockets(line); - } catch (error1) { - err = error1; - logger.err({err, project_id, doc_id, fromVersion, line, client_id: client.id}, "error encoding line uri component"); - return callback(err); - } - escapedLines.push(line); - } - if (options.encodeRanges) { - try { - for (const comment of Array.from((ranges != null ? ranges.comments : undefined) || [])) { - if (comment.op.c != null) { comment.op.c = encodeForWebsockets(comment.op.c); } - } - for (const change of Array.from((ranges != null ? ranges.changes : undefined) || [])) { - if (change.op.i != null) { change.op.i = encodeForWebsockets(change.op.i); } - if (change.op.d != null) { change.op.d = encodeForWebsockets(change.op.d); } - } - } catch (error2) { - err = error2; - logger.err({err, project_id, doc_id, fromVersion, ranges, client_id: client.id}, "error encoding range uri component"); - return callback(err); - } - } + // Encode any binary bits of data so it can go via WebSockets + // See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html + const encodeForWebsockets = (text) => + unescape(encodeURIComponent(text)) + const escapedLines = [] + for (let line of Array.from(lines)) { + try { + line = encodeForWebsockets(line) + } catch (error1) { + err = error1 + logger.err( + { + err, + project_id, + doc_id, + fromVersion, + line, + client_id: client.id + }, + 'error encoding line uri component' + ) + return callback(err) + } + escapedLines.push(line) + } + if (options.encodeRanges) { + try { + for (const comment of Array.from( + (ranges != null ? ranges.comments : undefined) || [] + )) { + if (comment.op.c != null) { + comment.op.c = encodeForWebsockets(comment.op.c) + } + } + for (const change of Array.from( + (ranges != null ? ranges.changes : undefined) || [] + )) { + if (change.op.i != null) { + change.op.i = encodeForWebsockets(change.op.i) + } + if (change.op.d != null) { + change.op.d = encodeForWebsockets(change.op.d) + } + } + } catch (error2) { + err = error2 + logger.err( + { + err, + project_id, + doc_id, + fromVersion, + ranges, + client_id: client.id + }, + 'error encoding range uri component' + ) + return callback(err) + } + } - AuthorizationManager.addAccessToDoc(client, doc_id); - logger.log({user_id, project_id, doc_id, fromVersion, client_id: client.id}, "client joined doc"); - return callback(null, escapedLines, version, ops, ranges); - }); - }); - }); - }, + AuthorizationManager.addAccessToDoc(client, doc_id) + logger.log( + { + user_id, + project_id, + doc_id, + fromVersion, + client_id: client.id + }, + 'client joined doc' + ) + return callback(null, escapedLines, version, ops, ranges) + } + ) + }) + }) + }, - leaveDoc(client, doc_id, callback) { - // client may have disconnected, but we have to cleanup internal state. - if (callback == null) { callback = function(error) {}; } - metrics.inc("editor.leave-doc"); - const {project_id, user_id} = client.ol_context; - logger.log({user_id, project_id, doc_id, client_id: client.id}, "client leaving doc"); - RoomManager.leaveDoc(client, doc_id); - // we could remove permission when user leaves a doc, but because - // the connection is per-project, we continue to allow access - // after the initial joinDoc since we know they are already authorised. - // # AuthorizationManager.removeAccessToDoc client, doc_id - return callback(); - }, - updateClientPosition(client, cursorData, callback) { - if (callback == null) { callback = function(error) {}; } - if (client.disconnected) { - // do not create a ghost entry in redis - return callback(); - } + leaveDoc(client, doc_id, callback) { + // client may have disconnected, but we have to cleanup internal state. + if (callback == null) { + callback = function (error) {} + } + metrics.inc('editor.leave-doc') + const { project_id, user_id } = client.ol_context + logger.log( + { user_id, project_id, doc_id, client_id: client.id }, + 'client leaving doc' + ) + RoomManager.leaveDoc(client, doc_id) + // we could remove permission when user leaves a doc, but because + // the connection is per-project, we continue to allow access + // after the initial joinDoc since we know they are already authorised. + // # AuthorizationManager.removeAccessToDoc client, doc_id + return callback() + }, + updateClientPosition(client, cursorData, callback) { + if (callback == null) { + callback = function (error) {} + } + if (client.disconnected) { + // do not create a ghost entry in redis + return callback() + } - metrics.inc("editor.update-client-position", 0.1); - const {project_id, first_name, last_name, email, user_id} = client.ol_context; - logger.log({user_id, project_id, client_id: client.id, cursorData}, "updating client position"); + metrics.inc('editor.update-client-position', 0.1) + const { + project_id, + first_name, + last_name, + email, + user_id + } = client.ol_context + logger.log( + { user_id, project_id, client_id: client.id, cursorData }, + 'updating client position' + ) - return AuthorizationManager.assertClientCanViewProjectAndDoc(client, cursorData.doc_id, function(error) { - if (error != null) { - logger.warn({err: error, client_id: client.id, project_id, user_id}, "silently ignoring unauthorized updateClientPosition. Client likely hasn't called joinProject yet."); - return callback(); - } - cursorData.id = client.publicId; - if (user_id != null) { cursorData.user_id = user_id; } - if (email != null) { cursorData.email = email; } - // Don't store anonymous users in redis to avoid influx - if (!user_id || (user_id === 'anonymous-user')) { - cursorData.name = ""; - callback(); - } else { - cursorData.name = first_name && last_name ? - `${first_name} ${last_name}` - : first_name || (last_name || ""); - ConnectedUsersManager.updateUserPosition(project_id, client.publicId, { - first_name, - last_name, - email, - _id: user_id - }, { - row: cursorData.row, - column: cursorData.column, - doc_id: cursorData.doc_id - }, callback); - } - return WebsocketLoadBalancer.emitToRoom(project_id, "clientTracking.clientUpdated", cursorData); - }); - }, + return AuthorizationManager.assertClientCanViewProjectAndDoc( + client, + cursorData.doc_id, + function (error) { + if (error != null) { + logger.warn( + { err: error, client_id: client.id, project_id, user_id }, + "silently ignoring unauthorized updateClientPosition. Client likely hasn't called joinProject yet." + ) + return callback() + } + cursorData.id = client.publicId + if (user_id != null) { + cursorData.user_id = user_id + } + if (email != null) { + cursorData.email = email + } + // Don't store anonymous users in redis to avoid influx + if (!user_id || user_id === 'anonymous-user') { + cursorData.name = '' + callback() + } else { + cursorData.name = + first_name && last_name + ? `${first_name} ${last_name}` + : first_name || last_name || '' + ConnectedUsersManager.updateUserPosition( + project_id, + client.publicId, + { + first_name, + last_name, + email, + _id: user_id + }, + { + row: cursorData.row, + column: cursorData.column, + doc_id: cursorData.doc_id + }, + callback + ) + } + return WebsocketLoadBalancer.emitToRoom( + project_id, + 'clientTracking.clientUpdated', + cursorData + ) + } + ) + }, - CLIENT_REFRESH_DELAY: 1000, - getConnectedUsers(client, callback) { - if (callback == null) { callback = function(error, users) {}; } - if (client.disconnected) { - // they are not interested anymore, skip the redis lookups - return callback(); - } + CLIENT_REFRESH_DELAY: 1000, + getConnectedUsers(client, callback) { + if (callback == null) { + callback = function (error, users) {} + } + if (client.disconnected) { + // they are not interested anymore, skip the redis lookups + return callback() + } - metrics.inc("editor.get-connected-users"); - const {project_id, user_id, is_restricted_user} = client.ol_context; - if (is_restricted_user) { - return callback(null, []); - } - if ((project_id == null)) { return callback(new Error("no project_id found on client")); } - logger.log({user_id, project_id, client_id: client.id}, "getting connected users"); - return AuthorizationManager.assertClientCanViewProject(client, function(error) { - if (error != null) { return callback(error); } - WebsocketLoadBalancer.emitToRoom(project_id, 'clientTracking.refresh'); - return setTimeout(() => ConnectedUsersManager.getConnectedUsers(project_id, function(error, users) { - if (error != null) { return callback(error); } - callback(null, users); - return logger.log({user_id, project_id, client_id: client.id}, "got connected users"); - }) - , WebsocketController.CLIENT_REFRESH_DELAY); - }); - }, + metrics.inc('editor.get-connected-users') + const { project_id, user_id, is_restricted_user } = client.ol_context + if (is_restricted_user) { + return callback(null, []) + } + if (project_id == null) { + return callback(new Error('no project_id found on client')) + } + logger.log( + { user_id, project_id, client_id: client.id }, + 'getting connected users' + ) + return AuthorizationManager.assertClientCanViewProject(client, function ( + error + ) { + if (error != null) { + return callback(error) + } + WebsocketLoadBalancer.emitToRoom(project_id, 'clientTracking.refresh') + return setTimeout( + () => + ConnectedUsersManager.getConnectedUsers(project_id, function ( + error, + users + ) { + if (error != null) { + return callback(error) + } + callback(null, users) + return logger.log( + { user_id, project_id, client_id: client.id }, + 'got connected users' + ) + }), + WebsocketController.CLIENT_REFRESH_DELAY + ) + }) + }, - applyOtUpdate(client, doc_id, update, callback) { - // client may have disconnected, but we can submit their update to doc-updater anyways. - if (callback == null) { callback = function(error) {}; } - const {user_id, project_id} = client.ol_context; - if ((project_id == null)) { return callback(new Error("no project_id found on client")); } + applyOtUpdate(client, doc_id, update, callback) { + // client may have disconnected, but we can submit their update to doc-updater anyways. + if (callback == null) { + callback = function (error) {} + } + const { user_id, project_id } = client.ol_context + if (project_id == null) { + return callback(new Error('no project_id found on client')) + } - return WebsocketController._assertClientCanApplyUpdate(client, doc_id, update, function(error) { - if (error != null) { - logger.warn({err: error, doc_id, client_id: client.id, version: update.v}, "client is not authorized to make update"); - setTimeout(() => // Disconnect, but give the client the chance to receive the error - client.disconnect() - , 100); - return callback(error); - } - if (!update.meta) { update.meta = {}; } - update.meta.source = client.publicId; - update.meta.user_id = user_id; - metrics.inc("editor.doc-update", 0.3); + return WebsocketController._assertClientCanApplyUpdate( + client, + doc_id, + update, + function (error) { + if (error != null) { + logger.warn( + { err: error, doc_id, client_id: client.id, version: update.v }, + 'client is not authorized to make update' + ) + setTimeout( + () => + // Disconnect, but give the client the chance to receive the error + client.disconnect(), + 100 + ) + return callback(error) + } + if (!update.meta) { + update.meta = {} + } + update.meta.source = client.publicId + update.meta.user_id = user_id + metrics.inc('editor.doc-update', 0.3) - logger.log({user_id, doc_id, project_id, client_id: client.id, version: update.v}, "sending update to doc updater"); + logger.log( + { + user_id, + doc_id, + project_id, + client_id: client.id, + version: update.v + }, + 'sending update to doc updater' + ) - return DocumentUpdaterManager.queueChange(project_id, doc_id, update, function(error) { - if ((error != null ? error.message : undefined) === "update is too large") { - metrics.inc("update_too_large"); - const { - updateSize - } = error; - logger.warn({user_id, project_id, doc_id, updateSize}, "update is too large"); + return DocumentUpdaterManager.queueChange( + project_id, + doc_id, + update, + function (error) { + if ( + (error != null ? error.message : undefined) === + 'update is too large' + ) { + metrics.inc('update_too_large') + const { updateSize } = error + logger.warn( + { user_id, project_id, doc_id, updateSize }, + 'update is too large' + ) - // mark the update as received -- the client should not send it again! - callback(); + // mark the update as received -- the client should not send it again! + callback() - // trigger an out-of-sync error - const message = {project_id, doc_id, error: "update is too large"}; - setTimeout(function() { - if (client.disconnected) { - // skip the message broadcast, the client has moved on - return metrics.inc('editor.doc-update.disconnected', 1, {status:'at-otUpdateError'}); - } - client.emit("otUpdateError", message.error, message); - return client.disconnect(); - } - , 100); - return; - } + // trigger an out-of-sync error + const message = { + project_id, + doc_id, + error: 'update is too large' + } + setTimeout(function () { + if (client.disconnected) { + // skip the message broadcast, the client has moved on + return metrics.inc('editor.doc-update.disconnected', 1, { + status: 'at-otUpdateError' + }) + } + client.emit('otUpdateError', message.error, message) + return client.disconnect() + }, 100) + return + } - if (error != null) { - logger.error({err: error, project_id, doc_id, client_id: client.id, version: update.v}, "document was not available for update"); - client.disconnect(); - } - return callback(error); - }); - }); - }, + if (error != null) { + logger.error( + { + err: error, + project_id, + doc_id, + client_id: client.id, + version: update.v + }, + 'document was not available for update' + ) + client.disconnect() + } + return callback(error) + } + ) + } + ) + }, - _assertClientCanApplyUpdate(client, doc_id, update, callback) { - return AuthorizationManager.assertClientCanEditProjectAndDoc(client, doc_id, function(error) { - if (error != null) { - if ((error.message === "not authorized") && WebsocketController._isCommentUpdate(update)) { - // This might be a comment op, which we only need read-only priveleges for - return AuthorizationManager.assertClientCanViewProjectAndDoc(client, doc_id, callback); - } else { - return callback(error); - } - } else { - return callback(null); - } - }); - }, + _assertClientCanApplyUpdate(client, doc_id, update, callback) { + return AuthorizationManager.assertClientCanEditProjectAndDoc( + client, + doc_id, + function (error) { + if (error != null) { + if ( + error.message === 'not authorized' && + WebsocketController._isCommentUpdate(update) + ) { + // This might be a comment op, which we only need read-only priveleges for + return AuthorizationManager.assertClientCanViewProjectAndDoc( + client, + doc_id, + callback + ) + } else { + return callback(error) + } + } else { + return callback(null) + } + } + ) + }, - _isCommentUpdate(update) { - for (const op of Array.from(update.op)) { - if ((op.c == null)) { - return false; - } - } - return true; - } -}); + _isCommentUpdate(update) { + for (const op of Array.from(update.op)) { + if (op.c == null) { + return false + } + } + return true + } +} function __guard__(value, transform) { - return (typeof value !== 'undefined' && value !== null) ? transform(value) : undefined; -} \ No newline at end of file + return typeof value !== 'undefined' && value !== null + ? transform(value) + : undefined +} diff --git a/services/real-time/app/js/WebsocketLoadBalancer.js b/services/real-time/app/js/WebsocketLoadBalancer.js index dc2617742a..2719921f10 100644 --- a/services/real-time/app/js/WebsocketLoadBalancer.js +++ b/services/real-time/app/js/WebsocketLoadBalancer.js @@ -11,146 +11,207 @@ * DS207: Consider shorter variations of null checks * Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md */ -let WebsocketLoadBalancer; -const Settings = require('settings-sharelatex'); -const logger = require('logger-sharelatex'); -const RedisClientManager = require("./RedisClientManager"); -const SafeJsonParse = require("./SafeJsonParse"); -const EventLogger = require("./EventLogger"); -const HealthCheckManager = require("./HealthCheckManager"); -const RoomManager = require("./RoomManager"); -const ChannelManager = require("./ChannelManager"); -const ConnectedUsersManager = require("./ConnectedUsersManager"); +let WebsocketLoadBalancer +const Settings = require('settings-sharelatex') +const logger = require('logger-sharelatex') +const RedisClientManager = require('./RedisClientManager') +const SafeJsonParse = require('./SafeJsonParse') +const EventLogger = require('./EventLogger') +const HealthCheckManager = require('./HealthCheckManager') +const RoomManager = require('./RoomManager') +const ChannelManager = require('./ChannelManager') +const ConnectedUsersManager = require('./ConnectedUsersManager') const RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST = [ - 'connectionAccepted', - 'otUpdateApplied', - 'otUpdateError', - 'joinDoc', - 'reciveNewDoc', - 'reciveNewFile', - 'reciveNewFolder', - 'removeEntity' -]; + 'connectionAccepted', + 'otUpdateApplied', + 'otUpdateError', + 'joinDoc', + 'reciveNewDoc', + 'reciveNewFile', + 'reciveNewFolder', + 'removeEntity' +] -module.exports = (WebsocketLoadBalancer = { - rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub), - rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub), +module.exports = WebsocketLoadBalancer = { + rclientPubList: RedisClientManager.createClientList(Settings.redis.pubsub), + rclientSubList: RedisClientManager.createClientList(Settings.redis.pubsub), - emitToRoom(room_id, message, ...payload) { - if ((room_id == null)) { - logger.warn({message, payload}, "no room_id provided, ignoring emitToRoom"); - return; - } - const data = JSON.stringify({ - room_id, - message, - payload - }); - logger.log({room_id, message, payload, length: data.length}, "emitting to room"); + emitToRoom(room_id, message, ...payload) { + if (room_id == null) { + logger.warn( + { message, payload }, + 'no room_id provided, ignoring emitToRoom' + ) + return + } + const data = JSON.stringify({ + room_id, + message, + payload + }) + logger.log( + { room_id, message, payload, length: data.length }, + 'emitting to room' + ) - return Array.from(this.rclientPubList).map((rclientPub) => - ChannelManager.publish(rclientPub, "editor-events", room_id, data)); - }, + return Array.from(this.rclientPubList).map((rclientPub) => + ChannelManager.publish(rclientPub, 'editor-events', room_id, data) + ) + }, - emitToAll(message, ...payload) { - return this.emitToRoom("all", message, ...Array.from(payload)); - }, + emitToAll(message, ...payload) { + return this.emitToRoom('all', message, ...Array.from(payload)) + }, - listenForEditorEvents(io) { - logger.log({rclients: this.rclientPubList.length}, "publishing editor events"); - logger.log({rclients: this.rclientSubList.length}, "listening for editor events"); - for (const rclientSub of Array.from(this.rclientSubList)) { - rclientSub.subscribe("editor-events"); - rclientSub.on("message", function(channel, message) { - if (Settings.debugEvents > 0) { EventLogger.debugEvent(channel, message); } - return WebsocketLoadBalancer._processEditorEvent(io, channel, message); - }); - } - return this.handleRoomUpdates(this.rclientSubList); - }, + listenForEditorEvents(io) { + logger.log( + { rclients: this.rclientPubList.length }, + 'publishing editor events' + ) + logger.log( + { rclients: this.rclientSubList.length }, + 'listening for editor events' + ) + for (const rclientSub of Array.from(this.rclientSubList)) { + rclientSub.subscribe('editor-events') + rclientSub.on('message', function (channel, message) { + if (Settings.debugEvents > 0) { + EventLogger.debugEvent(channel, message) + } + return WebsocketLoadBalancer._processEditorEvent(io, channel, message) + }) + } + return this.handleRoomUpdates(this.rclientSubList) + }, - handleRoomUpdates(rclientSubList) { - const roomEvents = RoomManager.eventSource(); - roomEvents.on('project-active', function(project_id) { - const subscribePromises = Array.from(rclientSubList).map((rclient) => - ChannelManager.subscribe(rclient, "editor-events", project_id)); - return RoomManager.emitOnCompletion(subscribePromises, `project-subscribed-${project_id}`); - }); - return roomEvents.on('project-empty', project_id => Array.from(rclientSubList).map((rclient) => - ChannelManager.unsubscribe(rclient, "editor-events", project_id))); - }, + handleRoomUpdates(rclientSubList) { + const roomEvents = RoomManager.eventSource() + roomEvents.on('project-active', function (project_id) { + const subscribePromises = Array.from(rclientSubList).map((rclient) => + ChannelManager.subscribe(rclient, 'editor-events', project_id) + ) + return RoomManager.emitOnCompletion( + subscribePromises, + `project-subscribed-${project_id}` + ) + }) + return roomEvents.on('project-empty', (project_id) => + Array.from(rclientSubList).map((rclient) => + ChannelManager.unsubscribe(rclient, 'editor-events', project_id) + ) + ) + }, - _processEditorEvent(io, channel, message) { - return SafeJsonParse.parse(message, function(error, message) { - let clientList; - let client; - if (error != null) { - logger.error({err: error, channel}, "error parsing JSON"); - return; - } - if (message.room_id === "all") { - return io.sockets.emit(message.message, ...Array.from(message.payload)); - } else if ((message.message === 'clientTracking.refresh') && (message.room_id != null)) { - clientList = io.sockets.clients(message.room_id); - logger.log({channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: ((() => { - const result = []; - for (client of Array.from(clientList)) { result.push(client.id); - } - return result; - })())}, "refreshing client list"); - return (() => { - const result1 = []; - for (client of Array.from(clientList)) { - result1.push(ConnectedUsersManager.refreshClient(message.room_id, client.publicId)); - } - return result1; - })(); - } else if (message.room_id != null) { - if ((message._id != null) && Settings.checkEventOrder) { - const status = EventLogger.checkEventOrder("editor-events", message._id, message); - if (status === "duplicate") { - return; // skip duplicate events - } - } + _processEditorEvent(io, channel, message) { + return SafeJsonParse.parse(message, function (error, message) { + let clientList + let client + if (error != null) { + logger.error({ err: error, channel }, 'error parsing JSON') + return + } + if (message.room_id === 'all') { + return io.sockets.emit(message.message, ...Array.from(message.payload)) + } else if ( + message.message === 'clientTracking.refresh' && + message.room_id != null + ) { + clientList = io.sockets.clients(message.room_id) + logger.log( + { + channel, + message: message.message, + room_id: message.room_id, + message_id: message._id, + socketIoClients: (() => { + const result = [] + for (client of Array.from(clientList)) { + result.push(client.id) + } + return result + })() + }, + 'refreshing client list' + ) + return (() => { + const result1 = [] + for (client of Array.from(clientList)) { + result1.push( + ConnectedUsersManager.refreshClient( + message.room_id, + client.publicId + ) + ) + } + return result1 + })() + } else if (message.room_id != null) { + if (message._id != null && Settings.checkEventOrder) { + const status = EventLogger.checkEventOrder( + 'editor-events', + message._id, + message + ) + if (status === 'duplicate') { + return // skip duplicate events + } + } - const is_restricted_message = !Array.from(RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST).includes(message.message); + const is_restricted_message = !Array.from( + RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST + ).includes(message.message) - // send messages only to unique clients (due to duplicate entries in io.sockets.clients) - clientList = io.sockets.clients(message.room_id) - .filter(client => !(is_restricted_message && client.ol_context.is_restricted_user)); + // send messages only to unique clients (due to duplicate entries in io.sockets.clients) + clientList = io.sockets + .clients(message.room_id) + .filter( + (client) => + !(is_restricted_message && client.ol_context.is_restricted_user) + ) - // avoid unnecessary work if no clients are connected - if (clientList.length === 0) { return; } - logger.log({ - channel, - message: message.message, - room_id: message.room_id, - message_id: message._id, - socketIoClients: ((() => { - const result2 = []; - for (client of Array.from(clientList)) { result2.push(client.id); - } - return result2; - })()) - }, "distributing event to clients"); - const seen = {}; - return (() => { - const result3 = []; - for (client of Array.from(clientList)) { - if (!seen[client.id]) { - seen[client.id] = true; - result3.push(client.emit(message.message, ...Array.from(message.payload))); - } else { - result3.push(undefined); - } - } - return result3; - })(); - } else if (message.health_check != null) { - logger.debug({message}, "got health check message in editor events channel"); - return HealthCheckManager.check(channel, message.key); - } - }); - } -}); + // avoid unnecessary work if no clients are connected + if (clientList.length === 0) { + return + } + logger.log( + { + channel, + message: message.message, + room_id: message.room_id, + message_id: message._id, + socketIoClients: (() => { + const result2 = [] + for (client of Array.from(clientList)) { + result2.push(client.id) + } + return result2 + })() + }, + 'distributing event to clients' + ) + const seen = {} + return (() => { + const result3 = [] + for (client of Array.from(clientList)) { + if (!seen[client.id]) { + seen[client.id] = true + result3.push( + client.emit(message.message, ...Array.from(message.payload)) + ) + } else { + result3.push(undefined) + } + } + return result3 + })() + } else if (message.health_check != null) { + logger.debug( + { message }, + 'got health check message in editor events channel' + ) + return HealthCheckManager.check(channel, message.key) + } + }) + } +}