From 905c685696089d5fb47e7757430d856f3f02d788 Mon Sep 17 00:00:00 2001
From: Deluan <deluan@navidrome.org>
Date: Mon, 1 Feb 2021 17:41:17 -0500
Subject: [PATCH] Use diodes instead of channels in SSE broker

---
 go.mod                 |  1 +
 go.sum                 |  2 ++
 server/events/diode.go | 26 ++++++++++++++++++++
 server/events/sse.go   | 56 ++++++++++++++++++++----------------------
 4 files changed, 55 insertions(+), 30 deletions(-)
 create mode 100644 server/events/diode.go

diff --git a/go.mod b/go.mod
index 7332ad26e..c2af561c1 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/deluan/navidrome
 go 1.15
 
 require (
+	code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee
 	github.com/ClickHouse/clickhouse-go v1.4.3 // indirect
 	github.com/Masterminds/squirrel v1.5.0
 	github.com/astaxie/beego v1.12.3
diff --git a/go.sum b/go.sum
index b93f8fd21..1966840fd 100644
--- a/go.sum
+++ b/go.sum
@@ -12,6 +12,8 @@ cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7
 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
+code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee h1:iAAPf9s7/+BIiGf+RjgcXLm3NoZaLIJsBXJuUa63Lx8=
+code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee/go.mod h1:Jzi+ccHgo/V/PLQUaQ6hnZcC1c4BS790gx21LRRui4g=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
diff --git a/server/events/diode.go b/server/events/diode.go
new file mode 100644
index 000000000..7c687f52c
--- /dev/null
+++ b/server/events/diode.go
@@ -0,0 +1,26 @@
+package events
+
+import (
+	"context"
+
+	"code.cloudfoundry.org/go-diodes"
+)
+
+type diode struct {
+	d *diodes.Poller
+}
+
+func newDiode(ctx context.Context, size int, alerter diodes.Alerter) *diode {
+	return &diode{
+		d: diodes.NewPoller(diodes.NewOneToOne(size, alerter), diodes.WithPollingContext(ctx)),
+	}
+}
+
+func (d *diode) set(data message) {
+	d.d.Set(diodes.GenericDataType(&data))
+}
+
+func (d *diode) next() *message {
+	data := d.d.Next()
+	return (*message)(data)
+}
diff --git a/server/events/sse.go b/server/events/sse.go
index 0cd8f9419..894ed2784 100644
--- a/server/events/sse.go
+++ b/server/events/sse.go
@@ -10,6 +10,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"code.cloudfoundry.org/go-diodes"
 	"github.com/deluan/navidrome/consts"
 	"github.com/deluan/navidrome/log"
 	"github.com/deluan/navidrome/model/request"
@@ -44,7 +45,7 @@ type (
 		address   string
 		username  string
 		userAgent string
-		channel   messageChan
+		diode     *diode
 	}
 )
 
@@ -78,30 +79,30 @@ func NewBroker() Broker {
 }
 
 func (b *broker) SendMessage(evt Event) {
-	msg := b.preparePackage(evt)
+	msg := b.prepareMessage(evt)
 	log.Trace("Broker received new event", "event", msg)
 	b.publish <- msg
 }
 
-func (b *broker) newEventID() uint32 {
+func (b *broker) nextEventID() uint32 {
 	return atomic.AddUint32(&eventId, 1)
 }
 
-func (b *broker) preparePackage(event Event) message {
-	pkg := message{}
-	pkg.ID = b.newEventID()
-	pkg.Event = event.EventName()
+func (b *broker) prepareMessage(event Event) message {
+	msg := message{}
+	msg.ID = b.nextEventID()
+	msg.Event = event.EventName()
 	data, _ := json.Marshal(event)
-	pkg.Data = string(data)
-	return pkg
+	msg.Data = string(data)
+	return msg
 }
 
 // writeEvent Write to the ResponseWriter, Server Sent Events compatible
-func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err error) {
+func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
 	flusher, _ := w.(http.Flusher)
 	complete := make(chan struct{}, 1)
 	go func() {
-		n, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
+		_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
 		// Flush the data immediately instead of buffering it for later.
 		flusher.Flush()
 		complete <- struct{}{}
@@ -110,7 +111,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err e
 	case <-complete:
 		return
 	case <-time.After(timeout):
-		return 0, errWriteTimeOut
+		return errWriteTimeOut
 	}
 }
 
@@ -138,15 +139,14 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	log.Debug(ctx, "New broker client", "client", c.String())
 
 	for {
-		select {
-		case event := <-c.channel:
-			log.Trace(ctx, "Sending event to client", "event", event, "client", c.String())
-			_, err := writeEvent(w, event, writeTimeOut)
-			if err == errWriteTimeOut {
-				return
-			}
-		case <-ctx.Done():
-			log.Trace(ctx, "Client closed the connection", "client", c.String())
+		event := c.diode.next()
+		if event == nil {
+			log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
+			return
+		}
+		log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
+		if err := writeEvent(w, *event, writeTimeOut); err == errWriteTimeOut {
+			log.Debug(ctx, "Timeout sending event to client", "event", *event, "client", c.String())
 			return
 		}
 	}
@@ -159,8 +159,10 @@ func (b *broker) subscribe(r *http.Request) client {
 		username:  user.UserName,
 		address:   r.RemoteAddr,
 		userAgent: r.UserAgent(),
-		channel:   make(messageChan, 5),
 	}
+	c.diode = newDiode(r.Context(), 1000, diodes.AlertFunc(func(missed int) {
+		log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
+	}))
 
 	// Signal the broker that we have a new client
 	b.subscribing <- c
@@ -186,12 +188,11 @@ func (b *broker) listen() {
 			log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
 
 			// Send a serverStart event to new client
-			c.channel <- b.preparePackage(&ServerStart{consts.ServerStart})
+			c.diode.set(b.prepareMessage(&ServerStart{consts.ServerStart}))
 
 		case c := <-b.unsubscribing:
 			// A client has detached and we want to
 			// stop sending them messages.
-			close(c.channel)
 			delete(clients, c)
 			log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
 
@@ -200,12 +201,7 @@ func (b *broker) listen() {
 			// Send event to all connected clients
 			for c := range clients {
 				log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
-				// Use non-blocking send. If cannot send, ignore the message
-				select {
-				case c.channel <- event:
-				default:
-					log.Warn("Could not send event to client", "client", c.String(), "event", event)
-				}
+				c.diode.set(event)
 			}
 
 		case ts := <-keepAlive.C: