From 41debfae0fee378fae7ecbcb53d22f157ad8d241 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Fri, 15 May 2020 11:34:07 +0200 Subject: [PATCH] [ChannelManager] rework (un)subscribing to redis - send a subscribe request on every request - wait for a pending unsubscribe request before subscribing - wait for a pending subscribe request before unsubscribing Co-Authored-By: Brian Gough --- .../app/coffee/ChannelManager.coffee | 63 +++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/services/real-time/app/coffee/ChannelManager.coffee b/services/real-time/app/coffee/ChannelManager.coffee index 367d2059a2..e60a145bd5 100644 --- a/services/real-time/app/coffee/ChannelManager.coffee +++ b/services/real-time/app/coffee/ChannelManager.coffee @@ -17,35 +17,48 @@ module.exports = ChannelManager = subscribe: (rclient, baseChannel, id) -> clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" - # we track pending subscribes because we want to be sure that the - # channel is active before letting the client join the doc or project, - # so that events are not lost. - if clientChannelMap.has(channel) - logger.warn {channel}, "subscribe already actioned" - # return the existing subscribe promise, so we can wait for it to resolve - return clientChannelMap.get(channel) - else - # get the subscribe promise and return it, the actual subscribe - # completes in the background - subscribePromise = rclient.subscribe channel - clientChannelMap.set(channel, subscribePromise) - logger.log {channel}, "subscribed to new channel" - metrics.inc "subscribe.#{baseChannel}" - return subscribePromise + actualSubscribe = () -> + # subscribe is happening in the foreground and it should reject + p = rclient.subscribe(channel) + p.finally () -> + if clientChannelMap.get(channel) is subscribePromise + clientChannelMap.delete(channel) + .then () -> + logger.log {channel}, "subscribed to channel" + metrics.inc "subscribe.#{baseChannel}" + .catch (err) -> + logger.error {channel, err}, "failed to subscribe to channel" + metrics.inc "subscribe.failed.#{baseChannel}" + return p + + pendingActions = clientChannelMap.get(channel) || Promise.resolve() + subscribePromise = pendingActions.then(actualSubscribe, actualSubscribe) + clientChannelMap.set(channel, subscribePromise) + logger.log {channel}, "planned to subscribe to channel" + return subscribePromise unsubscribe: (rclient, baseChannel, id) -> clientChannelMap = @getClientMapEntry(rclient) channel = "#{baseChannel}:#{id}" - # we don't need to track pending unsubscribes, because we there is no - # harm if events continue to arrive on the channel while the unsubscribe - # command in pending. - if !clientChannelMap.has(channel) - logger.error {channel}, "not subscribed - shouldn't happen" - else - rclient.unsubscribe channel # completes in the background - clientChannelMap.delete(channel) - logger.log {channel}, "unsubscribed from channel" - metrics.inc "unsubscribe.#{baseChannel}" + actualUnsubscribe = () -> + # unsubscribe is happening in the background, it should not reject + p = rclient.unsubscribe(channel) + .finally () -> + if clientChannelMap.get(channel) is unsubscribePromise + clientChannelMap.delete(channel) + .then () -> + logger.log {channel}, "unsubscribed from channel" + metrics.inc "unsubscribe.#{baseChannel}" + .catch (err) -> + logger.error {channel, err}, "unsubscribed from channel" + metrics.inc "unsubscribe.failed.#{baseChannel}" + return p + + pendingActions = clientChannelMap.get(channel) || Promise.resolve() + unsubscribePromise = pendingActions.then(actualUnsubscribe, actualUnsubscribe) + clientChannelMap.set(channel, unsubscribePromise) + logger.log {channel}, "planned to unsubscribe from channel" + return unsubscribePromise publish: (rclient, baseChannel, id, data) -> metrics.summary "redis.publish.#{baseChannel}", data.length