mirror of
https://github.com/EDCD/EDDN.git
synced 2025-04-21 02:57:38 +03:00
Fix PEP8
This commit is contained in:
parent
2f72fe84fe
commit
ceaac2be74
@ -55,7 +55,7 @@ def push_message(string_message, topic):
|
||||
"""
|
||||
|
||||
# Push a zlib compressed JSON representation of the message to
|
||||
# announcers with schema as topic
|
||||
# announcers with schema as topic
|
||||
compressed_msg = zlib.compress(string_message)
|
||||
sender.send("%s |-| %s" % (topic, compressed_msg))
|
||||
statsCollector.tally("outbound")
|
||||
|
@ -17,11 +17,13 @@ from eddn.conf.Settings import Settings, loadConfig
|
||||
from gevent import monkey
|
||||
monkey.patch_all()
|
||||
|
||||
# This import must be done post-monkey-patching!
|
||||
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
||||
from eddn.core.DuplicateMessages import DuplicateMessages
|
||||
duplicateMessages = DuplicateMessages()
|
||||
duplicateMessages.start()
|
||||
|
||||
|
||||
def date(__format):
|
||||
d = datetime.datetime.utcnow()
|
||||
return d.strftime(__format)
|
||||
@ -30,211 +32,212 @@ def date(__format):
|
||||
@get('/getTotalSoftwares/')
|
||||
def getTotalSoftwares():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
softwares = collections.OrderedDict()
|
||||
|
||||
maxDays = request.GET.get('maxDays', '31').strip()
|
||||
maxDays = int(maxDays) -1;
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
|
||||
FROM softwares
|
||||
GROUP BY name
|
||||
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
|
||||
ORDER BY total DESC"""
|
||||
results = db.execute(query)
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
softwares = collections.OrderedDict()
|
||||
|
||||
maxDays = request.GET.get('maxDays', '31').strip()
|
||||
maxDays = int(maxDays) - 1
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
|
||||
FROM softwares
|
||||
GROUP BY name
|
||||
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
|
||||
ORDER BY total DESC"""
|
||||
results = db.execute(query)
|
||||
|
||||
for row in results:
|
||||
softwares[row[0].encode('utf8')] = str(row[1])
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
return simplejson.dumps(softwares)
|
||||
|
||||
|
||||
|
||||
@get('/getSoftwares/')
|
||||
def getSoftwares():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
softwares = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM softwares
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
softwares = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM softwares
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
for row in results:
|
||||
if not str(row[2].encode('utf8')) in softwares.keys():
|
||||
softwares[row[2].encode('utf8')] = collections.OrderedDict()
|
||||
|
||||
|
||||
softwares[row[2].encode('utf8')][str(row[0])] = str(row[1])
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
return simplejson.dumps(softwares)
|
||||
|
||||
|
||||
|
||||
@get('/getTotalUploaders/')
|
||||
def getTotalUploaders():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
uploaders = collections.OrderedDict()
|
||||
|
||||
limit = request.GET.get('limit', '20').strip()
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total
|
||||
FROM uploaders
|
||||
GROUP BY name
|
||||
ORDER BY total DESC
|
||||
LIMIT """ + limit
|
||||
results = db.execute(query)
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
uploaders = collections.OrderedDict()
|
||||
|
||||
limit = request.GET.get('limit', '20').strip()
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total
|
||||
FROM uploaders
|
||||
GROUP BY name
|
||||
ORDER BY total DESC
|
||||
LIMIT """ + limit
|
||||
results = db.execute(query)
|
||||
|
||||
for row in results:
|
||||
uploaders[row[0].encode('utf8')] = row[1]
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
return simplejson.dumps(uploaders)
|
||||
|
||||
|
||||
|
||||
@get('/getUploaders/')
|
||||
def getUploaders():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
uploaders = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM uploaders
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
uploaders = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM uploaders
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
for row in results:
|
||||
if not row[2].encode('utf8') in uploaders.keys():
|
||||
uploaders[row[2].encode('utf8')] = collections.OrderedDict()
|
||||
|
||||
|
||||
uploaders[row[2]][row[0].encode('utf8')] = row[1]
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
return simplejson.dumps(uploaders)
|
||||
|
||||
|
||||
|
||||
@get('/getTotalSchemas/')
|
||||
def getTotalSchemas():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
schemas = collections.OrderedDict()
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total
|
||||
FROM schemas
|
||||
GROUP BY name
|
||||
ORDER BY total DESC"""
|
||||
results = db.execute(query)
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
schemas = collections.OrderedDict()
|
||||
|
||||
query = """SELECT name, SUM(hits) AS total
|
||||
FROM schemas
|
||||
GROUP BY name
|
||||
ORDER BY total DESC"""
|
||||
results = db.execute(query)
|
||||
|
||||
for row in results:
|
||||
schemas[str(row[0])] = str(row[1])
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
|
||||
return simplejson.dumps(schemas)
|
||||
|
||||
|
||||
|
||||
@get('/getSchemas/')
|
||||
def getSchemas():
|
||||
response.set_header("Access-Control-Allow-Origin", "*")
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
schemas = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM schemas
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
schemas = collections.OrderedDict()
|
||||
|
||||
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
|
||||
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
|
||||
|
||||
query = """SELECT *
|
||||
FROM schemas
|
||||
WHERE dateStats BETWEEN ? AND ?
|
||||
ORDER BY hits DESC, dateStats ASC"""
|
||||
results = db.execute(query, (dateStart, dateEnd))
|
||||
|
||||
for row in results:
|
||||
if not str(row[2]) in schemas.keys():
|
||||
schemas[str(row[2])] = collections.OrderedDict()
|
||||
|
||||
schemas[str(row[2])][str(row[0])] = str(row[1])
|
||||
|
||||
db.close()
|
||||
|
||||
return simplejson.dumps(schemas)
|
||||
|
||||
schemas[str(row[2])][str(row[0])] = str(row[1])
|
||||
|
||||
db.close()
|
||||
|
||||
return simplejson.dumps(schemas)
|
||||
|
||||
|
||||
class Monitor(Thread):
|
||||
|
||||
def run(self):
|
||||
context = zmq.Context()
|
||||
|
||||
context = zmq.Context()
|
||||
|
||||
receiver = context.socket(zmq.SUB)
|
||||
receiver.setsockopt(zmq.SUBSCRIBE, '')
|
||||
|
||||
|
||||
for binding in Settings.MONITOR_RECEIVER_BINDINGS:
|
||||
receiver.connect(binding)
|
||||
|
||||
def monitor_worker(message):
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
|
||||
db = sqlite3.connect(Settings.MONITOR_DB)
|
||||
|
||||
# Separate topic from message
|
||||
message = message.split(' |-| ')
|
||||
|
||||
|
||||
# Handle gateway not sending topic
|
||||
if len(message) > 1:
|
||||
message = message[1]
|
||||
else:
|
||||
message = message[0]
|
||||
|
||||
|
||||
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
||||
if duplicateMessages.isDuplicated(message):
|
||||
schemaID = 'DUPLICATE MESSAGE'
|
||||
|
||||
|
||||
c = db.cursor()
|
||||
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
|
||||
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
|
||||
db.commit()
|
||||
|
||||
|
||||
return
|
||||
|
||||
|
||||
if Settings.MONITOR_DECOMPRESS_MESSAGES:
|
||||
message = zlib.decompress(message)
|
||||
|
||||
json = simplejson.loads(message)
|
||||
|
||||
|
||||
json = simplejson.loads(message)
|
||||
|
||||
# Update software count
|
||||
softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8')
|
||||
|
||||
|
||||
c = db.cursor()
|
||||
c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, ))
|
||||
c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, ))
|
||||
db.commit()
|
||||
|
||||
|
||||
|
||||
# Update uploader count
|
||||
uploaderID = json['header']['uploaderID'].encode('utf8')
|
||||
|
||||
if uploaderID: # Don't get empty uploaderID
|
||||
|
||||
if uploaderID: # Don't get empty uploaderID
|
||||
c = db.cursor()
|
||||
c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, ))
|
||||
c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, ))
|
||||
db.commit()
|
||||
|
||||
|
||||
# Update schemas count
|
||||
|
||||
# Update schemas count
|
||||
schemaID = json['$schemaRef']
|
||||
|
||||
|
||||
c = db.cursor()
|
||||
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
|
||||
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
|
||||
db.commit()
|
||||
|
||||
|
||||
db.close()
|
||||
|
||||
while True:
|
||||
|
@ -20,11 +20,12 @@ from eddn.conf.Settings import Settings, loadConfig
|
||||
from gevent import monkey
|
||||
monkey.patch_all()
|
||||
|
||||
# This import must be done post-monkey-patching!
|
||||
from eddn.core.StatsCollector import StatsCollector
|
||||
|
||||
statsCollector = StatsCollector()
|
||||
statsCollector.start()
|
||||
|
||||
# This import must be done post-monkey-patching!
|
||||
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
|
||||
from eddn.core.DuplicateMessages import DuplicateMessages
|
||||
duplicateMessages = DuplicateMessages()
|
||||
@ -71,11 +72,11 @@ class Relay(Thread):
|
||||
|
||||
def relay_worker(message):
|
||||
"""
|
||||
This is the worker function that re-sends the incoming messages out
|
||||
to any subscribers.
|
||||
:param str message: A JSON string to re-broadcast.
|
||||
This is the worker function that re-sends the incoming messages out
|
||||
to any subscribers.
|
||||
:param str message: A JSON string to re-broadcast.
|
||||
"""
|
||||
# Separate topic from message
|
||||
# Separate topic from message
|
||||
message = message.split(' |-| ')
|
||||
|
||||
# Handle gateway not sending topic
|
||||
|
@ -13,7 +13,7 @@ class _Settings(object):
|
||||
# Relay settings
|
||||
###############################################################################
|
||||
|
||||
#RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"]
|
||||
# RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"]
|
||||
RELAY_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
|
||||
|
||||
RELAY_SENDER_BINDINGS = ["tcp://*:9500"]
|
||||
@ -52,11 +52,10 @@ class _Settings(object):
|
||||
|
||||
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
|
||||
|
||||
MONITOR_DB = "/home/EDDN_Monitor.s3db"
|
||||
MONITOR_DB = "/home/EDDN_Monitor.s3db"
|
||||
|
||||
MONITOR_DECOMPRESS_MESSAGES = True
|
||||
|
||||
|
||||
def loadFrom(self, fileName):
|
||||
f = open(fileName, 'r')
|
||||
conf = simplejson.load(f)
|
||||
|
@ -12,7 +12,7 @@ from eddn.conf.Settings import Settings, loadConfig
|
||||
class DuplicateMessages(Thread):
|
||||
max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES
|
||||
|
||||
caches = {}
|
||||
caches = {}
|
||||
|
||||
lock = Lock()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user