Only close connection if the write times out

This commit is contained in:
Deluan 2020-12-20 15:21:46 -05:00
parent 1804fb3e50
commit 14b060a42a

View File

@ -3,7 +3,9 @@ package events
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"sync/atomic" "sync/atomic"
"time" "time"
@ -18,7 +20,12 @@ type Broker interface {
SendMessage(event Event) SendMessage(event Event)
} }
const keepAliveFrequency = 15 * time.Second var errWriteTimeOut = errors.New("write timeout")
const (
keepAliveFrequency = 15 * time.Second
writeTimeOut = 5 * time.Second
)
var eventId uint32 var eventId uint32
@ -88,12 +95,30 @@ func (b *broker) preparePackage(event Event) message {
return pkg return pkg
} }
// writeEvent Write to the ResponseWriter, Server Sent Events compatible
func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err error) {
flusher, _ := w.(http.Flusher)
complete := make(chan struct{}, 1)
go func() {
n, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
complete <- struct{}{}
}()
select {
case <-complete:
return
case <-time.After(timeout):
return 0, errWriteTimeOut
}
}
func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
user, _ := request.UserFrom(ctx)
// Make sure that the writer supports flushing. // Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher) _, ok := w.(http.Flusher)
user, _ := request.UserFrom(ctx)
if !ok { if !ok {
log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr, log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr,
"userAgent", r.UserAgent(), "user", user.UserName) "userAgent", r.UserAgent(), "user", user.UserName)
@ -114,13 +139,11 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for { for {
select { select {
case event := <-c.channel: case event := <-c.channel:
// Write to the ResponseWriter
// Server Sent Events compatible
log.Trace(ctx, "Sending event to client", "event", event, "client", c.String()) log.Trace(ctx, "Sending event to client", "event", event, "client", c.String())
_, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) _, err := writeEvent(w, event, writeTimeOut)
if err == errWriteTimeOut {
// Flush the data immediately instead of buffering it for later. return
flusher.Flush() }
case <-c.done: case <-c.done:
log.Trace(ctx, "Closing event stream connection", "client", c.String()) log.Trace(ctx, "Closing event stream connection", "client", c.String())
return return
@ -186,11 +209,6 @@ func (b *broker) listen() {
case c.channel <- event: case c.channel <- event:
default: default:
log.Warn("Could not send event to client", "client", c.String(), "event", event) log.Warn("Could not send event to client", "client", c.String(), "event", event)
select {
case c.done <- struct{}{}:
default:
log.Warn("Could not ask client to end", "client", c.String())
}
} }
} }