diff --git a/server/events/sse.go b/server/events/sse.go index 0ca841b31..482a71269 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -3,7 +3,9 @@ package events import ( "encoding/json" + "errors" "fmt" + "io" "net/http" "sync/atomic" "time" @@ -18,7 +20,12 @@ type Broker interface { SendMessage(event Event) } -const keepAliveFrequency = 15 * time.Second +var errWriteTimeOut = errors.New("write timeout") + +const ( + keepAliveFrequency = 15 * time.Second + writeTimeOut = 5 * time.Second +) var eventId uint32 @@ -88,12 +95,30 @@ func (b *broker) preparePackage(event Event) message { return pkg } +// writeEvent Write to the ResponseWriter, Server Sent Events compatible +func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err error) { + flusher, _ := w.(http.Flusher) + complete := make(chan struct{}, 1) + go func() { + n, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + complete <- struct{}{} + }() + select { + case <-complete: + return + case <-time.After(timeout): + return 0, errWriteTimeOut + } +} + func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + user, _ := request.UserFrom(ctx) // Make sure that the writer supports flushing. - flusher, ok := w.(http.Flusher) - user, _ := request.UserFrom(ctx) + _, ok := w.(http.Flusher) if !ok { log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr, "userAgent", r.UserAgent(), "user", user.UserName) @@ -114,13 +139,11 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { for { select { case event := <-c.channel: - // Write to the ResponseWriter - // Server Sent Events compatible log.Trace(ctx, "Sending event to client", "event", event, "client", c.String()) - _, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) - - // Flush the data immediately instead of buffering it for later. - flusher.Flush() + _, err := writeEvent(w, event, writeTimeOut) + if err == errWriteTimeOut { + return + } case <-c.done: log.Trace(ctx, "Closing event stream connection", "client", c.String()) return @@ -186,11 +209,6 @@ func (b *broker) listen() { case c.channel <- event: default: log.Warn("Could not send event to client", "client", c.String(), "event", event) - select { - case c.done <- struct{}{}: - default: - log.Warn("Could not ask client to end", "client", c.String()) - } } }