diff --git a/EDMarketConnector.py b/EDMarketConnector.py index 283c3bf5..041f7331 100755 --- a/EDMarketConnector.py +++ b/EDMarketConnector.py @@ -922,7 +922,7 @@ class AppWindow(object): return False # Ignore possibly missing shipyard info - elif (config.get_int('output') & config.OUT_MKT_EDDN) \ + elif (config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA) \ and not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')): if not self.status['text']: # LANG: Status - Either no market or no modules data for station from Frontier CAPI diff --git a/PLUGINS.md b/PLUGINS.md index 1649b47e..e8d47a3e 100644 --- a/PLUGINS.md +++ b/PLUGINS.md @@ -617,6 +617,7 @@ Content of `state` (updated to the current journal entry): | `Modules` | `dict` | Currently fitted modules | | `NavRoute` | `dict` | Last plotted multi-hop route | | `ModuleInfo` | `dict` | Last loaded ModulesInfo.json data | +| `IsDocked` | `bool` | Whether the Cmdr is currently docked *in their own ship*. | | `OnFoot` | `bool` | Whether the Cmdr is on foot | | `Component` | `dict` | 'Component' MicroResources in Odyssey, `int` count each. | | `Item` | `dict` | 'Item' MicroResources in Odyssey, `int` count each. | @@ -710,6 +711,17 @@ NB: It *is* possible, if a player is quick enough, to plot and clear a route before we load it, in which case we'd be retaining the *previous* plotted route. +New in version 5.6.0: + +`IsDocked` boolean added to `state`. This is set True for a `Location` event +having `"Docked":true"`, or the `Docked` event. It is set back to False (its +default value) for an `Undocked` event. Being on-foot in a station at login +time does *not* count as docked for this. + +In general on-foot, including being in a taxi, might not set this 100% +correctly. Its main use in core code is to detect being docked so as to send +any stored EDDN messages due to "Delay sending until docked" option. + ___ ##### Synthetic Events diff --git a/config/__init__.py b/config/__init__.py index 56679019..70d0603e 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -52,7 +52,7 @@ appcmdname = 'EDMC' # # Major.Minor.Patch(-prerelease)(+buildmetadata) # NB: Do *not* import this, use the functions appversion() and appversion_nobuild() -_static_appversion = '5.5.1-alpha0' +_static_appversion = '5.6.0-alpha0' _cached_version: Optional[semantic_version.Version] = None copyright = '© 2015-2019 Jonathan Harris, 2020-2022 EDCD' @@ -162,7 +162,7 @@ def appversion_nobuild() -> semantic_version.Version: class AbstractConfig(abc.ABC): """Abstract root class of all platform specific Config implementations.""" - OUT_MKT_EDDN = 1 + OUT_EDDN_SEND_STATION_DATA = 1 # OUT_MKT_BPC = 2 # No longer supported OUT_MKT_TD = 4 OUT_MKT_CSV = 8 @@ -171,12 +171,12 @@ class AbstractConfig(abc.ABC): # OUT_SYS_FILE = 32 # No longer supported # OUT_STAT = 64 # No longer available # OUT_SHIP_CORIOLIS = 128 # Replaced by OUT_SHIP - OUT_STATION_ANY = OUT_MKT_EDDN | OUT_MKT_TD | OUT_MKT_CSV # OUT_SYS_EDSM = 256 # Now a plugin # OUT_SYS_AUTO = 512 # Now always automatic OUT_MKT_MANUAL = 1024 - OUT_SYS_EDDN = 2048 - OUT_SYS_DELAY = 4096 + OUT_EDDN_SEND_NON_STATION = 2048 + OUT_EDDN_DELAY = 4096 + OUT_STATION_ANY = OUT_EDDN_SEND_STATION_DATA | OUT_MKT_TD | OUT_MKT_CSV app_dir_path: pathlib.Path plugin_dir_path: pathlib.Path diff --git a/monitor.py b/monitor.py index 52a53c51..d8c5707e 100644 --- a/monitor.py +++ b/monitor.py @@ -174,6 +174,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below 'Modules': None, 'CargoJSON': None, # The raw data from the last time cargo.json was read 'Route': None, # Last plotted route from Route.json file + 'IsDocked': False, # Whether we think cmdr is docked 'OnFoot': False, # Whether we think you're on-foot 'Component': defaultdict(int), # Odyssey Components in Ship Locker 'Item': defaultdict(int), # Odyssey Items in Ship Locker @@ -315,6 +316,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below self.systemaddress = None self.is_beta = False self.state['OnFoot'] = False + self.state['IsDocked'] = False self.state['Body'] = None self.state['BodyType'] = None @@ -734,6 +736,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below self.station_marketid = None self.stationtype = None self.stationservices = None + self.state['IsDocked'] = False elif event_type == 'embark': # This event is logged when a player (on foot) gets into a ship or SRV @@ -800,6 +803,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below self.state['Dropship'] = False elif event_type == 'docked': + self.state['IsDocked'] = True self.station = entry.get('StationName') # May be None self.station_marketid = entry.get('MarketID') # May be None self.stationtype = entry.get('StationType') # May be None @@ -822,6 +826,8 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below if event_type == 'location': logger.trace_if('journal.locations', '"Location" event') + if entry.get('Docked'): + self.state['IsDocked'] = True elif event_type == 'fsdjump': self.planet = None diff --git a/plugins/eddn.py b/plugins/eddn.py index c0b80697..5921e19c 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -22,20 +22,22 @@ # # ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# # ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# +import http import itertools import json +import os import pathlib import re +import sqlite3 import sys import tkinter as tk from collections import OrderedDict -from os import SEEK_SET -from os.path import join from platform import system from textwrap import dedent +from threading import Lock from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, MutableMapping, Optional from typing import OrderedDict as OrderedDictT -from typing import TextIO, Tuple, Union +from typing import Tuple, Union import requests @@ -52,10 +54,6 @@ from prefs import prefsVersion from ttkHyperlinkLabel import HyperlinkLabel from util import text -if sys.platform != 'win32': - from fcntl import LOCK_EX, LOCK_NB, lockf - - if TYPE_CHECKING: def _(x: str) -> str: return x @@ -67,8 +65,16 @@ class This: """Holds module globals.""" def __init__(self): + # Game version and build + self.game_version = "" + self.game_build = "" + # Commander Name + self.cmdr_name = "" + # Track if we're on foot self.on_foot = False + # Track if we're docked + self.docked = False # Horizons ? self.horizons = False @@ -118,7 +124,6 @@ class This: this = This() - # This SKU is tagged on any module or ship that you must have Horizons for. HORIZONS_SKU = 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' # ELITE_HORIZONS_V_COBRA_MK_IV_1000` is for the Cobra Mk IV, but @@ -130,165 +135,395 @@ HORIZONS_SKU = 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' # one. -# TODO: a good few of these methods are static or could be classmethods. they should be created as such. +class EDDNSender: + """Handle sending of EDDN messages to the Gateway.""" -class EDDN: - """EDDN Data export.""" - - DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/' - if 'eddn' in debug_senders: - DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn' - - REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms] - REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds + SQLITE_DB_FILENAME_V1 = 'eddn_queue-v1.db' + # EDDN schema types that pertain to station data + STATION_SCHEMAS = ('commodity', 'fcmaterials_capi', 'fcmaterials_journal', 'outfitting', 'shipyard') TIMEOUT = 10 # requests timeout - MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE) - CANONICALISE_RE = re.compile(r'\$(.+)_name;') UNKNOWN_SCHEMA_RE = re.compile( r"^FAIL: \[JsonValidationException\('Schema " r"https://eddn.edcd.io/schemas/(?P.+)/(?P[0-9]+) is unknown, " r"unable to validate.',\)\]$" ) - CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+') - def __init__(self, parent: tk.Tk): - self.parent: tk.Tk = parent + def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None: + """ + Prepare the system for processing messages. + + - Ensure the sqlite3 database for EDDN replays exists and has schema. + - Convert any legacy file into the database. + - (Future) Handle any database migrations. + + :param eddn: Reference to the `EDDN` instance this is for. + :param eddn_endpoint: Where messages should be sent. + """ + self.eddn = eddn + self.eddn_endpoint = eddn_endpoint self.session = requests.Session() self.session.headers['User-Agent'] = user_agent - self.replayfile: Optional[TextIO] = None # For delayed messages - self.replaylog: List[str] = [] - self.fss_signals: List[Mapping[str, Any]] = [] - if config.eddn_url is not None: - self.eddn_url = config.eddn_url + self.db_conn = self.sqlite_queue_v1() + self.db = self.db_conn.cursor() - else: - self.eddn_url = self.DEFAULT_URL + ####################################################################### + # Queue database migration + ####################################################################### + self.convert_legacy_file() + ####################################################################### - def load_journal_replay(self) -> bool: + self.queue_processing = Lock() + # Initiate retry/send-now timer + logger.trace_if( + "plugin.eddn.send", + f"First queue run scheduled for {self.eddn.REPLAY_STARTUP_DELAY}ms from now" + ) + self.eddn.parent.after(self.eddn.REPLAY_STARTUP_DELAY, self.queue_check_and_send, True) + + def sqlite_queue_v1(self) -> sqlite3.Connection: """ - Load cached journal entries from disk. + Initialise a v1 EDDN queue database. - :return: a bool indicating success + :return: sqlite3 connection """ - # Try to obtain exclusive access to the journal cache - filename = join(config.app_dir, 'replay.jsonl') + db_conn = sqlite3.connect(config.app_dir_path / self.SQLITE_DB_FILENAME_V1) + db = db_conn.cursor() + try: - try: - # Try to open existing file - self.replayfile = open(filename, 'r+', buffering=1) + db.execute( + """ + CREATE TABLE messages + ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created TEXT NOT NULL, + cmdr TEXT NOT NULL, + edmc_version TEXT, + game_version TEXT, + game_build TEXT, + message TEXT NOT NULL + ) + """ + ) - except FileNotFoundError: - self.replayfile = open(filename, 'w+', buffering=1) # Create file + db.execute( + """ + CREATE INDEX messages_created ON messages + ( + created + ) + """ + ) - if sys.platform != 'win32': # open for writing is automatically exclusive on Windows - lockf(self.replayfile, LOCK_EX | LOCK_NB) + db.execute( + """ + CREATE INDEX messages_cmdr ON messages + ( + cmdr + ) + """ + ) - except OSError: - logger.exception('Failed opening "replay.jsonl"') - if self.replayfile: - self.replayfile.close() - - self.replayfile = None - return False + except sqlite3.OperationalError as e: + if str(e) != "table messages already exists": + # Cleanup, as schema creation failed + db.close() + db_conn.close() + raise e else: - self.replaylog = [line.strip() for line in self.replayfile] - return True + logger.info("New `eddn_queue-v1.db` created") - def flush(self): - """Flush the replay file, clearing any data currently there that is not in the replaylog list.""" - if self.replayfile is None: - logger.error('replayfile is None!') - return + # We return only the connection, so tidy up + db.close() - self.replayfile.seek(0, SEEK_SET) - self.replayfile.truncate() - for line in self.replaylog: - self.replayfile.write(f'{line}\n') + return db_conn - self.replayfile.flush() + def convert_legacy_file(self): + """Convert a legacy file's contents into the sqlite3 db.""" + filename = config.app_dir_path / 'replay.jsonl' + try: + with open(filename, 'r+', buffering=1) as replay_file: + logger.info("Converting legacy `replay.jsonl` to `eddn_queue-v1.db`") + for line in replay_file: + cmdr, msg = json.loads(line) + self.add_message(cmdr, msg) - def close(self): - """Close down the EDDN class instance.""" - logger.debug('Closing replayfile...') - if self.replayfile: - self.replayfile.close() + except FileNotFoundError: + pass - self.replayfile = None - logger.debug('Done.') + finally: + # Best effort at removing the file/contents + # NB: The legacy code assumed it could write to the file. + logger.info("Converson` to `eddn_queue-v1.db` complete, removing `replay.jsonl`") + replay_file = open(filename, 'w') # Will truncate + replay_file.close() + os.unlink(filename) + + def close(self) -> None: + """Clean up any resources.""" + logger.debug('Closing db cursor.') + if self.db: + self.db.close() + + logger.debug('Closing db connection.') + if self.db_conn: + self.db_conn.close() logger.debug('Closing EDDN requests.Session.') self.session.close() - def send(self, cmdr: str, msg: Mapping[str, Any]) -> None: + def add_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> int: """ - Send sends an update to EDDN. + Add an EDDN message to the database. - :param cmdr: the CMDR to use as the uploader ID. - :param msg: the payload to send. + `msg` absolutely needs to be the **FULL** EDDN message, including all + of `header`, `$schemaRef` and `message`. Code handling this not being + the case is only for loading the legacy `replay.json` file messages. + + NB: Although `cmdr` *should* be the same as `msg->header->uploaderID` + we choose not to assume that. + + :param cmdr: Name of the Commander that created this message. + :param msg: The full, transmission-ready, EDDN message. + :return: ID of the successfully inserted row. """ - should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', msg) + logger.trace_if("plugin.eddn.send", f"Message for {msg['$schemaRef']=}") + # Cater for legacy replay.json messages + if 'header' not in msg: + msg['header'] = { + # We have to lie and say it's *this* version, but denote that + # it might not actually be this version. + 'softwareName': f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]' + ' (legacy replay)', + 'softwareVersion': str(appversion_nobuild()), + 'uploaderID': cmdr, + 'gameversion': '', # Can't add what we don't know + 'gamebuild': '', # Can't add what we don't know + } + + created = msg['message']['timestamp'] + edmc_version = msg['header']['softwareVersion'] + game_version = msg['header'].get('gameversion', '') + game_build = msg['header'].get('gamebuild', '') + uploader = msg['header']['uploaderID'] + + try: + self.db.execute( + """ + INSERT INTO messages ( + created, cmdr, edmc_version, game_version, game_build, message + ) + VALUES ( + ?, ?, ?, ?, ?, ? + ) + """, + (created, uploader, edmc_version, game_version, game_build, json.dumps(msg)) + ) + self.db_conn.commit() + + except Exception: + logger.exception('INSERT error') + # Can't possibly be a valid row id + return -1 + + logger.trace_if("plugin.eddn.send", f"Message for {msg['$schemaRef']=} recorded, id={self.db.lastrowid}") + return self.db.lastrowid or -1 + + def delete_message(self, row_id: int) -> None: + """ + Delete a queued message by row id. + + :param row_id: id of message to be deleted. + """ + logger.trace_if("plugin.eddn.send", f"Deleting message with {row_id=}") + self.db.execute( + """ + DELETE FROM messages WHERE id = :row_id + """, + {'row_id': row_id} + ) + self.db_conn.commit() + + def send_message_by_id(self, id: int): + """ + Transmit the message identified by the given ID. + + :param id: + :return: + """ + logger.trace_if("plugin.eddn.send", f"Sending message with {id=}") + self.db.execute( + """ + SELECT * FROM messages WHERE id = :row_id + """, + {'row_id': id} + ) + row = dict(zip([c[0] for c in self.db.description], self.db.fetchone())) + + try: + if self.send_message(row['message']): + self.delete_message(id) + return True + + except requests.exceptions.HTTPError as e: + logger.warning(f"HTTPError: {str(e)}") + + return False + + def send_message(self, msg: str) -> bool: + """ + Transmit a fully-formed EDDN message to the Gateway. + + If this is called then the attempt *will* be made. This is not where + options to not send to EDDN, or to delay the sending until docked, + are checked. + + It *is* however the one 'sending' place that the EDDN killswitches are checked. + + Should catch and handle all failure conditions. A `True` return might + mean that the message was successfully sent, *or* that this message + should not be retried after a failure, i.e. too large. + + :param msg: Fully formed, string, message. + :return: `True` for "now remove this message from the queue" + """ + logger.trace_if("plugin.eddn.send", "Sending message") + should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: logger.warning('eddn.send has been disabled via killswitch. Returning.') - return + return False - msg = new_data - - uploader_id = cmdr - - to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([ - ('$schemaRef', msg['$schemaRef']), - ('header', OrderedDict([ - ('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'), - ('softwareVersion', str(appversion_nobuild())), - ('uploaderID', uploader_id), - ])), - ('message', msg['message']), - ]) - - # About the smallest request is going to be (newlines added for brevity): - # {"$schemaRef":"https://eddn.edcd.io/schemas/commodity/3","header":{"softwareName":"E:D Market - # Connector Windows","softwareVersion":"5.3.0-beta4extra","uploaderID":"abcdefghijklm"},"messag - # e":{"systemName":"delphi","stationName":"The Oracle","marketId":128782803,"timestamp":"2022-0 - # 1-26T12:00:00Z","commodities":[]}} - # - # Which comes to 315 bytes (including \n) and compresses to 244 bytes. So lets just compress everything - - encoded, compressed = text.gzip(json.dumps(to_send, separators=(',', ':')), max_size=0) + status: tk.Widget = self.eddn.parent.children['status'] + # Even the smallest possible message compresses somewhat, so always compress + encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0) headers: None | dict[str, str] = None if compressed: headers = {'Content-Encoding': 'gzip'} - r = self.session.post(self.eddn_url, data=encoded, timeout=self.TIMEOUT, headers=headers) - if r.status_code != requests.codes.ok: + try: + r = self.session.post(self.eddn_endpoint, data=encoded, timeout=self.TIMEOUT, headers=headers) + if r.status_code == requests.codes.ok: + return True - # Check if EDDN is still objecting to an empty commodities list - if ( - r.status_code == 400 - and msg['$schemaRef'] == 'https://eddn.edcd.io/schemas/commodity/3' - and msg['message']['commodities'] == [] - and r.text == "FAIL: []" - ): - logger.trace_if('plugin.eddn', "EDDN is still objecting to empty commodities data") - return # We want to silence warnings otherwise - - if r.status_code == 413: + if r.status_code == http.HTTPStatus.REQUEST_ENTITY_TOO_LARGE: extra_data = { - 'schema_ref': msg.get('$schemaRef', 'Unset $schemaRef!'), + 'schema_ref': new_data.get('$schemaRef', 'Unset $schemaRef!'), 'sent_data_len': str(len(encoded)), } if '/journal/' in extra_data['schema_ref']: - extra_data['event'] = msg.get('message', {}).get('event', 'No Event Set') + extra_data['event'] = new_data.get('message', {}).get('event', 'No Event Set') - self._log_response(r, header_msg='Got a 413 while POSTing data', **extra_data) - return # drop the error + self._log_response(r, header_msg='Got "Payload Too Large" while POSTing data', **extra_data) + return True - if not self.UNKNOWN_SCHEMA_RE.match(r.text): - self._log_response(r, header_msg='Status from POST wasn\'t 200 (OK)') + self._log_response(r, header_msg="Status from POST wasn't 200 (OK)") + r.raise_for_status() - r.raise_for_status() + except requests.exceptions.HTTPError as e: + if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text): + logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}" + f"/{unknown_schema['schema_version']}") + # This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema. + return True + + elif e.response.status_code == http.HTTPStatus.BAD_REQUEST: + # EDDN straight up says no, so drop the message + logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}") + return True + + else: + # This should catch anything else, e.g. timeouts, gateway errors + status['text'] = self.http_error_to_log(e) + + except requests.exceptions.RequestException as e: + logger.debug('Failed sending', exc_info=e) + # LANG: Error while trying to send data to EDDN + status['text'] = _("Error: Can't connect to EDDN") + + except Exception as e: + logger.debug('Failed sending', exc_info=e) + status['text'] = str(e) + + return False + + def queue_check_and_send(self, reschedule: bool = False) -> None: # noqa: CCR001 + """ + Check if we should be sending queued messages, and send if we should. + + :param reschedule: Boolean indicating if we should call `after()` again. + """ + logger.trace_if("plugin.eddn.send", "Called") + # Mutex in case we're already processing + if not self.queue_processing.acquire(blocking=False): + logger.trace_if("plugin.eddn.send", "Couldn't obtain mutex") + if reschedule: + logger.trace_if("plugin.eddn.send", f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now") + self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule) + + else: + logger.trace_if("plugin.eddn.send", "NO next run scheduled (there should be another one already set)") + + return + + logger.trace_if("plugin.eddn.send", "Obtained mutex") + # Used to indicate if we've rescheduled at the faster rate already. + have_rescheduled = False + # We send either if docked or 'Delay sending until docked' not set + if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY): + logger.trace_if("plugin.eddn.send", "Should send") + # We need our own cursor here, in case the semantics of + # tk `after()` could allow this to run in the middle of other + # database usage. + db_cursor = self.db_conn.cursor() + + # Options: + # 1. Process every queued message, regardless. + # 2. Bail if we get any sort of connection error from EDDN. + + # Every queued message that is for *this* commander. We do **NOT** + # check if it's station/not-station, as the control of if a message + # was even created, versus the Settings > EDDN options, is applied + # *then*, not at time of sending. + try: + db_cursor.execute( + """ + SELECT id FROM messages + ORDER BY created ASC + LIMIT 1 + """ + ) + + except Exception: + logger.exception("DB error querying queued messages") + + else: + row = db_cursor.fetchone() + if row: + row = dict(zip([c[0] for c in db_cursor.description], row)) + if self.send_message_by_id(row['id']): + # If `True` was returned then we're done with this message. + # `False` means "failed to send, but not because the message + # is bad", i.e. an EDDN Gateway problem. Thus, in that case + # we do *NOT* schedule attempting the next message. + # Always re-schedule as this is only a "Don't hammer EDDN" delay + logger.trace_if("plugin.eddn.send", f"Next run scheduled for {self.eddn.REPLAY_DELAY}ms from " + "now") + self.eddn.parent.after(self.eddn.REPLAY_DELAY, self.queue_check_and_send, reschedule) + have_rescheduled = True + + db_cursor.close() + + else: + logger.trace_if("plugin.eddn.send", "Should NOT send") + + self.queue_processing.release() + logger.trace_if("plugin.eddn.send", "Mutex released") + if reschedule and not have_rescheduled: + # Set us up to run again per the configured period + logger.trace_if("plugin.eddn.send", f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now") + self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule) def _log_response( self, @@ -315,91 +550,6 @@ class EDDN: Content :\t{response.text} ''')+additional_data) - def sendreplay(self) -> None: # noqa: CCR001 - """Send cached Journal lines to EDDN.""" - if not self.replayfile: - return # Probably closing app - - status: tk.Widget = self.parent.children['status'] - - if not self.replaylog: - status['text'] = '' - return - - localized: str = _('Sending data to EDDN...') # LANG: Status text shown while attempting to send data - if len(self.replaylog) == 1: - status['text'] = localized - - else: - status['text'] = f'{localized.replace("...", "")} [{len(self.replaylog)}]' - - self.parent.update_idletasks() - - # Paranoia check in case this function gets chain-called. - if not self.replaylog: - # import traceback - # logger.error( - # f'self.replaylog (type: {type(self.replaylog)}) is falsey after update_idletasks(). Traceback:\n' - # f'{"".join(traceback.format_list(traceback.extract_stack()))}') - return - - try: - cmdr, msg = json.loads(self.replaylog[0], object_pairs_hook=OrderedDict) - - except json.JSONDecodeError as e: - # Couldn't decode - shouldn't happen! - logger.debug(f'\n{self.replaylog[0]}\n', exc_info=e) - # Discard and continue - self.replaylog.pop(0) - - else: - # TODO: Check message against *current* relevant schema so we don't try - # to send an old message that's now invalid. - - # Rewrite old schema name - if msg['$schemaRef'].startswith('http://schemas.elite-markets.net/eddn/'): - msg['$schemaRef'] = str(msg['$schemaRef']).replace( - 'http://schemas.elite-markets.net/eddn/', - 'https://eddn.edcd.io/schemas/' - ) - - try: - self.send(cmdr, msg) - self.replaylog.pop(0) - if not len(self.replaylog) % self.REPLAYFLUSH: - self.flush() - - except requests.exceptions.HTTPError as e: - if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text): - logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}" - f"/{unknown_schema['schema_version']}") - # NB: This dropping is to cater for the time when EDDN - # doesn't *yet* support a new schema. - self.replaylog.pop(0) # Drop the message - self.flush() # Truncates the file, then writes the extant data - - elif e.response.status_code == 400: - # EDDN straight up says no, so drop the message - logger.debug(f"EDDN responded '400' to the message, dropping:\n{msg!r}") - self.replaylog.pop(0) # Drop the message - self.flush() # Truncates the file, then writes the extant data - - else: - status['text'] = self.http_error_to_log(e) - - except requests.exceptions.RequestException as e: - logger.debug('Failed sending', exc_info=e) - # LANG: Error while trying to send data to EDDN - status['text'] = _("Error: Can't connect to EDDN") - return # stop sending - - except Exception as e: - logger.debug('Failed sending', exc_info=e) - status['text'] = str(e) - return # stop sending - - self.parent.after(self.REPLAYPERIOD, self.sendreplay) - @staticmethod def http_error_to_log(exception: requests.exceptions.HTTPError) -> str: """Convert an exception from raise_for_status to a log message and displayed error.""" @@ -421,6 +571,45 @@ class EDDN: # LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code) + +# TODO: a good few of these methods are static or could be classmethods. they should be created as such. +class EDDN: + """EDDN Data export.""" + + DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/' + if 'eddn' in debug_senders: + DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn' + + # FIXME: Change back to `300_000` + REPLAY_STARTUP_DELAY = 10_000 # Delay during startup before checking queue [milliseconds] + REPLAY_PERIOD = 300_000 # How often to try (re-)sending the queue, [milliseconds] + REPLAY_DELAY = 400 # Roughly two messages per second, accounting for send delays [milliseconds] + REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds + MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE) + CANONICALISE_RE = re.compile(r'\$(.+)_name;') + CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+') + + def __init__(self, parent: tk.Tk): + self.parent: tk.Tk = parent + + if config.eddn_url is not None: + self.eddn_url = config.eddn_url + + else: + self.eddn_url = self.DEFAULT_URL + + self.sender = EDDNSender(self, self.eddn_url) + + self.fss_signals: List[Mapping[str, Any]] = [] + + def close(self): + """Close down the EDDN class instance.""" + logger.debug('Closing Sender...') + if self.sender: + self.sender.close() + + logger.debug('Done.') + def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None: # noqa: CCR001 """ Update EDDN with the commodities on the current (lastStarport) station. @@ -486,9 +675,10 @@ class EDDN: if 'prohibited' in data['lastStarport']: message['prohibited'] = sorted(x for x in (data['lastStarport']['prohibited'] or {}).values()) - self.send(data['commander']['name'], { + self.send_message(data['commander']['name'], { '$schemaRef': f'https://eddn.edcd.io/schemas/commodity/3{"/test" if is_beta else ""}', 'message': message, + 'header': self.standard_header(game_version='CAPI-market', game_build=''), }) this.commodities = commodities @@ -571,7 +761,7 @@ class EDDN: # Don't send empty modules list - schema won't allow it if outfitting and this.outfitting != (horizons, outfitting): - self.send(data['commander']['name'], { + self.send_message(data['commander']['name'], { '$schemaRef': f'https://eddn.edcd.io/schemas/outfitting/2{"/test" if is_beta else ""}', 'message': OrderedDict([ ('timestamp', data['timestamp']), @@ -582,6 +772,7 @@ class EDDN: ('modules', outfitting), ('odyssey', this.odyssey), ]), + 'header': self.standard_header(game_version='CAPI-shipyard', game_build=''), }) this.outfitting = (horizons, outfitting) @@ -615,7 +806,7 @@ class EDDN: ) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. if shipyard and this.shipyard != (horizons, shipyard): - self.send(data['commander']['name'], { + self.send_message(data['commander']['name'], { '$schemaRef': f'https://eddn.edcd.io/schemas/shipyard/2{"/test" if is_beta else ""}', 'message': OrderedDict([ ('timestamp', data['timestamp']), @@ -626,6 +817,7 @@ class EDDN: ('ships', shipyard), ('odyssey', this.odyssey), ]), + 'header': self.standard_header(game_version='CAPI-shipyard', game_build=''), }) this.shipyard = (horizons, shipyard) @@ -663,7 +855,7 @@ class EDDN: # none and that really does need to be recorded over EDDN so that, e.g. # EDDB can update in a timely manner. if this.commodities != commodities: - self.send(cmdr, { + self.send_message(cmdr, { '$schemaRef': f'https://eddn.edcd.io/schemas/commodity/3{"/test" if is_beta else ""}', 'message': OrderedDict([ ('timestamp', entry['timestamp']), @@ -702,7 +894,7 @@ class EDDN: ) # Don't send empty modules list - schema won't allow it if outfitting and this.outfitting != (horizons, outfitting): - self.send(cmdr, { + self.send_message(cmdr, { '$schemaRef': f'https://eddn.edcd.io/schemas/outfitting/2{"/test" if is_beta else ""}', 'message': OrderedDict([ ('timestamp', entry['timestamp']), @@ -736,7 +928,7 @@ class EDDN: shipyard = sorted(ship['ShipType'] for ship in ships) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. if shipyard and this.shipyard != (horizons, shipyard): - self.send(cmdr, { + self.send_message(cmdr, { '$schemaRef': f'https://eddn.edcd.io/schemas/shipyard/2{"/test" if is_beta else ""}', 'message': OrderedDict([ ('timestamp', entry['timestamp']), @@ -751,35 +943,59 @@ class EDDN: # this.shipyard = (horizons, shipyard) - def export_journal_entry(self, cmdr: str, entry: Mapping[str, Any], msg: Mapping[str, Any]) -> None: + def send_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> None: """ - Update EDDN with an event from the journal. - - Additionally if other lines have been saved for retry, it may send - those as well. + Send an EDDN message. :param cmdr: Commander name as passed in through `journal_entry()`. - :param entry: The full journal event dictionary (due to checks in this function). :param msg: The EDDN message body to be sent. """ - if self.replayfile or self.load_journal_replay(): - # Store the entry - self.replaylog.append(json.dumps([cmdr, msg])) - self.replayfile.write(f'{self.replaylog[-1]}\n') # type: ignore + # Check if the user configured messages to be sent. + # + # 1. If this is a 'station' data message then check config.EDDN_SEND_STATION_DATA + # 2. Else check against config.EDDN_SEND_NON_STATION *and* config.OUT_EDDN_DELAY + if any(f'{s}' in msg['$schemaRef'] for s in EDDNSender.STATION_SCHEMAS): + # 'Station data' + if config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA: + # And user has 'station data' configured to be sent + logger.trace_if("plugin.eddn.send", "Recording/sending 'station' message") + if 'header' not in msg: + msg['header'] = self.standard_header() - if ( - entry['event'] == 'Docked' or (entry['event'] == 'Location' and entry['Docked']) or not - (config.get_int('output') & config.OUT_SYS_DELAY) - ): - self.parent.after(self.REPLAYPERIOD, self.sendreplay) # Try to send this and previous entries + msg_id = self.sender.add_message(cmdr, msg) + # 'Station data' is never delayed on construction of message + self.sender.send_message_by_id(msg_id) - else: - # Can't access replay file! Send immediately. - # LANG: Status text shown while attempting to send data - self.parent.children['status']['text'] = _('Sending data to EDDN...') - self.parent.update_idletasks() - self.send(cmdr, msg) - self.parent.children['status']['text'] = '' + elif config.get_int('output') & config.OUT_EDDN_SEND_NON_STATION: + # Any data that isn't 'station' is configured to be sent + logger.trace_if("plugin.eddn.send", "Recording 'non-station' message") + if 'header' not in msg: + msg['header'] = self.standard_header() + + msg_id = self.sender.add_message(cmdr, msg) + if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY): + # No delay in sending configured, so attempt immediately + logger.trace_if("plugin.eddn.send", "Sending 'non-station' message") + self.sender.send_message_by_id(msg_id) + + def standard_header( + self, game_version: Optional[str] = None, game_build: Optional[str] = None + ) -> MutableMapping[str, Any]: + """ + Return the standard header for an EDDN message, given tracked state. + + NB: This should *only* be called for newly constructed messages, not + for either a legacy message or an already queued one! + + :return: The standard header + """ + return { + 'softwareName': f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]', + 'softwareVersion': str(appversion_nobuild()), + 'uploaderID': this.cmdr_name, + 'gameversion': game_version or this.game_version, + 'gamebuild': game_build or this.game_build, + } def export_journal_generic(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None: """ @@ -793,7 +1009,7 @@ class EDDN: '$schemaRef': f'https://eddn.edcd.io/schemas/journal/1{"/test" if is_beta else ""}', 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) def entry_augment_system_data( self, @@ -881,7 +1097,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_navbeaconscan( @@ -923,7 +1139,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_codexentry( # noqa: CCR001 @@ -1023,7 +1239,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_scanbarycentre( @@ -1077,7 +1293,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_navroute( @@ -1150,7 +1366,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_fcmaterials( @@ -1196,6 +1412,9 @@ class EDDN: # } # ] # } + # Abort if we're not configured to send 'station' data. + if not config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA: + return None # Sanity check if 'Items' not in entry: @@ -1231,7 +1450,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_capi_fcmaterials( @@ -1286,10 +1505,11 @@ class EDDN: msg = { '$schemaRef': f'https://eddn.edcd.io/schemas/fcmaterials_capi/1{"/test" if is_beta else ""}', - 'message': entry + 'message': entry, + 'header': self.standard_header(game_version='CAPI-market', game_build=''), } - this.eddn.export_journal_entry(data['commander']['name'], entry, msg) + this.eddn.send_message(data['commander']['name'], msg) return None def export_journal_approachsettlement( @@ -1364,7 +1584,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_fssallbodiesfound( @@ -1414,7 +1634,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def export_journal_fssbodysignals( @@ -1470,7 +1690,7 @@ class EDDN: 'message': entry } - this.eddn.export_journal_entry(cmdr, entry, msg) + this.eddn.send_message(cmdr, msg) return None def enqueue_journal_fsssignaldiscovered(self, entry: MutableMapping[str, Any]) -> None: @@ -1577,8 +1797,7 @@ class EDDN: logger.trace_if("plugin.eddn.fsssignaldiscovered", f"FSSSignalDiscovered batch is {json.dumps(msg)}") - # Fake an 'entry' as it's only there for some "should we send replay?" checks in the called function. - this.eddn.export_journal_entry(cmdr, {'event': 'send_fsssignaldiscovered'}, msg) + this.eddn.send_message(cmdr, msg) self.fss_signals = [] return None @@ -1611,20 +1830,13 @@ def plugin_app(parent: tk.Tk) -> Optional[tk.Frame]: Set up any plugin-specific UI. In this case we need the tkinter parent in order to later call - `update_idletasks()` on it. - - TODO: Re-work the whole replaylog and general sending to EDDN so this isn't - necessary. + `update_idletasks()` on it, or schedule things with `after()`. :param parent: tkinter parent frame. :return: Optional tk.Frame, if the tracking UI is active. """ this.parent = parent this.eddn = EDDN(parent) - # Try to obtain exclusive lock on journal cache, even if we don't need it yet - if not this.eddn.load_journal_replay(): - # Shouldn't happen - don't bother localizing - this.parent.children['status']['text'] = 'Error: Is another copy of this app already running?' if config.eddn_tracking_ui: this.ui = tk.Frame(parent) @@ -1686,7 +1898,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame: BUTTONX = 12 # noqa: N806 # indent Checkbuttons and Radiobuttons if prefsVersion.shouldSetDefaults('0.0.0.0', not bool(config.get_int('output'))): - output: int = (config.OUT_MKT_EDDN | config.OUT_SYS_EDDN) # default settings + output: int = (config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION) # default settings else: output = config.get_int('output') @@ -1701,7 +1913,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame: underline=True ).grid(padx=PADX, sticky=tk.W) # Don't translate - this.eddn_station = tk.IntVar(value=(output & config.OUT_MKT_EDDN) and 1) + this.eddn_station = tk.IntVar(value=(output & config.OUT_EDDN_SEND_STATION_DATA) and 1) this.eddn_station_button = nb.Checkbutton( eddnframe, # LANG: Enable EDDN support for station data checkbox label @@ -1711,7 +1923,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame: ) # Output setting this.eddn_station_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W) - this.eddn_system = tk.IntVar(value=(output & config.OUT_SYS_EDDN) and 1) + this.eddn_system = tk.IntVar(value=(output & config.OUT_EDDN_SEND_NON_STATION) and 1) # Output setting new in E:D 2.2 this.eddn_system_button = nb.Checkbutton( eddnframe, @@ -1722,7 +1934,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame: ) this.eddn_system_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W) - this.eddn_delay = tk.IntVar(value=(output & config.OUT_SYS_DELAY) and 1) + this.eddn_delay = tk.IntVar(value=(output & config.OUT_EDDN_DELAY) and 1) # Output setting under 'Send system and scan data to the Elite Dangerous Data Network' new in E:D 2.2 this.eddn_delay_button = nb.Checkbutton( eddnframe, @@ -1741,9 +1953,12 @@ def prefsvarchanged(event=None) -> None: :param event: tkinter event ? """ + # These two lines are legacy and probably not even needed this.eddn_station_button['state'] = tk.NORMAL this.eddn_system_button['state'] = tk.NORMAL - this.eddn_delay_button['state'] = this.eddn.replayfile and this.eddn_system.get() and tk.NORMAL or tk.DISABLED + # This line will grey out the 'Delay sending ...' option if the 'Send + # system and scan data' option is off. + this.eddn_delay_button['state'] = this.eddn_system.get() and tk.NORMAL or tk.DISABLED def prefs_changed(cmdr: str, is_beta: bool) -> None: @@ -1757,9 +1972,9 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: 'output', (config.get_int('output') & (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP | config.OUT_MKT_MANUAL)) + - (this.eddn_station.get() and config.OUT_MKT_EDDN) + - (this.eddn_system.get() and config.OUT_SYS_EDDN) + - (this.eddn_delay.get() and config.OUT_SYS_DELAY) + (this.eddn_station.get() and config.OUT_EDDN_SEND_STATION_DATA) + + (this.eddn_system.get() and config.OUT_EDDN_SEND_NON_STATION) + + (this.eddn_delay.get() and config.OUT_EDDN_DELAY) ) @@ -1829,7 +2044,7 @@ def journal_entry( # noqa: C901, CCR001 """ Process a new Journal entry. - :param cmdr: `str` - Name of currennt Cmdr. + :param cmdr: `str` - Name of current Cmdr. :param is_beta: `bool` - True if this is a beta version of the Game. :param system: `str` - Name of system Cmdr is in. :param station: `str` - Name of station Cmdr is docked at, if applicable. @@ -1849,7 +2064,11 @@ def journal_entry( # noqa: C901, CCR001 entry = new_data event_name = entry['event'].lower() + this.cmdr_name = cmdr + this.game_version = state['GameVersion'] + this.game_build = state['GameBuild'] this.on_foot = state['OnFoot'] + this.docked = state['IsDocked'] # Note if we're under Horizons and/or Odyssey # The only event these are already in is `LoadGame` which isn't sent to EDDN. @@ -1909,6 +2128,9 @@ def journal_entry( # noqa: C901, CCR001 # Yes, explicitly state `None` here, so it's crystal clear. this.systemaddress = entry.get('SystemAddress', None) # type: ignore + if event_name == 'docked': + this.eddn.parent.after(this.eddn.REPLAY_DELAY, this.eddn.sender.queue_check_and_send, False) + elif event_name == 'approachbody': this.body_name = entry['Body'] this.body_id = entry.get('BodyID') @@ -1928,7 +2150,7 @@ def journal_entry( # noqa: C901, CCR001 this.status_body_name = None # Events with their own EDDN schema - if config.get_int('output') & config.OUT_SYS_EDDN and not state['Captain']: + if config.get_int('output') & config.OUT_EDDN_SEND_NON_STATION and not state['Captain']: if event_name == 'fssdiscoveryscan': return this.eddn.export_journal_fssdiscoveryscan(cmdr, system, state['StarPos'], is_beta, entry) @@ -1985,7 +2207,7 @@ def journal_entry( # noqa: C901, CCR001 ) # Send journal schema events to EDDN, but not when on a crew - if (config.get_int('output') & config.OUT_SYS_EDDN and not state['Captain'] and + if (config.get_int('output') & config.OUT_EDDN_SEND_NON_STATION and not state['Captain'] and (event_name in ('location', 'fsdjump', 'docked', 'scan', 'saasignalsfound', 'carrierjump')) and ('StarPos' in entry or this.coordinates)): @@ -2066,15 +2288,15 @@ def journal_entry( # noqa: C901, CCR001 this.eddn.export_journal_generic(cmdr, is_beta, filter_localised(entry)) except requests.exceptions.RequestException as e: - logger.debug('Failed in export_journal_entry', exc_info=e) + logger.debug('Failed in send_message', exc_info=e) return _("Error: Can't connect to EDDN") # LANG: Error while trying to send data to EDDN except Exception as e: - logger.debug('Failed in export_journal_entry', exc_info=e) + logger.debug('Failed in export_journal_generic', exc_info=e) return str(e) - elif (config.get_int('output') & config.OUT_MKT_EDDN and not state['Captain'] and - event_name in ('market', 'outfitting', 'shipyard')): + elif (config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA and not state['Captain'] and + event_name in ('market', 'outfitting', 'shipyard')): # Market.json, Outfitting.json or Shipyard.json to process try: @@ -2116,7 +2338,7 @@ def journal_entry( # noqa: C901, CCR001 return None -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: """ Process new CAPI data. @@ -2125,7 +2347,7 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 :return: str - Error message, or `None` if no errors. """ if (data['commander'].get('docked') or (this.on_foot and monitor.station) - and config.get_int('output') & config.OUT_MKT_EDDN): + and config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA): try: if this.marketId != data['lastStarport']['id']: this.commodities = this.outfitting = this.shipyard = None diff --git a/prefs.py b/prefs.py index 03b8dcd8..376b6c67 100644 --- a/prefs.py +++ b/prefs.py @@ -1221,7 +1221,9 @@ class PreferencesDialog(tk.Toplevel): (self.out_csv.get() and config.OUT_MKT_CSV) + (config.OUT_MKT_MANUAL if not self.out_auto.get() else 0) + (self.out_ship.get() and config.OUT_SHIP) + - (config.get_int('output') & (config.OUT_MKT_EDDN | config.OUT_SYS_EDDN | config.OUT_SYS_DELAY)) + (config.get_int('output') & ( + config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION | config.OUT_EDDN_DELAY + )) ) config.set(