From 95e83333d7b2a42fba701e45e0d3013631330e0b Mon Sep 17 00:00:00 2001 From: James Muscat Date: Mon, 13 Apr 2015 18:00:24 +0100 Subject: [PATCH] Add a StatsCollector to the Relay too. --- setup.py | 2 +- src/eddn/Client.py | 2 +- src/eddn/Relay.py | 102 +++++++++++++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/setup.py b/setup.py index 8355566..ea94d0f 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ setup( entry_points={ 'console_scripts': [ 'eddn-gateway = eddn.Gateway:main', - 'eddn-relay = eddn.Relay:run', + 'eddn-relay = eddn.Relay:main', ], } ) diff --git a/src/eddn/Client.py b/src/eddn/Client.py index b52f47c..3e709db 100644 --- a/src/eddn/Client.py +++ b/src/eddn/Client.py @@ -9,7 +9,7 @@ def main(): subscriber = context.socket(zmq.SUB) subscriber.setsockopt(zmq.SUBSCRIBE, "") - subscriber.connect('tcp://eddn-relay.elite-markets.net:9500') + subscriber.connect('tcp://localhost:9500') while True: market_json = zlib.decompress(subscriber.recv()) diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index b597cd2..37a6770 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -4,55 +4,95 @@ they receive over PUB/SUB. """ # Logging has to be configured first before we do anything. import logging +from threading import Thread logger = logging.getLogger(__name__) import zlib import gevent +import simplejson import zmq.green as zmq +from bottle import get, run as bottle_run from eddn.conf import Settings +from gevent import monkey +monkey.patch_all() -def run(): - """ - Fires up the relay process. - """ - # These form the connection to the Gateway daemon(s) upstream. - context = zmq.Context() +from eddn.StatsCollector import StatsCollector - receiver = context.socket(zmq.SUB) - receiver.setsockopt(zmq.SUBSCRIBE, '') - for binding in Settings.RELAY_RECEIVER_BINDINGS: - # Relays bind upstream to an Announcer, or another Relay. - receiver.connect(binding) +statsCollector = StatsCollector() +statsCollector.start() - sender = context.socket(zmq.PUB) - for binding in Settings.RELAY_SENDER_BINDINGS: - # End users, or other relays, may attach here. - sender.bind(binding) - def relay_worker(message): +@get('/stats/') +def stats(): + return simplejson.dumps( + { + "inbound": { + "1min": statsCollector.getInboundCount(1), + "5min": statsCollector.getInboundCount(5), + "60min": statsCollector.getInboundCount(60) + }, + "outbound": { + "1min": statsCollector.getOutboundCount(1), + "5min": statsCollector.getOutboundCount(5), + "60min": statsCollector.getOutboundCount(60) + }, + } + ) + + +class Relay(Thread): + + def run(self): """ - This is the worker function that re-sends the incoming messages out - to any subscribers. - :param str message: A JSON string to re-broadcast. + Fires up the relay process. """ - # if is_message_duped(message): - # We've already seen this message recently. Discard it. - # return + # These form the connection to the Gateway daemon(s) upstream. + context = zmq.Context() - if Settings.RELAY_DECOMPRESS_MESSAGES: - message = zlib.decompress(message) + receiver = context.socket(zmq.SUB) + receiver.setsockopt(zmq.SUBSCRIBE, '') + for binding in Settings.RELAY_RECEIVER_BINDINGS: + # Relays bind upstream to an Announcer, or another Relay. + receiver.connect(binding) - sender.send(message) + sender = context.socket(zmq.PUB) + for binding in Settings.RELAY_SENDER_BINDINGS: + # End users, or other relays, may attach here. + sender.bind(binding) - logger.info("Relay is now listening for order data.") + def relay_worker(message): + """ + This is the worker function that re-sends the incoming messages out + to any subscribers. + :param str message: A JSON string to re-broadcast. + """ + # if is_message_duped(message): + # We've already seen this message recently. Discard it. + # return - while True: - # For each incoming message, spawn a greenlet using the relay_worker - # function. - gevent.spawn(relay_worker, receiver.recv()) + if Settings.RELAY_DECOMPRESS_MESSAGES: + message = zlib.decompress(message) + + sender.send(message) + statsCollector.tallyOutbound() + + logger.info("Relay is now listening for order data.") + + while True: + # For each incoming message, spawn a greenlet using the relay_worker + # function. + inboundMessage = receiver.recv() + statsCollector.tallyInbound() + gevent.spawn(relay_worker, inboundMessage) + + +def main(): + r = Relay() + r.start() + bottle_run(host='0.0.0.0', port=9090, server='gevent') if __name__ == '__main__': - run() + main()