diff --git a/server/events/sse.go b/server/events/sse.go index f111ee487..45c4b0120 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -19,8 +19,15 @@ type Broker interface { const keepAliveFrequency = 15 * time.Second +var eventId uint32 + type ( - messageChan chan []byte + message struct { + ID uint32 + Event string + Data string + } + messageChan chan message clientsChan chan client client struct { address string @@ -63,26 +70,19 @@ func NewBroker() Broker { return broker } -func (broker *broker) SendMessage(event Event) { - pkg := broker.preparePackage(event) - - log.Trace("Broker received new event", "name", event.EventName(), "event", string(pkg)) - broker.notifier <- pkg +func (broker *broker) SendMessage(evt Event) { + msg := broker.preparePackage(evt) + log.Trace("Broker received new event", "event", msg) + broker.notifier <- msg } -var eventId uint32 - -func (broker *broker) preparePackage(event Event) []byte { - pkg := struct { - Event `json:"data"` - Id uint32 `json:"id"` - Name string `json:"name"` - }{} - pkg.Id = atomic.AddUint32(&eventId, 1) - pkg.Name = event.EventName() - pkg.Event = event - data, _ := json.Marshal(pkg) - return data +func (broker *broker) preparePackage(event Event) message { + pkg := message{} + pkg.ID = atomic.AddUint32(&eventId, 1) + pkg.Event = event.EventName() + data, _ := json.Marshal(event) + pkg.Data = string(data) + return pkg } func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { @@ -133,8 +133,8 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // Write to the ResponseWriter // Server Sent Events compatible event := <-client.channel - log.Trace(ctx, "Sending event to client", "event", string(event), "client", client.String()) - _, _ = fmt.Fprintf(rw, "data: %s\n\n", event) + log.Trace(ctx, "Sending event to client", "event", event, "client", client.String()) + _, _ = fmt.Fprintf(rw, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) // Flush the data immediately instead of buffering it for later. flusher.Flush() @@ -157,7 +157,7 @@ func (broker *broker) listen() { s.channel <- broker.preparePackage(&ServerStart{serverStart}) case s := <-broker.closingClients: - // A client has dettached and we want to + // A client has detached and we want to // stop sending them messages. delete(broker.clients, s) log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String()) @@ -166,7 +166,7 @@ func (broker *broker) listen() { // We got a new event from the outside! // Send event to all connected clients for client := range broker.clients { - log.Trace("Putting event on client's queue", "client", client.String(), "event", string(event)) + log.Trace("Putting event on client's queue", "client", client.String(), "event", event) client.channel <- event } diff --git a/ui/src/actions/serverEvents.js b/ui/src/actions/serverEvents.js index 0ce45f59d..78f6af9a2 100644 --- a/ui/src/actions/serverEvents.js +++ b/ui/src/actions/serverEvents.js @@ -1,17 +1,11 @@ -export const EVENT_SCAN_STATUS = 'EVENT_SCAN_STATUS' -export const EVENT_SERVER_START = 'EVENT_SERVER_START' +export const EVENT_SCAN_STATUS = 'scanStatus' +export const EVENT_SERVER_START = 'serverStart' -const actionsMap = { - scanStatus: EVENT_SCAN_STATUS, - serverStart: EVENT_SERVER_START, -} - -export const processEvent = (data) => { - let type = actionsMap[data.name] - if (!type) type = data.name +export const processEvent = (type, event) => { + const data = JSON.parse(event) return { type, - data: data.data, + data: data, } } diff --git a/ui/src/eventStream.js b/ui/src/eventStream.js index 1a5835302..f0b5b52fa 100644 --- a/ui/src/eventStream.js +++ b/ui/src/eventStream.js @@ -13,7 +13,7 @@ let timeout = null const getEventStream = async () => { if (es === null) { - return httpClient(`${REST_URL}/keepalive/`).then(() => { + return httpClient(`${REST_URL}/keepalive/eventSource`).then(() => { es = new EventSource( baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`) ) @@ -53,29 +53,34 @@ const setDispatch = (dispatchFunc) => { dispatch = dispatchFunc } +const eventHandler = throttle( + (event) => { + if (event.type !== 'keepAlive') { + dispatch(processEvent(event.type, event.data)) + } + setTimeout(defaultIntervalCheck) // Reset timeout on every received message + }, + 100, + { trailing: true } +) + const startEventStream = async () => { setTimeout(currentIntervalCheck) if (!localStorage.getItem('token')) { console.log('Cannot create a unauthenticated EventSource connection') - return + return Promise.reject() } getEventStream().then((newStream) => { - newStream.onmessage = throttle( - (msg) => { - const data = JSON.parse(msg.data) - if (data.name !== 'keepAlive') { - dispatch(processEvent(data)) - } - setTimeout(defaultIntervalCheck) // Reset timeout on every received message - }, - 100, - { trailing: true } - ) + newStream.addEventListener('serverStart', eventHandler) + newStream.addEventListener('scanStatus', eventHandler) + newStream.addEventListener('keepAlive', eventHandler) newStream.onerror = (e) => { + console.log('EventStream error', e) setTimeout(reconnectIntervalCheck) dispatch(serverDown()) } es = newStream + return es }) }