From 72f3f046d04a2413d2d8702778e60805697ada6e Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Fri, 5 Jun 2015 15:23:46 +0200 Subject: [PATCH 1/2] Removed spaces on empty lines --- src/eddn/Monitor.py | 107 ++++++++++++++-------------- src/eddn/Relay.py | 11 ++- src/eddn/_Conf/Settings.py | 21 +++--- src/eddn/_Core/DuplicateMessages.py | 16 ++--- 4 files changed, 74 insertions(+), 81 deletions(-) diff --git a/src/eddn/Monitor.py b/src/eddn/Monitor.py index d3675d3..e668dfe 100644 --- a/src/eddn/Monitor.py +++ b/src/eddn/Monitor.py @@ -30,138 +30,138 @@ def getTotalSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) softwares = collections.OrderedDict() - + maxDays = request.GET.get('maxDays', '31').strip() maxDays = int(maxDays) -1; - + query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate FROM softwares GROUP BY name HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day') ORDER BY total DESC""" results = db.execute(query) - + for row in results: softwares[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + @get('/getSoftwares/') def getSoftwares(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) softwares = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM softwares WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: if not str(row[2]) in softwares.keys(): softwares[str(row[2])] = collections.OrderedDict() - + softwares[str(row[2])][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(softwares) - + @get('/getTotalUploaders/') def getTotalUploaders(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) uploaders = collections.OrderedDict() - + limit = request.GET.get('limit', '20').strip() - + query = """SELECT name, SUM(hits) AS total FROM uploaders GROUP BY name ORDER BY total DESC LIMIT """ + limit results = db.execute(query) - + for row in results: uploaders[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(uploaders) - + @get('/getUploaders/') def getUploaders(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) uploaders = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM uploaders WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: if not str(row[2]) in uploaders.keys(): uploaders[str(row[2])] = collections.OrderedDict() - + uploaders[str(row[2])][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(uploaders) - + @get('/getTotalSchemas/') def getTotalSchemas(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) schemas = collections.OrderedDict() - + query = """SELECT name, SUM(hits) AS total FROM schemas GROUP BY name ORDER BY total DESC""" results = db.execute(query) - + for row in results: schemas[str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) - + @get('/getSchemas/') def getSchemas(): response.set_header("Access-Control-Allow-Origin", "*") db = sqlite3.connect(Settings.MONITOR_DB) schemas = collections.OrderedDict() - + dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip() dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip() - + query = """SELECT * FROM schemas WHERE dateStats BETWEEN ? AND ? ORDER BY hits DESC, dateStats ASC""" results = db.execute(query, (dateStart, dateEnd)) - + for row in results: if not str(row[2]) in schemas.keys(): schemas[str(row[2])] = collections.OrderedDict() - + schemas[str(row[2])][str(row[0])] = str(row[1]) - + db.close() - + return simplejson.dumps(schemas) @@ -170,69 +170,66 @@ class Monitor(Thread): def run(self): context = zmq.Context() - + receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SUBSCRIBE, '') - + for binding in Settings.MONITOR_RECEIVER_BINDINGS: receiver.connect(binding) def monitor_worker(message): db = sqlite3.connect(Settings.MONITOR_DB) - + # Separate topic from message message = message.split(' |-| ') - + # Handle gateway not sending topic if len(message) > 1: message = message[1] else: message = message[0] - if Settings.RELAY_DUPLICATE_MAX_MINUTES: if duplicateMessages.isDuplicated(message): schemaID = 'DUPLICATE MESSAGE' - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + return - + if Settings.MONITOR_DECOMPRESS_MESSAGES: message = zlib.decompress(message) - + json = simplejson.loads(message) - + # Update software count softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion'] - + c = db.cursor() c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, )) c.execute('INSERT OR IGNORE INTO softwares (name, dateStats) VALUES (?, DATE("now", "utc"))', (softwareID, )) db.commit() - - + # Update uploader count uploaderID = json['header']['uploaderID'] - + if uploaderID: # Don't get empty uploaderID c = db.cursor() c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, )) c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, )) db.commit() - - + # Update schemas count schemaID = json['$schemaRef'] - + c = db.cursor() c.execute('UPDATE schemas SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (schemaID, )) c.execute('INSERT OR IGNORE INTO schemas (name, dateStats) VALUES (?, DATE("now", "utc"))', (schemaID, )) db.commit() - + db.close() while True: diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 9d2f903..581c3e8 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -47,23 +47,22 @@ class Relay(Thread): context = zmq.Context() receiver = context.socket(zmq.SUB) - + # Filters on topics or not... - if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON == True: + if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON is True: for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems(): receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) else: receiver.setsockopt(zmq.SUBSCRIBE, '') - - + for binding in Settings.RELAY_RECEIVER_BINDINGS: # Relays bind upstream to an Announcer, or another Relay. receiver.connect(binding) sender = context.socket(zmq.PUB) - + for binding in Settings.RELAY_SENDER_BINDINGS: # End users, or other relays, may attach here. sender.bind(binding) @@ -76,7 +75,7 @@ class Relay(Thread): """ # Separate topic from message message = message.split(' |-| ') - + # Handle gateway not sending topic if len(message) > 1: message = message[1] diff --git a/src/eddn/_Conf/Settings.py b/src/eddn/_Conf/Settings.py index 6a59e4d..36673ca 100644 --- a/src/eddn/_Conf/Settings.py +++ b/src/eddn/_Conf/Settings.py @@ -23,16 +23,14 @@ class _Settings(object): RELAY_SENDER_BINDINGS = ["tcp://*:9500"] RELAY_DECOMPRESS_MESSAGES = False - + # If set to False, no deduplicate is made RELAY_DUPLICATE_MAX_MINUTES = 15 - + # If set to false, don't listen to topic and accept all incoming messages RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON = True - - RELAY_EXTRA_JSON_SCHEMAS = { - - } + + RELAY_EXTRA_JSON_SCHEMAS = {} ############################################################################### # Gateway settings @@ -47,7 +45,7 @@ class _Settings(object): GATEWAY_JSON_SCHEMAS = { "http://schemas.elite-markets.net/eddn/commodity/1": "schemas/commodity-v0.1.json", "http://schemas.elite-markets.net/eddn/commodity/1/test": "schemas/commodity-v0.1.json", - + "http://schemas.elite-markets.net/eddn/commodity/2": "schemas/commodity-v2.0.json", "http://schemas.elite-markets.net/eddn/commodity/2/test": "schemas/commodity-v2.0.json" } @@ -55,14 +53,13 @@ class _Settings(object): ############################################################################### # Monitor settings ############################################################################### - + MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"] - + MONITOR_DB = "/home/EDDN_Monitor.s3db" - + MONITOR_DECOMPRESS_MESSAGES = True - - + def loadFrom(self, fileName): f = open(fileName, 'r') diff --git a/src/eddn/_Core/DuplicateMessages.py b/src/eddn/_Core/DuplicateMessages.py index 48a6055..5d79107 100644 --- a/src/eddn/_Core/DuplicateMessages.py +++ b/src/eddn/_Core/DuplicateMessages.py @@ -23,27 +23,27 @@ class DuplicateMessages(Thread): sleep(60) with self.lock: maxTime = datetime.utcnow() - + for key in self.caches.keys(): if self.caches[key] + timedelta(minutes=self.max_minutes) < maxTime: del self.caches[key] - + def isDuplicated(self, message): with self.lock: message = zlib.decompress(message) message = simplejson.loads(message) - + if message['header']['gatewayTimestamp']: - del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^ + del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^ if message['message']['timestamp']: - del message['message']['timestamp'] # Prevent dupe with new timestamp ^^ - + del message['message']['timestamp'] # Prevent dupe with new timestamp ^^ + message = simplejson.dumps(message) key = hashlib.sha256(message).hexdigest() - + if key not in self.caches: self.caches[key] = datetime.utcnow() return False else: self.caches[key] = datetime.utcnow() - return True \ No newline at end of file + return True From 1504d99d498497ee2aa64a5fc764fb9b894e325a Mon Sep 17 00:00:00 2001 From: AnthorNet Date: Fri, 5 Jun 2015 15:26:59 +0200 Subject: [PATCH 2/2] Ignore .prices files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 3b1dc68..b4e601e 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ docs/_build/ # PyBuilder target/ *.prices +*.prices