diff --git a/server/events/events.go b/server/events/events.go index edeaf9e1a..52c1cc641 100644 --- a/server/events/events.go +++ b/server/events/events.go @@ -1,27 +1,41 @@ package events -import "time" +import ( + "encoding/json" + "reflect" + "strings" + "time" + "unicode" +) type Event interface { - EventName() string + Prepare(Event) string +} + +type baseEvent struct { + Name string `json:"name"` +} + +func (e *baseEvent) Prepare(evt Event) string { + str := strings.TrimPrefix(reflect.TypeOf(evt).String(), "*events.") + e.Name = str[:0] + string(unicode.ToLower(rune(str[0]))) + str[1:] + data, _ := json.Marshal(evt) + return string(data) } type ScanStatus struct { + baseEvent Scanning bool `json:"scanning"` Count int64 `json:"count"` FolderCount int64 `json:"folderCount"` } -func (s ScanStatus) EventName() string { return "scanStatus" } - type KeepAlive struct { + baseEvent TS int64 `json:"ts"` } -func (s KeepAlive) EventName() string { return "keepAlive" } - type ServerStart struct { + baseEvent StartTime time.Time `json:"startTime"` } - -func (s ServerStart) EventName() string { return "serverStart" } diff --git a/server/events/sse.go b/server/events/sse.go index 894ed2784..73a98544d 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -2,12 +2,10 @@ package events import ( - "encoding/json" "errors" "fmt" "io" "net/http" - "sync/atomic" "time" "code.cloudfoundry.org/go-diodes" @@ -29,14 +27,11 @@ const ( var ( errWriteTimeOut = errors.New("write timeout") - eventId uint32 ) type ( message struct { - ID uint32 - Event string - Data string + Data string } messageChan chan message clientsChan chan client @@ -84,16 +79,9 @@ func (b *broker) SendMessage(evt Event) { b.publish <- msg } -func (b *broker) nextEventID() uint32 { - return atomic.AddUint32(&eventId, 1) -} - func (b *broker) prepareMessage(event Event) message { msg := message{} - msg.ID = b.nextEventID() - msg.Event = event.EventName() - data, _ := json.Marshal(event) - msg.Data = string(data) + msg.Data = event.Prepare(event) return msg } @@ -102,7 +90,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) { flusher, _ := w.(http.Flusher) complete := make(chan struct{}, 1) go func() { - _, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) + _, err = fmt.Fprintf(w, "data: %s\n\n", event.Data) // Flush the data immediately instead of buffering it for later. flusher.Flush() complete <- struct{}{} @@ -188,7 +176,7 @@ func (b *broker) listen() { log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) // Send a serverStart event to new client - c.diode.set(b.prepareMessage(&ServerStart{consts.ServerStart})) + c.diode.set(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart})) case c := <-b.unsubscribing: // A client has detached and we want to diff --git a/ui/src/actions/serverEvents.js b/ui/src/actions/serverEvents.js index 78f6af9a2..a94c2478a 100644 --- a/ui/src/actions/serverEvents.js +++ b/ui/src/actions/serverEvents.js @@ -1,8 +1,7 @@ export const EVENT_SCAN_STATUS = 'scanStatus' export const EVENT_SERVER_START = 'serverStart' -export const processEvent = (type, event) => { - const data = JSON.parse(event) +export const processEvent = (type, data) => { return { type, data: data, diff --git a/ui/src/eventStream.js b/ui/src/eventStream.js index 197b5a438..6faed3573 100644 --- a/ui/src/eventStream.js +++ b/ui/src/eventStream.js @@ -54,8 +54,9 @@ const setDispatch = (dispatchFunc) => { const eventHandler = throttle( (event) => { - if (event.type !== 'keepAlive') { - dispatch(processEvent(event.type, event.data)) + const data = JSON.parse(event.data) + if (data.name !== 'keepAlive') { + dispatch(processEvent(data.name, data)) } setTimeout(defaultIntervalCheck) // Reset timeout on every received message }, @@ -71,9 +72,7 @@ const startEventStream = async () => { } return getEventStream() .then((newStream) => { - newStream.addEventListener('serverStart', eventHandler) - newStream.addEventListener('scanStatus', eventHandler) - newStream.addEventListener('keepAlive', eventHandler) + newStream.onmessage = eventHandler newStream.onerror = (e) => { console.log('EventStream error', e) setTimeout(reconnectIntervalCheck)