Simple stat collector, to better monitor gateway throughput and health.

This commit is contained in:
James Muscat 2015-04-09 00:35:27 +01:00
parent 5d3551d705
commit fce10cdaa6
2 changed files with 73 additions and 0 deletions

View File

@ -33,6 +33,11 @@ validator = Validator()
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
validator.addSchemaResource(schemaRef, schemaFile)
# This import must be done post-monkey-patching!
from eddn.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
def push_message(string_message):
"""
@ -45,6 +50,7 @@ def push_message(string_message):
# announcers.
compressed_msg = zlib.compress(string_message)
sender.send(compressed_msg)
statsCollector.tallyOutbound()
def get_remote_address():
@ -158,6 +164,7 @@ def upload():
logger.error("Error to %s: %s" % (get_remote_address(), exc.message))
return exc.message
statsCollector.tallyInbound()
return parse_and_error_handle(message_body)
@ -171,6 +178,24 @@ def health_check():
return Settings.EDDN_VERSION
@get('/stats/')
def stats():
return simplejson.dumps(
{
"inbound": {
"1min": statsCollector.getInboundCount(1),
"5min": statsCollector.getInboundCount(5),
"60min": statsCollector.getInboundCount(60)
},
"outbound": {
"1min": statsCollector.getOutboundCount(1),
"5min": statsCollector.getOutboundCount(5),
"60min": statsCollector.getOutboundCount(60)
},
}
)
class MalformedUploadError(Exception):
"""
Raise this when an upload is structurally incorrect. This isn't so much

View File

@ -0,0 +1,48 @@
from collections import deque
from itertools import islice
from threading import Lock, Thread
from time import sleep
class StatsCollector(Thread):
'''
Collects simple statistics - number of inbound vs. outbound messages - and
aggregates them over the number of minutes you choose, up to one hour.
'''
max_minutes = 60
inboundMessages = 0
outboundMessages = 0
inboundHistory = deque(maxlen=max_minutes)
outboundHistory = deque(maxlen=max_minutes)
lock = Lock()
def __init__(self):
super(StatsCollector, self).__init__()
self.daemon = True
def run(self):
while True:
sleep(60)
with self.lock:
self.inboundHistory.append(self.inboundMessages)
self.outboundHistory.append(self.outboundMessages)
self.inboundMessages = 0
self.outboundMessages = 0
def tallyInbound(self):
with self.lock:
self.inboundMessages += 1
def tallyOutbound(self):
with self.lock:
self.outboundMessages += 1
def getInboundCount(self, minutes):
return sum(islice(self.inboundHistory, 0, max(minutes, self.max_minutes)))
def getOutboundCount(self, minutes):
return sum(islice(self.outboundHistory, 0, max(minutes, self.max_minutes)))