Monitor: Initial flake8 pass

This commit is contained in:
Athanasius 2022-03-12 10:51:17 +00:00
parent 3b3e1a91c5
commit b37307e6a2
No known key found for this signature in database
GPG Key ID: 8C392035DD80FD62

View File

@ -1,31 +1,29 @@
# coding: utf8
"""
Monitor sit below gateways, or another relay, and simply parse what it receives over SUB.
"""
from threading import Thread
import argparse
import zlib
import gevent
import simplejson
import mysql.connector as mariadb
import datetime
import collections
import datetime
import zlib
from threading import Thread
import gevent
import mysql.connector as mariadb
import simplejson
import zmq.green as zmq
import re
from bottle import Bottle, request, response
from gevent import monkey
from eddn.conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
from bottle import Bottle, get, request, response, run
app = Bottle()
# This import must be done post-monkey-patching!
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn.core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
duplicate_messages = DuplicateMessages()
duplicate_messages.start()
def parse_cl_args():
@ -59,13 +57,17 @@ def ping():
@app.route('/getTotalSoftwares/', method=['OPTIONS', 'GET'])
def getTotalSoftwares():
def get_total_softwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
db = mariadb.connect(
user=Settings.MONITOR_DB['user'],
password=Settings.MONITOR_DB['password'],
database=Settings.MONITOR_DB['database']
)
softwares = collections.OrderedDict()
maxDays = request.GET.get('maxDays', '31').strip()
maxDays = int(maxDays) - 1
max_days = request.GET.get('maxDays', '31').strip()
max_days = int(max_days) - 1
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
FROM softwares
@ -74,7 +76,7 @@ def getTotalSoftwares():
ORDER BY total DESC"""
results = db.cursor()
results.execute(query, (maxDays, ))
results.execute(query, (max_days, ))
for row in results:
softwares[row[0].encode('utf8')] = str(row[1])
@ -85,13 +87,17 @@ def getTotalSoftwares():
@app.route('/getSoftwares/', method=['OPTIONS', 'GET'])
def getSoftwares():
def get_softwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
db = mariadb.connect(
user=Settings.MONITOR_DB['user'],
password=Settings.MONITOR_DB['password'],
database=Settings.MONITOR_DB['database']
)
softwares = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM `softwares`
@ -99,14 +105,14 @@ def getSoftwares():
ORDER BY `hits` DESC, `dateStats` ASC"""
results = db.cursor()
results.execute(query, (dateStart, dateEnd))
results.execute(query, (date_start, date_end))
for row in results:
currentDate = row[2].strftime('%Y-%m-%d')
if not currentDate in softwares.keys():
softwares[currentDate] = collections.OrderedDict()
current_date = row[2].strftime('%Y-%m-%d')
if current_date not in softwares.keys():
softwares[current_date] = collections.OrderedDict()
softwares[currentDate][str(row[0])] = str(row[1])
softwares[current_date][str(row[0])] = str(row[1])
db.close()
@ -114,9 +120,13 @@ def getSoftwares():
@app.route('/getTotalSchemas/', method=['OPTIONS', 'GET'])
def getTotalSchemas():
def get_total_schemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
db = mariadb.connect(
user=Settings.MONITOR_DB['user'],
password=Settings.MONITOR_DB['password'],
database=Settings.MONITOR_DB['database']
)
schemas = collections.OrderedDict()
query = """SELECT `name`, SUM(`hits`) AS `total`
@ -136,14 +146,17 @@ def getTotalSchemas():
@app.route('/getSchemas/', method=['OPTIONS', 'GET'])
def getSchemas():
def get_schemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
#db.text_factory = lambda x: unicode(x, "utf-8", "ignore")
db = mariadb.connect(
user=Settings.MONITOR_DB['user'],
password=Settings.MONITOR_DB['password'],
database=Settings.MONITOR_DB['database']
)
schemas = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM `schemas`
@ -151,14 +164,14 @@ def getSchemas():
ORDER BY `hits` DESC, `dateStats` ASC"""
results = db.cursor()
results.execute(query, (dateStart, dateEnd))
results.execute(query, (date_start, date_end))
for row in results:
currentDate = row[2].strftime('%Y-%m-%d')
if not currentDate in schemas.keys():
schemas[currentDate] = collections.OrderedDict()
current_date = row[2].strftime('%Y-%m-%d')
if current_date not in schemas.keys():
schemas[current_date] = collections.OrderedDict()
schemas[currentDate][str(row[0])] = str(row[1])
schemas[current_date][str(row[0])] = str(row[1])
db.close()
@ -177,7 +190,11 @@ class Monitor(Thread):
receiver.connect(binding)
def monitor_worker(message):
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
db = mariadb.connect(
user=Settings.MONITOR_DB['user'],
password=Settings.MONITOR_DB['password'],
database=Settings.MONITOR_DB['database']
)
# Separate topic from message
message = message.split(' |-| ')
@ -189,25 +206,27 @@ class Monitor(Thread):
message = message[0]
message = zlib.decompress(message)
json = simplejson.loads(message)
json = simplejson.loads(message)
# Default variables
schemaID = json['$schemaRef']
softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8')
uploaderID = json['header']['uploaderID'].encode('utf8')
uploaderIP = None
if 'uploaderIP' in json['header']:
uploaderIP = json['header']['uploaderIP'].encode('utf8')
schema_id = json['$schemaRef']
software_id = json['header']['softwareName'].encode('utf8') + ' | ' \
+ json['header']['softwareVersion'].encode('utf8')
# Duplicates?
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(json):
schemaID = 'DUPLICATE MESSAGE'
if duplicate_messages.isDuplicated(json):
schema_id = 'DUPLICATE MESSAGE'
c = db.cursor()
c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, ))
c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, ))
c.execute(
'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
(schema_id, )
)
c.execute(
'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
(schema_id, )
)
db.commit()
db.close()
@ -216,21 +235,33 @@ class Monitor(Thread):
# Update software count
c = db.cursor()
c.execute('UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (softwareID, ))
c.execute('INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (softwareID, ))
c.execute(
'UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
(software_id, )
)
c.execute(
'INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
(software_id, )
)
db.commit()
# Update schemas count
c = db.cursor()
c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, ))
c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, ))
c.execute(
'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
(schema_id, )
)
c.execute(
'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
(schema_id, )
)
db.commit()
db.close()
while True:
inboundMessage = receiver.recv()
gevent.spawn(monitor_worker, inboundMessage)
inbound_message = receiver.recv()
gevent.spawn(monitor_worker, inbound_message)
class EnableCors(object):
@ -249,7 +280,8 @@ class EnableCors(object):
"""Set CORS Headers."""
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
response.headers['Access-Control-Allow-Headers'] = \
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
if request.method != 'OPTIONS':
# actual request; reply with the actual response
@ -257,7 +289,7 @@ class EnableCors(object):
return _enable_cors
def main():
def main() -> None:
cl_args = parse_cl_args()
loadConfig(cl_args)