From 8b92796a5ca8890aed029ccac4742a36744558b8 Mon Sep 17 00:00:00 2001 From: Deluan Date: Sat, 12 Dec 2020 18:26:30 -0500 Subject: [PATCH] Disconnect the client if the output buffer fills up --- scanner/scanner.go | 2 +- server/events/sse.go | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/scanner/scanner.go b/scanner/scanner.go index ae1dede3b..db6f7616d 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -161,7 +161,7 @@ func (s *scanner) RescanAll(ctx context.Context, fullRescan bool) error { return ErrAlreadyScanning } isScanning.Set(true) - defer func() { isScanning.Set(false) }() + defer isScanning.Set(false) defer s.cacheWarmer.Flush(context.Background()) var hasError bool diff --git a/server/events/sse.go b/server/events/sse.go index c9ff875dc..24c32a121 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -36,6 +36,7 @@ type ( username string userAgent string channel messageChan + done chan struct{} } ) @@ -124,9 +125,12 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Flush the data immediately instead of buffering it for later. flusher.Flush() - case <-ctx.Done(): + case <-c.done: log.Trace(ctx, "Closing event stream connection", "client", c.String()) return + case <-ctx.Done(): + log.Trace(ctx, "Client closed the connection", "client", c.String()) + return } } } @@ -176,13 +180,14 @@ func (b *broker) listen() { case event := <-b.publish: // We got a new event from the outside! // Send event to all connected clients - for client := range b.clients { - log.Trace("Putting event on client's queue", "client", client.String(), "event", event) - // Use non-blocking send + for c := range b.clients { + log.Trace("Putting event on client's queue", "client", c.String(), "event", event) + // Use non-blocking send. If cannot send, terminate the client's connection select { - case client.channel <- event: + case c.channel <- event: default: - log.Warn("Could not send message to client", "client", client.String(), "event", event) + log.Warn("Could not send message to client", "client", c.String(), "event", event) + c.done <- struct{}{} } }