From 2f72fe84fea717dba10d429797214275dbfda10a Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Fri, 5 Jun 2015 16:08:32 +0200 Subject: [PATCH 01/11] Moved _Core/_Conf to core/conf and some UTF8 fixes --- .gitignore | 2 +- setup.py | 2 +- src/eddn/Gateway.py | 8 +- src/eddn/Monitor.py | 133 +++++++++--------- src/eddn/Relay.py | 8 +- src/eddn/{_Conf => conf}/Settings.py | 8 +- src/eddn/{_Conf => conf}/Version.py | 2 + src/eddn/{_Conf => conf}/__init__.py | 0 .../{_Conf => conf}/tests/TestSettings.py | 0 src/eddn/{_Conf => conf}/tests/__init__.py | 0 .../tests/testLoadSettings.json | 0 src/eddn/{_Core => core}/DuplicateMessages.py | 4 +- src/eddn/{_Core => core}/StatsCollector.py | 2 + src/eddn/{_Core => core}/Validator.py | 2 + src/eddn/{_Core => core}/__init__.py | 0 15 files changed, 92 insertions(+), 79 deletions(-) rename src/eddn/{_Conf => conf}/Settings.py (96%) rename src/eddn/{_Conf => conf}/Version.py (55%) rename src/eddn/{_Conf => conf}/__init__.py (100%) rename src/eddn/{_Conf => conf}/tests/TestSettings.py (100%) rename src/eddn/{_Conf => conf}/tests/__init__.py (100%) rename src/eddn/{_Conf => conf}/tests/testLoadSettings.json (100%) rename src/eddn/{_Core => core}/DuplicateMessages.py (95%) rename src/eddn/{_Core => core}/StatsCollector.py (99%) rename src/eddn/{_Core => core}/Validator.py (99%) rename src/eddn/{_Core => core}/__init__.py (100%) diff --git a/.gitignore b/.gitignore index b4e601e..4dc28c8 100644 --- a/.gitignore +++ b/.gitignore @@ -52,5 +52,5 @@ docs/_build/ # PyBuilder target/ -*.prices + *.prices diff --git a/setup.py b/setup.py index d23f9ce..1b8a056 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import re import glob -VERSIONFILE = "src/eddn/_Conf/Version.py" +VERSIONFILE = "src/eddn/conf/Version.py" verstr = "unknown" try: verstrline = open(VERSIONFILE, "rt").read() diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index bfcff97..19f3980 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -1,3 +1,5 @@ +# coding: utf8 + """ Contains the necessary ZeroMQ socket and a helper function to publish market data to the Announcer daemons. @@ -13,8 +15,8 @@ from datetime import datetime import os -from eddn._Conf.Settings import Settings, loadConfig -from eddn._Core.Validator import Validator, ValidationSeverity +from eddn.conf.Settings import Settings, loadConfig +from eddn.core.Validator import Validator, ValidationSeverity from gevent import monkey monkey.patch_all() @@ -29,7 +31,7 @@ sender = context.socket(zmq.PUB) validator = Validator() # This import must be done post-monkey-patching! -from eddn._Core.StatsCollector import StatsCollector +from eddn.core.StatsCollector import StatsCollector statsCollector = StatsCollector() statsCollector.start() diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index e668dfe..cf046a9 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -1,3 +1,5 @@ +# coding: utf8 + """ Monitor sit below gateways, or another relay, and simply parse what it receives over SUB. """ @@ -10,13 +12,13 @@ import datetime import collections import zmq.green as zmq from bottle import get, request, response, run as bottle_run -from eddn._Conf.Settings import Settings, loadConfig +from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() if Settings.RELAY_DUPLICATE_MAX_MINUTES: - from eddn._Core.DuplicateMessages import DuplicateMessages + from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() duplicateMessages.start() @@ -30,138 +32,138 @@ def getTotalSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) softwares = collections.OrderedDict() - + maxDays = request.GET.get('maxDays', '31').strip() maxDays = int(maxDays) -1; - + query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate FROM softwares GROUP BY name HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') ORDER BY total DESC""" results = db.execute(query) - + for row in results: - softwares[str(row[0])] = str(row[1]) - + softwares[row[0].encode('utf8')] = str(row[1]) + db.close() - + return simplejson.dumps(softwares) - + @get('/getSoftwares/') def getSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) softwares = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM softwares WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: - if not str(row[2]) in softwares.keys(): - softwares[str(row[2])] = collections.OrderedDict() - - softwares[str(row[2])][str(row[0])] = str(row[1]) - + if not str(row[2].encode('utf8')) in softwares.keys(): + softwares[row[2].encode('utf8')] = collections.OrderedDict() + + softwares[row[2].encode('utf8')][str(row[0])] = str(row[1]) + db.close() - + return simplejson.dumps(softwares) - + @get('/getTotalUploaders/') def getTotalUploaders(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) uploaders = collections.OrderedDict() - + limit = request.GET.get('limit', '20').strip() - + query = """SELECT name, SUM(hits) AS total FROM uploaders GROUP BY name ORDER BY total DESC LIMIT """ + limit results = db.execute(query) - + for row in results: - uploaders[str(row[0])] = str(row[1]) - + uploaders[row[0].encode('utf8')] = row[1] + db.close() - + return simplejson.dumps(uploaders) - + @get('/getUploaders/') def getUploaders(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) uploaders = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM uploaders WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: - if not str(row[2]) in uploaders.keys(): - uploaders[str(row[2])] = collections.OrderedDict() - - uploaders[str(row[2])][str(row[0])] = str(row[1]) - + if not row[2].encode('utf8') in uploaders.keys(): + uploaders[row[2].encode('utf8')] = collections.OrderedDict() + + uploaders[row[2]][row[0].encode('utf8')] = row[1] + db.close() - + return simplejson.dumps(uploaders) - + @get('/getTotalSchemas/') def getTotalSchemas(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) schemas = collections.OrderedDict() - + query = """SELECT name, SUM(hits) AS total FROM schemas GROUP BY name ORDER BY total DESC""" results = db.execute(query) - + for row in results: schemas[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) - + @get('/getSchemas/') def getSchemas(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) schemas = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM schemas WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: if not str(row[2]) in schemas.keys(): schemas[str(row[2])] = collections.OrderedDict() - + schemas[str(row[2])][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) @@ -170,66 +172,69 @@ class Monitor(Thread): def run(self): context = zmq.Context() - + receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SUBSCRIBE, '') - + for binding in Settings.MONITOR_RECEIVER_BINDINGS: receiver.connect(binding) def monitor_worker(message): db = sqlite3.connect(Settings.MONITOR_DB) - + # Separate topic from message message = message.split(' |-| ') - + # Handle gateway not sending topic if len(message) > 1: message = message[1] else: message = message[0] + if Settings.RELAY_DUPLICATE_MAX_MINUTES: if duplicateMessages.isDuplicated(message): schemaID = 'DUPLICATE MESSAGE' - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + return - + if Settings.MONITOR_DECOMPRESS_MESSAGES: message = zlib.decompress(message) - + json = simplejson.loads(message) - + # Update software count - softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion'] - + softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8') + c = db.cursor() c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, )) c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, )) db.commit() - + + # Update uploader count - uploaderID = json['header']['uploaderID'] - + uploaderID = json['header']['uploaderID'].encode('utf8') + if uploaderID: # Don't get empty uploaderID c = db.cursor() c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, )) c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, )) db.commit() - + + # Update schemas count schemaID = json['$schemaRef'] - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + db.close() while True: diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 581c3e8..209649b 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -1,3 +1,5 @@ +# coding: utf8 + """ Relays sit below an announcer, or another relay, and simply repeat what they receive over PUB/SUB. @@ -13,18 +15,18 @@ import gevent import simplejson import zmq.green as zmq from bottle import get, response, run as bottle_run -from eddn._Conf.Settings import Settings, loadConfig +from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() -from eddn._Core.StatsCollector import StatsCollector +from eddn.core.StatsCollector import StatsCollector statsCollector = StatsCollector() statsCollector.start() if Settings.RELAY_DUPLICATE_MAX_MINUTES: - from eddn._Core.DuplicateMessages import DuplicateMessages + from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() duplicateMessages.start() diff --git a/src/eddn/_Conf/Settings.py b/src/eddn/conf/Settings.py similarity index 96% rename from src/eddn/_Conf/Settings.py rename to src/eddn/conf/Settings.py index 36673ca..7e08307 100644 --- a/src/eddn/_Conf/Settings.py +++ b/src/eddn/conf/Settings.py @@ -1,12 +1,8 @@ -''' -Created on 15 Nov 2014 - -@author: james -''' +# coding: utf8 import argparse import simplejson -from eddn._Conf.Version import __version__ as version +from eddn.conf.Version import __version__ as version class _Settings(object): diff --git a/src/eddn/_Conf/Version.py b/src/eddn/conf/Version.py similarity index 55% rename from src/eddn/_Conf/Version.py rename to src/eddn/conf/Version.py index 896a370..5b67b06 100644 --- a/src/eddn/_Conf/Version.py +++ b/src/eddn/conf/Version.py @@ -1 +1,3 @@ +# coding: utf8 + __version__ = "0.4" diff --git a/src/eddn/_Conf/__init__.py b/src/eddn/conf/__init__.py similarity index 100% rename from src/eddn/_Conf/__init__.py rename to src/eddn/conf/__init__.py diff --git a/src/eddn/_Conf/tests/TestSettings.py b/src/eddn/conf/tests/TestSettings.py similarity index 100% rename from src/eddn/_Conf/tests/TestSettings.py rename to src/eddn/conf/tests/TestSettings.py diff --git a/src/eddn/_Conf/tests/__init__.py b/src/eddn/conf/tests/__init__.py similarity index 100% rename from src/eddn/_Conf/tests/__init__.py rename to src/eddn/conf/tests/__init__.py diff --git a/src/eddn/_Conf/tests/testLoadSettings.json b/src/eddn/conf/tests/testLoadSettings.json similarity index 100% rename from src/eddn/_Conf/tests/testLoadSettings.json rename to src/eddn/conf/tests/testLoadSettings.json diff --git a/src/eddn/_Core/DuplicateMessages.py b/src/eddn/core/DuplicateMessages.py similarity index 95% rename from src/eddn/_Core/DuplicateMessages.py rename to src/eddn/core/DuplicateMessages.py index 5d79107..45f26b2 100644 --- a/src/eddn/_Core/DuplicateMessages.py +++ b/src/eddn/core/DuplicateMessages.py @@ -1,10 +1,12 @@ +# coding: utf8 + from datetime import datetime, timedelta from threading import Lock, Thread from time import sleep import hashlib import zlib import simplejson -from eddn._Conf.Settings import Settings, loadConfig +from eddn.conf.Settings import Settings, loadConfig class DuplicateMessages(Thread): diff --git a/src/eddn/_Core/StatsCollector.py b/src/eddn/core/StatsCollector.py similarity index 99% rename from src/eddn/_Core/StatsCollector.py rename to src/eddn/core/StatsCollector.py index 7e12376..fdf8ed7 100644 --- a/src/eddn/_Core/StatsCollector.py +++ b/src/eddn/core/StatsCollector.py @@ -1,3 +1,5 @@ +# coding: utf8 + from collections import deque from datetime import datetime from itertools import islice diff --git a/src/eddn/_Core/Validator.py b/src/eddn/core/Validator.py similarity index 99% rename from src/eddn/_Core/Validator.py rename to src/eddn/core/Validator.py index 330d9ff..18ea5b4 100644 --- a/src/eddn/_Core/Validator.py +++ b/src/eddn/core/Validator.py @@ -1,3 +1,5 @@ +# coding: utf8 + import simplejson from enum import IntEnum from jsonschema import validate as jsValidate, ValidationError diff --git a/src/eddn/_Core/__init__.py b/src/eddn/core/__init__.py similarity index 100% rename from src/eddn/_Core/__init__.py rename to src/eddn/core/__init__.py From ceaac2be74d6b826bae6faa4a782475002b4e108 Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Fri, 5 Jun 2015 16:43:59 +0200 Subject: [PATCH 02/11] Fix PEP8 --- src/eddn/Gateway.py | 2 +- src/eddn/Monitor.py | 235 +++++++++++++++-------------- src/eddn/Relay.py | 11 +- src/eddn/conf/Settings.py | 5 +- src/eddn/core/DuplicateMessages.py | 2 +- 5 files changed, 129 insertions(+), 126 deletions(-) diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index 19f3980..dc0af02 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -55,7 +55,7 @@ def push_message(string_message, topic): """ # Push a zlib compressed JSON representation of the message to - # announcers with schema as topic + # announcers with schema as topic compressed_msg = zlib.compress(string_message) sender.send("%s |-| %s" % (topic, compressed_msg)) statsCollector.tally("outbound") diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index cf046a9..2c3a9fa 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -17,11 +17,13 @@ from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() +# This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() duplicateMessages.start() + def date(__format): d = datetime.datetime.utcnow() return d.strftime(__format) @@ -30,211 +32,212 @@ def date(__format): @get('/getTotalSoftwares/') def getTotalSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - softwares = collections.OrderedDict() - - maxDays = request.GET.get('maxDays', '31').strip() - maxDays = int(maxDays) -1; - - query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate - FROM softwares - GROUP BY name - HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') - ORDER BY total DESC""" - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + softwares = collections.OrderedDict() + + maxDays = request.GET.get('maxDays', '31').strip() + maxDays = int(maxDays) - 1 + + query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate + FROM softwares + GROUP BY name + HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') + ORDER BY total DESC""" + results = db.execute(query) + for row in results: softwares[row[0].encode('utf8')] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + + @get('/getSoftwares/') def getSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - softwares = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM softwares - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + softwares = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM softwares + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not str(row[2].encode('utf8')) in softwares.keys(): softwares[row[2].encode('utf8')] = collections.OrderedDict() - + softwares[row[2].encode('utf8')][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + + @get('/getTotalUploaders/') def getTotalUploaders(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - uploaders = collections.OrderedDict() - - limit = request.GET.get('limit', '20').strip() - - query = """SELECT name, SUM(hits) AS total - FROM uploaders - GROUP BY name - ORDER BY total DESC - LIMIT """ + limit - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + uploaders = collections.OrderedDict() + + limit = request.GET.get('limit', '20').strip() + + query = """SELECT name, SUM(hits) AS total + FROM uploaders + GROUP BY name + ORDER BY total DESC + LIMIT """ + limit + results = db.execute(query) + for row in results: uploaders[row[0].encode('utf8')] = row[1] - + db.close() - + return simplejson.dumps(uploaders) - + + @get('/getUploaders/') def getUploaders(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - uploaders = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM uploaders - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + uploaders = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM uploaders + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not row[2].encode('utf8') in uploaders.keys(): uploaders[row[2].encode('utf8')] = collections.OrderedDict() - + uploaders[row[2]][row[0].encode('utf8')] = row[1] - + db.close() - + return simplejson.dumps(uploaders) - + + @get('/getTotalSchemas/') def getTotalSchemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - schemas = collections.OrderedDict() - - query = """SELECT name, SUM(hits) AS total - FROM schemas - GROUP BY name - ORDER BY total DESC""" - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + schemas = collections.OrderedDict() + + query = """SELECT name, SUM(hits) AS total + FROM schemas + GROUP BY name + ORDER BY total DESC""" + results = db.execute(query) + for row in results: schemas[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) - + + @get('/getSchemas/') def getSchemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - schemas = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM schemas - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + schemas = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM schemas + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not str(row[2]) in schemas.keys(): schemas[str(row[2])] = collections.OrderedDict() - - schemas[str(row[2])][str(row[0])] = str(row[1]) - - db.close() - - return simplejson.dumps(schemas) + schemas[str(row[2])][str(row[0])] = str(row[1]) + + db.close() + + return simplejson.dumps(schemas) class Monitor(Thread): def run(self): - context = zmq.Context() - + context = zmq.Context() + receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SUBSCRIBE, '') - + for binding in Settings.MONITOR_RECEIVER_BINDINGS: receiver.connect(binding) def monitor_worker(message): - db = sqlite3.connect(Settings.MONITOR_DB) - + db = sqlite3.connect(Settings.MONITOR_DB) + # Separate topic from message message = message.split(' |-| ') - + # Handle gateway not sending topic if len(message) > 1: message = message[1] else: message = message[0] - if Settings.RELAY_DUPLICATE_MAX_MINUTES: if duplicateMessages.isDuplicated(message): schemaID = 'DUPLICATE MESSAGE' - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + return - + if Settings.MONITOR_DECOMPRESS_MESSAGES: message = zlib.decompress(message) - - json = simplejson.loads(message) - + + json = simplejson.loads(message) + # Update software count softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8') - + c = db.cursor() c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, )) c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, )) db.commit() - - + # Update uploader count uploaderID = json['header']['uploaderID'].encode('utf8') - - if uploaderID: # Don't get empty uploaderID + + if uploaderID: # Don't get empty uploaderID c = db.cursor() c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, )) c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, )) db.commit() - - - # Update schemas count + + # Update schemas count schemaID = json['$schemaRef'] - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + db.close() while True: diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 209649b..58ea46b 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -20,11 +20,12 @@ from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() +# This import must be done post-monkey-patching! from eddn.core.StatsCollector import StatsCollector - statsCollector = StatsCollector() statsCollector.start() +# This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() @@ -71,11 +72,11 @@ class Relay(Thread): def relay_worker(message): """ - This is the worker function that re-sends the incoming messages out - to any subscribers. - :param str message: A JSON string to re-broadcast. + This is the worker function that re-sends the incoming messages out + to any subscribers. + :param str message: A JSON string to re-broadcast. """ - # Separate topic from message + # Separate topic from message message = message.split(' |-| ') # Handle gateway not sending topic diff --git a/src/eddn/conf/Settings.py b/src/eddn/conf/Settings.py index 7e08307..e5f211b 100644 --- a/src/eddn/conf/Settings.py +++ b/src/eddn/conf/Settings.py @@ -13,7 +13,7 @@ class _Settings(object): # Relay settings ############################################################################### - #RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"] + # RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"] RELAY_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] RELAY_SENDER_BINDINGS = ["tcp://*:9500"] @@ -52,11 +52,10 @@ class _Settings(object): MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] - MONITOR_DB = "/home/EDDN_Monitor.s3db" + MONITOR_DB = "/home/EDDN_Monitor.s3db" MONITOR_DECOMPRESS_MESSAGES = True - def loadFrom(self, fileName): f = open(fileName, 'r') conf = simplejson.load(f) diff --git a/src/eddn/core/DuplicateMessages.py b/src/eddn/core/DuplicateMessages.py index 45f26b2..3bb2e2b 100644 --- a/src/eddn/core/DuplicateMessages.py +++ b/src/eddn/core/DuplicateMessages.py @@ -12,7 +12,7 @@ from eddn.conf.Settings import Settings, loadConfig class DuplicateMessages(Thread): max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES - caches = {} + caches = {} lock = Lock() From e3bbe974060c4dba840cf78e5fab66db80720bd5 Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Mon, 8 Jun 2015 07:37:54 +0200 Subject: [PATCH 03/11] Fix missing dependencies in simple python examples --- examples/Python 2.7/Client_Simple.py | 1 + examples/Python 3.4/Client_Simple.py | 1 + 2 files changed, 2 insertions(+) diff --git a/examples/Python 2.7/Client_Simple.py b/examples/Python 2.7/Client_Simple.py index fde638d..020ca4c 100644 --- a/examples/Python 2.7/Client_Simple.py +++ b/examples/Python 2.7/Client_Simple.py @@ -2,6 +2,7 @@ import zlib import zmq import simplejson import sys +import time """ " Configuration diff --git a/examples/Python 3.4/Client_Simple.py b/examples/Python 3.4/Client_Simple.py index 69e02ea..acf668d 100644 --- a/examples/Python 3.4/Client_Simple.py +++ b/examples/Python 3.4/Client_Simple.py @@ -2,6 +2,7 @@ import zlib import zmq import simplejson import sys +import time """ " Configuration From cb9816aa911da2000abaa517ef68affbf339f7d5 Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Mon, 8 Jun 2015 07:43:03 +0200 Subject: [PATCH 04/11] Fix disconnect not working on python examples --- examples/Python 2.7/Client_Complete.py | 4 +++- examples/Python 2.7/Client_Simple.py | 3 ++- examples/Python 3.4/Client_Complete.py | 4 +++- examples/Python 3.4/Client_Simple.py | 3 ++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/examples/Python 2.7/Client_Complete.py b/examples/Python 2.7/Client_Complete.py index 5f6026e..dd13739 100644 --- a/examples/Python 2.7/Client_Complete.py +++ b/examples/Python 2.7/Client_Complete.py @@ -209,8 +209,10 @@ def main(): except zmq.ZMQError, e: echoLog('') echoLog('ZMQSocketException: ' + str(e)) + subscriber.disconnect(__relayEDDN) + echoLog('Disconnect from ' + __relayEDDN) echoLog('') - time.sleep(10) + time.sleep(5) diff --git a/examples/Python 2.7/Client_Simple.py b/examples/Python 2.7/Client_Simple.py index 020ca4c..e8ae7fd 100644 --- a/examples/Python 2.7/Client_Simple.py +++ b/examples/Python 2.7/Client_Simple.py @@ -43,7 +43,8 @@ def main(): except zmq.ZMQError, e: print 'ZMQSocketException: ' + str(e) sys.stdout.flush() - time.sleep(10) + subscriber.disconnect(__relayEDDN) + time.sleep(5) diff --git a/examples/Python 3.4/Client_Complete.py b/examples/Python 3.4/Client_Complete.py index e40816a..f794198 100644 --- a/examples/Python 3.4/Client_Complete.py +++ b/examples/Python 3.4/Client_Complete.py @@ -209,8 +209,10 @@ def main(): except zmq.ZMQError as e: echoLog('') echoLog('ZMQSocketException: ' + str(e)) + subscriber.disconnect(__relayEDDN) + echoLog('Disconnect from ' + __relayEDDN) echoLog('') - time.sleep(10) + time.sleep(5) diff --git a/examples/Python 3.4/Client_Simple.py b/examples/Python 3.4/Client_Simple.py index acf668d..526c44f 100644 --- a/examples/Python 3.4/Client_Simple.py +++ b/examples/Python 3.4/Client_Simple.py @@ -43,7 +43,8 @@ def main(): except zmq.ZMQError as e: print ('ZMQSocketException: ' + str(e)) sys.stdout.flush() - time.sleep(10) + subscriber.disconnect(__relayEDDN) + time.sleep(5) From 85ab051b9aff9122d02a0a34e28d3fd7a0847b3e Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Mon, 8 Jun 2015 08:20:05 +0200 Subject: [PATCH 05/11] Remove unnecessary stats collector in favor of monitor --- src/eddn/Gateway.py | 4 ---- src/eddn/conf/Settings.py | 1 - 2 files changed, 5 deletions(-) diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index dc0af02..cdab0a2 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -132,10 +132,6 @@ def parse_and_error_handle(data): validationResults = validator.validate(parsed_message) if validationResults.severity <= ValidationSeverity.WARN: - - statsCollector.tally(parsed_message["$schemaRef"]) - statsCollector.tally(parsed_message['header']['softwareName'] + " " + parsed_message['header']['softwareVersion']) - parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat() ip_hash_salt = Settings.GATEWAY_IP_KEY_SALT diff --git a/src/eddn/conf/Settings.py b/src/eddn/conf/Settings.py index e5f211b..1d436c7 100644 --- a/src/eddn/conf/Settings.py +++ b/src/eddn/conf/Settings.py @@ -13,7 +13,6 @@ class _Settings(object): # Relay settings ############################################################################### - # RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"] RELAY_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] RELAY_SENDER_BINDINGS = ["tcp://*:9500"] From 86cdb0d912efa0cd0eda64fc1b64269a6a53df77 Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Mon, 8 Jun 2015 08:22:40 +0200 Subject: [PATCH 06/11] Remove gateway/relay sorting So main gateway/relay stay at start in charts --- contrib/monitor/js/eddn.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/monitor/js/eddn.js b/contrib/monitor/js/eddn.js index 875a6b7..8b71ecf 100644 --- a/contrib/monitor/js/eddn.js +++ b/contrib/monitor/js/eddn.js @@ -357,7 +357,7 @@ var start = function(){ Highcharts.setOptions({global: {useUTC: false}}); // Grab gateways - gateways = gateways.sort(); + //gateways = gateways.sort(); $.each(gateways, function(k, gateway){ gateway = gateway.replace('tcp://', ''); gateway = gateway.replace(':8500', ''); @@ -406,7 +406,7 @@ var start = function(){ }, updateInterval); // Grab relays - relays = relays.sort(); + //relays = relays.sort(); $.each(relays, function(k, relay){ $("select[name=relays]").append($('