diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index a31853f..5b88fe3 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -1,31 +1,29 @@ # coding: utf8 -""" -Monitor sit below gateways, or another relay, and simply parse what it receives over SUB. -""" -from threading import Thread import argparse -import zlib -import gevent -import simplejson -import mysql.connector as mariadb -import datetime import collections +import datetime +import zlib +from threading import Thread + +import gevent +import mysql.connector as mariadb +import simplejson import zmq.green as zmq -import re +from bottle import Bottle, request, response +from gevent import monkey from eddn.conf.Settings import Settings, loadConfig -from gevent import monkey monkey.patch_all() -from bottle import Bottle, get, request, response, run + app = Bottle() # This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages - duplicateMessages = DuplicateMessages() - duplicateMessages.start() + duplicate_messages = DuplicateMessages() + duplicate_messages.start() def parse_cl_args(): @@ -59,13 +57,17 @@ def ping(): @app.route('/getTotalSoftwares/', method=['OPTIONS', 'GET']) -def getTotalSoftwares(): +def get_total_softwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database']) + db = mariadb.connect( + user=Settings.MONITOR_DB['user'], + password=Settings.MONITOR_DB['password'], + database=Settings.MONITOR_DB['database'] + ) softwares = collections.OrderedDict() - maxDays = request.GET.get('maxDays', '31').strip() - maxDays = int(maxDays) - 1 + max_days = request.GET.get('maxDays', '31').strip() + max_days = int(max_days) - 1 query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate FROM softwares @@ -74,7 +76,7 @@ def getTotalSoftwares(): ORDER BY total DESC""" results = db.cursor() - results.execute(query, (maxDays, )) + results.execute(query, (max_days, )) for row in results: softwares[row[0].encode('utf8')] = str(row[1]) @@ -85,13 +87,17 @@ def getTotalSoftwares(): @app.route('/getSoftwares/', method=['OPTIONS', 'GET']) -def getSoftwares(): +def get_softwares(): response.set_header("Access-Control-Allow-Origin", "*") - db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database']) + db = mariadb.connect( + user=Settings.MONITOR_DB['user'], + password=Settings.MONITOR_DB['password'], + database=Settings.MONITOR_DB['database'] + ) softwares = collections.OrderedDict() - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() query = """SELECT * FROM `softwares` @@ -99,14 +105,14 @@ def getSoftwares(): ORDER BY `hits` DESC, `dateStats` ASC""" results = db.cursor() - results.execute(query, (dateStart, dateEnd)) + results.execute(query, (date_start, date_end)) for row in results: - currentDate = row[2].strftime('%Y-%m-%d') - if not currentDate in softwares.keys(): - softwares[currentDate] = collections.OrderedDict() + current_date = row[2].strftime('%Y-%m-%d') + if current_date not in softwares.keys(): + softwares[current_date] = collections.OrderedDict() - softwares[currentDate][str(row[0])] = str(row[1]) + softwares[current_date][str(row[0])] = str(row[1]) db.close() @@ -114,9 +120,13 @@ def getSoftwares(): @app.route('/getTotalSchemas/', method=['OPTIONS', 'GET']) -def getTotalSchemas(): +def get_total_schemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database']) + db = mariadb.connect( + user=Settings.MONITOR_DB['user'], + password=Settings.MONITOR_DB['password'], + database=Settings.MONITOR_DB['database'] + ) schemas = collections.OrderedDict() query = """SELECT `name`, SUM(`hits`) AS `total` @@ -136,14 +146,17 @@ def getTotalSchemas(): @app.route('/getSchemas/', method=['OPTIONS', 'GET']) -def getSchemas(): +def get_schemas(): response.set_header("Access-Control-Allow-Origin", "*") - db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database']) - #db.text_factory = lambda x: unicode(x, "utf-8", "ignore") + db = mariadb.connect( + user=Settings.MONITOR_DB['user'], + password=Settings.MONITOR_DB['password'], + database=Settings.MONITOR_DB['database'] + ) schemas = collections.OrderedDict() - dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() - dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() + date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() + date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() query = """SELECT * FROM `schemas` @@ -151,14 +164,14 @@ def getSchemas(): ORDER BY `hits` DESC, `dateStats` ASC""" results = db.cursor() - results.execute(query, (dateStart, dateEnd)) + results.execute(query, (date_start, date_end)) for row in results: - currentDate = row[2].strftime('%Y-%m-%d') - if not currentDate in schemas.keys(): - schemas[currentDate] = collections.OrderedDict() + current_date = row[2].strftime('%Y-%m-%d') + if current_date not in schemas.keys(): + schemas[current_date] = collections.OrderedDict() - schemas[currentDate][str(row[0])] = str(row[1]) + schemas[current_date][str(row[0])] = str(row[1]) db.close() @@ -177,7 +190,11 @@ class Monitor(Thread): receiver.connect(binding) def monitor_worker(message): - db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database']) + db = mariadb.connect( + user=Settings.MONITOR_DB['user'], + password=Settings.MONITOR_DB['password'], + database=Settings.MONITOR_DB['database'] + ) # Separate topic from message message = message.split(' |-| ') @@ -189,25 +206,27 @@ class Monitor(Thread): message = message[0] message = zlib.decompress(message) - json = simplejson.loads(message) + json = simplejson.loads(message) # Default variables - schemaID = json['$schemaRef'] - softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8') - - uploaderID = json['header']['uploaderID'].encode('utf8') - uploaderIP = None - if 'uploaderIP' in json['header']: - uploaderIP = json['header']['uploaderIP'].encode('utf8') + schema_id = json['$schemaRef'] + software_id = json['header']['softwareName'].encode('utf8') + ' | ' \ + + json['header']['softwareVersion'].encode('utf8') # Duplicates? if Settings.RELAY_DUPLICATE_MAX_MINUTES: - if duplicateMessages.isDuplicated(json): - schemaID = 'DUPLICATE MESSAGE' + if duplicate_messages.isDuplicated(json): + schema_id = 'DUPLICATE MESSAGE' c = db.cursor() - c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, )) - c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, )) + c.execute( + 'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', + (schema_id, ) + ) + c.execute( + 'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', + (schema_id, ) + ) db.commit() db.close() @@ -216,21 +235,33 @@ class Monitor(Thread): # Update software count c = db.cursor() - c.execute('UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (softwareID, )) - c.execute('INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (softwareID, )) + c.execute( + 'UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', + (software_id, ) + ) + c.execute( + 'INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', + (software_id, ) + ) db.commit() # Update schemas count c = db.cursor() - c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, )) - c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, )) + c.execute( + 'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', + (schema_id, ) + ) + c.execute( + 'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', + (schema_id, ) + ) db.commit() db.close() while True: - inboundMessage = receiver.recv() - gevent.spawn(monitor_worker, inboundMessage) + inbound_message = receiver.recv() + gevent.spawn(monitor_worker, inbound_message) class EnableCors(object): @@ -249,7 +280,8 @@ class EnableCors(object): """Set CORS Headers.""" response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' - response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' + response.headers['Access-Control-Allow-Headers'] = \ + 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' if request.method != 'OPTIONS': # actual request; reply with the actual response @@ -257,7 +289,7 @@ class EnableCors(object): return _enable_cors -def main(): +def main() -> None: cl_args = parse_cl_args() loadConfig(cl_args)