diff --git a/contrib/monitor/index.html b/contrib/monitor/index.html
index e270618..3f676fc 100644
--- a/contrib/monitor/index.html
+++ b/contrib/monitor/index.html
@@ -176,6 +176,12 @@
').attr('data-name', uploader).on('mouseover', function(){
chart.get('uploader-' + makeSlug(uploader)).setState('hover');
@@ -142,7 +147,7 @@ var doUpdateUploaders = function()
chart.get('uploader-' + makeSlug(uploader)).setState('');
chart.tooltip.hide();
}).append(
- $('').html('' + uploader + '')
+ $(' | ').html('' + truncateUploader + '')
)
.append(
$(' | ').addClass('stat today').html(formatNumber(uploaders[today][uploader] || 0))
@@ -259,14 +264,20 @@ var doUpdates = function(type){
shift = chart.get('inbound').data.length > 60;
chart.get('inbound').addPoint([d.getTime(), (data['inbound'] || {})['1min'] || 0], true, shift);
- shift = chart.get('outbound').data.length > 60;
- chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift);
-
if(type == 'gateways')
{
shift = chart.get('invalid').data.length > 60;
chart.get('invalid').addPoint([d.getTime(), (data['invalid'] || {})['1min'] || 0], true, shift);
}
+
+ if(type == 'relays')
+ {
+ shift = chart.get('duplicate').data.length > 60;
+ chart.get('duplicate').addPoint([d.getTime(), (data['duplicate'] || {})['1min'] || 0], true, shift);
+ }
+
+ shift = chart.get('outbound').data.length > 60;
+ chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift);
}
});
});
@@ -287,6 +298,13 @@ var showStats = function(type, currentItem){
el.find(".invalid_60min").html((currentItemStats["invalid"] || {})['60min'] || 0);
}
+ if(type == 'relays')
+ {
+ el.find(".duplicate_1min").html((currentItemStats["duplicate"] || {})['1min'] || 0);
+ el.find(".duplicate_5min").html((currentItemStats["duplicate"] || {})['5min'] || 0);
+ el.find(".duplicate_60min").html((currentItemStats["duplicate"] || {})['60min'] || 0);
+ }
+
el.find(".outbound_1min").html((currentItemStats["outbound"] || {})['1min'] || 0);
el.find(".outbound_5min").html((currentItemStats["outbound"] || {})['5min'] || 0);
el.find(".outbound_60min").html((currentItemStats["outbound"] || {})['60min'] || 0);
@@ -353,8 +371,8 @@ var start = function(){
exporting: { enabled: false },
series: [
{id: 'inbound', data: [], name: 'Messages received', zIndex: 300},
- {id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200},
- {id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1}
+ {id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1},
+ {id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200}
]
}).hide();
@@ -403,6 +421,7 @@ var start = function(){
exporting: { enabled: false },
series: [
{id: 'inbound', data: [], name: 'Messages received', zIndex: 300},
+ {id: 'duplicate', data: [], name: 'Messages duplicate', zIndex: 1},
{id: 'outbound', data: [], name: 'Messages passed to subscribers', zIndex: 200}
]
}).hide();
diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py
index cbf40d1..d3675d3 100644
--- a/src/eddn/Monitor.py
+++ b/src/eddn/Monitor.py
@@ -15,6 +15,11 @@ from eddn._Conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
+if Settings.RELAY_DUPLICATE_MAX_MINUTES:
+ from eddn._Core.DuplicateMessages import DuplicateMessages
+ duplicateMessages = DuplicateMessages()
+ duplicateMessages.start()
+
def date(__format):
d = datetime.datetime.utcnow()
return d.strftime(__format)
@@ -183,6 +188,18 @@ class Monitor(Thread):
message = message[1]
else:
message = message[0]
+
+
+ if Settings.RELAY_DUPLICATE_MAX_MINUTES:
+ if duplicateMessages.isDuplicated(message):
+ schemaID = 'DUPLICATE MESSAGE'
+
+ c = db.cursor()
+ c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
+ c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
+ db.commit()
+
+ return
if Settings.MONITOR_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)
diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py
index 42a0285..9d2f903 100644
--- a/src/eddn/Relay.py
+++ b/src/eddn/Relay.py
@@ -23,6 +23,11 @@ from eddn._Core.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
+if Settings.RELAY_DUPLICATE_MAX_MINUTES:
+ from eddn._Core.DuplicateMessages import DuplicateMessages
+ duplicateMessages = DuplicateMessages()
+ duplicateMessages.start()
+
@get('/stats/')
def stats():
@@ -78,9 +83,11 @@ class Relay(Thread):
else:
message = message[0]
- # if is_message_duped(message):
- # We've already seen this message recently. Discard it.
- # return
+ if Settings.RELAY_DUPLICATE_MAX_MINUTES:
+ if duplicateMessages.isDuplicated(message):
+ # We've already seen this message recently. Discard it.
+ statsCollector.tally("duplicate")
+ return
if Settings.RELAY_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)
diff --git a/src/eddn/_Conf/Settings.py b/src/eddn/_Conf/Settings.py
index 0ba4905..1020736 100644
--- a/src/eddn/_Conf/Settings.py
+++ b/src/eddn/_Conf/Settings.py
@@ -24,6 +24,9 @@ class _Settings(object):
RELAY_DECOMPRESS_MESSAGES = False
+ # If set to False, no deduplicate is made
+ RELAY_DUPLICATE_MAX_MINUTES = 15
+
# If set to false, don't listen to topic and accept all incoming messages
RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = False
@@ -54,8 +57,7 @@ class _Settings(object):
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
- MONITOR_DB = "/home/EDDN_Monitor.s3db"
- #MONITOR_DB = "D:\EDDN_Monitor.s3db"
+ MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DECOMPRESS_MESSAGES = True
diff --git a/src/eddn/_Core/DuplicateMessages.py b/src/eddn/_Core/DuplicateMessages.py
new file mode 100644
index 0000000..48a6055
--- /dev/null
+++ b/src/eddn/_Core/DuplicateMessages.py
@@ -0,0 +1,49 @@
+from datetime import datetime, timedelta
+from threading import Lock, Thread
+from time import sleep
+import hashlib
+import zlib
+import simplejson
+from eddn._Conf.Settings import Settings, loadConfig
+
+
+class DuplicateMessages(Thread):
+ max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES
+
+ caches = {}
+
+ lock = Lock()
+
+ def __init__(self):
+ super(DuplicateMessages, self).__init__()
+ self.daemon = True
+
+ def run(self):
+ while True:
+ sleep(60)
+ with self.lock:
+ maxTime = datetime.utcnow()
+
+ for key in self.caches.keys():
+ if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime:
+ del self.caches[key]
+
+ def isDuplicated(self, message):
+ with self.lock:
+ message = zlib.decompress(message)
+ message = simplejson.loads(message)
+
+ if message['header']['gatewayTimestamp']:
+ del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
+ if message['message']['timestamp']:
+ del message['message']['timestamp'] # Prevent dupe with new timestamp ^^
+
+ message = simplejson.dumps(message)
+ key = hashlib.sha256(message).hexdigest()
+
+ if key not in self.caches:
+ self.caches[key] = datetime.utcnow()
+ return False
+ else:
+ self.caches[key] = datetime.utcnow()
+ return True
\ No newline at end of file
|