Gateway: Use "" for strings throughout

This commit is contained in:
Athanasius 2022-03-12 12:52:48 +00:00
parent 1e39ea37af
commit a08fc09586

View File

@ -36,13 +36,13 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
__logger_channel = logging.StreamHandler() __logger_channel = logging.StreamHandler()
__logger_formatter = logging.Formatter( __logger_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(module)s:%(lineno)d: %(message)s' "%(asctime)s - %(levelname)s - %(module)s:%(lineno)d: %(message)s"
) )
__logger_formatter.default_time_format = '%Y-%m-%d %H:%M:%S' __logger_formatter.default_time_format = "%Y-%m-%d %H:%M:%S"
__logger_formatter.default_msec_format = '%s.%03d' __logger_formatter.default_msec_format = "%s.%03d"
__logger_channel.setFormatter(__logger_formatter) __logger_channel.setFormatter(__logger_formatter)
logger.addHandler(__logger_channel) logger.addHandler(__logger_channel)
logger.info('Made logger') logger.info("Made logger")
# This socket is used to push market data out to the Announcers over ZeroMQ. # This socket is used to push market data out to the Announcers over ZeroMQ.
@ -61,19 +61,19 @@ stats_collector.start()
def parse_cl_args(): def parse_cl_args():
"""Parse command-line arguments.""" """Parse command-line arguments."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='Gateway', prog="Gateway",
description='EDDN Gateway server', description="EDDN Gateway server",
) )
parser.add_argument( parser.add_argument(
'--loglevel', "--loglevel",
help='Logging level to output at', help="Logging level to output at",
) )
parser.add_argument( parser.add_argument(
'-c', '--config', "-c", "--config",
metavar='config filename', metavar="config filename",
nargs='?', nargs="?",
default=None, default=None,
) )
@ -87,32 +87,32 @@ def extract_message_details(parsed_message): # noqa: CCR001
:param parsed_message: The message to process :param parsed_message: The message to process
:return: Tuple of (uploader_id, software_name, software_version, schema_ref, journal_event) :return: Tuple of (uploader_id, software_name, software_version, schema_ref, journal_event)
""" """
uploader_id = '<<UNKNOWN>>' uploader_id = "<<UNKNOWN>>"
software_name = '<<UNKNOWN>>' software_name = "<<UNKNOWN>>"
software_version = '<<UNKNOWN>>' software_version = "<<UNKNOWN>>"
schema_ref = '<<UNKNOWN>>' schema_ref = "<<UNKNOWN>>"
journal_event = '<<UNKNOWN>>' journal_event = "<<UNKNOWN>>"
if 'header' in parsed_message: if "header" in parsed_message:
if 'uploaderID' in parsed_message['header']: if "uploaderID" in parsed_message["header"]:
uploader_id = parsed_message['header']['uploaderID'] uploader_id = parsed_message["header"]["uploaderID"]
if 'softwareName' in parsed_message['header']: if "softwareName" in parsed_message["header"]:
software_name = parsed_message['header']['softwareName'] software_name = parsed_message["header"]["softwareName"]
if 'softwareVersion' in parsed_message['header']: if "softwareVersion" in parsed_message["header"]:
software_version = parsed_message['header']['softwareVersion'] software_version = parsed_message["header"]["softwareVersion"]
if '$schemaRef' in parsed_message: if "$schemaRef" in parsed_message:
schema_ref = parsed_message['$schemaRef'] schema_ref = parsed_message["$schemaRef"]
if '/journal/' in schema_ref: if "/journal/" in schema_ref:
if 'message' in parsed_message: if "message" in parsed_message:
if 'event' in parsed_message['message']: if "event" in parsed_message["message"]:
journal_event = parsed_message['message']['event'] journal_event = parsed_message["message"]["event"]
else: else:
journal_event = '-' journal_event = "-"
return uploader_id, software_name, software_version, schema_ref, journal_event return uploader_id, software_name, software_version, schema_ref, journal_event
@ -128,7 +128,7 @@ def configure() -> None:
sender.bind(binding) sender.bind(binding)
for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.items(): for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.items():
validator.add_schema_resource(schema_ref, resource_string('eddn.Gateway', schema_file)) validator.add_schema_resource(schema_ref, resource_string("eddn.Gateway", schema_file))
def push_message(parsed_message: Dict, topic: str) -> None: def push_message(parsed_message: Dict, topic: str) -> None:
@ -139,7 +139,7 @@ def push_message(parsed_message: Dict, topic: str) -> None:
This is a dumb method that just pushes strings; it assumes you've already This is a dumb method that just pushes strings; it assumes you've already
validated and serialised as you want to. validated and serialised as you want to.
""" """
string_message = simplejson.dumps(parsed_message, ensure_ascii=False).encode('utf-8') string_message = simplejson.dumps(parsed_message, ensure_ascii=False).encode("utf-8")
# Push a zlib compressed JSON representation of the message to # Push a zlib compressed JSON representation of the message to
# announcers with schema as topic # announcers with schema as topic
@ -157,7 +157,7 @@ def get_remote_address() -> str:
request.remote_addr. request.remote_addr.
:returns: Best attempt at remote address. :returns: Best attempt at remote address.
""" """
return request.headers.get('X-Forwarded-For', request.remote_addr) return request.headers.get("X-Forwarded-For", request.remote_addr)
def get_decompressed_message() -> bytes: def get_decompressed_message() -> bytes:
@ -168,38 +168,38 @@ def get_decompressed_message() -> bytes:
:rtype: str :rtype: str
:returns: The de-compressed request body. :returns: The de-compressed request body.
""" """
content_encoding = request.headers.get('Content-Encoding', '') content_encoding = request.headers.get("Content-Encoding", "")
logger.debug('Content-Encoding: ' + content_encoding) logger.debug("Content-Encoding: %s", content_encoding)
if content_encoding in ['gzip', 'deflate']: if content_encoding in ["gzip", "deflate"]:
logger.debug('Content-Encoding of gzip or deflate...') logger.debug("Content-Encoding of gzip or deflate...")
# Compressed request. We have to decompress the body, then figure out # Compressed request. We have to decompress the body, then figure out
# if it's form-encoded. # if it's form-encoded.
try: try:
# Auto header checking. # Auto header checking.
logger.debug('Trying zlib.decompress (15 + 32)...') logger.debug("Trying zlib.decompress (15 + 32)...")
message_body = zlib.decompress(request.body.read(), 15 + 32) message_body = zlib.decompress(request.body.read(), 15 + 32)
except zlib.error: except zlib.error:
logger.error('zlib.error, trying zlib.decompress (-15)') logger.error("zlib.error, trying zlib.decompress (-15)")
# Negative wbits suppresses adler32 checksumming. # Negative wbits suppresses adler32 checksumming.
message_body = zlib.decompress(request.body.read(), -15) message_body = zlib.decompress(request.body.read(), -15)
logger.debug('Resulting message_body:\n%s\n', message_body) logger.debug("Resulting message_body:\n%s\n", message_body)
# At this point, we're not sure whether we're dealing with a straight # At this point, we're not sure whether we're dealing with a straight
# un-encoded POST body, or a form-encoded POST. Attempt to parse the # un-encoded POST body, or a form-encoded POST. Attempt to parse the
# body. If it's not form-encoded, this will return an empty dict. # body. If it's not form-encoded, this will return an empty dict.
form_enc_parsed = parse_qs(message_body) form_enc_parsed = parse_qs(message_body)
if form_enc_parsed: if form_enc_parsed:
logger.info('Request is form-encoded, compressed, from %s', get_remote_address()) logger.info("Request is form-encoded, compressed, from %s", get_remote_address())
# This is a form-encoded POST. The value of the data attrib will # This is a form-encoded POST. The value of the data attrib will
# be the body we're looking for. # be the body we're looking for.
try: try:
message_body = form_enc_parsed[b'data'][0] message_body = form_enc_parsed[b"data"][0]
except (KeyError, IndexError): except (KeyError, IndexError):
logger.error( logger.error(
'form-encoded, compressed, upload did not contain a "data" key. From %s', "form-encoded, compressed, upload did not contain a 'data' key. From %s",
get_remote_address() get_remote_address()
) )
raise MalformedUploadError( raise MalformedUploadError(
@ -208,21 +208,21 @@ def get_decompressed_message() -> bytes:
) )
else: else:
logger.debug('Request is *NOT* form-encoded') logger.debug("Request is *NOT* form-encoded")
else: else:
logger.debug('Content-Encoding indicates *not* compressed...') logger.debug("Content-Encoding indicates *not* compressed...")
# Uncompressed request. Bottle handles all of the parsing of the # Uncompressed request. Bottle handles all of the parsing of the
# POST key/vals, or un-encoded body. # POST key/vals, or un-encoded body.
data_key = request.forms.get('data') data_key = request.forms.get("data")
if data_key: if data_key:
logger.info('Request is form-encoded, uncompressed, from %s', get_remote_address()) logger.info("Request is form-encoded, uncompressed, from %s", get_remote_address())
# This is a form-encoded POST. Support the silly people. # This is a form-encoded POST. Support the silly people.
message_body = data_key message_body = data_key
else: else:
logger.debug('Plain POST request detected...') logger.debug("Plain POST request detected...")
# This is a non form-encoded POST body. # This is a non form-encoded POST body.
message_body = request.body.read() message_body = request.body.read()
@ -244,13 +244,13 @@ def parse_and_error_handle(data: bytes) -> str:
# semi-useful error message, so do so. # semi-useful error message, so do so.
try: try:
logger.error( logger.error(
'Error - JSON parse failed (%d, "%s", "%s", "%s", "%s", "%s") from %s:\n%s\n', "Error - JSON parse failed (%d, '%s', '%s', '%s', '%s', '%s') from %s:\n%s\n",
request.content_length, request.content_length,
'<<UNKNOWN>>', "<<UNKNOWN>>",
'<<UNKNOWN>>', "<<UNKNOWN>>",
'<<UNKNOWN>>', "<<UNKNOWN>>",
'<<UNKNOWN>>', "<<UNKNOWN>>",
'<<UNKNOWN>>', "<<UNKNOWN>>",
get_remote_address(), get_remote_address(),
data[:512] data[:512]
) )
@ -262,11 +262,11 @@ def parse_and_error_handle(data: bytes) -> str:
response.status = 400 response.status = 400
logger.error(f"Error to {get_remote_address()}: {exc}") logger.error(f"Error to {get_remote_address()}: {exc}")
return 'FAIL: JSON parsing: ' + str(exc) return "FAIL: JSON parsing: " + str(exc)
# Here we check if an outdated schema has been passed # Here we check if an outdated schema has been passed
if parsed_message["$schemaRef"] in Settings.GATEWAY_OUTDATED_SCHEMAS: if parsed_message["$schemaRef"] in Settings.GATEWAY_OUTDATED_SCHEMAS:
response.status = '426 Upgrade Required' # Bottle (and underlying httplib) don't know this one response.status = "426 Upgrade Required" # Bottle (and underlying httplib) don't know this one
stats_collector.tally("outdated") stats_collector.tally("outdated")
return "FAIL: Outdated Schema: The schema you have used is no longer supported. Please check for an updated " \ return "FAIL: Outdated Schema: The schema you have used is no longer supported. Please check for an updated " \
"version of your application." "version of your application."
@ -274,16 +274,16 @@ def parse_and_error_handle(data: bytes) -> str:
validation_results = validator.validate(parsed_message) validation_results = validator.validate(parsed_message)
if validation_results.severity <= ValidationSeverity.WARN: if validation_results.severity <= ValidationSeverity.WARN:
parsed_message['header']['gatewayTimestamp'] = datetime.utcnow().isoformat() + 'Z' parsed_message["header"]["gatewayTimestamp"] = datetime.utcnow().isoformat() + "Z"
parsed_message['header']['uploaderIP'] = get_remote_address() parsed_message["header"]["uploaderIP"] = get_remote_address()
# Sends the parsed message to the Relay/Monitor as compressed JSON. # Sends the parsed message to the Relay/Monitor as compressed JSON.
gevent.spawn(push_message, parsed_message, parsed_message['$schemaRef']) gevent.spawn(push_message, parsed_message, parsed_message["$schemaRef"])
try: try:
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501 uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
logger.info( logger.info(
'Accepted (%d, "%s", "%s", "%s", "%s", "%s") from %s', "Accepted (%d, '%s', '%s', '%s', '%s', '%s') from %s",
request.content_length, request.content_length,
uploader_id, software_name, software_version, schema_ref, journal_event, uploader_id, software_name, software_version, schema_ref, journal_event,
get_remote_address() get_remote_address()
@ -294,13 +294,13 @@ def parse_and_error_handle(data: bytes) -> str:
print(f"Logging of Accepted request failed: {str(e)}") print(f"Logging of Accepted request failed: {str(e)}")
pass pass
return 'OK' return "OK"
else: else:
try: try:
uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501 uploader_id, software_name, software_version, schema_ref, journal_event = extract_message_details(parsed_message) # noqa: E501
logger.error( logger.error(
'Failed Validation "%s" (%d, "%s", "%s", "%s", "%s", "%s") from %s', "Failed Validation '%s' (%d, '%s', '%s', '%s', '%s', '%s') from %s",
str(validation_results.messages), str(validation_results.messages),
request.content_length, request.content_length,
uploader_id, software_name, software_version, schema_ref, journal_event, uploader_id, software_name, software_version, schema_ref, journal_event,
@ -317,7 +317,7 @@ def parse_and_error_handle(data: bytes) -> str:
return "FAIL: Schema Validation: " + str(validation_results.messages) return "FAIL: Schema Validation: " + str(validation_results.messages)
@app.route('/upload/', method=['OPTIONS', 'POST']) @app.route("/upload/", method=["OPTIONS", "POST"])
def upload() -> str: def upload() -> str:
""" """
Handle an /upload/ request. Handle an /upload/ request.
@ -335,8 +335,8 @@ def upload() -> str:
response.status = 400 response.status = 400
try: try:
logger.error( logger.error(
f'gzip error ({request.content_length}, "<<UNKNOWN>>", "<<UNKNOWN>>", "<<UNKNOWN>>",' f"gzip error ({request.content_length}, "<<UNKNOWN>>", "<<UNKNOWN>>", "<<UNKNOWN>>","
' "<<UNKNOWN>>", "<<UNKNOWN>>") from {get_remote_address()}' " "<<UNKNOWN>>", "<<UNKNOWN>>") from {get_remote_address()}"
) )
except Exception as e: except Exception as e:
@ -344,7 +344,7 @@ def upload() -> str:
print(f"Logging of 'gzip error' failed: {str(e)}") print(f"Logging of 'gzip error' failed: {str(e)}")
pass pass
return 'FAIL: zlib.error: ' + str(exc) return "FAIL: zlib.error: " + str(exc)
except MalformedUploadError as exc: except MalformedUploadError as exc:
# They probably sent an encoded POST, but got the key/val wrong. # They probably sent an encoded POST, but got the key/val wrong.
@ -352,16 +352,16 @@ def upload() -> str:
# TODO: Maybe just `{exc}` ? # TODO: Maybe just `{exc}` ?
logger.error("MalformedUploadError from %s: %s", get_remote_address(), str(exc)) logger.error("MalformedUploadError from %s: %s", get_remote_address(), str(exc))
return 'FAIL: Malformed Upload: ' + str(exc) return "FAIL: Malformed Upload: " + str(exc)
stats_collector.tally("inbound") stats_collector.tally("inbound")
return parse_and_error_handle(message_body) return parse_and_error_handle(message_body)
@app.route('/health_check/', method=['OPTIONS', 'GET']) @app.route("/health_check/", method=["OPTIONS", "GET"])
def health_check() -> str: def health_check() -> str:
""" """
Return our version string in as an 'am I awake' signal. Return our version string in as an "am I awake" signal.
This should only be used by the gateway monitoring script. It is used This should only be used by the gateway monitoring script. It is used
to detect whether the gateway is still alive, and whether it should remain to detect whether the gateway is still alive, and whether it should remain
@ -372,7 +372,7 @@ def health_check() -> str:
return Settings.EDDN_VERSION return Settings.EDDN_VERSION
@app.route('/stats/', method=['OPTIONS', 'GET']) @app.route("/stats/", method=["OPTIONS", "GET"])
def stats() -> str: def stats() -> str:
""" """
Return some stats about the Gateway's operation so far. Return some stats about the Gateway's operation so far.
@ -405,16 +405,16 @@ def apply_cors() -> None:
:return: :return:
""" """
response.set_header( response.set_header(
'Access-Control-Allow-Origin', "Access-Control-Allow-Origin",
'*' "*"
) )
response.set_header( response.set_header(
'Access-Control-Allow-Methods', "Access-Control-Allow-Methods",
'GET, POST, PUT, OPTIONS' "GET, POST, PUT, OPTIONS"
) )
response.set_header( response.set_header(
'Access-Control-Allow-Headers', "Access-Control-Allow-Headers",
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' "Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token"
) )
@ -427,15 +427,15 @@ def main() -> None:
load_config(cl_args) load_config(cl_args)
configure() configure()
app.add_hook('after_request', apply_cors) app.add_hook("after_request", apply_cors)
app.run( app.run(
host=Settings.GATEWAY_HTTP_BIND_ADDRESS, host=Settings.GATEWAY_HTTP_BIND_ADDRESS,
port=Settings.GATEWAY_HTTP_PORT, port=Settings.GATEWAY_HTTP_PORT,
server='gevent', server="gevent",
certfile=Settings.CERT_FILE, certfile=Settings.CERT_FILE,
keyfile=Settings.KEY_FILE keyfile=Settings.KEY_FILE
) )
if __name__ == '__main__': if __name__ == "__main__":
main() main()