Merge pull request #15 from jamesremuscat/dev

Dev to Master
This commit is contained in:
AnthorNet 2015-06-08 13:49:04 +02:00
commit e79038cce5
20 changed files with 137 additions and 111 deletions

2
.gitignore vendored
View File

@ -52,5 +52,5 @@ docs/_build/
# PyBuilder
target/
*.prices
*.prices

View File

@ -357,7 +357,7 @@ var start = function(){
Highcharts.setOptions({global: {useUTC: false}});
// Grab gateways
gateways = gateways.sort();
//gateways = gateways.sort();
$.each(gateways, function(k, gateway){
gateway = gateway.replace('tcp://', '');
gateway = gateway.replace(':8500', '');
@ -406,7 +406,7 @@ var start = function(){
}, updateInterval);
// Grab relays
relays = relays.sort();
//relays = relays.sort();
$.each(relays, function(k, relay){
$("select[name=relays]").append($('<option>', {
value: 'http://' + relay + ':' + relayBottlePort + '/stats/',

View File

@ -209,8 +209,10 @@ def main():
except zmq.ZMQError, e:
echoLog('')
echoLog('ZMQSocketException: ' + str(e))
subscriber.disconnect(__relayEDDN)
echoLog('Disconnect from ' + __relayEDDN)
echoLog('')
time.sleep(10)
time.sleep(5)

View File

@ -2,6 +2,7 @@ import zlib
import zmq
import simplejson
import sys
import time
"""
" Configuration
@ -42,7 +43,8 @@ def main():
except zmq.ZMQError, e:
print 'ZMQSocketException: ' + str(e)
sys.stdout.flush()
time.sleep(10)
subscriber.disconnect(__relayEDDN)
time.sleep(5)

View File

@ -209,8 +209,10 @@ def main():
except zmq.ZMQError as e:
echoLog('')
echoLog('ZMQSocketException: ' + str(e))
subscriber.disconnect(__relayEDDN)
echoLog('Disconnect from ' + __relayEDDN)
echoLog('')
time.sleep(10)
time.sleep(5)

View File

@ -2,6 +2,7 @@ import zlib
import zmq
import simplejson
import sys
import time
"""
" Configuration
@ -42,7 +43,8 @@ def main():
except zmq.ZMQError as e:
print ('ZMQSocketException: ' + str(e))
sys.stdout.flush()
time.sleep(10)
subscriber.disconnect(__relayEDDN)
time.sleep(5)

View File

@ -3,7 +3,7 @@ import re
import glob
VERSIONFILE = "src/eddn/_Conf/Version.py"
VERSIONFILE = "src/eddn/conf/Version.py"
verstr = "unknown"
try:
verstrline = open(VERSIONFILE, "rt").read()

View File

@ -1,3 +1,5 @@
# coding: utf8
"""
Contains the necessary ZeroMQ socket and a helper function to publish
market data to the Announcer daemons.
@ -11,10 +13,11 @@ import zlib
import zmq.green as zmq
from datetime import datetime
import os
from pkg_resources import resource_string
# import os
from eddn._Conf.Settings import Settings, loadConfig
from eddn._Core.Validator import Validator, ValidationSeverity
from eddn.conf.Settings import Settings, loadConfig
from eddn.core.Validator import Validator, ValidationSeverity
from gevent import monkey
monkey.patch_all()
@ -29,7 +32,7 @@ sender = context.socket(zmq.PUB)
validator = Validator()
# This import must be done post-monkey-patching!
from eddn._Core.StatsCollector import StatsCollector
from eddn.core.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
@ -42,7 +45,7 @@ def configure():
sender.bind(binding)
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
validator.addSchemaResource(schemaRef, os.path.dirname(__file__) + '/' + schemaFile)
validator.addSchemaResource(schemaRef, resource_string(__name__, schemaFile))
def push_message(string_message, topic):
@ -53,7 +56,7 @@ def push_message(string_message, topic):
"""
# Push a zlib compressed JSON representation of the message to
# announcers with schema as topic
# announcers with schema as topic
compressed_msg = zlib.compress(string_message)
sender.send("%s |-| %s" % (topic, compressed_msg))
statsCollector.tally("outbound")
@ -130,16 +133,12 @@ def parse_and_error_handle(data):
validationResults = validator.validate(parsed_message)
if validationResults.severity <= ValidationSeverity.WARN:
statsCollector.tally(parsed_message["$schemaRef"])
statsCollector.tally(parsed_message['header']['softwareName'] + " " + parsed_message['header']['softwareVersion'])
parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat()
ip_hash_salt = Settings.GATEWAY_IP_KEY_SALT
if ip_hash_salt:
# If an IP hash is set, salt+hash the uploader's IP address and set
# it as the EMDR upload key value.
# it as the EDDN upload key value.
ip_hash = hashlib.sha1(ip_hash_salt + get_remote_address()).hexdigest()
parsed_message['header']['uploaderKey'] = ip_hash

View File

@ -1,3 +1,5 @@
# coding: utf8
"""
Monitor sit below gateways, or another relay, and simply parse what it receives over SUB.
"""
@ -10,16 +12,18 @@ import datetime
import collections
import zmq.green as zmq
from bottle import get, request, response, run as bottle_run
from eddn._Conf.Settings import Settings, loadConfig
from eddn.conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
# This import must be done post-monkey-patching!
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn._Core.DuplicateMessages import DuplicateMessages
from eddn.core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
def date(__format):
d = datetime.datetime.utcnow()
return d.strftime(__format)
@ -28,109 +32,113 @@ def date(__format):
@get('/getTotalSoftwares/')
def getTotalSoftwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
maxDays = request.GET.get('maxDays', '31').strip()
maxDays = int(maxDays) -1;
maxDays = request.GET.get('maxDays', '31').strip()
maxDays = int(maxDays) - 1
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
FROM softwares
GROUP BY name
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
ORDER BY total DESC"""
results = db.execute(query)
query = """SELECT name, SUM(hits) AS total, MAX(dateStats) AS maxDate
FROM softwares
GROUP BY name
HAVING maxDate >= DATE('now', '""" + '-' + str(maxDays) + """ day')
ORDER BY total DESC"""
results = db.execute(query)
for row in results:
softwares[str(row[0])] = str(row[1])
softwares[row[0].encode('utf8')] = str(row[1])
db.close()
return simplejson.dumps(softwares)
@get('/getSoftwares/')
def getSoftwares():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
softwares = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM softwares
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
query = """SELECT *
FROM softwares
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in softwares.keys():
softwares[str(row[2])] = collections.OrderedDict()
if not str(row[2].encode('utf8')) in softwares.keys():
softwares[row[2].encode('utf8')] = collections.OrderedDict()
softwares[str(row[2])][str(row[0])] = str(row[1])
softwares[row[2].encode('utf8')][str(row[0])] = str(row[1])
db.close()
return simplejson.dumps(softwares)
@get('/getTotalUploaders/')
def getTotalUploaders():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
limit = request.GET.get('limit', '20').strip()
limit = request.GET.get('limit', '20').strip()
query = """SELECT name, SUM(hits) AS total
FROM uploaders
GROUP BY name
ORDER BY total DESC
LIMIT """ + limit
results = db.execute(query)
query = """SELECT name, SUM(hits) AS total
FROM uploaders
GROUP BY name
ORDER BY total DESC
LIMIT """ + limit
results = db.execute(query)
for row in results:
uploaders[str(row[0])] = str(row[1])
uploaders[row[0].encode('utf8')] = row[1]
db.close()
return simplejson.dumps(uploaders)
@get('/getUploaders/')
def getUploaders():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
uploaders = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM uploaders
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
query = """SELECT *
FROM uploaders
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in uploaders.keys():
uploaders[str(row[2])] = collections.OrderedDict()
if not row[2].encode('utf8') in uploaders.keys():
uploaders[row[2].encode('utf8')] = collections.OrderedDict()
uploaders[str(row[2])][str(row[0])] = str(row[1])
uploaders[row[2]][row[0].encode('utf8')] = row[1]
db.close()
return simplejson.dumps(uploaders)
@get('/getTotalSchemas/')
def getTotalSchemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
query = """SELECT name, SUM(hits) AS total
FROM schemas
GROUP BY name
ORDER BY total DESC"""
results = db.execute(query)
query = """SELECT name, SUM(hits) AS total
FROM schemas
GROUP BY name
ORDER BY total DESC"""
results = db.execute(query)
for row in results:
schemas[str(row[0])] = str(row[1])
@ -139,20 +147,21 @@ def getTotalSchemas():
return simplejson.dumps(schemas)
@get('/getSchemas/')
def getSchemas():
response.set_header("Access-Control-Allow-Origin", "*")
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
db = sqlite3.connect(Settings.MONITOR_DB)
schemas = collections.OrderedDict()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
dateStart = request.GET.get('dateStart', str(date('%Y-%m-%d'))).strip()
dateEnd = request.GET.get('dateEnd', str(date('%Y-%m-%d'))).strip()
query = """SELECT *
FROM schemas
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
query = """SELECT *
FROM schemas
WHERE dateStats BETWEEN ? AND ?
ORDER BY hits DESC, dateStats ASC"""
results = db.execute(query, (dateStart, dateEnd))
for row in results:
if not str(row[2]) in schemas.keys():
@ -165,11 +174,10 @@ def getSchemas():
return simplejson.dumps(schemas)
class Monitor(Thread):
def run(self):
context = zmq.Context()
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.setsockopt(zmq.SUBSCRIBE, '')
@ -178,7 +186,7 @@ class Monitor(Thread):
receiver.connect(binding)
def monitor_worker(message):
db = sqlite3.connect(Settings.MONITOR_DB)
db = sqlite3.connect(Settings.MONITOR_DB)
# Separate topic from message
message = message.split(' |-| ')
@ -203,10 +211,10 @@ class Monitor(Thread):
if Settings.MONITOR_DECOMPRESS_MESSAGES:
message = zlib.decompress(message)
json = simplejson.loads(message)
json = simplejson.loads(message)
# Update software count
softwareID = json['header']['softwareName'] + ' | ' + json['header']['softwareVersion']
softwareID = json['header']['softwareName'].encode('utf8') + ' | ' + json['header']['softwareVersion'].encode('utf8')
c = db.cursor()
c.execute('UPDATE softwares SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (softwareID, ))
@ -214,15 +222,15 @@ class Monitor(Thread):
db.commit()
# Update uploader count
uploaderID = json['header']['uploaderID']
uploaderID = json['header']['uploaderID'].encode('utf8')
if uploaderID: # Don't get empty uploaderID
if uploaderID: # Don't get empty uploaderID
c = db.cursor()
c.execute('UPDATE uploaders SET hits = hits + 1 WHERE `name` = ? AND `dateStats` = DATE("now", "utc")', (uploaderID, ))
c.execute('INSERT OR IGNORE INTO uploaders (name, dateStats) VALUES (?, DATE("now", "utc"))', (uploaderID, ))
db.commit()
# Update schemas count
# Update schemas count
schemaID = json['$schemaRef']
c = db.cursor()

View File

@ -1,3 +1,5 @@
# coding: utf8
"""
Relays sit below an announcer, or another relay, and simply repeat what
they receive over PUB/SUB.
@ -13,18 +15,19 @@ import gevent
import simplejson
import zmq.green as zmq
from bottle import get, response, run as bottle_run
from eddn._Conf.Settings import Settings, loadConfig
from eddn.conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
from eddn._Core.StatsCollector import StatsCollector
# This import must be done post-monkey-patching!
from eddn.core.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
# This import must be done post-monkey-patching!
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn._Core.DuplicateMessages import DuplicateMessages
from eddn.core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
@ -69,11 +72,11 @@ class Relay(Thread):
def relay_worker(message):
"""
This is the worker function that re-sends the incoming messages out
to any subscribers.
:param str message: A JSON string to re-broadcast.
This is the worker function that re-sends the incoming messages out
to any subscribers.
:param str message: A JSON string to re-broadcast.
"""
# Separate topic from message
# Separate topic from message
message = message.split(' |-| ')
# Handle gateway not sending topic

View File

@ -1,12 +1,8 @@
'''
Created on 15 Nov 2014
@author: james
'''
# coding: utf8
import argparse
import simplejson
from eddn._Conf.Version import __version__ as version
from eddn.conf.Version import __version__ as version
class _Settings(object):
@ -17,7 +13,6 @@ class _Settings(object):
# Relay settings
###############################################################################
#RELAY_RECEIVER_BINDINGS = ["tcp://localhost:8500"]
RELAY_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
RELAY_SENDER_BINDINGS = ["tcp://*:9500"]
@ -56,11 +51,10 @@ class _Settings(object):
MONITOR_RECEIVER_BINDINGS = ["tcp://eddn-gateway.elite-markets.net:8500", "tcp://eddn-gateway.ed-td.space:8500"]
MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DB = "/home/EDDN_Monitor.s3db"
MONITOR_DECOMPRESS_MESSAGES = True
def loadFrom(self, fileName):
f = open(fileName, 'r')
conf = simplejson.load(f)

View File

@ -1 +1,3 @@
# coding: utf8
__version__ = "0.4"

View File

@ -1,16 +1,20 @@
# coding: utf8
from datetime import datetime, timedelta
from threading import Lock, Thread
from time import sleep
import hashlib
import zlib
import re
import simplejson
from eddn._Conf.Settings import Settings, loadConfig
from eddn.conf.Settings import Settings, loadConfig
class DuplicateMessages(Thread):
max_minutes = Settings.RELAY_DUPLICATE_MAX_MINUTES
caches = {}
caches = {}
lock = Lock()
@ -33,6 +37,10 @@ class DuplicateMessages(Thread):
message = zlib.decompress(message)
message = simplejson.loads(message)
# Test messages are not duplicate
if re.search('test', message['$schemaRef'], re.I):
return False
if message['header']['gatewayTimestamp']:
del message['header']['gatewayTimestamp'] # Prevent dupe with new timestamp ^^
if message['message']['timestamp']:

View File

@ -1,3 +1,5 @@
# coding: utf8
from collections import deque
from datetime import datetime
from itertools import islice

View File

@ -1,3 +1,5 @@
# coding: utf8
import simplejson
from enum import IntEnum
from jsonschema import validate as jsValidate, ValidationError
@ -7,10 +9,10 @@ class Validator(object):
schemas = {"http://example.com": {}}
def addSchemaResource(self, schemaRef, schemaFile):
def addSchemaResource(self, schemaRef, schema):
if schemaRef in self.schemas.keys():
raise Exception("Attempted to redefine schema for " + schemaRef)
schema = simplejson.load(open(schemaFile, "r"))
schema = simplejson.loads(schema)
self.schemas[schemaRef] = schema
def validate(self, json_object):