From 58f47222f3474e3b9380186045871d0cb0bcdc95 Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Fri, 8 May 2015 17:55:25 +0200 Subject: [PATCH] Added duplicate function --- contrib/monitor/index.html | 6 ++++ contrib/monitor/js/eddn.js | 31 ++++++++++++++---- src/eddn/Monitor.py | 17 ++++++++++ src/eddn/Relay.py | 13 ++++++-- src/eddn/_Conf/Settings.py | 6 ++-- src/eddn/_Core/DuplicateMessages.py | 49 +++++++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 11 deletions(-) create mode 100644 src/eddn/_Core/DuplicateMessages.py diff --git a/contrib/monitor/index.html b/contrib/monitor/index.html index e270618..3f676fc 100644 --- a/contrib/monitor/index.html +++ b/contrib/monitor/index.html @@ -176,6 +176,12 @@ - - + + Messages duplicate + - + - + - + Messages passed to subscribers - diff --git a/contrib/monitor/js/eddn.js b/contrib/monitor/js/eddn.js index b7db040..86ecd80 100644 --- a/contrib/monitor/js/eddn.js +++ b/contrib/monitor/js/eddn.js @@ -134,6 +134,11 @@ var doUpdateUploaders = function() $('#uploaders .table tbody').empty(); $.each(uploadersTotal, function(uploader, hits){ + if(uploader.length > 32) + truncateUploader = jQuery.trim(uploader).substring(0, 32)/*.split(" ").slice(0, -1).join(" ")*/ + "..." + else + truncateUploader = uploader + $('#uploaders .table tbody').append( $('').attr('data-name', uploader).on('mouseover', function(){ chart.get('uploader-' + makeSlug(uploader)).setState('hover'); @@ -142,7 +147,7 @@ var doUpdateUploaders = function() chart.get('uploader-' + makeSlug(uploader)).setState(''); chart.tooltip.hide(); }).append( - $('').html('' + uploader + '') + $('').html('' + truncateUploader + '') ) .append( $('').addClass('stat today').html(formatNumber(uploaders[today][uploader] || 0)) @@ -259,14 +264,20 @@ var doUpdates = function(type){ shift = chart.get('inbound').data.length > 60; chart.get('inbound').addPoint([d.getTime(), (data['inbound'] || {})['1min'] || 0], true, shift); - shift = chart.get('outbound').data.length > 60; - chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift); - if(type == 'gateways') { shift = chart.get('invalid').data.length > 60; chart.get('invalid').addPoint([d.getTime(), (data['invalid'] || {})['1min'] || 0], true, shift); } + + if(type == 'relays') + { + shift = chart.get('duplicate').data.length > 60; + chart.get('duplicate').addPoint([d.getTime(), (data['duplicate'] || {})['1min'] || 0], true, shift); + } + + shift = chart.get('outbound').data.length > 60; + chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift); } }); }); @@ -287,6 +298,13 @@ var showStats = function(type, currentItem){ el.find(".invalid_60min").html((currentItemStats["invalid"] || {})['60min'] || 0); } + if(type == 'relays') + { + el.find(".duplicate_1min").html((currentItemStats["duplicate"] || {})['1min'] || 0); + el.find(".duplicate_5min").html((currentItemStats["duplicate"] || {})['5min'] || 0); + el.find(".duplicate_60min").html((currentItemStats["duplicate"] || {})['60min'] || 0); + } + el.find(".outbound_1min").html((currentItemStats["outbound"] || {})['1min'] || 0); el.find(".outbound_5min").html((currentItemStats["outbound"] || {})['5min'] || 0); el.find(".outbound_60min").html((currentItemStats["outbound"] || {})['60min'] || 0); @@ -353,8 +371,8 @@ var start = function(){ exporting: { enabled: false }, series: [ {id: 'inbound', data: [], name: 'Messages received', zIndex: 300}, - {id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200}, - {id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1} + {id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1}, + {id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200} ] }).hide(); @@ -403,6 +421,7 @@ var start = function(){ exporting: { enabled: false }, series: [ {id: 'inbound', data: [], name: 'Messages received', zIndex: 300}, + {id: 'duplicate', data: [], name: 'Messages duplicate', zIndex: 1}, {id: 'outbound', data: [], name: 'Messages passed to subscribers', zIndex: 200} ] }).hide(); diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index cbf40d1..d3675d3 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -15,6 +15,11 @@ from eddn._Conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() +if Settings.RELAY_DUPLICATE_MAX_MINUTES: + from eddn._Core.DuplicateMessages import DuplicateMessages + duplicateMessages = DuplicateMessages() + duplicateMessages.start() + def date(__format): d = datetime.datetime.utcnow() return d.strftime(__format) @@ -183,6 +188,18 @@ class Monitor(Thread): message = message[1] else: message = message[0] + + + if Settings.RELAY_DUPLICATE_MAX_MINUTES: + if duplicateMessages.isDuplicated(message): + schemaID = 'DUPLICATE MESSAGE' + + c = db.cursor() + c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) + c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) + db.commit() + + return if Settings.MONITOR_DECOMPRESS_MESSAGES: message = zlib.decompress(message) diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 42a0285..9d2f903 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -23,6 +23,11 @@ from eddn._Core.StatsCollector import StatsCollector statsCollector = StatsCollector() statsCollector.start() +if Settings.RELAY_DUPLICATE_MAX_MINUTES: + from eddn._Core.DuplicateMessages import DuplicateMessages + duplicateMessages = DuplicateMessages() + duplicateMessages.start() + @get('/stats/') def stats(): @@ -78,9 +83,11 @@ class Relay(Thread): else: message = message[0] - # if is_message_duped(message): - # We've already seen this message recently. Discard it. - # return + if Settings.RELAY_DUPLICATE_MAX_MINUTES: + if duplicateMessages.isDuplicated(message): + # We've already seen this message recently. Discard it. + statsCollector.tally("duplicate") + return if Settings.RELAY_DECOMPRESS_MESSAGES: message = zlib.decompress(message) diff --git a/src/eddn/_Conf/Settings.py b/src/eddn/_Conf/Settings.py index 0ba4905..1020736 100644 --- a/src/eddn/_Conf/Settings.py +++ b/src/eddn/_Conf/Settings.py @@ -24,6 +24,9 @@ class _Settings(object): RELAY_DECOMPRESS_MESSAGES = False + # If set to False, no deduplicate is made + RELAY_DUPLICATE_MAX_MINUTES = 15 + # If set to false, don't listen to topic and accept all incoming messages RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = False @@ -54,8 +57,7 @@ class _Settings(object): MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] - MONITOR_DB = "/home/EDDN_Monitor.s3db" - #MONITOR_DB = "D:\EDDN_Monitor.s3db" + MONITOR_DB = "/home/EDDN_Monitor.s3db" MONITOR_DECOMPRESS_MESSAGES = True diff --git a/src/eddn/_Core/DuplicateMessages.py b/src/eddn/_Core/DuplicateMessages.py new file mode 100644 index 0000000..48a6055 --- /dev/null +++ b/src/eddn/_Core/DuplicateMessages.py @@ -0,0 +1,49 @@ +from datetime import datetime, timedelta +from threading import Lock, Thread +from time import sleep +import hashlib +import zlib +import simplejson +from eddn._Conf.Settings import Settings, loadConfig + + +class DuplicateMessages(Thread): + max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES + + caches = {} + + lock = Lock() + + def __init__(self): + super(DuplicateMessages, self).__init__() + self.daemon = True + + def run(self): + while True: + sleep(60) + with self.lock: + maxTime = datetime.utcnow() + + for key in self.caches.keys(): + if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime: + del self.caches[key] + + def isDuplicated(self, message): + with self.lock: + message = zlib.decompress(message) + message = simplejson.loads(message) + + if message['header']['gatewayTimestamp']: + del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^ + if message['message']['timestamp']: + del message['message']['timestamp'] # Prevent dupe with new timestamp ^^ + + message = simplejson.dumps(message) + key = hashlib.sha256(message).hexdigest() + + if key not in self.caches: + self.caches[key] = datetime.utcnow() + return False + else: + self.caches[key] = datetime.utcnow() + return True \ No newline at end of file