From fce10cdaa68f2f47a3d398d0a603d0996b6720d4 Mon Sep 17 00:00:00 2001 From: James Muscat Date: Thu, 9 Apr 2015 00:35:27 +0100 Subject: [PATCH] Simple stat collector, to better monitor gateway throughput and health. --- src/eddn/Gateway.py | 25 ++++++++++++++++++++ src/eddn/StatsCollector.py | 48 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/eddn/StatsCollector.py diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index 6637880..53c2138 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -33,6 +33,11 @@ validator = Validator() for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): validator.addSchemaResource(schemaRef, schemaFile) +# This import must be done post-monkey-patching! +from eddn.StatsCollector import StatsCollector +statsCollector = StatsCollector() +statsCollector.start() + def push_message(string_message): """ @@ -45,6 +50,7 @@ def push_message(string_message): # announcers. compressed_msg = zlib.compress(string_message) sender.send(compressed_msg) + statsCollector.tallyOutbound() def get_remote_address(): @@ -158,6 +164,7 @@ def upload(): logger.error("Error to %s: %s" % (get_remote_address(), exc.message)) return exc.message + statsCollector.tallyInbound() return parse_and_error_handle(message_body) @@ -171,6 +178,24 @@ def health_check(): return Settings.EDDN_VERSION +@get('/stats/') +def stats(): + return simplejson.dumps( + { + "inbound": { + "1min": statsCollector.getInboundCount(1), + "5min": statsCollector.getInboundCount(5), + "60min": statsCollector.getInboundCount(60) + }, + "outbound": { + "1min": statsCollector.getOutboundCount(1), + "5min": statsCollector.getOutboundCount(5), + "60min": statsCollector.getOutboundCount(60) + }, + } + ) + + class MalformedUploadError(Exception): """ Raise this when an upload is structurally incorrect. This isn't so much diff --git a/src/eddn/StatsCollector.py b/src/eddn/StatsCollector.py new file mode 100644 index 0000000..1876f32 --- /dev/null +++ b/src/eddn/StatsCollector.py @@ -0,0 +1,48 @@ +from collections import deque +from itertools import islice +from threading import Lock, Thread +from time import sleep + + +class StatsCollector(Thread): + ''' + Collects simple statistics - number of inbound vs. outbound messages - and + aggregates them over the number of minutes you choose, up to one hour. + ''' + + max_minutes = 60 + + inboundMessages = 0 + outboundMessages = 0 + + inboundHistory = deque(maxlen=max_minutes) + outboundHistory = deque(maxlen=max_minutes) + + lock = Lock() + + def __init__(self): + super(StatsCollector, self).__init__() + self.daemon = True + + def run(self): + while True: + sleep(60) + with self.lock: + self.inboundHistory.append(self.inboundMessages) + self.outboundHistory.append(self.outboundMessages) + self.inboundMessages = 0 + self.outboundMessages = 0 + + def tallyInbound(self): + with self.lock: + self.inboundMessages += 1 + + def tallyOutbound(self): + with self.lock: + self.outboundMessages += 1 + + def getInboundCount(self, minutes): + return sum(islice(self.inboundHistory, 0, max(minutes, self.max_minutes))) + + def getOutboundCount(self, minutes): + return sum(islice(self.outboundHistory, 0, max(minutes, self.max_minutes)))