mirror of
https://github.com/EDCD/EDDN.git
synced 2025-04-28 06:02:13 +03:00
Gateway: Use "" for strings throughout
# Conflicts: # src/eddn/Gateway.py # Conflicts: # src/eddn/Gateway.py
This commit is contained in:
parent
710223494d
commit
841c7e979e
@ -36,13 +36,13 @@ logger = logging.getLogger(__name__)
|
|||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
__logger_channel = logging.StreamHandler()
|
__logger_channel = logging.StreamHandler()
|
||||||
__logger_formatter = logging.Formatter(
|
__logger_formatter = logging.Formatter(
|
||||||
'%(asctime)s - %(levelname)s - %(module)s:%(lineno)d: %(message)s'
|
"%(asctime)s - %(levelname)s - %(module)s:%(lineno)d: %(message)s"
|
||||||
)
|
)
|
||||||
__logger_formatter.default_time_format = '%Y-%m-%d %H:%M:%S'
|
__logger_formatter.default_time_format = "%Y-%m-%d %H:%M:%S"
|
||||||
__logger_formatter.default_msec_format = '%s.%03d'
|
__logger_formatter.default_msec_format = "%s.%03d"
|
||||||
__logger_channel.setFormatter(__logger_formatter)
|
__logger_channel.setFormatter(__logger_formatter)
|
||||||
logger.addHandler(__logger_channel)
|
logger.addHandler(__logger_channel)
|
||||||
logger.info('Made logger')
|
logger.info("Made logger")
|
||||||
|
|
||||||
|
|
||||||
# This socket is used to push market data out to the Announcers over ZeroMQ.
|
# This socket is used to push market data out to the Announcers over ZeroMQ.
|
||||||
@ -61,19 +61,19 @@ stats_collector.start()
|
|||||||
def parse_cl_args():
|
def parse_cl_args():
|
||||||
"""Parse command-line arguments."""
|
"""Parse command-line arguments."""
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog='Gateway',
|
prog="Gateway",
|
||||||
description='EDDN Gateway server',
|
description="EDDN Gateway server",
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--loglevel',
|
"--loglevel",
|
||||||
help='Logging level to output at',
|
help="Logging level to output at",
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-c', '--config',
|
"-c", "--config",
|
||||||
metavar='config filename',
|
metavar="config filename",
|
||||||
nargs='?',
|
nargs="?",
|
||||||
default=None,
|
default=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -87,32 +87,32 @@ def extract_message_details(parsed_message): # noqa: CCR001
|
|||||||
:param parsed_message: The message to process
|
:param parsed_message: The message to process
|
||||||
:return: Tuple of (uploader_id, software_name, software_version, schema_ref, journal_event)
|
:return: Tuple of (uploader_id, software_name, software_version, schema_ref, journal_event)
|
||||||
"""
|
"""
|
||||||
uploader_id = '<<UNKNOWN>>'
|
uploader_id = "<<UNKNOWN>>"
|
||||||
software_name = '<<UNKNOWN>>'
|
software_name = "<<UNKNOWN>>"
|
||||||
software_version = '<<UNKNOWN>>'
|
software_version = "<<UNKNOWN>>"
|
||||||
schema_ref = '<<UNKNOWN>>'
|
schema_ref = "<<UNKNOWN>>"
|
||||||
journal_event = '<<UNKNOWN>>'
|
journal_event = "<<UNKNOWN>>"
|
||||||
|
|
||||||
if 'header' in parsed_message:
|
if "header" in parsed_message:
|
||||||
if 'uploaderID' in parsed_message['header']:
|
if "uploaderID" in parsed_message["header"]:
|
||||||
uploader_id = parsed_message['header']['uploaderID']
|
uploader_id = parsed_message["header"]["uploaderID"]
|
||||||
|
|
||||||
if 'softwareName' in parsed_message['header']:
|
if "softwareName" in parsed_message["header"]:
|
||||||
software_name = parsed_message['header']['softwareName']
|
software_name = parsed_message["header"]["softwareName"]
|
||||||
|
|
||||||
if 'softwareVersion' in parsed_message['header']:
|
if "softwareVersion" in parsed_message["header"]:
|
||||||
software_version = parsed_message['header']['softwareVersion']
|
software_version = parsed_message["header"]["softwareVersion"]
|
||||||
|
|
||||||
if '$schemaRef' in parsed_message:
|
if "$schemaRef" in parsed_message:
|
||||||
schema_ref = parsed_message['$schemaRef']
|
schema_ref = parsed_message["$schemaRef"]
|
||||||
|
|
||||||
if '/journal/' in schema_ref:
|
if "/journal/" in schema_ref:
|
||||||
if 'message' in parsed_message:
|
if "message" in parsed_message:
|
||||||
if 'event' in parsed_message['message']:
|
if "event" in parsed_message["message"]:
|
||||||
journal_event = parsed_message['message']['event']
|
journal_event = parsed_message["message"]["event"]
|
||||||
|
|
||||||
else:
|
else:
|
||||||
journal_event = '-'
|
journal_event = "-"
|
||||||
|
|
||||||
return uploader_id, software_name, software_version, schema_ref, journal_event
|
return uploader_id, software_name, software_version, schema_ref, journal_event
|
||||||
|
|
||||||
@ -128,7 +128,7 @@ def configure() -> None:
|
|||||||
sender.bind(binding)
|
sender.bind(binding)
|
||||||
|
|
||||||
for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.items():
|
for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.items():
|
||||||
validator.add_schema_resource(schema_ref, resource_string('eddn.Gateway', schema_file))
|
validator.add_schema_resource(schema_ref, resource_string("eddn.Gateway", schema_file))
|
||||||
|
|
||||||
|
|
||||||
def push_message(parsed_message: Dict, topic: str) -> None:
|
def push_message(parsed_message: Dict, topic: str) -> None:
|
||||||
@ -139,7 +139,7 @@ def push_message(parsed_message: Dict, topic: str) -> None:
|
|||||||
This is a dumb method that just pushes strings; it assumes you've already
|
This is a dumb method that just pushes strings; it assumes you've already
|
||||||
validated and serialised as you want to.
|
validated and serialised as you want to.
|
||||||
"""
|
"""
|
||||||
string_message = simplejson.dumps(parsed_message, ensure_ascii=False).encode('utf-8')
|
string_message = simplejson.dumps(parsed_message, ensure_ascii=False).encode("utf-8")
|
||||||
|
|
||||||
# Push a zlib compressed JSON representation of the message to
|
# Push a zlib compressed JSON representation of the message to
|
||||||
# announcers with schema as topic
|
# announcers with schema as topic
|
||||||
@ -157,7 +157,7 @@ def get_remote_address() -> str:
|
|||||||
request.remote_addr.
|
request.remote_addr.
|
||||||
:returns: Best attempt at remote address.
|
:returns: Best attempt at remote address.
|
||||||
"""
|
"""
|
||||||
return request.headers.get('X-Forwarded-For', request.remote_addr)
|
return request.headers.get("X-Forwarded-For", request.remote_addr)
|
||||||
|
|
||||||
|
|
||||||
def get_decompressed_message() -> bytes:
|
def get_decompressed_message() -> bytes:
|
||||||
@ -168,26 +168,26 @@ def get_decompressed_message() -> bytes:
|
|||||||
:rtype: str
|
:rtype: str
|
||||||
:returns: The de-compressed request body.
|
:returns: The de-compressed request body.
|
||||||
"""
|
"""
|
||||||
content_encoding = request.headers.get('Content-Encoding', '')
|
content_encoding = request.headers.get("Content-Encoding", "")
|
||||||
logger.debug('Content-Encoding: ' + content_encoding)
|
logger.debug("Content-Encoding: %s", content_encoding)
|
||||||
|
|
||||||
if content_encoding in ['gzip', 'deflate']:
|
if content_encoding in ["gzip", "deflate"]:
|
||||||
logger.debug('Content-Encoding of gzip or deflate...')
|
logger.debug("Content-Encoding of gzip or deflate...")
|
||||||
# Compressed request. We have to decompress the body, then figure out
|
# Compressed request. We have to decompress the body, then figure out
|
||||||
# if it's form-encoded.
|
# if it's form-encoded.
|
||||||
try:
|
try:
|
||||||
# Auto header checking.
|
# Auto header checking.
|
||||||
logger.debug('Trying zlib.decompress (15 + 32)...')
|
logger.debug("Trying zlib.decompress (15 + 32)...")
|
||||||
message_body = zlib.decompress(request.body.read(), 15 + 32)
|
message_body = zlib.decompress(request.body.read(), 15 + 32)
|
||||||
|
|
||||||
except zlib.error:
|
except zlib.error:
|
||||||
logger.error('zlib.error, trying zlib.decompress (-15)')
|
logger.error("zlib.error, trying zlib.decompress (-15)")
|
||||||
# Negative wbits suppresses adler32 checksumming.
|
# Negative wbits suppresses adler32 checksumming.
|
||||||
message_body = zlib.decompress(request.body.read(), -15)
|
message_body = zlib.decompress(request.body.read(), -15)
|
||||||
logger.debug('Resulting message_body:\n%s\n', message_body)
|
logger.debug("Resulting message_body:\n%s\n", message_body)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.debug('Content-Encoding indicates *not* compressed...')
|
logger.debug("Content-Encoding indicates *not* compressed...")
|
||||||
|
|
||||||
message_body = request.body.read()
|
message_body = request.body.read()
|
||||||
|
|
||||||
@ -209,13 +209,13 @@ def parse_and_error_handle(data: bytes) -> str:
|
|||||||
# semi-useful error message, so do so.
|
# semi-useful error message, so do so.
|
||||||
try:
|
try:
|
||||||
logger.error(
|
logger.error(
|
||||||
'Error - JSON parse failed (%d, "%s", "%s", "%s", "%s", "%s") from %s:\n%s\n',
|
"Error - JSON parse failed (%d, '%s', '%s', '%s', '%s', '%s') from %s:\n%s\n",
|
||||||
request.content_length,
|
request.content_length,
|
||||||
'<<UNKNOWN>>',
|
"<<UNKNOWN>>",
|
||||||
'<<UNKNOWN>>',
|
"<<UNKNOWN>>",
|
||||||
'<<UNKNOWN>>',
|
"<<UNKNOWN>>",
|
||||||
'<<UNKNOWN>>',
|
"<<UNKNOWN>>",
|
||||||
'<<UNKNOWN>>',
|
"<<UNKNOWN>>",
|
||||||
get_remote_address(),
|
get_remote_address(),
|
||||||
data[:512]
|
data[:512]
|
||||||
)
|
)
|
||||||
@ -227,11 +227,11 @@ def parse_and_error_handle(data: bytes) -> str:
|
|||||||
|
|
||||||
response.status = 400
|
response.status = 400
|
||||||
logger.error(f"Error to {get_remote_address()}: {exc}")
|
logger.error(f"Error to {get_remote_address()}: {exc}")
|
||||||
return 'FAIL: JSON parsing: ' + str(exc)
|
return "FAIL: JSON parsing: " + str(exc)
|
||||||
|
|
||||||
# Here we check if an outdated schema has been passed
|
# Here we check if an outdated schema has been passed
|
||||||
if parsed_message["$schemaRef"] in Settings.GATEWAY_OUTDATED_SCHEMAS:
|
if parsed_message["$schemaRef"] in Settings.GATEWAY_OUTDATED_SCHEMAS:
|
||||||
response.status = '426 Upgrade Required' # Bottle (and underlying httplib) don't know this one
|
response.status = "426 Upgrade Required" # Bottle (and underlying httplib) don't know this one
|
||||||
stats_collector.tally("outdated")
|
stats_collector.tally("outdated")
|
||||||
return "FAIL: Outdated Schema: The schema you have used is no longer supported. Please check for an updated " \
|
return "FAIL: Outdated Schema: The schema you have used is no longer supported. Please check for an updated " \
|
||||||
"version of your application."
|
"version of your application."
|
||||||
@ -239,16 +239,16 @@ def parse_and_error_handle(data: bytes) -> str:
|
|||||||
validation_results = validator.validate(parsed_message)
|
validation_results = validator.validate(parsed_message)
|
||||||
|
|
||||||
if validation_results.severity <= ValidationSeverity.WARN:
|
if validation_results.severity <= ValidationSeverity.WARN:
|
||||||
parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat() + 'Z'
|
parsed_message["header"]["gatewayTimestamp"] = datetime.utcnow().isoformat() + "Z"
|
||||||
parsed_message['header']['uploaderIP'] = get_remote_address()
|
parsed_message["header"]["uploaderIP"] = get_remote_address()
|
||||||
|
|
||||||
# Sends the parsed message to the Relay/Monitor as compressed JSON.
|
# Sends the parsed message to the Relay/Monitor as compressed JSON.
|
||||||
gevent.spawn(push_message, parsed_message, parsed_message['$schemaRef'])
|
gevent.spawn(push_message, parsed_message, parsed_message["$schemaRef"])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
|
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
|
||||||
logger.info(
|
logger.info(
|
||||||
'Accepted (%d, "%s", "%s", "%s", "%s", "%s") from %s',
|
"Accepted (%d, '%s', '%s', '%s', '%s', '%s') from %s",
|
||||||
request.content_length,
|
request.content_length,
|
||||||
uploader_id, software_name, software_version, schema_ref, journal_event,
|
uploader_id, software_name, software_version, schema_ref, journal_event,
|
||||||
get_remote_address()
|
get_remote_address()
|
||||||
@ -259,13 +259,13 @@ def parse_and_error_handle(data: bytes) -> str:
|
|||||||
print(f"Logging of Accepted request failed: {str(e)}")
|
print(f"Logging of Accepted request failed: {str(e)}")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return 'OK'
|
return "OK"
|
||||||
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
|
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
|
||||||
logger.error(
|
logger.error(
|
||||||
'Failed Validation "%s" (%d, "%s", "%s", "%s", "%s", "%s") from %s',
|
"Failed Validation '%s' (%d, '%s', '%s', '%s', '%s', '%s') from %s",
|
||||||
str(validation_results.messages),
|
str(validation_results.messages),
|
||||||
request.content_length,
|
request.content_length,
|
||||||
uploader_id, software_name, software_version, schema_ref, journal_event,
|
uploader_id, software_name, software_version, schema_ref, journal_event,
|
||||||
@ -282,7 +282,7 @@ def parse_and_error_handle(data: bytes) -> str:
|
|||||||
return "FAIL: Schema Validation: " + str(validation_results.messages)
|
return "FAIL: Schema Validation: " + str(validation_results.messages)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/upload/', method=['OPTIONS', 'POST'])
|
@app.route("/upload/", method=["OPTIONS", "POST"])
|
||||||
def upload() -> str:
|
def upload() -> str:
|
||||||
"""
|
"""
|
||||||
Handle an /upload/ request.
|
Handle an /upload/ request.
|
||||||
@ -300,8 +300,8 @@ def upload() -> str:
|
|||||||
response.status = 400
|
response.status = 400
|
||||||
try:
|
try:
|
||||||
logger.error(
|
logger.error(
|
||||||
f'gzip error ({request.content_length}, "<<UNKNOWN>>", "<<UNKNOWN>>", "<<UNKNOWN>>",'
|
f"gzip error ({request.content_length}, "<<UNKNOWN>>", "<<UNKNOWN>>", "<<UNKNOWN>>","
|
||||||
' "<<UNKNOWN>>", "<<UNKNOWN>>") from {get_remote_address()}'
|
" "<<UNKNOWN>>", "<<UNKNOWN>>") from {get_remote_address()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -309,7 +309,7 @@ def upload() -> str:
|
|||||||
print(f"Logging of 'gzip error' failed: {str(e)}")
|
print(f"Logging of 'gzip error' failed: {str(e)}")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return 'FAIL: zlib.error: ' + str(exc)
|
return "FAIL: zlib.error: " + str(exc)
|
||||||
|
|
||||||
except MalformedUploadError as exc:
|
except MalformedUploadError as exc:
|
||||||
# They probably sent an encoded POST, but got the key/val wrong.
|
# They probably sent an encoded POST, but got the key/val wrong.
|
||||||
@ -317,16 +317,16 @@ def upload() -> str:
|
|||||||
# TODO: Maybe just `{exc}` ?
|
# TODO: Maybe just `{exc}` ?
|
||||||
logger.error("MalformedUploadError from %s: %s", get_remote_address(), str(exc))
|
logger.error("MalformedUploadError from %s: %s", get_remote_address(), str(exc))
|
||||||
|
|
||||||
return 'FAIL: Malformed Upload: ' + str(exc)
|
return "FAIL: Malformed Upload: " + str(exc)
|
||||||
|
|
||||||
stats_collector.tally("inbound")
|
stats_collector.tally("inbound")
|
||||||
return parse_and_error_handle(message_body)
|
return parse_and_error_handle(message_body)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/health_check/', method=['OPTIONS', 'GET'])
|
@app.route("/health_check/", method=["OPTIONS", "GET"])
|
||||||
def health_check() -> str:
|
def health_check() -> str:
|
||||||
"""
|
"""
|
||||||
Return our version string in as an 'am I awake' signal.
|
Return our version string in as an "am I awake" signal.
|
||||||
|
|
||||||
This should only be used by the gateway monitoring script. It is used
|
This should only be used by the gateway monitoring script. It is used
|
||||||
to detect whether the gateway is still alive, and whether it should remain
|
to detect whether the gateway is still alive, and whether it should remain
|
||||||
@ -337,7 +337,7 @@ def health_check() -> str:
|
|||||||
return Settings.EDDN_VERSION
|
return Settings.EDDN_VERSION
|
||||||
|
|
||||||
|
|
||||||
@app.route('/stats/', method=['OPTIONS', 'GET'])
|
@app.route("/stats/", method=["OPTIONS", "GET"])
|
||||||
def stats() -> str:
|
def stats() -> str:
|
||||||
"""
|
"""
|
||||||
Return some stats about the Gateway's operation so far.
|
Return some stats about the Gateway's operation so far.
|
||||||
@ -370,16 +370,16 @@ def apply_cors() -> None:
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
response.set_header(
|
response.set_header(
|
||||||
'Access-Control-Allow-Origin',
|
"Access-Control-Allow-Origin",
|
||||||
'*'
|
"*"
|
||||||
)
|
)
|
||||||
response.set_header(
|
response.set_header(
|
||||||
'Access-Control-Allow-Methods',
|
"Access-Control-Allow-Methods",
|
||||||
'GET, POST, PUT, OPTIONS'
|
"GET, POST, PUT, OPTIONS"
|
||||||
)
|
)
|
||||||
response.set_header(
|
response.set_header(
|
||||||
'Access-Control-Allow-Headers',
|
"Access-Control-Allow-Headers",
|
||||||
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
|
"Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -392,15 +392,15 @@ def main() -> None:
|
|||||||
load_config(cl_args)
|
load_config(cl_args)
|
||||||
configure()
|
configure()
|
||||||
|
|
||||||
app.add_hook('after_request', apply_cors)
|
app.add_hook("after_request", apply_cors)
|
||||||
app.run(
|
app.run(
|
||||||
host=Settings.GATEWAY_HTTP_BIND_ADDRESS,
|
host=Settings.GATEWAY_HTTP_BIND_ADDRESS,
|
||||||
port=Settings.GATEWAY_HTTP_PORT,
|
port=Settings.GATEWAY_HTTP_PORT,
|
||||||
server='gevent',
|
server="gevent",
|
||||||
certfile=Settings.CERT_FILE,
|
certfile=Settings.CERT_FILE,
|
||||||
keyfile=Settings.KEY_FILE
|
keyfile=Settings.KEY_FILE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user