mirror of
https://github.com/EDCD/EDDN.git
synced 2025-04-28 22:22:13 +03:00
Monitor: Initial flake8 pass
This commit is contained in:
parent
fee5279c12
commit
5cb2dbed92
@ -1,31 +1,29 @@
|
|||||||
# coding: utf8
|
# coding: utf8
|
||||||
|
|
||||||
"""
|
|
||||||
Monitor sit below gateways, or another relay, and simply parse what it receives over SUB.
|
|
||||||
"""
|
|
||||||
from threading import Thread
|
|
||||||
import argparse
|
import argparse
|
||||||
import zlib
|
|
||||||
import gevent
|
|
||||||
import simplejson
|
|
||||||
import mysql.connector as mariadb
|
|
||||||
import datetime
|
|
||||||
import collections
|
import collections
|
||||||
|
import datetime
|
||||||
|
import zlib
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
import gevent
|
||||||
|
import mysql.connector as mariadb
|
||||||
|
import simplejson
|
||||||
import zmq.green as zmq
|
import zmq.green as zmq
|
||||||
import re
|
from bottle import Bottle, request, response
|
||||||
|
from gevent import monkey
|
||||||
|
|
||||||
from eddn.conf.Settings import Settings, loadConfig
|
from eddn.conf.Settings import Settings, loadConfig
|
||||||
|
|
||||||
from gevent import monkey
|
|
||||||
monkey.patch_all()
|
monkey.patch_all()
|
||||||
from bottle import Bottle, get, request, response, run
|
|
||||||
app = Bottle()
|
app = Bottle()
|
||||||
|
|
||||||
# This import must be done post-monkey-patching!
|
# This import must be done post-monkey-patching!
|
||||||
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
||||||
from eddn.core.DuplicateMessages import DuplicateMessages
|
from eddn.core.DuplicateMessages import DuplicateMessages
|
||||||
duplicateMessages = DuplicateMessages()
|
duplicate_messages = DuplicateMessages()
|
||||||
duplicateMessages.start()
|
duplicate_messages.start()
|
||||||
|
|
||||||
|
|
||||||
def parse_cl_args():
|
def parse_cl_args():
|
||||||
@ -59,13 +57,17 @@ def ping():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/getTotalSoftwares/', method=['OPTIONS', 'GET'])
|
@app.route('/getTotalSoftwares/', method=['OPTIONS', 'GET'])
|
||||||
def getTotalSoftwares():
|
def get_total_softwares():
|
||||||
response.set_header("Access-Control-Allow-Origin", "*")
|
response.set_header("Access-Control-Allow-Origin", "*")
|
||||||
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
|
db = mariadb.connect(
|
||||||
|
user=Settings.MONITOR_DB['user'],
|
||||||
|
password=Settings.MONITOR_DB['password'],
|
||||||
|
database=Settings.MONITOR_DB['database']
|
||||||
|
)
|
||||||
softwares = collections.OrderedDict()
|
softwares = collections.OrderedDict()
|
||||||
|
|
||||||
maxDays = request.GET.get('maxDays', '31').strip()
|
max_days = request.GET.get('maxDays', '31').strip()
|
||||||
maxDays = int(maxDays) - 1
|
max_days = int(max_days) - 1
|
||||||
|
|
||||||
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
|
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
|
||||||
FROM softwares
|
FROM softwares
|
||||||
@ -74,7 +76,7 @@ def getTotalSoftwares():
|
|||||||
ORDER BY total DESC"""
|
ORDER BY total DESC"""
|
||||||
|
|
||||||
results = db.cursor()
|
results = db.cursor()
|
||||||
results.execute(query, (maxDays, ))
|
results.execute(query, (max_days, ))
|
||||||
|
|
||||||
for row in results:
|
for row in results:
|
||||||
softwares[row[0].encode('utf8')] = str(row[1])
|
softwares[row[0].encode('utf8')] = str(row[1])
|
||||||
@ -85,13 +87,17 @@ def getTotalSoftwares():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/getSoftwares/', method=['OPTIONS', 'GET'])
|
@app.route('/getSoftwares/', method=['OPTIONS', 'GET'])
|
||||||
def getSoftwares():
|
def get_softwares():
|
||||||
response.set_header("Access-Control-Allow-Origin", "*")
|
response.set_header("Access-Control-Allow-Origin", "*")
|
||||||
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
|
db = mariadb.connect(
|
||||||
|
user=Settings.MONITOR_DB['user'],
|
||||||
|
password=Settings.MONITOR_DB['password'],
|
||||||
|
database=Settings.MONITOR_DB['database']
|
||||||
|
)
|
||||||
softwares = collections.OrderedDict()
|
softwares = collections.OrderedDict()
|
||||||
|
|
||||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||||
|
|
||||||
query = """SELECT *
|
query = """SELECT *
|
||||||
FROM `softwares`
|
FROM `softwares`
|
||||||
@ -99,14 +105,14 @@ def getSoftwares():
|
|||||||
ORDER BY `hits` DESC, `dateStats` ASC"""
|
ORDER BY `hits` DESC, `dateStats` ASC"""
|
||||||
|
|
||||||
results = db.cursor()
|
results = db.cursor()
|
||||||
results.execute(query, (dateStart, dateEnd))
|
results.execute(query, (date_start, date_end))
|
||||||
|
|
||||||
for row in results:
|
for row in results:
|
||||||
currentDate = row[2].strftime('%Y-%m-%d')
|
current_date = row[2].strftime('%Y-%m-%d')
|
||||||
if not currentDate in softwares.keys():
|
if current_date not in softwares.keys():
|
||||||
softwares[currentDate] = collections.OrderedDict()
|
softwares[current_date] = collections.OrderedDict()
|
||||||
|
|
||||||
softwares[currentDate][str(row[0])] = str(row[1])
|
softwares[current_date][str(row[0])] = str(row[1])
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
@ -114,9 +120,13 @@ def getSoftwares():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/getTotalSchemas/', method=['OPTIONS', 'GET'])
|
@app.route('/getTotalSchemas/', method=['OPTIONS', 'GET'])
|
||||||
def getTotalSchemas():
|
def get_total_schemas():
|
||||||
response.set_header("Access-Control-Allow-Origin", "*")
|
response.set_header("Access-Control-Allow-Origin", "*")
|
||||||
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
|
db = mariadb.connect(
|
||||||
|
user=Settings.MONITOR_DB['user'],
|
||||||
|
password=Settings.MONITOR_DB['password'],
|
||||||
|
database=Settings.MONITOR_DB['database']
|
||||||
|
)
|
||||||
schemas = collections.OrderedDict()
|
schemas = collections.OrderedDict()
|
||||||
|
|
||||||
query = """SELECT `name`, SUM(`hits`) AS `total`
|
query = """SELECT `name`, SUM(`hits`) AS `total`
|
||||||
@ -136,14 +146,17 @@ def getTotalSchemas():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/getSchemas/', method=['OPTIONS', 'GET'])
|
@app.route('/getSchemas/', method=['OPTIONS', 'GET'])
|
||||||
def getSchemas():
|
def get_schemas():
|
||||||
response.set_header("Access-Control-Allow-Origin", "*")
|
response.set_header("Access-Control-Allow-Origin", "*")
|
||||||
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
|
db = mariadb.connect(
|
||||||
#db.text_factory = lambda x: unicode(x, "utf-8", "ignore")
|
user=Settings.MONITOR_DB['user'],
|
||||||
|
password=Settings.MONITOR_DB['password'],
|
||||||
|
database=Settings.MONITOR_DB['database']
|
||||||
|
)
|
||||||
schemas = collections.OrderedDict()
|
schemas = collections.OrderedDict()
|
||||||
|
|
||||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
date_start = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
date_end = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||||
|
|
||||||
query = """SELECT *
|
query = """SELECT *
|
||||||
FROM `schemas`
|
FROM `schemas`
|
||||||
@ -151,14 +164,14 @@ def getSchemas():
|
|||||||
ORDER BY `hits` DESC, `dateStats` ASC"""
|
ORDER BY `hits` DESC, `dateStats` ASC"""
|
||||||
|
|
||||||
results = db.cursor()
|
results = db.cursor()
|
||||||
results.execute(query, (dateStart, dateEnd))
|
results.execute(query, (date_start, date_end))
|
||||||
|
|
||||||
for row in results:
|
for row in results:
|
||||||
currentDate = row[2].strftime('%Y-%m-%d')
|
current_date = row[2].strftime('%Y-%m-%d')
|
||||||
if not currentDate in schemas.keys():
|
if current_date not in schemas.keys():
|
||||||
schemas[currentDate] = collections.OrderedDict()
|
schemas[current_date] = collections.OrderedDict()
|
||||||
|
|
||||||
schemas[currentDate][str(row[0])] = str(row[1])
|
schemas[current_date][str(row[0])] = str(row[1])
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
@ -177,7 +190,11 @@ class Monitor(Thread):
|
|||||||
receiver.connect(binding)
|
receiver.connect(binding)
|
||||||
|
|
||||||
def monitor_worker(message):
|
def monitor_worker(message):
|
||||||
db = mariadb.connect(user=Settings.MONITOR_DB['user'], password=Settings.MONITOR_DB['password'], database=Settings.MONITOR_DB['database'])
|
db = mariadb.connect(
|
||||||
|
user=Settings.MONITOR_DB['user'],
|
||||||
|
password=Settings.MONITOR_DB['password'],
|
||||||
|
database=Settings.MONITOR_DB['database']
|
||||||
|
)
|
||||||
|
|
||||||
# Separate topic from message
|
# Separate topic from message
|
||||||
message = message.split(' |-| ')
|
message = message.split(' |-| ')
|
||||||
@ -189,25 +206,27 @@ class Monitor(Thread):
|
|||||||
message = message[0]
|
message = message[0]
|
||||||
|
|
||||||
message = zlib.decompress(message)
|
message = zlib.decompress(message)
|
||||||
json = simplejson.loads(message)
|
json = simplejson.loads(message)
|
||||||
|
|
||||||
# Default variables
|
# Default variables
|
||||||
schemaID = json['$schemaRef']
|
schema_id = json['$schemaRef']
|
||||||
softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8')
|
software_id = json['header']['softwareName'].encode('utf8') + ' | ' \
|
||||||
|
+ json['header']['softwareVersion'].encode('utf8')
|
||||||
uploaderID = json['header']['uploaderID'].encode('utf8')
|
|
||||||
uploaderIP = None
|
|
||||||
if 'uploaderIP' in json['header']:
|
|
||||||
uploaderIP = json['header']['uploaderIP'].encode('utf8')
|
|
||||||
|
|
||||||
# Duplicates?
|
# Duplicates?
|
||||||
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
||||||
if duplicateMessages.isDuplicated(json):
|
if duplicate_messages.isDuplicated(json):
|
||||||
schemaID = 'DUPLICATE MESSAGE'
|
schema_id = 'DUPLICATE MESSAGE'
|
||||||
|
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, ))
|
c.execute(
|
||||||
c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, ))
|
'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
|
||||||
|
(schema_id, )
|
||||||
|
)
|
||||||
|
c.execute(
|
||||||
|
'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
|
||||||
|
(schema_id, )
|
||||||
|
)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
@ -216,21 +235,33 @@ class Monitor(Thread):
|
|||||||
|
|
||||||
# Update software count
|
# Update software count
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
c.execute('UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (softwareID, ))
|
c.execute(
|
||||||
c.execute('INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (softwareID, ))
|
'UPDATE `softwares` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
|
||||||
|
(software_id, )
|
||||||
|
)
|
||||||
|
c.execute(
|
||||||
|
'INSERT IGNORE INTO `softwares` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
|
||||||
|
(software_id, )
|
||||||
|
)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
# Update schemas count
|
# Update schemas count
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
c.execute('UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()', (schemaID, ))
|
c.execute(
|
||||||
c.execute('INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())', (schemaID, ))
|
'UPDATE `schemas` SET `hits` = `hits` + 1 WHERE `name` = %s AND `dateStats` = UTC_DATE()',
|
||||||
|
(schema_id, )
|
||||||
|
)
|
||||||
|
c.execute(
|
||||||
|
'INSERT IGNORE INTO `schemas` (`name`, `dateStats`) VALUES (%s, UTC_DATE())',
|
||||||
|
(schema_id, )
|
||||||
|
)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
inboundMessage = receiver.recv()
|
inbound_message = receiver.recv()
|
||||||
gevent.spawn(monitor_worker, inboundMessage)
|
gevent.spawn(monitor_worker, inbound_message)
|
||||||
|
|
||||||
|
|
||||||
class EnableCors(object):
|
class EnableCors(object):
|
||||||
@ -249,7 +280,8 @@ class EnableCors(object):
|
|||||||
"""Set CORS Headers."""
|
"""Set CORS Headers."""
|
||||||
response.headers['Access-Control-Allow-Origin'] = '*'
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
||||||
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
|
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
|
||||||
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
|
response.headers['Access-Control-Allow-Headers'] = \
|
||||||
|
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
|
||||||
|
|
||||||
if request.method != 'OPTIONS':
|
if request.method != 'OPTIONS':
|
||||||
# actual request; reply with the actual response
|
# actual request; reply with the actual response
|
||||||
@ -257,7 +289,7 @@ class EnableCors(object):
|
|||||||
|
|
||||||
return _enable_cors
|
return _enable_cors
|
||||||
|
|
||||||
def main():
|
def main() -> None:
|
||||||
cl_args = parse_cl_args()
|
cl_args = parse_cl_args()
|
||||||
loadConfig(cl_args)
|
loadConfig(cl_args)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user