Added duplicate function

This commit is contained in:
AnthorNet 2015-05-08 17:55:25 +02:00
parent 49915116a7
commit 58f47222f3
6 changed files with 111 additions and 11 deletions

View File

@ -176,6 +176,12 @@
<td class="inbound_5min stat">-</td>
<td class="inbound_60min stat">-</td>
</tr>
<tr>
<th>Messages duplicate</th>
<td class="duplicate_1min stat">-</td>
<td class="duplicate_5min stat">-</td>
<td class="duplicate_60min stat">-</td>
</tr>
<tr>
<th>Messages passed to subscribers</th>
<td class="outbound_1min stat">-</td>

View File

@ -134,6 +134,11 @@ var doUpdateUploaders = function()
$('#uploaders .table tbody').empty();
$.each(uploadersTotal, function(uploader, hits){
if(uploader.length > 32)
truncateUploader = jQuery.trim(uploader).substring(0, 32)/*.split(" ").slice(0, -1).join(" ")*/ + "..."
else
truncateUploader = uploader
$('#uploaders .table tbody').append(
$('<tr>').attr('data-name', uploader).on('mouseover', function(){
chart.get('uploader-' + makeSlug(uploader)).setState('hover');
@ -142,7 +147,7 @@ var doUpdateUploaders = function()
chart.get('uploader-' + makeSlug(uploader)).setState('');
chart.tooltip.hide();
}).append(
$('<td>').html('<strong>' + uploader + '</strong>')
$('<td>').html('<strong>' + truncateUploader + '</strong>')
)
.append(
$('<td>').addClass('stat today').html(formatNumber(uploaders[today][uploader] || 0))
@ -259,14 +264,20 @@ var doUpdates = function(type){
shift = chart.get('inbound').data.length > 60;
chart.get('inbound').addPoint([d.getTime(), (data['inbound'] || {})['1min'] || 0], true, shift);
shift = chart.get('outbound').data.length > 60;
chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift);
if(type == 'gateways')
{
shift = chart.get('invalid').data.length > 60;
chart.get('invalid').addPoint([d.getTime(), (data['invalid'] || {})['1min'] || 0], true, shift);
}
if(type == 'relays')
{
shift = chart.get('duplicate').data.length > 60;
chart.get('duplicate').addPoint([d.getTime(), (data['duplicate'] || {})['1min'] || 0], true, shift);
}
shift = chart.get('outbound').data.length > 60;
chart.get('outbound').addPoint([d.getTime(), (data['outbound'] || {})['1min'] || 0], true, shift);
}
});
});
@ -287,6 +298,13 @@ var showStats = function(type, currentItem){
el.find(".invalid_60min").html((currentItemStats["invalid"] || {})['60min'] || 0);
}
if(type == 'relays')
{
el.find(".duplicate_1min").html((currentItemStats["duplicate"] || {})['1min'] || 0);
el.find(".duplicate_5min").html((currentItemStats["duplicate"] || {})['5min'] || 0);
el.find(".duplicate_60min").html((currentItemStats["duplicate"] || {})['60min'] || 0);
}
el.find(".outbound_1min").html((currentItemStats["outbound"] || {})['1min'] || 0);
el.find(".outbound_5min").html((currentItemStats["outbound"] || {})['5min'] || 0);
el.find(".outbound_60min").html((currentItemStats["outbound"] || {})['60min'] || 0);
@ -353,8 +371,8 @@ var start = function(){
exporting: { enabled: false },
series: [
{id: 'inbound', data: [], name: 'Messages received', zIndex: 300},
{id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200},
{id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1}
{id: 'invalid', data: [], name: 'Invalid messages', zIndex: 1},
{id: 'outbound', data: [], name: 'Messages passed to relay', zIndex: 200}
]
}).hide();
@ -403,6 +421,7 @@ var start = function(){
exporting: { enabled: false },
series: [
{id: 'inbound', data: [], name: 'Messages received', zIndex: 300},
{id: 'duplicate', data: [], name: 'Messages duplicate', zIndex: 1},
{id: 'outbound', data: [], name: 'Messages passed to subscribers', zIndex: 200}
]
}).hide();

View File

@ -15,6 +15,11 @@ from eddn._Conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn._Core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
def date(__format):
d = datetime.datetime.utcnow()
return d.strftime(__format)
@ -183,6 +188,18 @@ class Monitor(Thread):
message = message[1]
else:
message = message[0]
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(message):
schemaID = 'DUPLICATE MESSAGE'
c = db.cursor()
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
db.commit()
return
if Settings.MONITOR_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)

View File

@ -23,6 +23,11 @@ from eddn._Core.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn._Core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
@get('/stats/')
def stats():
@ -78,9 +83,11 @@ class Relay(Thread):
else:
message = message[0]
# if is_message_duped(message):
# We've already seen this message recently. Discard it.
# return
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(message):
# We've already seen this message recently. Discard it.
statsCollector.tally("duplicate")
return
if Settings.RELAY_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)

View File

@ -24,6 +24,9 @@ class _Settings(object):
RELAY_DECOMPRESS_MESSAGES = False
# If set to False, no deduplicate is made
RELAY_DUPLICATE_MAX_MINUTES = 15
# If set to false, don't listen to topic and accept all incoming messages
RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = False
@ -54,8 +57,7 @@ class _Settings(object):
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
MONITOR_DB = "/home/EDDN_Monitor.s3db"
#MONITOR_DB = "D:\EDDN_Monitor.s3db"
MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DECOMPRESS_MESSAGES = True

View File

@ -0,0 +1,49 @@
from datetime import datetime, timedelta
from threading import Lock, Thread
from time import sleep
import hashlib
import zlib
import simplejson
from eddn._Conf.Settings import Settings, loadConfig
class DuplicateMessages(Thread):
max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES
caches = {}
lock = Lock()
def __init__(self):
super(DuplicateMessages, self).__init__()
self.daemon = True
def run(self):
while True:
sleep(60)
with self.lock:
maxTime = datetime.utcnow()
for key in self.caches.keys():
if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime:
del self.caches[key]
def isDuplicated(self, message):
with self.lock:
message = zlib.decompress(message)
message = simplejson.loads(message)
if message['header']['gatewayTimestamp']:
del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
if message['message']['timestamp']:
del message['message']['timestamp'] # Prevent dupe with new timestamp ^^
message = simplejson.dumps(message)
key = hashlib.sha256(message).hexdigest()
if key not in self.caches:
self.caches[key] = datetime.utcnow()
return False
else:
self.caches[key] = datetime.utcnow()
return True