From e2969aa34c06ff4b8ba453c8967154a0476caa44 Mon Sep 17 00:00:00 2001 From: Deluan <deluan@navidrome.org> Date: Sat, 12 Dec 2020 13:35:49 -0500 Subject: [PATCH] Use non-blocking event sending --- server/events/sse.go | 60 +++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/server/events/sse.go b/server/events/sse.go index 7e7aeefe3..19e688ee2 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -10,6 +10,7 @@ import ( "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model/request" + "github.com/google/uuid" ) type Broker interface { @@ -30,6 +31,7 @@ type ( messageChan chan message clientsChan chan client client struct { + id string address string username string userAgent string @@ -38,7 +40,7 @@ type ( ) func (c client) String() string { - return fmt.Sprintf("%s (%s - %s)", c.username, c.address, c.userAgent) + return fmt.Sprintf("%s (%s - %s - %s)", c.id, c.username, c.address, c.userAgent) } type broker struct { @@ -52,16 +54,16 @@ type broker struct { closingClients clientsChan // Client connections registry - clients map[client]bool + clients map[client]struct{} } func NewBroker() Broker { // Instantiate a broker broker := &broker{ notifier: make(messageChan, 100), - newClients: make(clientsChan), - closingClients: make(clientsChan), - clients: make(map[client]bool), + newClients: make(clientsChan, 1), + closingClients: make(clientsChan, 1), + clients: make(map[client]struct{}), } // Set it running - listening and broadcasting events @@ -104,24 +106,10 @@ func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") // Each connection registers its own message channel with the Broker's connections registry - client := client{ - username: user.UserName, - address: r.RemoteAddr, - userAgent: r.UserAgent(), - channel: make(messageChan), - } - - // Signal the broker that we have a new client - broker.newClients <- client - + client := broker.subscribe(r) + defer broker.unsubscribe(client) log.Debug(ctx, "New broker client", "client", client.String()) - // Remove this client from the map of connected clients - // when this handler exits. - defer func() { - broker.closingClients <- client - }() - for { select { case event := <-client.channel: @@ -139,6 +127,26 @@ func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +func (broker *broker) subscribe(r *http.Request) client { + user, _ := request.UserFrom(r.Context()) + id, _ := uuid.NewRandom() + client := client{ + id: id.String(), + username: user.UserName, + address: r.RemoteAddr, + userAgent: r.UserAgent(), + channel: make(messageChan, 5), + } + + // Signal the broker that we have a new client + broker.newClients <- client + return client +} + +func (broker *broker) unsubscribe(c client) { + broker.closingClients <- c +} + func (broker *broker) listen() { keepAlive := time.NewTicker(keepAliveFrequency) defer keepAlive.Stop() @@ -148,7 +156,7 @@ func (broker *broker) listen() { case s := <-broker.newClients: // A new client has connected. // Register their message channel - broker.clients[s] = true + broker.clients[s] = struct{}{} log.Debug("Client added to event broker", "numClients", len(broker.clients), "newClient", s.String()) // Send a serverStart event to new client @@ -157,6 +165,7 @@ func (broker *broker) listen() { case s := <-broker.closingClients: // A client has detached and we want to // stop sending them messages. + close(s.channel) delete(broker.clients, s) log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String()) @@ -165,7 +174,12 @@ func (broker *broker) listen() { // Send event to all connected clients for client := range broker.clients { log.Trace("Putting event on client's queue", "client", client.String(), "event", event) - client.channel <- event + // Use non-blocking send + select { + case client.channel <- event: + default: + log.Warn("Could not send message to client", "client", client.String(), "event", event) + } } case ts := <-keepAlive.C: