diff --git a/plugins/eddn.py b/plugins/eddn.py index 505039c4..22152f83 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -22,6 +22,7 @@ # # ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# # ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# +import http import itertools import json import os @@ -129,8 +130,14 @@ class EDDNSender: """Handle sending of EDDN messages to the Gateway.""" SQLITE_DB_FILENAME_V1 = 'eddn_queue-v1.db' + TIMEOUT = 10 # requests timeout + UNKNOWN_SCHEMA_RE = re.compile( + r"^FAIL: \[JsonValidationException\('Schema " + r"https://eddn.edcd.io/schemas/(?P.+)/(?P[0-9]+) is unknown, " + r"unable to validate.',\)\]$" + ) - def __init__(self, eddn_endpoint: str) -> None: + def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None: """ Prepare the system for processing messages. @@ -138,9 +145,13 @@ class EDDNSender: - Convert any legacy file into the database. - (Future) Handle any database migrations. + :param eddn: Reference to the `EDDN` instance this is for. :param eddn_endpoint: Where messages should be sent. """ + self.eddn = eddn self.eddn_endpoint = eddn_endpoint + self.session = requests.Session() + self.session.headers['User-Agent'] = user_agent self.db_conn = self.sqlite_queue_v1() self.db = self.db_conn.cursor() @@ -224,7 +235,7 @@ class EDDNSender: replay_file = open(filename, 'w') # Will truncate replay_file.close() os.unlink(filename) - + def close(self) -> None: """Clean up any resources.""" if self.db: @@ -263,8 +274,8 @@ class EDDNSender: created = msg['message']['timestamp'] edmc_version = msg['header']['softwareVersion'] - game_version = msg['header']['gameversion'] - game_build = msg['header']['gamebuild'] + game_version = msg['header'].get('gameversion', '') + game_build = msg['header'].get('gamebuild', '') uploader = msg['header']['uploaderID'] try: @@ -300,121 +311,103 @@ class EDDNSender: ) self.db_conn.commit() - -# TODO: a good few of these methods are static or could be classmethods. they should be created as such. -class EDDN: - """EDDN Data export.""" - - DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/' - if 'eddn' in debug_senders: - DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn' - - REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms] - REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds - TIMEOUT = 10 # requests timeout - MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE) - CANONICALISE_RE = re.compile(r'\$(.+)_name;') - UNKNOWN_SCHEMA_RE = re.compile( - r"^FAIL: \[JsonValidationException\('Schema " - r"https://eddn.edcd.io/schemas/(?P.+)/(?P[0-9]+) is unknown, " - r"unable to validate.',\)\]$" - ) - CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+') - - def __init__(self, parent: tk.Tk): - self.parent: tk.Tk = parent - self.session = requests.Session() - self.session.headers['User-Agent'] = user_agent - - if config.eddn_url is not None: - self.eddn_url = config.eddn_url - - else: - self.eddn_url = self.DEFAULT_URL - - self.sender = EDDNSender(self.eddn_url) - - self.fss_signals: List[Mapping[str, Any]] = [] - - def close(self): - """Close down the EDDN class instance.""" - logger.debug('Closing Sender...') - if self.sender: - self.sender.close() - - logger.debug('Done.') - - logger.debug('Closing EDDN requests.Session.') - self.session.close() - - def send(self, cmdr: str, msg: Mapping[str, Any]) -> None: + def send_message_by_id(self, id: int): """ - Send sends an update to EDDN. + Transmit the message identified by the given ID. - :param cmdr: the CMDR to use as the uploader ID. - :param msg: the payload to send. + :param id: + :return: """ - should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', msg) + self.db.execute( + """ + SELECT * FROM messages WHERE id = :row_id + """, + {'row_id': id} + ) + row = dict(zip([c[0] for c in self.db.description], self.db.fetchone())) + + try: + self.send_message(row['message']) + + except requests.exceptions.HTTPError as e: + logger.warning(f"HTTPError: {str(e)}") + + finally: + # Remove from queue + ... + + + def send_message(self, msg: str) -> bool: + """ + Transmit a fully-formed EDDN message to the Gateway. + + Should catch and handle all failure conditions. A `True` return might + mean that the message was successfully sent, *or* that this message + should not be retried after a failure, i.e. too large. + + :param msg: Fully formed, string, message. + :return: `True` for "now remove this message from the queue" + """ + should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: logger.warning('eddn.send has been disabled via killswitch. Returning.') return - msg = new_data + msg = json.dumps(new_data) - uploader_id = cmdr - - to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([ - ('$schemaRef', msg['$schemaRef']), - ('header', OrderedDict([ - ('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'), - ('softwareVersion', str(appversion_nobuild())), - ('uploaderID', uploader_id), - ])), - ('message', msg['message']), - ]) - - # About the smallest request is going to be (newlines added for brevity): - # {"$schemaRef":"https://eddn.edcd.io/schemas/commodity/3","header":{"softwareName":"E:D Market - # Connector Windows","softwareVersion":"5.3.0-beta4extra","uploaderID":"abcdefghijklm"},"messag - # e":{"systemName":"delphi","stationName":"The Oracle","marketId":128782803,"timestamp":"2022-0 - # 1-26T12:00:00Z","commodities":[]}} - # - # Which comes to 315 bytes (including \n) and compresses to 244 bytes. So lets just compress everything - - encoded, compressed = text.gzip(json.dumps(to_send, separators=(',', ':')), max_size=0) + status: tk.Widget = self.eddn.parent.children['status'] + # Even the smallest possible message compresses somewhat, so always compress + encoded, compressed = text.gzip(json.dumps(msg, separators=(',', ':')), max_size=0) headers: None | dict[str, str] = None if compressed: headers = {'Content-Encoding': 'gzip'} - r = self.session.post(self.eddn_url, data=encoded, timeout=self.TIMEOUT, headers=headers) - if r.status_code != requests.codes.ok: + try: + r = self.session.post(self.eddn_endpoint, data=encoded, timeout=self.TIMEOUT, headers=headers) + if r.status_code == requests.codes.ok: + return True - # Check if EDDN is still objecting to an empty commodities list - if ( - r.status_code == 400 - and msg['$schemaRef'] == 'https://eddn.edcd.io/schemas/commodity/3' - and msg['message']['commodities'] == [] - and r.text == "FAIL: []" - ): - logger.trace_if('plugin.eddn', "EDDN is still objecting to empty commodities data") - return # We want to silence warnings otherwise - - if r.status_code == 413: + if r.status_code == http.HTTPStatus.REQUEST_ENTITY_TOO_LARGE: extra_data = { - 'schema_ref': msg.get('$schemaRef', 'Unset $schemaRef!'), + 'schema_ref': new_data.get('$schemaRef', 'Unset $schemaRef!'), 'sent_data_len': str(len(encoded)), } if '/journal/' in extra_data['schema_ref']: - extra_data['event'] = msg.get('message', {}).get('event', 'No Event Set') + extra_data['event'] = new_data.get('message', {}).get('event', 'No Event Set') - self._log_response(r, header_msg='Got a 413 while POSTing data', **extra_data) - return # drop the error + self._log_response(r, header_msg='Got "Payload Too Large" while POSTing data', **extra_data) + return True - if not self.UNKNOWN_SCHEMA_RE.match(r.text): - self._log_response(r, header_msg='Status from POST wasn\'t 200 (OK)') + self._log_response(r, header_msg="Status from POST wasn't 200 (OK)") + r.raise_for_status() - r.raise_for_status() + except requests.exceptions.HTTPError as e: + if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text): + logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}" + f"/{unknown_schema['schema_version']}") + # This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema. + return True + + elif e.response.status_code == http.HTTPStatus.BAD_REQUEST: + # EDDN straight up says no, so drop the message + logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}") + return True + + else: + # This should catch anything else, e.g. timeouts, gateway errors + status['text'] = self.http_error_to_log(e) + + except requests.exceptions.RequestException as e: + logger.debug('Failed sending', exc_info=e) + # LANG: Error while trying to send data to EDDN + status['text'] = _("Error: Can't connect to EDDN") + + except Exception as e: + logger.debug('Failed sending', exc_info=e) + status['text'] = str(e) + + return False def _log_response( self, @@ -441,6 +434,92 @@ class EDDN: Content :\t{response.text} ''')+additional_data) + @staticmethod + def http_error_to_log(exception: requests.exceptions.HTTPError) -> str: + """Convert an exception from raise_for_status to a log message and displayed error.""" + status_code = exception.errno + + if status_code == 429: # HTTP UPGRADE REQUIRED + logger.warning('EDMC is sending schemas that are too old') + # LANG: EDDN has banned this version of our client + return _('EDDN Error: EDMC is too old for EDDN. Please update.') + + elif status_code == 400: + # we a validation check or something else. + logger.warning(f'EDDN Error: {status_code} -- {exception.response}') + # LANG: EDDN returned an error that indicates something about what we sent it was wrong + return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log') + + else: + logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}') + # LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number + return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code) + + +# TODO: a good few of these methods are static or could be classmethods. they should be created as such. +class EDDN: + """EDDN Data export.""" + + DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/' + if 'eddn' in debug_senders: + DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn' + + REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms] + REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds + MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE) + CANONICALISE_RE = re.compile(r'\$(.+)_name;') + CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+') + + def __init__(self, parent: tk.Tk): + self.parent: tk.Tk = parent + + if config.eddn_url is not None: + self.eddn_url = config.eddn_url + + else: + self.eddn_url = self.DEFAULT_URL + + self.sender = EDDNSender(self, self.eddn_url) + + self.fss_signals: List[Mapping[str, Any]] = [] + + def close(self): + """Close down the EDDN class instance.""" + logger.debug('Closing Sender...') + if self.sender: + self.sender.close() + + logger.debug('Done.') + + logger.debug('Closing EDDN requests.Session.') + self.session.close() + + def send(self, cmdr: str, msg: Mapping[str, Any]) -> None: + """ + Enqueue a message for transmission. + + :param cmdr: the CMDR to use as the uploader ID. + :param msg: the payload to send. + """ + to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([ + ('$schemaRef', msg['$schemaRef']), + ('header', OrderedDict([ + ('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'), + ('softwareVersion', str(appversion_nobuild())), + ('uploaderID', cmdr), + # TODO: Add `gameversion` and `gamebuild` if that change is live + # on EDDN. + ])), + ('message', msg['message']), + ]) + + # Ensure it's en-queued + msg_id = self.sender.add_message(cmdr, to_send) + # Now try to transmit it immediately + if self.sender.send_message_by_id(msg_id): + # De-queue + self.sender.delete_message(msg_id) + def sendreplay(self) -> None: # noqa: CCR001 """Send cached Journal lines to EDDN.""" # TODO: Convert to using the sqlite3 db @@ -493,63 +572,13 @@ class EDDN: 'https://eddn.edcd.io/schemas/' ) - try: - self.send(cmdr, msg) - self.replaylog.pop(0) - if not len(self.replaylog) % self.REPLAYFLUSH: - self.flush() - - except requests.exceptions.HTTPError as e: - if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text): - logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}" - f"/{unknown_schema['schema_version']}") - # NB: This dropping is to cater for the time when EDDN - # doesn't *yet* support a new schema. - self.replaylog.pop(0) # Drop the message - self.flush() # Truncates the file, then writes the extant data - - elif e.response.status_code == 400: - # EDDN straight up says no, so drop the message - logger.debug(f"EDDN responded '400' to the message, dropping:\n{msg!r}") - self.replaylog.pop(0) # Drop the message - self.flush() # Truncates the file, then writes the extant data - - else: - status['text'] = self.http_error_to_log(e) - - except requests.exceptions.RequestException as e: - logger.debug('Failed sending', exc_info=e) - # LANG: Error while trying to send data to EDDN - status['text'] = _("Error: Can't connect to EDDN") - return # stop sending - - except Exception as e: - logger.debug('Failed sending', exc_info=e) - status['text'] = str(e) - return # stop sending + self.send(cmdr, msg) + self.replaylog.pop(0) + if not len(self.replaylog) % self.REPLAYFLUSH: + self.flush() self.parent.after(self.REPLAYPERIOD, self.sendreplay) - @staticmethod - def http_error_to_log(exception: requests.exceptions.HTTPError) -> str: - """Convert an exception from raise_for_status to a log message and displayed error.""" - status_code = exception.errno - - if status_code == 429: # HTTP UPGRADE REQUIRED - logger.warning('EDMC is sending schemas that are too old') - # LANG: EDDN has banned this version of our client - return _('EDDN Error: EDMC is too old for EDDN. Please update.') - - elif status_code == 400: - # we a validation check or something else. - logger.warning(f'EDDN Error: {status_code} -- {exception.response}') - # LANG: EDDN returned an error that indicates something about what we sent it was wrong - return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log') - - else: - logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}') - # LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number - return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code) def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None: # noqa: CCR001 """ @@ -883,33 +912,13 @@ class EDDN: def export_journal_entry(self, cmdr: str, entry: Mapping[str, Any], msg: Mapping[str, Any]) -> None: """ - Update EDDN with an event from the journal. - - Additionally if other lines have been saved for retry, it may send - those as well. + Send a Journal-sourced EDDN message. :param cmdr: Commander name as passed in through `journal_entry()`. :param entry: The full journal event dictionary (due to checks in this function). :param msg: The EDDN message body to be sent. """ - if self.replayfile or self.journal_replay_load_file(): - # Store the entry - self.replaylog.append(json.dumps([cmdr, msg])) - self.replayfile.write(f'{self.replaylog[-1]}\n') # type: ignore - - if ( - entry['event'] == 'Docked' or (entry['event'] == 'Location' and entry['Docked']) or not - (config.get_int('output') & config.OUT_SYS_DELAY) - ): - self.parent.after(self.REPLAYPERIOD, self.sendreplay) # Try to send this and previous entries - - else: - # Can't access replay file! Send immediately. - # LANG: Status text shown while attempting to send data - self.parent.children['status']['text'] = _('Sending data to EDDN...') - self.parent.update_idletasks() - self.send(cmdr, msg) - self.parent.children['status']['text'] = '' + self.send(cmdr, msg) def export_journal_generic(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None: """