Relay: Full flake8/mypy pass

This commit is contained in:
Athanasius 2022-03-12 10:54:57 +00:00
parent 06e9442bea
commit 3b5d16c674
No known key found for this signature in database
GPG Key ID: 8C392035DD80FD62

View File

@ -1,22 +1,14 @@
# coding: utf8
"""
Relays sit below an announcer, or another relay, and simply repeat what
they receive over PUB/SUB.
"""
"""EDDN Relay, which passes messages from the Gateway to listeners."""
import argparse
import gevent
import hashlib
import logging
import simplejson
import time
import uuid
import zlib
from threading import Thread
import zmq.green as zmq
# Logging has to be configured first before we do anything.
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -30,24 +22,29 @@ __logger_channel.setFormatter(__logger_formatter)
logger.addHandler(__logger_channel)
logger.info('Made logger')
import gevent
import simplejson
import zmq.green as zmq
from bottle import Bottle, request, response
from gevent import monkey
from eddn.conf.Settings import Settings, loadConfig
from gevent import monkey
monkey.patch_all()
from bottle import Bottle, get, request, response, run
app = Bottle()
# This import must be done post-monkey-patching!
from eddn.core.StatsCollector import StatsCollector
statsCollector = StatsCollector()
statsCollector.start()
from eddn.core.StatsCollector import StatsCollector # noqa: E402
stats_collector = StatsCollector()
stats_collector.start()
# This import must be done post-monkey-patching!
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
from eddn.core.DuplicateMessages import DuplicateMessages
duplicateMessages = DuplicateMessages()
duplicateMessages.start()
duplicate_messages = DuplicateMessages()
duplicate_messages.start()
def parse_cl_args():
@ -71,13 +68,19 @@ def parse_cl_args():
return parser.parse_args()
@app.route('/stats/', method=['OPTIONS', 'GET'])
def stats():
stats = statsCollector.getSummary()
def stats() -> str:
"""
Return some stats about the Relay's operation so far.
:return: JSON stats data
"""
stats = stats_collector.getSummary()
stats["version"] = Settings.EDDN_VERSION
return simplejson.dumps(stats)
class Relay(Thread):
"""Relay thread class."""
REGENERATE_UPLOADER_NONCE_INTERVAL = 12 * 60 * 60 # 12 hrs
@ -87,20 +90,26 @@ class Relay(Thread):
self.uploader_nonce_timestamp = 0
self.generate_uploader_nonce()
def generate_uploader_nonce(self):
def generate_uploader_nonce(self) -> None:
"""Generate an uploader nonce."""
self.uploader_nonce = str(uuid.uuid4())
self.uploader_nonce_timestamp = time.time()
def scramble_uploader(self, uploader):
def scramble_uploader(self, uploader: str) -> str:
"""
Scramble an uploader ID.
:param uploader: Plain text uploaderID.
:return: Scrambled version of uploader.
"""
now = time.time()
if now - self.uploader_nonce_timestamp > self.REGENERATE_UPLOADER_NONCE_INTERVAL:
self.generate_uploader_nonce()
return hashlib.sha1("{}-{}".format(self.uploader_nonce, uploader.encode('utf8'))).hexdigest()
def run(self):
"""
Fires up the relay process.
"""
return hashlib.sha1(f"{self.uploader_nonce!r}-{uploader.encode}".encode('utf8')).hexdigest()
def run(self) -> None: # noqa: CCR001
"""Handle receiving messages from Gateway and passing them on."""
# These form the connection to the Gateway daemon(s) upstream.
context = zmq.Context()
@ -108,10 +117,10 @@ class Relay(Thread):
# Filters on topics or not...
if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON is True:
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
for schema_ref, schema_file in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schema_ref)
for schema_ref, schema_file in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schema_ref)
else:
receiver.setsockopt(zmq.SUBSCRIBE, '')
@ -126,29 +135,29 @@ class Relay(Thread):
# End users, or other relays, may attach here.
sender.bind(binding)
def relay_worker(message):
def relay_worker(message: bytes) -> None:
"""
This is the worker function that re-sends the incoming messages out
to any subscribers.
:param str message: A JSON string to re-broadcast.
Worker that resends messages to any subscribers.
:param message: Message to be passed on.
"""
# Separate topic from message
message = message.split(' |-| ')
message_split = message.split(b' |-| ')
# Handle gateway not sending topic
if len(message) > 1:
message = message[1]
if len(message_split) > 1:
message = message_split[1]
else:
message = message[0]
message = message_split[0]
message = zlib.decompress(message)
json = simplejson.loads(message)
message_text = zlib.decompress(message)
json = simplejson.loads(message_text)
# Handle duplicate message
if Settings.RELAY_DUPLICATE_MAX_MINUTES:
if duplicateMessages.isDuplicated(json):
if duplicate_messages.isDuplicated(json):
# We've already seen this message recently. Discard it.
statsCollector.tally("duplicate")
stats_collector.tally("duplicate")
return
# Mask the uploader with a randomised nonce but still make it unique
@ -161,21 +170,21 @@ class Relay(Thread):
del json['header']['uploaderIP']
# Convert message back to JSON
message = simplejson.dumps(json, sort_keys=True)
message_json = simplejson.dumps(json, sort_keys=True)
# Recompress message
message = zlib.compress(message)
message = zlib.compress(message_json.encode('utf8'))
# Send message
sender.send(message)
statsCollector.tally("outbound")
stats_collector.tally("outbound")
while True:
# For each incoming message, spawn a greenlet using the relay_worker
# function.
inboundMessage = receiver.recv()
statsCollector.tally("inbound")
gevent.spawn(relay_worker, inboundMessage)
inbound_message = receiver.recv()
stats_collector.tally("inbound")
gevent.spawn(relay_worker, inbound_message)
class EnableCors(object):
@ -184,7 +193,8 @@ class EnableCors(object):
name = 'enable_cors'
api = 2
def apply(self, fn, context):
@staticmethod
def apply(fn, context):
"""
Apply a CORS handler.
@ -194,7 +204,8 @@ class EnableCors(object):
"""Set CORS Headers."""
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
response.headers['Access-Control-Allow-Headers'] = \
'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
if request.method != 'OPTIONS':
# actual request; reply with the actual response
@ -203,7 +214,9 @@ class EnableCors(object):
return _enable_cors
def main():
<<<<<<< HEAD
def main() -> None:
"""Handle setting up and running the bottle app."""
cl_args = parse_cl_args()
if cl_args.loglevel:
logger.setLevel(cl_args.loglevel)