From 555c9dd813451beb297b239611ee3ce33627b0ae Mon Sep 17 00:00:00 2001 From: Athanasius Date: Thu, 8 Jul 2021 12:16:50 +0000 Subject: [PATCH] Bouncer: Accept messages as if a Gateway, but then forward them to another This is to be used during migration, running on the old host so as to forward all messages to the new Gateway. The destination is hard-coded in `LIVE_GATEWAY_URL`. --- src/eddn/Bouncer.py | 268 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 src/eddn/Bouncer.py diff --git a/src/eddn/Bouncer.py b/src/eddn/Bouncer.py new file mode 100644 index 0000000..5ef64fd --- /dev/null +++ b/src/eddn/Bouncer.py @@ -0,0 +1,268 @@ +# coding: utf8 + +""" +Bouncer script to forward received messages to a Gateway. + +This is intended to be used if performing a server migration of the live +service. + +DNS updates can be slow, especially with multiple layers of caching +involved, some of which might not respect declared TTLs. So use this to +listen for new messages from EDDN client/senders and forward them on to +where the new Gateway is running. + +Architecture: + + 1. The /upload/ bottle route, `upload()`, receives messages, as if this + was a Gateway. + + 2. The payload is extracted using `get_decompressed_message()`, as the + message *may* be compressed, but might not be. + + 3. The result of that, which XXX, + is passed on to forward_message() + which spawns a greenlet to actually send the message on using + `push_message()`. + + 4. `push_message()` then sends the message to the configured live/real + Gateway. +""" +import gevent +import hashlib +import logging +import requests +import simplejson +import urlparse +import zlib +from datetime import datetime +from functools import wraps + +from pkg_resources import resource_string +# import os + +from eddn.conf.Settings import Settings, loadConfig +from eddn.core.Validator import Validator, ValidationSeverity + +from gevent import monkey +monkey.patch_all() +from bottle import Bottle, run, request, response, get, post +app = Bottle() + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler()) +logger.info('Made logger') + + +# This import must be done post-monkey-patching! +from eddn.core.StatsCollector import StatsCollector +statsCollector = StatsCollector() +statsCollector.start() + +LIVE_GATEWAY_URL = 'https://beta.eddn.edcd.io:4431/upload/' + +def push_message(message_body): + """ + Spawned as a greenlet to push messages (strings) through ZeroMQ. + This is a dumb method that just pushes strings; it assumes you've already validated + and serialised as you want to. + """ + try: + r = requests.post( + LIVE_GATEWAY_URL, + data=message_body, + ) + + except Exception as e: + logger.error('Failed sending message on', exc_info=e) + + else: + if r.status_code != requests.codes.ok: + logger.error('Response from %s:\n%s\n' % (LIVE_GATEWAY_URL, + r.text)) + + else: + statsCollector.tally("outbound") + + +def get_remote_address(): + """ + Determines the address of the uploading client. First checks the for + proxy-forwarded headers, then falls back to request.remote_addr. + :rtype: str + """ + return request.headers.get('X-Forwarded-For', request.remote_addr) + + +def get_decompressed_message(): + """ + For upload formats that support it, detect gzip Content-Encoding headers + and de-compress on the fly. + :rtype: str + :returns: The de-compressed request body. + """ + content_encoding = request.headers.get('Content-Encoding', '') + + if content_encoding in ['gzip', 'deflate']: + # Compressed request. We have to decompress the body, then figure out + # if it's form-encoded. + try: + # Auto header checking. + message_body = zlib.decompress(request.body.read(), 15 + 32) + except zlib.error: + # Negative wbits suppresses adler32 checksumming. + message_body = zlib.decompress(request.body.read(), -15) + + # At this point, we're not sure whether we're dealing with a straight + # un-encoded POST body, or a form-encoded POST. Attempt to parse the + # body. If it's not form-encoded, this will return an empty dict. + form_enc_parsed = urlparse.parse_qs(message_body) + if form_enc_parsed: + # This is a form-encoded POST. The value of the data attrib will + # be the body we're looking for. + try: + message_body = form_enc_parsed['data'][0] + except (KeyError, IndexError): + raise MalformedUploadError( + "No 'data' POST key/value found. Check your POST key " + "name for spelling, and make sure you're passing a value." + ) + else: + # Uncompressed request. Bottle handles all of the parsing of the + # POST key/vals, or un-encoded body. + data_key = request.forms.get('data') + if data_key: + # This is a form-encoded POST. Support the silly people. + message_body = data_key + else: + # This is a non form-encoded POST body. + message_body = request.body.read() + + return message_body + + +def forward_message(message_body): + # TODO: This instead needs to send the message to remote Gateway + # Sends the parsed message to the Relay/Monitor as compressed JSON. + gevent.spawn(push_message, message_body) + logger.info("Accepted upload from %s" % ( + get_remote_address() + )) + return 'OK' + +@app.route('/upload/', method=['OPTIONS', 'POST']) +def upload(): + try: + # Body may or may not be compressed. + message_body = get_decompressed_message() + + except zlib.error as exc: + # Some languages and libs do a crap job zlib compressing stuff. Provide + # at least some kind of feedback for them to try to get pointed in + # the correct direction. + response.status = 400 + logger.error("gzip error with %s: %s" % (get_remote_address(), exc.message)) + return exc.message + + except MalformedUploadError as exc: + # They probably sent an encoded POST, but got the key/val wrong. + response.status = 400 + logger.error("Error to %s: %s" % (get_remote_address(), exc.message)) + return exc.message + + statsCollector.tally("inbound") + return forward_message(message_body) + + +@app.route('/health_check/', method=['OPTIONS', 'GET']) +def health_check(): + """ + This should only be used by the gateway monitoring script. It is used + to detect whether the gateway is still alive, and whether it should remain + in the DNS rotation. + """ + return Settings.EDDN_VERSION + + +@app.route('/stats/', method=['OPTIONS', 'GET']) +def stats(): + stats = statsCollector.getSummary() + stats["version"] = Settings.EDDN_VERSION + return simplejson.dumps(stats) + + +class MalformedUploadError(Exception): + """ + Raise this when an upload is structurally incorrect. This isn't so much + to do with something like a bogus region ID, this is more like "You are + missing a POST key/val, or a body". + """ + pass + + +class EnableCors(object): + name = 'enable_cors' + api = 2 + + def apply(self, fn, context): + def _enable_cors(*args, **kwargs): + # set CORS headers + response.headers['Access-Control-Allow-Origin'] = '*' + response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' + response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' + + if request.method != 'OPTIONS': + # actual request; reply with the actual response + return fn(*args, **kwargs) + + return _enable_cors + + +class CustomLogging(object): + """Wrap a Bottle request so that a log line is emitted after it's handled. """ + name = 'custom_logging' + api = 2 + + def apply(self, fn, context): + def _log_to_logger(*args, **kwargs): + request_time = datetime.utcnow() + actual_response = fn(*args, **kwargs) + + # logger.info('Request:\n%s\n' % (request ) ) + if len(request.remote_route) > 1: + remote_addr = request.remote_route[1] + + else: + remote_addr = request.remote_addr + + logger.info('%s %s %s %s %s' % (remote_addr, + request_time, + request.method, + request.url, + response.status) + ) + + return actual_response + + return _log_to_logger + +def main(): + logger.info('Loading config...') + loadConfig() + + logger.info('Installing EnableCors ...') + app.install(EnableCors()) + logger.info('Installing CustomLogging ...') + app.install(CustomLogging()) + logger.info('Running bottle app ...') + app.run( + host=Settings.GATEWAY_HTTP_BIND_ADDRESS, + port=Settings.GATEWAY_HTTP_PORT, + server='gevent', + certfile=Settings.CERT_FILE, + keyfile=Settings.KEY_FILE, + quiet=True, + ) + +if __name__ == '__main__': + main()