From a8c5fa6d49adb8f352841d3f93a4abc5e59a3e79 Mon Sep 17 00:00:00 2001
From: Deluan <deluan@navidrome.org>
Date: Tue, 1 Dec 2020 09:24:44 -0500
Subject: [PATCH] Fix file descriptor leak in SSE implementation.master

See https://github.com/deluan/navidrome/issues/446#issuecomment-736296465
---
 server/events/sse.go | 50 +++++++++++++++++++++-----------------------
 1 file changed, 24 insertions(+), 26 deletions(-)

diff --git a/server/events/sse.go b/server/events/sse.go
index 45c4b0120..7e7aeefe3 100644
--- a/server/events/sse.go
+++ b/server/events/sse.go
@@ -85,29 +85,29 @@ func (broker *broker) preparePackage(event Event) message {
 	return pkg
 }
 
-func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
-	ctx := req.Context()
+func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx := r.Context()
 
 	// Make sure that the writer supports flushing.
-	flusher, ok := rw.(http.Flusher)
+	flusher, ok := w.(http.Flusher)
 	user, _ := request.UserFrom(ctx)
 	if !ok {
-		log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr,
-			"userAgent", req.UserAgent(), "user", user.UserName)
-		http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
+		log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr,
+			"userAgent", r.UserAgent(), "user", user.UserName)
+		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
 		return
 	}
 
-	rw.Header().Set("Content-Type", "text/event-stream")
-	rw.Header().Set("Cache-Control", "no-cache, no-transform")
-	rw.Header().Set("Connection", "keep-alive")
-	rw.Header().Set("Access-Control-Allow-Origin", "*")
+	w.Header().Set("Content-Type", "text/event-stream")
+	w.Header().Set("Cache-Control", "no-cache, no-transform")
+	w.Header().Set("Connection", "keep-alive")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	// Each connection registers its own message channel with the Broker's connections registry
 	client := client{
 		username:  user.UserName,
-		address:   req.RemoteAddr,
-		userAgent: req.UserAgent(),
+		address:   r.RemoteAddr,
+		userAgent: r.UserAgent(),
 		channel:   make(messageChan),
 	}
 
@@ -122,22 +122,20 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
 		broker.closingClients <- client
 	}()
 
-	// Listen to client close and un-register messageChan
-	notify := ctx.Done()
-	go func() {
-		<-notify
-		broker.closingClients <- client
-	}()
-
 	for {
-		// Write to the ResponseWriter
-		// Server Sent Events compatible
-		event := <-client.channel
-		log.Trace(ctx, "Sending event to client", "event", event, "client", client.String())
-		_, _ = fmt.Fprintf(rw, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
+		select {
+		case event := <-client.channel:
+			// Write to the ResponseWriter
+			// Server Sent Events compatible
+			log.Trace(ctx, "Sending event to client", "event", event, "client", client.String())
+			_, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
 
-		// Flush the data immediately instead of buffering it for later.
-		flusher.Flush()
+			// Flush the data immediately instead of buffering it for later.
+			flusher.Flush()
+		case <-ctx.Done():
+			log.Trace(ctx, "Closing event stream connection", "client", client.String())
+			return
+		}
 	}
 }