diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index 19f3980..dc0af02 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -55,7 +55,7 @@ def push_message(string_message, topic): """ # Push a zlib compressed JSON representation of the message to - # announcers with schema as topic + # announcers with schema as topic compressed_msg = zlib.compress(string_message) sender.send("%s |-| %s" % (topic, compressed_msg)) statsCollector.tally("outbound") diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index cf046a9..2c3a9fa 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -17,11 +17,13 @@ from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() +# This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() duplicateMessages.start() + def date(__format): d = datetime.datetime.utcnow() return d.strftime(__format) @@ -30,211 +32,212 @@ def date(__format): @get('/getTotalSoftwares/') def getTotalSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - softwares = collections.OrderedDict() - - maxDays = request.GET.get('maxDays', '31').strip() - maxDays = int(maxDays) -1; - - query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate - FROM softwares - GROUP BY name - HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') - ORDER BY total DESC""" - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + softwares = collections.OrderedDict() + + maxDays = request.GET.get('maxDays', '31').strip() + maxDays = int(maxDays) - 1 + + query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate + FROM softwares + GROUP BY name + HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') + ORDER BY total DESC""" + results = db.execute(query) + for row in results: softwares[row[0].encode('utf8')] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + + @get('/getSoftwares/') def getSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - softwares = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM softwares - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + softwares = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM softwares + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not str(row[2].encode('utf8')) in softwares.keys(): softwares[row[2].encode('utf8')] = collections.OrderedDict() - + softwares[row[2].encode('utf8')][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + + @get('/getTotalUploaders/') def getTotalUploaders(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - uploaders = collections.OrderedDict() - - limit = request.GET.get('limit', '20').strip() - - query = """SELECT name, SUM(hits) AS total - FROM uploaders - GROUP BY name - ORDER BY total DESC - LIMIT """ + limit - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + uploaders = collections.OrderedDict() + + limit = request.GET.get('limit', '20').strip() + + query = """SELECT name, SUM(hits) AS total + FROM uploaders + GROUP BY name + ORDER BY total DESC + LIMIT """ + limit + results = db.execute(query) + for row in results: uploaders[row[0].encode('utf8')] = row[1] - + db.close() - + return simplejson.dumps(uploaders) - + + @get('/getUploaders/') def getUploaders(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - uploaders = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM uploaders - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + uploaders = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM uploaders + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not row[2].encode('utf8') in uploaders.keys(): uploaders[row[2].encode('utf8')] = collections.OrderedDict() - + uploaders[row[2]][row[0].encode('utf8')] = row[1] - + db.close() - + return simplejson.dumps(uploaders) - + + @get('/getTotalSchemas/') def getTotalSchemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - schemas = collections.OrderedDict() - - query = """SELECT name, SUM(hits) AS total - FROM schemas - GROUP BY name - ORDER BY total DESC""" - results = db.execute(query) - + db = sqlite3.connect(Settings.MONITOR_DB) + schemas = collections.OrderedDict() + + query = """SELECT name, SUM(hits) AS total + FROM schemas + GROUP BY name + ORDER BY total DESC""" + results = db.execute(query) + for row in results: schemas[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) - + + @get('/getSchemas/') def getSchemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = sqlite3.connect(Settings.MONITOR_DB) - schemas = collections.OrderedDict() - - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - - query = """SELECT * - FROM schemas - WHERE dateStats BETWEEN ? AND ? - ORDER BY hits DESC, dateStats ASC""" - results = db.execute(query, (dateStart, dateEnd)) - + db = sqlite3.connect(Settings.MONITOR_DB) + schemas = collections.OrderedDict() + + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + + query = """SELECT * + FROM schemas + WHERE dateStats BETWEEN ? AND ? + ORDER BY hits DESC, dateStats ASC""" + results = db.execute(query, (dateStart, dateEnd)) + for row in results: if not str(row[2]) in schemas.keys(): schemas[str(row[2])] = collections.OrderedDict() - - schemas[str(row[2])][str(row[0])] = str(row[1]) - - db.close() - - return simplejson.dumps(schemas) + schemas[str(row[2])][str(row[0])] = str(row[1]) + + db.close() + + return simplejson.dumps(schemas) class Monitor(Thread): def run(self): - context = zmq.Context() - + context = zmq.Context() + receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SUBSCRIBE, '') - + for binding in Settings.MONITOR_RECEIVER_BINDINGS: receiver.connect(binding) def monitor_worker(message): - db = sqlite3.connect(Settings.MONITOR_DB) - + db = sqlite3.connect(Settings.MONITOR_DB) + # Separate topic from message message = message.split(' |-| ') - + # Handle gateway not sending topic if len(message) > 1: message = message[1] else: message = message[0] - if Settings.RELAY_DUPLICATE_MAX_MINUTES: if duplicateMessages.isDuplicated(message): schemaID = 'DUPLICATE MESSAGE' - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + return - + if Settings.MONITOR_DECOMPRESS_MESSAGES: message = zlib.decompress(message) - - json = simplejson.loads(message) - + + json = simplejson.loads(message) + # Update software count softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8') - + c = db.cursor() c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, )) c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, )) db.commit() - - + # Update uploader count uploaderID = json['header']['uploaderID'].encode('utf8') - - if uploaderID: # Don't get empty uploaderID + + if uploaderID: # Don't get empty uploaderID c = db.cursor() c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, )) c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, )) db.commit() - - - # Update schemas count + + # Update schemas count schemaID = json['$schemaRef'] - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + db.close() while True: diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 209649b..58ea46b 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -20,11 +20,12 @@ from eddn.conf.Settings import Settings, loadConfig from gevent import monkey monkey.patch_all() +# This import must be done post-monkey-patching! from eddn.core.StatsCollector import StatsCollector - statsCollector = StatsCollector() statsCollector.start() +# This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages duplicateMessages = DuplicateMessages() @@ -71,11 +72,11 @@ class Relay(Thread): def relay_worker(message): """ - This is the worker function that re-sends the incoming messages out - to any subscribers. - :param str message: A JSON string to re-broadcast. + This is the worker function that re-sends the incoming messages out + to any subscribers. + :param str message: A JSON string to re-broadcast. """ - # Separate topic from message + # Separate topic from message message = message.split(' |-| ') # Handle gateway not sending topic diff --git a/src/eddn/conf/Settings.py b/src/eddn/conf/Settings.py index 7e08307..e5f211b 100644 --- a/src/eddn/conf/Settings.py +++ b/src/eddn/conf/Settings.py @@ -13,7 +13,7 @@ class _Settings(object): # Relay settings ############################################################################### - #RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"] + # RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"] RELAY_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] RELAY_SENDER_BINDINGS = ["tcp://*:9500"] @@ -52,11 +52,10 @@ class _Settings(object): MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] - MONITOR_DB = "/home/EDDN_Monitor.s3db" + MONITOR_DB = "/home/EDDN_Monitor.s3db" MONITOR_DECOMPRESS_MESSAGES = True - def loadFrom(self, fileName): f = open(fileName, 'r') conf = simplejson.load(f) diff --git a/src/eddn/core/DuplicateMessages.py b/src/eddn/core/DuplicateMessages.py index 45f26b2..3bb2e2b 100644 --- a/src/eddn/core/DuplicateMessages.py +++ b/src/eddn/core/DuplicateMessages.py @@ -12,7 +12,7 @@ from eddn.conf.Settings import Settings, loadConfig class DuplicateMessages(Thread): max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES - caches = {} + caches = {} lock = Lock()