diff --git a/contrib/monitor/index.html b/contrib/monitor/index.html index 3a195a2..75f0f5d 100644 --- a/contrib/monitor/index.html +++ b/contrib/monitor/index.html @@ -43,6 +43,10 @@
  • Uploaders
  • Schemas
  • + + @@ -406,5 +410,15 @@ + + diff --git a/contrib/monitor/schemas.html b/contrib/monitor/schemas.html index c7c3df8..5870f0b 100644 --- a/contrib/monitor/schemas.html +++ b/contrib/monitor/schemas.html @@ -39,6 +39,10 @@ + + @@ -279,5 +283,15 @@ }); }); + + diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index f049f0b..6ead7df 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -143,17 +143,10 @@ def parse_and_error_handle(data): validationResults = validator.validate(parsed_message) if validationResults.severity <= ValidationSeverity.WARN: - parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat() + 'Z' + parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat() + 'Z' + parsed_message['header']['uploaderIP'] = get_remote_address() - ip_hash_salt = Settings.GATEWAY_IP_KEY_SALT - if ip_hash_salt: - # If an IP hash is set, salt+hash the uploader's IP address and set - # it as the EDDN upload key value. - ip_hash = hashlib.sha1(ip_hash_salt + get_remote_address()).hexdigest() - parsed_message['header']['uploaderKey'] = ip_hash - - # Sends the parsed MarketOrderList or MarketHistoryList to the Announcers - # as compressed JSON. + # Sends the parsed to the Relay/Monitor as compressed JSON. gevent.spawn(push_message, parsed_message, parsed_message['$schemaRef']) logger.info("Accepted %s upload from %s" % ( parsed_message, get_remote_address() diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index d701576..4a0bd0a 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -11,8 +11,10 @@ import sqlite3 import datetime import collections import zmq.green as zmq + from bottle import get, request, response, run as bottle_run from eddn.conf.Settings import Settings, loadConfig +from eddn.core.Analytics import Analytics from gevent import monkey monkey.patch_all() @@ -186,6 +188,8 @@ class Monitor(Thread): receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SUBSCRIBE, '') + + analytics = Analytics() for binding in Settings.MONITOR_RECEIVER_BINDINGS: receiver.connect(binding) @@ -210,6 +214,7 @@ class Monitor(Thread): c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() + db.close() return @@ -244,6 +249,12 @@ class Monitor(Thread): db.commit() db.close() + + uploaderIP = None + if 'uploaderIP' in json['header']: + uploaderIP = json['header']['uploaderIP'].encode('utf8') + + analytics.hit(schemaID, uploaderID, uploaderIP) while True: inboundMessage = receiver.recv() diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index f4cfeb9..cd5437e 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -85,20 +85,30 @@ class Relay(Thread): else: message = message[0] + message = zlib.decompress(message) + message = simplejson.loads(message) + + # Handle duplicate message if Settings.RELAY_DUPLICATE_MAX_MINUTES: if duplicateMessages.isDuplicated(message): # We've already seen this message recently. Discard it. statsCollector.tally("duplicate") return - - if Settings.RELAY_DECOMPRESS_MESSAGES: - message = zlib.decompress(message) - + + # Remove IP to end consumer + if 'uploaderIP' in message['header']: + del json['header']['uploaderIP'] + + # Convert messgae back to JSON + message = simplejson.dumps(message, sort_keys=True) + + # Recompress message when needed + if not Settings.RELAY_DECOMPRESS_MESSAGES: + message = zlib.compress(message) + sender.send(message) statsCollector.tally("outbound") - logger.info("Relay is now listening for order data.") - while True: # For each incoming message, spawn a greenlet using the relay_worker # function. diff --git a/src/eddn/conf/Settings.py b/src/eddn/conf/Settings.py index acec16d..e905983 100644 --- a/src/eddn/conf/Settings.py +++ b/src/eddn/conf/Settings.py @@ -46,8 +46,6 @@ class _Settings(object): GATEWAY_SENDER_BINDINGS = ["tcp://*:8500"] - GATEWAY_IP_KEY_SALT = None - GATEWAY_JSON_SCHEMAS = { "https://eddn.edcd.io/schemas/commodity/3" : "schemas/commodity-v3.0.json", "https://eddn.edcd.io/schemas/commodity/3/test" : "schemas/commodity-v3.0.json", @@ -95,8 +93,11 @@ class _Settings(object): ] MONITOR_DB = "/home/EDDN_Monitor.s3db" + MONITOR_DB = "D:/EDDN_Monitor.s3db" #DEBUG MONITOR_DECOMPRESS_MESSAGES = True + + MONITOR_UA = "UA-496332-23" diff --git a/src/eddn/conf/Version.py b/src/eddn/conf/Version.py index 6688800..d197bd7 100644 --- a/src/eddn/conf/Version.py +++ b/src/eddn/conf/Version.py @@ -1,2 +1,2 @@ # This should be a version number as understood by setuptools -__version__ = "1.0.1" +__version__ = "1.1" diff --git a/src/eddn/core/Analytics.py b/src/eddn/core/Analytics.py new file mode 100644 index 0000000..5c510f0 --- /dev/null +++ b/src/eddn/core/Analytics.py @@ -0,0 +1,33 @@ +# coding: utf8 +import requests + +from hashlib import sha1 +from random import randint + +from traceback import print_exc +from eddn.conf.Settings import Settings + +class Analytics(object): + mobileProperty = 'MO' + Settings.MONITOR_UA[2:] + utmUrl = 'http://www.google-analytics.com/__utm.gif' + + def hit(self, schema, uploaderId, uploaderIp): + try: + if(uploaderId): + uploaderId = str(int("0x%s" % sha1(uploaderId).hexdigest(), 0))[:10] + else: + uploaderId = 'DUPLICATE' + + payload = {} + payload['utmwv'] = "5.2.2d", + payload['utmn'] = str(randint(1, 9999999999)), + payload['utmp'] = schema, + payload['utmac'] = self.mobileProperty + payload['utmcc'] = "__utma=%s;" % ".".join(["1", uploaderId, "1", "1", "1", "1"]) + payload['utmip'] = uploaderIp + + r = requests.get(self.utmUrl, params=payload) + except: + print_exc() + + return \ No newline at end of file diff --git a/src/eddn/core/DuplicateMessages.py b/src/eddn/core/DuplicateMessages.py index d66bb4b..dece230 100644 --- a/src/eddn/core/DuplicateMessages.py +++ b/src/eddn/core/DuplicateMessages.py @@ -33,9 +33,6 @@ class DuplicateMessages(Thread): def isDuplicated(self, message): with self.lock: - message = zlib.decompress(message) - message = simplejson.loads(message) - # Test messages are not duplicate if re.search('test', message['$schemaRef'], re.I): return False