Removed spaces on empty lines

This commit is contained in:
AnthorNet 2015-06-05 15:23:46 +02:00
parent dc9a71df3a
commit 72f3f046d0
4 changed files with 74 additions and 81 deletions

View File

@ -30,138 +30,138 @@ def getTotalSoftwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
maxDays = request.GET.get('maxDays', '31').strip()
maxDays = int(maxDays) -1;
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
FROM softwares
GROUP BY name
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
ORDER BY total DESC"""
results = db.execute(query)
for row in results:
softwares[str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(softwares)
@get('/getSoftwares/')
def getSoftwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM softwares
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in softwares.keys():
softwares[str(row[2])] = collections.OrderedDict()
softwares[str(row[2])][str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(softwares)
@get('/getTotalUploaders/')
def getTotalUploaders():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
limit = request.GET.get('limit', '20').strip()
query = """SELECT name, SUM(hits) AS total
FROM uploaders
GROUP BY name
ORDER BY total DESC
LIMIT """ + limit
results = db.execute(query)
for row in results:
uploaders[str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(uploaders)
@get('/getUploaders/')
def getUploaders():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM uploaders
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in uploaders.keys():
uploaders[str(row[2])] = collections.OrderedDict()
uploaders[str(row[2])][str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(uploaders)
@get('/getTotalSchemas/')
def getTotalSchemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
query = """SELECT name, SUM(hits) AS total
FROM schemas
GROUP BY name
ORDER BY total DESC"""
results = db.execute(query)
for row in results:
schemas[str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(schemas)
@get('/getSchemas/')
def getSchemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM schemas
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in schemas.keys():
schemas[str(row[2])] = collections.OrderedDict()
schemas[str(row[2])][str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(schemas)
@ -170,69 +170,66 @@ class Monitor(Thread):
def run(self):
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.MONITOR_RECEIVER_BINDINGS:
receiver.connect(binding)
def monitor_worker(message):
db = sqlite3.connect(Settings.MONITOR_DB)
# Separate topic from message
message = message.split(' |-| ')
# Handle gateway not sending topic
if len(message) > 1:
message = message[1]
else:
message = message[0]
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(message):
schemaID = 'DUPLICATE MESSAGE'
c = db.cursor()
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
db.commit()
return
if Settings.MONITOR_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)
json = simplejson.loads(message)
# Update software count
softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion']
c = db.cursor()
c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, ))
c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, ))
db.commit()
# Update uploader count
uploaderID = json['header']['uploaderID']
if uploaderID: # Don't get empty uploaderID
c = db.cursor()
c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, ))
c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, ))
db.commit()
# Update schemas count
schemaID = json['$schemaRef']
c = db.cursor()
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
db.commit()
db.close()
while True:

View File

@ -47,23 +47,22 @@ class Relay(Thread):
context = zmq.Context()
receiver = context.socket(zmq.SUB)
# Filters on topics or not...
if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON == True:
if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON is True:
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
else:
receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.RELAY_RECEIVER_BINDINGS:
# Relays bind upstream to an Announcer, or another Relay.
receiver.connect(binding)
sender = context.socket(zmq.PUB)
for binding in Settings.RELAY_SENDER_BINDINGS:
# End users, or other relays, may attach here.
sender.bind(binding)
@ -76,7 +75,7 @@ class Relay(Thread):
"""
# Separate topic from message
message = message.split(' |-| ')
# Handle gateway not sending topic
if len(message) > 1:
message = message[1]

View File

@ -23,16 +23,14 @@ class _Settings(object):
RELAY_SENDER_BINDINGS = ["tcp://*:9500"]
RELAY_DECOMPRESS_MESSAGES = False
# If set to False, no deduplicate is made
RELAY_DUPLICATE_MAX_MINUTES = 15
# If set to false, don't listen to topic and accept all incoming messages
RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = True
RELAY_EXTRA_JSON_SCHEMAS = {
}
RELAY_EXTRA_JSON_SCHEMAS = {}
###############################################################################
# Gateway settings
@ -47,7 +45,7 @@ class _Settings(object):
GATEWAY_JSON_SCHEMAS = {
"http://schemas.elite-markets.net/eddn/commodity/1": "schemas/commodity-v0.1.json",
"http://schemas.elite-markets.net/eddn/commodity/1/test": "schemas/commodity-v0.1.json",
"http://schemas.elite-markets.net/eddn/commodity/2": "schemas/commodity-v2.0.json",
"http://schemas.elite-markets.net/eddn/commodity/2/test": "schemas/commodity-v2.0.json"
}
@ -55,14 +53,13 @@ class _Settings(object):
###############################################################################
# Monitor settings
###############################################################################
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DECOMPRESS_MESSAGES = True
def loadFrom(self, fileName):
f = open(fileName, 'r')

View File

@ -23,27 +23,27 @@ class DuplicateMessages(Thread):
sleep(60)
with self.lock:
maxTime = datetime.utcnow()
for key in self.caches.keys():
if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime:
del self.caches[key]
def isDuplicated(self, message):
with self.lock:
message = zlib.decompress(message)
message = simplejson.loads(message)
if message['header']['gatewayTimestamp']:
del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
if message['message']['timestamp']:
del message['message']['timestamp'] # Prevent dupe with new timestamp ^^
del message['message']['timestamp'] # Prevent dupe with new timestamp ^^
message = simplejson.dumps(message)
key = hashlib.sha256(message).hexdigest()
if key not in self.caches:
self.caches[key] = datetime.utcnow()
return False
else:
self.caches[key] = datetime.utcnow()
return True
return True