From 86313dba286d4cc87ab987f126bc01fc88a2a5cd Mon Sep 17 00:00:00 2001 From: Athanasius Date: Sat, 12 Mar 2022 10:59:52 +0000 Subject: [PATCH] Bouncer: Full flake8 and mypy pass --- src/eddn/Bouncer.py | 155 ++++++++++++++++++++++++++++---------------- 1 file changed, 98 insertions(+), 57 deletions(-) diff --git a/src/eddn/Bouncer.py b/src/eddn/Bouncer.py index 82b5921..652d3da 100644 --- a/src/eddn/Bouncer.py +++ b/src/eddn/Bouncer.py @@ -27,24 +27,21 @@ Architecture: Gateway. """ import argparse -import gevent -import hashlib import logging +import zlib +from datetime import datetime + +import gevent import requests import simplejson import urlparse -import zlib -from datetime import datetime -from functools import wraps - -from pkg_resources import resource_string -# import os +from bottle import Bottle, request, response +from gevent import monkey from eddn.conf.Settings import Settings, load_config -from gevent import monkey monkey.patch_all() -from bottle import Bottle, run, request, response, get, post + app = Bottle() logger = logging.getLogger(__name__) @@ -61,10 +58,7 @@ logger.info('Made logger') # This import must be done post-monkey-patching! -from eddn.core.StatsCollector import StatsCollector -statsCollector = StatsCollector() -statsCollector.start() - +from eddn.core.StatsCollector import StatsCollector # noqa: E402 def parse_cl_args(): parser = argparse.ArgumentParser( @@ -86,11 +80,17 @@ def parse_cl_args(): return parser.parse_args() -def push_message(message_body): +stats_collector = StatsCollector() +stats_collector.start() + + +def push_message(message_body: str) -> None: """ + Push a message our to subscribed listeners. + Spawned as a greenlet to push messages (strings) through ZeroMQ. - This is a dumb method that just pushes strings; it assumes you've already validated - and serialised as you want to. + This is a dumb method that just pushes strings; it assumes you've already + validated and serialised as you want to. """ try: r = requests.post( @@ -103,27 +103,28 @@ def push_message(message_body): else: if r.status_code != requests.codes.ok: - logger.error('Response from %s:\n%s\n' % (BOUNCER_LIVE_GATEWAY_URL, - r.text)) + logger.error(f'Response from {Settings.BOUNCER_LIVE_GATEWAY_URL}:\n{r.text}\n') else: - statsCollector.tally("outbound") + stats_collector.tally("outbound") -def get_remote_address(): +def get_remote_address() -> str: """ - Determines the address of the uploading client. First checks the for - proxy-forwarded headers, then falls back to request.remote_addr. - :rtype: str + Determine the address of the uploading client. + + First checks the for proxy-forwarded headers, then falls back to + request.remote_addr. + :returns: Best attempt at remote address. """ return request.headers.get('X-Forwarded-For', request.remote_addr) -def get_decompressed_message(): +def get_decompressed_message() -> bytes: """ - For upload formats that support it, detect gzip Content-Encoding headers - and de-compress on the fly. - :rtype: str + Detect gzip Content-Encoding headers and de-compress on the fly. + + For upload formats that support it. :returns: The de-compressed request body. """ content_encoding = request.headers.get('Content-Encoding', '') @@ -166,17 +167,27 @@ def get_decompressed_message(): return message_body -def forward_message(message_body): +def forward_message(message_body: bytes) -> str: + """ + Send the parsed message to the Relay/Monitor as compressed JSON. + + :param message_body: Incoming message. + :returns: 'OK' assuming it is. + """ # TODO: This instead needs to send the message to remote Gateway - # Sends the parsed message to the Relay/Monitor as compressed JSON. gevent.spawn(push_message, message_body) - logger.info("Accepted upload from %s" % ( - get_remote_address() - )) + logger.info(f'Accepted upload from {get_remote_address()}') + return 'OK' + @app.route('/upload/', method=['OPTIONS', 'POST']) -def upload(): +def upload() -> str: + """ + Handle an /upload/ request. + + :returns: The processed message, else error string. + """ try: # Body may or may not be compressed. message_body = get_decompressed_message() @@ -186,55 +197,80 @@ def upload(): # at least some kind of feedback for them to try to get pointed in # the correct direction. response.status = 400 - logger.error("gzip error with %s: %s" % (get_remote_address(), exc.message)) - return exc.message + logger.error(f'gzip error with {get_remote_address()}: {exc}') + + return f'{exc}' except MalformedUploadError as exc: # They probably sent an encoded POST, but got the key/val wrong. response.status = 400 - logger.error("Error to %s: %s" % (get_remote_address(), exc.message)) - return exc.message + logger.error(f'Error to {get_remote_address()}: {exc}') - statsCollector.tally("inbound") + return f'{exc}' + + stats_collector.tally("inbound") return forward_message(message_body) @app.route('/health_check/', method=['OPTIONS', 'GET']) -def health_check(): +def health_check() -> str: """ + Return our version string in as an 'am I awake' signal. + This should only be used by the gateway monitoring script. It is used to detect whether the gateway is still alive, and whether it should remain in the DNS rotation. + + :returns: Version of this software. """ return Settings.EDDN_VERSION @app.route('/stats/', method=['OPTIONS', 'GET']) -def stats(): - stats = statsCollector.getSummary() - stats["version"] = Settings.EDDN_VERSION - return simplejson.dumps(stats) +def stats() -> str: + """ + Return some stats about the Gateway's operation so far. + + :return: JSON stats data + """ + stats_current = stats_collector.getSummary() + stats_current["version"] = Settings.EDDN_VERSION + + return simplejson.dumps(stats_current) class MalformedUploadError(Exception): """ + Exception for malformed upload. + Raise this when an upload is structurally incorrect. This isn't so much to do with something like a bogus region ID, this is more like "You are missing a POST key/val, or a body". """ + pass class EnableCors(object): + """Handle enabling CORS headers in all responses.""" + name = 'enable_cors' api = 2 def apply(self, fn, context): + """ + Apply CORS headers to the calling bottle app. + + :param fn: + :param context: + :return: + """ def _enable_cors(*args, **kwargs): # set CORS headers response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' - response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' + response.headers['Access-Control-Allow-Headers'] = \ + 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' if request.method != 'OPTIONS': # actual request; reply with the actual response @@ -244,11 +280,19 @@ class EnableCors(object): class CustomLogging(object): - """Wrap a Bottle request so that a log line is emitted after it's handled. """ + """Wrap a Bottle request so that a log line is emitted after it's handled.""" + name = 'custom_logging' api = 2 def apply(self, fn, context): + """ + Apply custom logging to bottle request. + + :param fn: + :param context: + :return: + """ def _log_to_logger(*args, **kwargs): request_time = datetime.utcnow() actual_response = fn(*args, **kwargs) @@ -259,19 +303,15 @@ class CustomLogging(object): else: remote_addr = request.remote_addr - - logger.info('%s %s %s %s %s' % (remote_addr, - request_time, - request.method, - request.url, - response.status) - ) + + logger.info(f'{remote_addr} {request_time} {request.method} {request.url} {response.status}') return actual_response return _log_to_logger -def main(): +def main() -> None: + """Handle setting up and running the bottle app.""" cl_args = parse_cl_args() if cl_args.loglevel: logger.setLevel(cl_args.loglevel) @@ -285,13 +325,14 @@ def main(): app.install(CustomLogging()) logger.info('Running bottle app ...') app.run( - host=Settings.BOUNCER_HTTP_BIND_ADDRESS, - port=Settings.BOUNCER_HTTP_PORT, - server='gevent', + host=Settings.BOUNCER_HTTP_BIND_ADDRESS, + port=Settings.BOUNCER_HTTP_PORT, + server='gevent', certfile=Settings.CERT_FILE, keyfile=Settings.KEY_FILE, quiet=True, ) + if __name__ == '__main__': main()