From 3b5d16c6747ee8ff4d268fc202a120d856b5ae3d Mon Sep 17 00:00:00 2001 From: Athanasius Date: Sat, 12 Mar 2022 10:54:57 +0000 Subject: [PATCH] Relay: Full flake8/mypy pass --- src/eddn/Relay.py | 115 ++++++++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 4fbd990..40ac605 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -1,22 +1,14 @@ # coding: utf8 -""" -Relays sit below an announcer, or another relay, and simply repeat what -they receive over PUB/SUB. -""" +"""EDDN Relay, which passes messages from the Gateway to listeners.""" import argparse -import gevent import hashlib import logging -import simplejson import time import uuid import zlib from threading import Thread -import zmq.green as zmq - - # Logging has to be configured first before we do anything. logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -30,24 +22,29 @@ __logger_channel.setFormatter(__logger_formatter) logger.addHandler(__logger_channel) logger.info('Made logger') +import gevent +import simplejson +import zmq.green as zmq +from bottle import Bottle, request, response +from gevent import monkey + from eddn.conf.Settings import Settings, loadConfig -from gevent import monkey monkey.patch_all() -from bottle import Bottle, get, request, response, run + app = Bottle() - # This import must be done post-monkey-patching! -from eddn.core.StatsCollector import StatsCollector -statsCollector = StatsCollector() -statsCollector.start() +from eddn.core.StatsCollector import StatsCollector # noqa: E402 + +stats_collector = StatsCollector() +stats_collector.start() # This import must be done post-monkey-patching! if Settings.RELAY_DUPLICATE_MAX_MINUTES: from eddn.core.DuplicateMessages import DuplicateMessages - duplicateMessages = DuplicateMessages() - duplicateMessages.start() + duplicate_messages = DuplicateMessages() + duplicate_messages.start() def parse_cl_args(): @@ -71,13 +68,19 @@ def parse_cl_args(): return parser.parse_args() @app.route('/stats/', method=['OPTIONS', 'GET']) -def stats(): - stats = statsCollector.getSummary() +def stats() -> str: + """ + Return some stats about the Relay's operation so far. + + :return: JSON stats data + """ + stats = stats_collector.getSummary() stats["version"] = Settings.EDDN_VERSION return simplejson.dumps(stats) class Relay(Thread): + """Relay thread class.""" REGENERATE_UPLOADER_NONCE_INTERVAL = 12 * 60 * 60 # 12 hrs @@ -87,20 +90,26 @@ class Relay(Thread): self.uploader_nonce_timestamp = 0 self.generate_uploader_nonce() - def generate_uploader_nonce(self): + def generate_uploader_nonce(self) -> None: + """Generate an uploader nonce.""" self.uploader_nonce = str(uuid.uuid4()) self.uploader_nonce_timestamp = time.time() - def scramble_uploader(self, uploader): + def scramble_uploader(self, uploader: str) -> str: + """ + Scramble an uploader ID. + + :param uploader: Plain text uploaderID. + :return: Scrambled version of uploader. + """ now = time.time() if now - self.uploader_nonce_timestamp > self.REGENERATE_UPLOADER_NONCE_INTERVAL: self.generate_uploader_nonce() - return hashlib.sha1("{}-{}".format(self.uploader_nonce, uploader.encode('utf8'))).hexdigest() - def run(self): - """ - Fires up the relay process. - """ + return hashlib.sha1(f"{self.uploader_nonce!r}-{uploader.encode}".encode('utf8')).hexdigest() + + def run(self) -> None: # noqa: CCR001 + """Handle receiving messages from Gateway and passing them on.""" # These form the connection to the Gateway daemon(s) upstream. context = zmq.Context() @@ -108,10 +117,10 @@ class Relay(Thread): # Filters on topics or not... if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON is True: - for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): - receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) - for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems(): - receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) + for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): + receiver.setsockopt(zmq.SUBSCRIBE, schema_ref) + for schema_ref, schema_file in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems(): + receiver.setsockopt(zmq.SUBSCRIBE, schema_ref) else: receiver.setsockopt(zmq.SUBSCRIBE, '') @@ -126,29 +135,29 @@ class Relay(Thread): # End users, or other relays, may attach here. sender.bind(binding) - def relay_worker(message): + def relay_worker(message: bytes) -> None: """ - This is the worker function that re-sends the incoming messages out - to any subscribers. - :param str message: A JSON string to re-broadcast. + Worker that resends messages to any subscribers. + + :param message: Message to be passed on. """ # Separate topic from message - message = message.split(' |-| ') + message_split = message.split(b' |-| ') # Handle gateway not sending topic - if len(message) > 1: - message = message[1] + if len(message_split) > 1: + message = message_split[1] else: - message = message[0] + message = message_split[0] - message = zlib.decompress(message) - json = simplejson.loads(message) + message_text = zlib.decompress(message) + json = simplejson.loads(message_text) # Handle duplicate message if Settings.RELAY_DUPLICATE_MAX_MINUTES: - if duplicateMessages.isDuplicated(json): + if duplicate_messages.isDuplicated(json): # We've already seen this message recently. Discard it. - statsCollector.tally("duplicate") + stats_collector.tally("duplicate") return # Mask the uploader with a randomised nonce but still make it unique @@ -161,21 +170,21 @@ class Relay(Thread): del json['header']['uploaderIP'] # Convert message back to JSON - message = simplejson.dumps(json, sort_keys=True) + message_json = simplejson.dumps(json, sort_keys=True) # Recompress message - message = zlib.compress(message) + message = zlib.compress(message_json.encode('utf8')) # Send message sender.send(message) - statsCollector.tally("outbound") + stats_collector.tally("outbound") while True: # For each incoming message, spawn a greenlet using the relay_worker # function. - inboundMessage = receiver.recv() - statsCollector.tally("inbound") - gevent.spawn(relay_worker, inboundMessage) + inbound_message = receiver.recv() + stats_collector.tally("inbound") + gevent.spawn(relay_worker, inbound_message) class EnableCors(object): @@ -184,7 +193,8 @@ class EnableCors(object): name = 'enable_cors' api = 2 - def apply(self, fn, context): + @staticmethod + def apply(fn, context): """ Apply a CORS handler. @@ -194,7 +204,8 @@ class EnableCors(object): """Set CORS Headers.""" response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' - response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' + response.headers['Access-Control-Allow-Headers'] = \ + 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' if request.method != 'OPTIONS': # actual request; reply with the actual response @@ -203,7 +214,9 @@ class EnableCors(object): return _enable_cors -def main(): +<<<<<<< HEAD +def main() -> None: + """Handle setting up and running the bottle app.""" cl_args = parse_cl_args() if cl_args.loglevel: logger.setLevel(cl_args.loglevel)