Add a StatsCollector to the Relay too.

This commit is contained in:
James Muscat 2015-04-13 18:00:24 +01:00
parent fce10cdaa6
commit 95e83333d7
3 changed files with 73 additions and 33 deletions

View File

@ -31,7 +31,7 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'eddn-gateway = eddn.Gateway:main', 'eddn-gateway = eddn.Gateway:main',
'eddn-relay = eddn.Relay:run', 'eddn-relay = eddn.Relay:main',
], ],
} }
) )

View File

@ -9,7 +9,7 @@ def main():
subscriber = context.socket(zmq.SUB) subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, "") subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect('tcp://eddn-relay.elite-markets.net:9500') subscriber.connect('tcp://localhost:9500')
while True: while True:
market_json = zlib.decompress(subscriber.recv()) market_json = zlib.decompress(subscriber.recv())

View File

@ -4,55 +4,95 @@ they receive over PUB/SUB.
""" """
# Logging has to be configured first before we do anything. # Logging has to be configured first before we do anything.
import logging import logging
from threading import Thread
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import zlib import zlib
import gevent import gevent
import simplejson
import zmq.green as zmq import zmq.green as zmq
from bottle import get, run as bottle_run
from eddn.conf import Settings from eddn.conf import Settings
from gevent import monkey
monkey.patch_all()
def run(): from eddn.StatsCollector import StatsCollector
"""
Fires up the relay process.
"""
# These form the connection to the Gateway daemon(s) upstream.
context = zmq.Context()
receiver = context.socket(zmq.SUB) statsCollector = StatsCollector()
receiver.setsockopt(zmq.SUBSCRIBE, '') statsCollector.start()
for binding in Settings.RELAY_RECEIVER_BINDINGS:
# Relays bind upstream to an Announcer, or another Relay.
receiver.connect(binding)
sender = context.socket(zmq.PUB)
for binding in Settings.RELAY_SENDER_BINDINGS:
# End users, or other relays, may attach here.
sender.bind(binding)
def relay_worker(message): @get('/stats/')
def stats():
return simplejson.dumps(
{
"inbound": {
"1min": statsCollector.getInboundCount(1),
"5min": statsCollector.getInboundCount(5),
"60min": statsCollector.getInboundCount(60)
},
"outbound": {
"1min": statsCollector.getOutboundCount(1),
"5min": statsCollector.getOutboundCount(5),
"60min": statsCollector.getOutboundCount(60)
},
}
)
class Relay(Thread):
def run(self):
""" """
This is the worker function that re-sends the incoming messages out Fires up the relay process.
to any subscribers.
:param str message: A JSON string to re-broadcast.
""" """
# if is_message_duped(message): # These form the connection to the Gateway daemon(s) upstream.
# We've already seen this message recently. Discard it. context = zmq.Context()
# return
if Settings.RELAY_DECOMPRESS_MESSAGES: receiver = context.socket(zmq.SUB)
message = zlib.decompress(message) receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.RELAY_RECEIVER_BINDINGS:
# Relays bind upstream to an Announcer, or another Relay.
receiver.connect(binding)
sender.send(message) sender = context.socket(zmq.PUB)
for binding in Settings.RELAY_SENDER_BINDINGS:
# End users, or other relays, may attach here.
sender.bind(binding)
logger.info("Relay is now listening for order data.") def relay_worker(message):
"""
This is the worker function that re-sends the incoming messages out
to any subscribers.
:param str message: A JSON string to re-broadcast.
"""
# if is_message_duped(message):
# We've already seen this message recently. Discard it.
# return
while True: if Settings.RELAY_DECOMPRESS_MESSAGES:
# For each incoming message, spawn a greenlet using the relay_worker message = zlib.decompress(message)
# function.
gevent.spawn(relay_worker, receiver.recv()) sender.send(message)
statsCollector.tallyOutbound()
logger.info("Relay is now listening for order data.")
while True:
# For each incoming message, spawn a greenlet using the relay_worker
# function.
inboundMessage = receiver.recv()
statsCollector.tallyInbound()
gevent.spawn(relay_worker, inboundMessage)
def main():
r = Relay()
r.start()
bottle_run(host='0.0.0.0', port=9090, server='gevent')
if __name__ == '__main__': if __name__ == '__main__':
run() main()