From d685aefab3d753489f0477df5cefd70d8f6906bc Mon Sep 17 00:00:00 2001 From: Deluan Date: Sat, 12 Dec 2020 23:04:50 -0500 Subject: [PATCH] Don't ever stop the `listen` go routine --- server/events/sse.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/server/events/sse.go b/server/events/sse.go index c6371ad5b..e18cb0379 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -53,9 +53,6 @@ type broker struct { // Closed client connections unsubscribing clientsChan - - // Client connections registry - clients map[client]struct{} } func NewBroker() Broker { @@ -64,7 +61,6 @@ func NewBroker() Broker { publish: make(messageChan, 100), subscribing: make(clientsChan, 1), unsubscribing: make(clientsChan, 1), - clients: make(map[client]struct{}), } // Set it running - listening and broadcasting events @@ -160,13 +156,15 @@ func (b *broker) listen() { keepAlive := time.NewTicker(keepAliveFrequency) defer keepAlive.Stop() + clients := map[client]struct{}{} + for { select { case c := <-b.subscribing: // A new client has connected. // Register their message channel - b.clients[c] = struct{}{} - log.Debug("Client added to event broker", "numClients", len(b.clients), "newClient", c.String()) + clients[c] = struct{}{} + log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) // Send a serverStart event to new client c.channel <- b.preparePackage(&ServerStart{serverStart}) @@ -175,20 +173,24 @@ func (b *broker) listen() { // A client has detached and we want to // stop sending them messages. close(c.channel) - delete(b.clients, c) - log.Debug("Removed client from event broker", "numClients", len(b.clients), "client", c.String()) + delete(clients, c) + log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String()) case event := <-b.publish: // We got a new event from the outside! // Send event to all connected clients - for c := range b.clients { + for c := range clients { log.Trace("Putting event on client's queue", "client", c.String(), "event", event) // Use non-blocking send. If cannot send, terminate the client's connection select { case c.channel <- event: default: - log.Warn("Could not send message to client", "client", c.String(), "event", event) - c.done <- struct{}{} + log.Warn("Could not send event to client", "client", c.String(), "event", event) + select { + case c.done <- struct{}{}: + default: + log.Warn("Could not ask client to end", "client", c.String()) + } } }