From b22d0366d5e3989bc327bad0ad29a0f0e9f177ad Mon Sep 17 00:00:00 2001 From: Deluan Date: Mon, 3 Apr 2023 10:51:24 -0400 Subject: [PATCH] Use channels for EventStream instead of diodes --- server/events/sse.go | 49 +++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/server/events/sse.go b/server/events/sse.go index 76a66db2c..b06f6f896 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -12,7 +12,7 @@ import ( "github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model/request" - "github.com/navidrome/navidrome/utils/diodes" + "github.com/navidrome/navidrome/utils/pl" "github.com/navidrome/navidrome/utils/singleton" ) @@ -24,6 +24,7 @@ type Broker interface { const ( keepAliveFrequency = 15 * time.Second writeTimeOut = 5 * time.Second + bufferSize = 1 ) type ( @@ -41,7 +42,7 @@ type ( username string userAgent string clientUniqueId string - diode *diodes.Diode[message] + msgC chan message } ) @@ -80,7 +81,7 @@ func GetBroker() Broker { func (b *broker) SendMessage(ctx context.Context, evt Event) { msg := b.prepareMessage(ctx, evt) - log.Trace("Broker received new event", "event", msg) + log.Trace("Broker received new event", "type", msg.event, "data", msg.data) b.publish <- msg } @@ -147,21 +148,18 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Each connection registers its own message channel with the Broker's connections registry c := b.subscribe(r) defer b.unsubscribe(c) - log.Debug(ctx, "New broker client", "client", c.String()) + log.Debug(ctx, "Started new EventStream connection", "client", c.String()) - for { - event := c.diode.Next() - if event == nil { - log.Trace(ctx, "Client closed the EventStream connection", "client", c.String()) - return - } - log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String()) - err := writeEvent(ctx, w, *event, writeTimeOut) + for event := range pl.ReadOrDone(ctx, c.msgC) { + log.Trace(ctx, "Sending event to client", "event", event, "client", c.String()) + err := writeEvent(ctx, w, event, writeTimeOut) if err != nil { - log.Debug(ctx, "Error sending event to client. Closing connection", "event", *event, "client", c.String(), err) + log.Debug(ctx, "Error sending event to client. Closing connection", "event", event, "client", c.String(), err) return } } + log.Trace(ctx, "Client EventStream connection closed", "client", c.String()) + return } func (b *broker) subscribe(r *http.Request) client { @@ -175,9 +173,7 @@ func (b *broker) subscribe(r *http.Request) client { userAgent: r.UserAgent(), clientUniqueId: clientUniqueId, } - c.diode = diodes.New[message](ctx, 1024, diodes.AlertFunc(func(missed int) { - log.Debug("Dropped SSE events", "client", c.String(), "missed", missed) - })) + c.msgC = make(chan message, bufferSize) // Signal the broker that we have a new client b.subscribing <- c @@ -220,18 +216,19 @@ func (b *broker) listen() { // A new client has connected. // Register their message channel clients[c] = struct{}{} - log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) + log.Debug("Client added to EventStream broker", "numActiveClients", len(clients), "newClient", c.String()) // Send a serverStart event to new client msg := b.prepareMessage(context.Background(), &ServerStart{StartTime: consts.ServerStart, Version: consts.Version}) - c.diode.Put(msg) + sendOrDrop(c, msg) case c := <-b.unsubscribing: // A client has detached, and we want to // stop sending them messages. + close(c.msgC) delete(clients, c) - log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String()) + log.Debug("Removed client from EventStream broker", "numActiveClients", len(clients), "client", c.String()) case msg := <-b.publish: msg.id = getNextEventId() @@ -241,7 +238,7 @@ func (b *broker) listen() { for c := range clients { if b.shouldSend(msg, c) { log.Trace("Putting event on client's queue", "client", c.String(), "event", msg) - c.diode.Put(msg) + sendOrDrop(c, msg) } } @@ -254,8 +251,18 @@ func (b *broker) listen() { msg.id = getNextEventId() for c := range clients { log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg) - c.diode.Put(msg) + sendOrDrop(c, msg) } } } } + +func sendOrDrop(client client, msg message) { + select { + case client.msgC <- msg: + default: + if log.CurrentLevel() >= log.LevelTrace { + log.Trace("Event dropped because client's channel is full", "event", msg, "client", client.String()) + } + } +}