From 17833cd9d289a89a5f065e43c1cbbd00977fd9fb Mon Sep 17 00:00:00 2001 From: Deluan Date: Sat, 12 Dec 2020 13:46:36 -0500 Subject: [PATCH] Make names more consistent --- server/events/sse.go | 78 +++++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/server/events/sse.go b/server/events/sse.go index 19e688ee2..c9ff875dc 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -45,13 +45,13 @@ func (c client) String() string { type broker struct { // Events are pushed to this channel by the main events-gathering routine - notifier messageChan + publish messageChan // New client connections - newClients clientsChan + subscribing clientsChan // Closed client connections - closingClients clientsChan + unsubscribing clientsChan // Client connections registry clients map[client]struct{} @@ -60,10 +60,10 @@ type broker struct { func NewBroker() Broker { // Instantiate a broker broker := &broker{ - notifier: make(messageChan, 100), - newClients: make(clientsChan, 1), - closingClients: make(clientsChan, 1), - clients: make(map[client]struct{}), + publish: make(messageChan, 100), + subscribing: make(clientsChan, 1), + unsubscribing: make(clientsChan, 1), + clients: make(map[client]struct{}), } // Set it running - listening and broadcasting events @@ -72,22 +72,26 @@ func NewBroker() Broker { return broker } -func (broker *broker) SendMessage(evt Event) { - msg := broker.preparePackage(evt) +func (b *broker) SendMessage(evt Event) { + msg := b.preparePackage(evt) log.Trace("Broker received new event", "event", msg) - broker.notifier <- msg + b.publish <- msg } -func (broker *broker) preparePackage(event Event) message { +func (b *broker) newEventID() uint32 { + return atomic.AddUint32(&eventId, 1) +} + +func (b *broker) preparePackage(event Event) message { pkg := message{} - pkg.ID = atomic.AddUint32(&eventId, 1) + pkg.ID = b.newEventID() pkg.Event = event.EventName() data, _ := json.Marshal(event) pkg.Data = string(data) return pkg } -func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Make sure that the writer supports flushing. @@ -106,31 +110,31 @@ func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") // Each connection registers its own message channel with the Broker's connections registry - client := broker.subscribe(r) - defer broker.unsubscribe(client) - log.Debug(ctx, "New broker client", "client", client.String()) + c := b.subscribe(r) + defer b.unsubscribe(c) + log.Debug(ctx, "New broker client", "client", c.String()) for { select { - case event := <-client.channel: + case event := <-c.channel: // Write to the ResponseWriter // Server Sent Events compatible - log.Trace(ctx, "Sending event to client", "event", event, "client", client.String()) + log.Trace(ctx, "Sending event to client", "event", event, "client", c.String()) _, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) // Flush the data immediately instead of buffering it for later. flusher.Flush() case <-ctx.Done(): - log.Trace(ctx, "Closing event stream connection", "client", client.String()) + log.Trace(ctx, "Closing event stream connection", "client", c.String()) return } } } -func (broker *broker) subscribe(r *http.Request) client { +func (b *broker) subscribe(r *http.Request) client { user, _ := request.UserFrom(r.Context()) id, _ := uuid.NewRandom() - client := client{ + c := client{ id: id.String(), username: user.UserName, address: r.RemoteAddr, @@ -139,40 +143,40 @@ func (broker *broker) subscribe(r *http.Request) client { } // Signal the broker that we have a new client - broker.newClients <- client - return client + b.subscribing <- c + return c } -func (broker *broker) unsubscribe(c client) { - broker.closingClients <- c +func (b *broker) unsubscribe(c client) { + b.unsubscribing <- c } -func (broker *broker) listen() { +func (b *broker) listen() { keepAlive := time.NewTicker(keepAliveFrequency) defer keepAlive.Stop() for { select { - case s := <-broker.newClients: + case c := <-b.subscribing: // A new client has connected. // Register their message channel - broker.clients[s] = struct{}{} - log.Debug("Client added to event broker", "numClients", len(broker.clients), "newClient", s.String()) + b.clients[c] = struct{}{} + log.Debug("Client added to event broker", "numClients", len(b.clients), "newClient", c.String()) // Send a serverStart event to new client - s.channel <- broker.preparePackage(&ServerStart{serverStart}) + c.channel <- b.preparePackage(&ServerStart{serverStart}) - case s := <-broker.closingClients: + case c := <-b.unsubscribing: // A client has detached and we want to // stop sending them messages. - close(s.channel) - delete(broker.clients, s) - log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String()) + close(c.channel) + delete(b.clients, c) + log.Debug("Removed client from event broker", "numClients", len(b.clients), "client", c.String()) - case event := <-broker.notifier: + case event := <-b.publish: // We got a new event from the outside! // Send event to all connected clients - for client := range broker.clients { + for client := range b.clients { log.Trace("Putting event on client's queue", "client", client.String(), "event", event) // Use non-blocking send select { @@ -184,7 +188,7 @@ func (broker *broker) listen() { case ts := <-keepAlive.C: // Send a keep alive message every 15 seconds - broker.SendMessage(&KeepAlive{TS: ts.Unix()}) + b.SendMessage(&KeepAlive{TS: ts.Unix()}) } } }