From e6bfa2bb0b51c5436f3e2d7c4a75ab86c0417dfa Mon Sep 17 00:00:00 2001
From: Deluan <deluan@navidrome.org>
Date: Sat, 1 Apr 2023 21:53:45 -0400
Subject: [PATCH] Convert our usage of go-diodes into a simplified, generic
 version

---
 server/events/diode.go      | 34 --------------------
 server/events/diode_test.go | 51 ------------------------------
 server/events/sse.go        | 16 +++++-----
 utils/diodes/diodes.go      | 38 +++++++++++++++++++++++
 utils/diodes/diodes_test.go | 62 +++++++++++++++++++++++++++++++++++++
 5 files changed, 108 insertions(+), 93 deletions(-)
 delete mode 100644 server/events/diode.go
 delete mode 100644 server/events/diode_test.go
 create mode 100644 utils/diodes/diodes.go
 create mode 100644 utils/diodes/diodes_test.go

diff --git a/server/events/diode.go b/server/events/diode.go
deleted file mode 100644
index 8ac5cd330..000000000
--- a/server/events/diode.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package events
-
-import (
-	"context"
-
-	"code.cloudfoundry.org/go-diodes"
-)
-
-type diode struct {
-	d *diodes.Waiter
-}
-
-func newDiode(ctx context.Context, size int, alerter diodes.Alerter) *diode {
-	return &diode{
-		d: diodes.NewWaiter(diodes.NewOneToOne(size, alerter), diodes.WithWaiterContext(ctx)),
-	}
-}
-
-func (d *diode) put(data message) {
-	d.d.Set(diodes.GenericDataType(&data))
-}
-
-func (d *diode) tryNext() (*message, bool) {
-	data, ok := d.d.TryNext()
-	if !ok {
-		return nil, ok
-	}
-	return (*message)(data), true
-}
-
-func (d *diode) next() *message {
-	data := d.d.Next()
-	return (*message)(data)
-}
diff --git a/server/events/diode_test.go b/server/events/diode_test.go
deleted file mode 100644
index 50e3dd1a2..000000000
--- a/server/events/diode_test.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package events
-
-import (
-	"context"
-
-	"code.cloudfoundry.org/go-diodes"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
-)
-
-var _ = Describe("diode", func() {
-	var diode *diode
-	var ctx context.Context
-	var ctxCancel context.CancelFunc
-	var missed int
-
-	BeforeEach(func() {
-		missed = 0
-		ctx, ctxCancel = context.WithCancel(context.Background())
-		diode = newDiode(ctx, 2, diodes.AlertFunc(func(m int) { missed = m }))
-	})
-
-	It("enqueues the data correctly", func() {
-		diode.put(message{data: "1"})
-		diode.put(message{data: "2"})
-		Expect(diode.next()).To(Equal(&message{data: "1"}))
-		Expect(diode.next()).To(Equal(&message{data: "2"}))
-		Expect(missed).To(BeZero())
-	})
-
-	It("drops messages when diode is full", func() {
-		diode.put(message{data: "1"})
-		diode.put(message{data: "2"})
-		diode.put(message{data: "3"})
-		next, ok := diode.tryNext()
-		Expect(ok).To(BeTrue())
-		Expect(next).To(Equal(&message{data: "3"}))
-
-		_, ok = diode.tryNext()
-		Expect(ok).To(BeFalse())
-
-		Expect(missed).To(Equal(2))
-	})
-
-	It("returns nil when diode is empty and the context is canceled", func() {
-		diode.put(message{data: "1"})
-		ctxCancel()
-		Expect(diode.next()).To(Equal(&message{data: "1"}))
-		Expect(diode.next()).To(BeNil())
-	})
-})
diff --git a/server/events/sse.go b/server/events/sse.go
index 9b9200336..f82ff117a 100644
--- a/server/events/sse.go
+++ b/server/events/sse.go
@@ -1,4 +1,4 @@
-// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
+// Package events based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
 package events
 
 import (
@@ -8,11 +8,11 @@ import (
 	"net/http"
 	"time"
 
-	"code.cloudfoundry.org/go-diodes"
 	"github.com/google/uuid"
 	"github.com/navidrome/navidrome/consts"
 	"github.com/navidrome/navidrome/log"
 	"github.com/navidrome/navidrome/model/request"
+	"github.com/navidrome/navidrome/utils/diodes"
 	"github.com/navidrome/navidrome/utils/singleton"
 )
 
@@ -41,7 +41,7 @@ type (
 		username       string
 		userAgent      string
 		clientUniqueId string
-		diode          *diode
+		diode          *diodes.Diode[message]
 	}
 )
 
@@ -150,7 +150,7 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	log.Debug(ctx, "New broker client", "client", c.String())
 
 	for {
-		event := c.diode.next()
+		event := c.diode.Next()
 		if event == nil {
 			log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
 			return
@@ -174,7 +174,7 @@ func (b *broker) subscribe(r *http.Request) client {
 		userAgent:      r.UserAgent(),
 		clientUniqueId: clientUniqueId,
 	}
-	c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) {
+	c.diode = diodes.New[message](ctx, 1024, diodes.AlertFunc(func(missed int) {
 		log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
 	}))
 
@@ -224,7 +224,7 @@ func (b *broker) listen() {
 			// Send a serverStart event to new client
 			msg := b.prepareMessage(context.Background(),
 				&ServerStart{StartTime: consts.ServerStart, Version: consts.Version})
-			c.diode.put(msg)
+			c.diode.Put(msg)
 
 		case c := <-b.unsubscribing:
 			// A client has detached, and we want to
@@ -240,7 +240,7 @@ func (b *broker) listen() {
 			for c := range clients {
 				if b.shouldSend(msg, c) {
 					log.Trace("Putting event on client's queue", "client", c.String(), "event", msg)
-					c.diode.put(msg)
+					c.diode.Put(msg)
 				}
 			}
 
@@ -253,7 +253,7 @@ func (b *broker) listen() {
 			msg.id = getNextEventId()
 			for c := range clients {
 				log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
-				c.diode.put(msg)
+				c.diode.Put(msg)
 			}
 		}
 	}
diff --git a/utils/diodes/diodes.go b/utils/diodes/diodes.go
new file mode 100644
index 000000000..64e2e436f
--- /dev/null
+++ b/utils/diodes/diodes.go
@@ -0,0 +1,38 @@
+package diodes
+
+import (
+	"context"
+
+	"code.cloudfoundry.org/go-diodes"
+)
+
+type Diode[T any] struct {
+	d *diodes.Waiter
+}
+
+type Alerter = diodes.Alerter
+
+type AlertFunc = diodes.AlertFunc
+
+func New[T any](ctx context.Context, size int, alerter Alerter) *Diode[T] {
+	return &Diode[T]{
+		d: diodes.NewWaiter(diodes.NewOneToOne(size, alerter), diodes.WithWaiterContext(ctx)),
+	}
+}
+
+func (d *Diode[T]) Put(data T) {
+	d.d.Set(diodes.GenericDataType(&data))
+}
+
+func (d *Diode[T]) TryNext() (*T, bool) {
+	data, ok := d.d.TryNext()
+	if !ok {
+		return nil, ok
+	}
+	return (*T)(data), true
+}
+
+func (d *Diode[T]) Next() *T {
+	data := d.d.Next()
+	return (*T)(data)
+}
diff --git a/utils/diodes/diodes_test.go b/utils/diodes/diodes_test.go
new file mode 100644
index 000000000..fda9746e2
--- /dev/null
+++ b/utils/diodes/diodes_test.go
@@ -0,0 +1,62 @@
+package diodes_test
+
+import (
+	"context"
+	"testing"
+
+	"github.com/navidrome/navidrome/tests"
+	. "github.com/navidrome/navidrome/utils/diodes"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+)
+
+func TestDiodes(t *testing.T) {
+	tests.Init(t, false)
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Diodes Suite")
+}
+
+var _ = Describe("Diode", func() {
+	type message struct {
+		data string
+	}
+	var diode *Diode[message]
+	var ctx context.Context
+	var ctxCancel context.CancelFunc
+	var missed int
+
+	BeforeEach(func() {
+		missed = 0
+		ctx, ctxCancel = context.WithCancel(context.Background())
+		diode = New[message](ctx, 2, AlertFunc(func(m int) { missed = m }))
+	})
+
+	It("enqueues the data correctly", func() {
+		diode.Put(message{data: "1"})
+		diode.Put(message{data: "2"})
+		Expect(diode.Next()).To(Equal(&message{data: "1"}))
+		Expect(diode.Next()).To(Equal(&message{data: "2"}))
+		Expect(missed).To(BeZero())
+	})
+
+	It("drops messages when Diode is full", func() {
+		diode.Put(message{data: "1"})
+		diode.Put(message{data: "2"})
+		diode.Put(message{data: "3"})
+		next, ok := diode.TryNext()
+		Expect(ok).To(BeTrue())
+		Expect(next).To(Equal(&message{data: "3"}))
+
+		_, ok = diode.TryNext()
+		Expect(ok).To(BeFalse())
+
+		Expect(missed).To(Equal(2))
+	})
+
+	It("returns nil when Diode is empty and the context is canceled", func() {
+		diode.Put(message{data: "1"})
+		ctxCancel()
+		Expect(diode.Next()).To(Equal(&message{data: "1"}))
+		Expect(diode.Next()).To(BeNil())
+	})
+})