Removed event.type from SSE, as it was causing the browser to hang.

Needs more investigation, but for now, back to the simple message format
This commit is contained in:
Deluan 2021-02-04 12:34:27 -05:00
parent 77fc4841e4
commit 7adacbac0d
4 changed files with 31 additions and 31 deletions

View File

@ -1,27 +1,41 @@
package events package events
import "time" import (
"encoding/json"
"reflect"
"strings"
"time"
"unicode"
)
type Event interface { type Event interface {
EventName() string Prepare(Event) string
}
type baseEvent struct {
Name string `json:"name"`
}
func (e *baseEvent) Prepare(evt Event) string {
str := strings.TrimPrefix(reflect.TypeOf(evt).String(), "*events.")
e.Name = str[:0] + string(unicode.ToLower(rune(str[0]))) + str[1:]
data, _ := json.Marshal(evt)
return string(data)
} }
type ScanStatus struct { type ScanStatus struct {
baseEvent
Scanning bool `json:"scanning"` Scanning bool `json:"scanning"`
Count int64 `json:"count"` Count int64 `json:"count"`
FolderCount int64 `json:"folderCount"` FolderCount int64 `json:"folderCount"`
} }
func (s ScanStatus) EventName() string { return "scanStatus" }
type KeepAlive struct { type KeepAlive struct {
baseEvent
TS int64 `json:"ts"` TS int64 `json:"ts"`
} }
func (s KeepAlive) EventName() string { return "keepAlive" }
type ServerStart struct { type ServerStart struct {
baseEvent
StartTime time.Time `json:"startTime"` StartTime time.Time `json:"startTime"`
} }
func (s ServerStart) EventName() string { return "serverStart" }

View File

@ -2,12 +2,10 @@
package events package events
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync/atomic"
"time" "time"
"code.cloudfoundry.org/go-diodes" "code.cloudfoundry.org/go-diodes"
@ -29,14 +27,11 @@ const (
var ( var (
errWriteTimeOut = errors.New("write timeout") errWriteTimeOut = errors.New("write timeout")
eventId uint32
) )
type ( type (
message struct { message struct {
ID uint32 Data string
Event string
Data string
} }
messageChan chan message messageChan chan message
clientsChan chan client clientsChan chan client
@ -84,16 +79,9 @@ func (b *broker) SendMessage(evt Event) {
b.publish <- msg b.publish <- msg
} }
func (b *broker) nextEventID() uint32 {
return atomic.AddUint32(&eventId, 1)
}
func (b *broker) prepareMessage(event Event) message { func (b *broker) prepareMessage(event Event) message {
msg := message{} msg := message{}
msg.ID = b.nextEventID() msg.Data = event.Prepare(event)
msg.Event = event.EventName()
data, _ := json.Marshal(event)
msg.Data = string(data)
return msg return msg
} }
@ -102,7 +90,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
flusher, _ := w.(http.Flusher) flusher, _ := w.(http.Flusher)
complete := make(chan struct{}, 1) complete := make(chan struct{}, 1)
go func() { go func() {
_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) _, err = fmt.Fprintf(w, "data: %s\n\n", event.Data)
// Flush the data immediately instead of buffering it for later. // Flush the data immediately instead of buffering it for later.
flusher.Flush() flusher.Flush()
complete <- struct{}{} complete <- struct{}{}
@ -188,7 +176,7 @@ func (b *broker) listen() {
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
// Send a serverStart event to new client // Send a serverStart event to new client
c.diode.set(b.prepareMessage(&ServerStart{consts.ServerStart})) c.diode.set(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart}))
case c := <-b.unsubscribing: case c := <-b.unsubscribing:
// A client has detached and we want to // A client has detached and we want to

View File

@ -1,8 +1,7 @@
export const EVENT_SCAN_STATUS = 'scanStatus' export const EVENT_SCAN_STATUS = 'scanStatus'
export const EVENT_SERVER_START = 'serverStart' export const EVENT_SERVER_START = 'serverStart'
export const processEvent = (type, event) => { export const processEvent = (type, data) => {
const data = JSON.parse(event)
return { return {
type, type,
data: data, data: data,

View File

@ -54,8 +54,9 @@ const setDispatch = (dispatchFunc) => {
const eventHandler = throttle( const eventHandler = throttle(
(event) => { (event) => {
if (event.type !== 'keepAlive') { const data = JSON.parse(event.data)
dispatch(processEvent(event.type, event.data)) if (data.name !== 'keepAlive') {
dispatch(processEvent(data.name, data))
} }
setTimeout(defaultIntervalCheck) // Reset timeout on every received message setTimeout(defaultIntervalCheck) // Reset timeout on every received message
}, },
@ -71,9 +72,7 @@ const startEventStream = async () => {
} }
return getEventStream() return getEventStream()
.then((newStream) => { .then((newStream) => {
newStream.addEventListener('serverStart', eventHandler) newStream.onmessage = eventHandler
newStream.addEventListener('scanStatus', eventHandler)
newStream.addEventListener('keepAlive', eventHandler)
newStream.onerror = (e) => { newStream.onerror = (e) => {
console.log('EventStream error', e) console.log('EventStream error', e)
setTimeout(reconnectIntervalCheck) setTimeout(reconnectIntervalCheck)