Merge pull request #14 from AnthorNet/Pull-for-James---Release-0.4

Fix Jenkins build part 1
This commit is contained in:
AnthorNet 2015-06-05 15:28:20 +02:00
commit f9252e41af
5 changed files with 75 additions and 81 deletions

1
.gitignore vendored
View File

@ -53,3 +53,4 @@ docs/_build/
# PyBuilder # PyBuilder
target/ target/
*.prices *.prices
*.prices

View File

@ -30,138 +30,138 @@ def getTotalSoftwares():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict() softwares = collections.OrderedDict()
maxDays = request.GET.get('maxDays', '31').strip() maxDays = request.GET.get('maxDays', '31').strip()
maxDays = int(maxDays) -1; maxDays = int(maxDays) -1;
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
FROM softwares FROM softwares
GROUP BY name GROUP BY name
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
ORDER BY total DESC""" ORDER BY total DESC"""
results = db.execute(query) results = db.execute(query)
for row in results: for row in results:
softwares[str(row[0])] = str(row[1]) softwares[str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(softwares) return simplejson.dumps(softwares)
@get('/getSoftwares/') @get('/getSoftwares/')
def getSoftwares(): def getSoftwares():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict() softwares = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT * query = """SELECT *
FROM softwares FROM softwares
WHERE dateStats BETWEEN ? AND ? WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC""" ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd)) results = db.execute(query, (dateStart, dateEnd))
for row in results: for row in results:
if not str(row[2]) in softwares.keys(): if not str(row[2]) in softwares.keys():
softwares[str(row[2])] = collections.OrderedDict() softwares[str(row[2])] = collections.OrderedDict()
softwares[str(row[2])][str(row[0])] = str(row[1]) softwares[str(row[2])][str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(softwares) return simplejson.dumps(softwares)
@get('/getTotalUploaders/') @get('/getTotalUploaders/')
def getTotalUploaders(): def getTotalUploaders():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict() uploaders = collections.OrderedDict()
limit = request.GET.get('limit', '20').strip() limit = request.GET.get('limit', '20').strip()
query = """SELECT name, SUM(hits) AS total query = """SELECT name, SUM(hits) AS total
FROM uploaders FROM uploaders
GROUP BY name GROUP BY name
ORDER BY total DESC ORDER BY total DESC
LIMIT """ + limit LIMIT """ + limit
results = db.execute(query) results = db.execute(query)
for row in results: for row in results:
uploaders[str(row[0])] = str(row[1]) uploaders[str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(uploaders) return simplejson.dumps(uploaders)
@get('/getUploaders/') @get('/getUploaders/')
def getUploaders(): def getUploaders():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict() uploaders = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT * query = """SELECT *
FROM uploaders FROM uploaders
WHERE dateStats BETWEEN ? AND ? WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC""" ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd)) results = db.execute(query, (dateStart, dateEnd))
for row in results: for row in results:
if not str(row[2]) in uploaders.keys(): if not str(row[2]) in uploaders.keys():
uploaders[str(row[2])] = collections.OrderedDict() uploaders[str(row[2])] = collections.OrderedDict()
uploaders[str(row[2])][str(row[0])] = str(row[1]) uploaders[str(row[2])][str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(uploaders) return simplejson.dumps(uploaders)
@get('/getTotalSchemas/') @get('/getTotalSchemas/')
def getTotalSchemas(): def getTotalSchemas():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict() schemas = collections.OrderedDict()
query = """SELECT name, SUM(hits) AS total query = """SELECT name, SUM(hits) AS total
FROM schemas FROM schemas
GROUP BY name GROUP BY name
ORDER BY total DESC""" ORDER BY total DESC"""
results = db.execute(query) results = db.execute(query)
for row in results: for row in results:
schemas[str(row[0])] = str(row[1]) schemas[str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(schemas) return simplejson.dumps(schemas)
@get('/getSchemas/') @get('/getSchemas/')
def getSchemas(): def getSchemas():
response.set_header("Access-Control-Allow-Origin", "*") response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict() schemas = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT * query = """SELECT *
FROM schemas FROM schemas
WHERE dateStats BETWEEN ? AND ? WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC""" ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd)) results = db.execute(query, (dateStart, dateEnd))
for row in results: for row in results:
if not str(row[2]) in schemas.keys(): if not str(row[2]) in schemas.keys():
schemas[str(row[2])] = collections.OrderedDict() schemas[str(row[2])] = collections.OrderedDict()
schemas[str(row[2])][str(row[0])] = str(row[1]) schemas[str(row[2])][str(row[0])] = str(row[1])
db.close() db.close()
return simplejson.dumps(schemas) return simplejson.dumps(schemas)
@ -170,69 +170,66 @@ class Monitor(Thread):
def run(self): def run(self):
context = zmq.Context() context = zmq.Context()
receiver = context.socket(zmq.SUB) receiver = context.socket(zmq.SUB)
receiver.setsockopt(zmq.SUBSCRIBE, '') receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.MONITOR_RECEIVER_BINDINGS: for binding in Settings.MONITOR_RECEIVER_BINDINGS:
receiver.connect(binding) receiver.connect(binding)
def monitor_worker(message): def monitor_worker(message):
db = sqlite3.connect(Settings.MONITOR_DB) db = sqlite3.connect(Settings.MONITOR_DB)
# Separate topic from message # Separate topic from message
message = message.split(' |-| ') message = message.split(' |-| ')
# Handle gateway not sending topic # Handle gateway not sending topic
if len(message) > 1: if len(message) > 1:
message = message[1] message = message[1]
else: else:
message = message[0] message = message[0]
if Settings.RELAY_DUPLICATE_MAX_MINUTES: if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(message): if duplicateMessages.isDuplicated(message):
schemaID = 'DUPLICATE MESSAGE' schemaID = 'DUPLICATE MESSAGE'
c = db.cursor() c = db.cursor()
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
db.commit() db.commit()
return return
if Settings.MONITOR_DECOMPRESS_MESSAGES: if Settings.MONITOR_DECOMPRESS_MESSAGES:
message = zlib.decompress(message) message = zlib.decompress(message)
json = simplejson.loads(message) json = simplejson.loads(message)
# Update software count # Update software count
softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion'] softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion']
c = db.cursor() c = db.cursor()
c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, )) c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, ))
c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, )) c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, ))
db.commit() db.commit()
# Update uploader count # Update uploader count
uploaderID = json['header']['uploaderID'] uploaderID = json['header']['uploaderID']
if uploaderID: # Don't get empty uploaderID if uploaderID: # Don't get empty uploaderID
c = db.cursor() c = db.cursor()
c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, )) c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, ))
c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, )) c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, ))
db.commit() db.commit()
# Update schemas count # Update schemas count
schemaID = json['$schemaRef'] schemaID = json['$schemaRef']
c = db.cursor() c = db.cursor()
c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, ))
c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, ))
db.commit() db.commit()
db.close() db.close()
while True: while True:

View File

@ -47,23 +47,22 @@ class Relay(Thread):
context = zmq.Context() context = zmq.Context()
receiver = context.socket(zmq.SUB) receiver = context.socket(zmq.SUB)
# Filters on topics or not... # Filters on topics or not...
if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON == True: if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON is True:
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems(): for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
else: else:
receiver.setsockopt(zmq.SUBSCRIBE, '') receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.RELAY_RECEIVER_BINDINGS: for binding in Settings.RELAY_RECEIVER_BINDINGS:
# Relays bind upstream to an Announcer, or another Relay. # Relays bind upstream to an Announcer, or another Relay.
receiver.connect(binding) receiver.connect(binding)
sender = context.socket(zmq.PUB) sender = context.socket(zmq.PUB)
for binding in Settings.RELAY_SENDER_BINDINGS: for binding in Settings.RELAY_SENDER_BINDINGS:
# End users, or other relays, may attach here. # End users, or other relays, may attach here.
sender.bind(binding) sender.bind(binding)
@ -76,7 +75,7 @@ class Relay(Thread):
""" """
# Separate topic from message # Separate topic from message
message = message.split(' |-| ') message = message.split(' |-| ')
# Handle gateway not sending topic # Handle gateway not sending topic
if len(message) > 1: if len(message) > 1:
message = message[1] message = message[1]

View File

@ -23,16 +23,14 @@ class _Settings(object):
RELAY_SENDER_BINDINGS = ["tcp://*:9500"] RELAY_SENDER_BINDINGS = ["tcp://*:9500"]
RELAY_DECOMPRESS_MESSAGES = False RELAY_DECOMPRESS_MESSAGES = False
# If set to False, no deduplicate is made # If set to False, no deduplicate is made
RELAY_DUPLICATE_MAX_MINUTES = 15 RELAY_DUPLICATE_MAX_MINUTES = 15
# If set to false, don't listen to topic and accept all incoming messages # If set to false, don't listen to topic and accept all incoming messages
RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = True RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = True
RELAY_EXTRA_JSON_SCHEMAS = { RELAY_EXTRA_JSON_SCHEMAS = {}
}
############################################################################### ###############################################################################
# Gateway settings # Gateway settings
@ -47,7 +45,7 @@ class _Settings(object):
GATEWAY_JSON_SCHEMAS = { GATEWAY_JSON_SCHEMAS = {
"http://schemas.elite-markets.net/eddn/commodity/1": "schemas/commodity-v0.1.json", "http://schemas.elite-markets.net/eddn/commodity/1": "schemas/commodity-v0.1.json",
"http://schemas.elite-markets.net/eddn/commodity/1/test": "schemas/commodity-v0.1.json", "http://schemas.elite-markets.net/eddn/commodity/1/test": "schemas/commodity-v0.1.json",
"http://schemas.elite-markets.net/eddn/commodity/2": "schemas/commodity-v2.0.json", "http://schemas.elite-markets.net/eddn/commodity/2": "schemas/commodity-v2.0.json",
"http://schemas.elite-markets.net/eddn/commodity/2/test": "schemas/commodity-v2.0.json" "http://schemas.elite-markets.net/eddn/commodity/2/test": "schemas/commodity-v2.0.json"
} }
@ -55,14 +53,13 @@ class _Settings(object):
############################################################################### ###############################################################################
# Monitor settings # Monitor settings
############################################################################### ###############################################################################
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
MONITOR_DB = "/home/EDDN_Monitor.s3db" MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DECOMPRESS_MESSAGES = True MONITOR_DECOMPRESS_MESSAGES = True
def loadFrom(self, fileName): def loadFrom(self, fileName):
f = open(fileName, 'r') f = open(fileName, 'r')

View File

@ -23,27 +23,27 @@ class DuplicateMessages(Thread):
sleep(60) sleep(60)
with self.lock: with self.lock:
maxTime = datetime.utcnow() maxTime = datetime.utcnow()
for key in self.caches.keys(): for key in self.caches.keys():
if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime: if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime:
del self.caches[key] del self.caches[key]
def isDuplicated(self, message): def isDuplicated(self, message):
with self.lock: with self.lock:
message = zlib.decompress(message) message = zlib.decompress(message)
message = simplejson.loads(message) message = simplejson.loads(message)
if message['header']['gatewayTimestamp']: if message['header']['gatewayTimestamp']:
del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^ del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
if message['message']['timestamp']: if message['message']['timestamp']:
del message['message']['timestamp'] # Prevent dupe with new timestamp ^^ del message['message']['timestamp'] # Prevent dupe with new timestamp ^^
message = simplejson.dumps(message) message = simplejson.dumps(message)
key = hashlib.sha256(message).hexdigest() key = hashlib.sha256(message).hexdigest()
if key not in self.caches: if key not in self.caches:
self.caches[key] = datetime.utcnow() self.caches[key] = datetime.utcnow()
return False return False
else: else:
self.caches[key] = datetime.utcnow() self.caches[key] = datetime.utcnow()
return True return True