1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-05-31 15:49:41 +03:00

Merge pull request #2083 from HullSeals/enhancement/2051/plugins

[2051] Core Plugin Audit
LGTM
This commit is contained in:
Phoebe 2023-10-19 16:02:42 -07:00 committed by GitHub
commit ff668eab8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 545 additions and 653 deletions

View File

@ -1,86 +1,86 @@
"""Coriolis ship export.""" """
coriolis.py - Coriolis Ship Export.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import base64 import base64
import gzip import gzip
import io import io
import json import json
import tkinter as tk import tkinter as tk
from tkinter import ttk from tkinter import ttk
from typing import TYPE_CHECKING, Union from typing import TYPE_CHECKING, Union, Optional
import myNotebook as nb # noqa: N813 # its not my fault. import myNotebook as nb # noqa: N813 # its not my fault.
from EDMCLogging import get_main_logger from EDMCLogging import get_main_logger
from plug import show_error from plug import show_error
from config import config
if TYPE_CHECKING: if TYPE_CHECKING:
def _(s: str) -> str: def _(s: str) -> str:
... ...
# Migrate settings from <= 3.01
from config import config
if not config.get_str('shipyard_provider') and config.get_int('shipyard'): class CoriolisConfig:
config.set('shipyard_provider', 'Coriolis') """Coriolis Configuration."""
config.delete('shipyard', suppress=True) def __init__(self):
self.normal_url = ''
self.beta_url = ''
self.override_mode = ''
self.normal_textvar = tk.StringVar()
self.beta_textvar = tk.StringVar()
self.override_textvar = tk.StringVar()
def initialize_urls(self):
"""Initialize Coriolis URLs and override mode from configuration."""
self.normal_url = config.get_str('coriolis_normal_url', default=DEFAULT_NORMAL_URL)
self.beta_url = config.get_str('coriolis_beta_url', default=DEFAULT_BETA_URL)
self.override_mode = config.get_str('coriolis_overide_url_selection', default=DEFAULT_OVERRIDE_MODE)
self.normal_textvar.set(value=self.normal_url)
self.beta_textvar.set(value=self.beta_url)
self.override_textvar.set(
value={
'auto': _('Auto'), # LANG: 'Auto' label for Coriolis site override selection
'normal': _('Normal'), # LANG: 'Normal' label for Coriolis site override selection
'beta': _('Beta') # LANG: 'Beta' label for Coriolis site override selection
}.get(self.override_mode, _('Auto')) # LANG: 'Auto' label for Coriolis site override selection
)
coriolis_config = CoriolisConfig()
logger = get_main_logger() logger = get_main_logger()
DEFAULT_NORMAL_URL = 'https://coriolis.io/import?data=' DEFAULT_NORMAL_URL = 'https://coriolis.io/import?data='
DEFAULT_BETA_URL = 'https://beta.coriolis.io/import?data=' DEFAULT_BETA_URL = 'https://beta.coriolis.io/import?data='
DEFAULT_OVERRIDE_MODE = 'auto' DEFAULT_OVERRIDE_MODE = 'auto'
normal_url = ''
beta_url = ''
override_mode = ''
normal_textvar = tk.StringVar()
beta_textvar = tk.StringVar()
override_textvar = tk.StringVar() # This will always contain a _localised_ version
def plugin_start3(path: str) -> str: def plugin_start3(path: str) -> str:
"""Set up URLs.""" """Set up URLs."""
global normal_url, beta_url, override_mode coriolis_config.initialize_urls()
normal_url = config.get_str('coriolis_normal_url', default=DEFAULT_NORMAL_URL)
beta_url = config.get_str('coriolis_beta_url', default=DEFAULT_BETA_URL)
override_mode = config.get_str('coriolis_overide_url_selection', default=DEFAULT_OVERRIDE_MODE)
normal_textvar.set(value=normal_url)
beta_textvar.set(value=beta_url)
override_textvar.set(
value={
'auto': _('Auto'), # LANG: 'Auto' label for Coriolis site override selection
'normal': _('Normal'), # LANG: 'Normal' label for Coriolis site override selection
'beta': _('Beta') # LANG: 'Beta' label for Coriolis site override selection
}.get(override_mode, _('Auto')) # LANG: 'Auto' label for Coriolis site override selection
)
return 'Coriolis' return 'Coriolis'
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
"""Set up plugin preferences.""" """Set up plugin preferences."""
PADX = 10 # noqa: N806 PADX = 10 # noqa: N806
@ -95,18 +95,21 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
# LANG: Settings>Coriolis: Label for 'NOT alpha/beta game version' URL # LANG: Settings>Coriolis: Label for 'NOT alpha/beta game version' URL
nb.Label(conf_frame, text=_('Normal URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX) nb.Label(conf_frame, text=_('Normal URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.Entry(conf_frame, textvariable=normal_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX) nb.Entry(conf_frame,
textvariable=coriolis_config.normal_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
# LANG: Generic 'Reset' button label # LANG: Generic 'Reset' button label
nb.Button(conf_frame, text=_("Reset"), command=lambda: normal_textvar.set(value=DEFAULT_NORMAL_URL)).grid( nb.Button(conf_frame, text=_("Reset"),
command=lambda: coriolis_config.normal_textvar.set(value=DEFAULT_NORMAL_URL)).grid(
sticky=tk.W, row=cur_row, column=2, padx=PADX sticky=tk.W, row=cur_row, column=2, padx=PADX
) )
cur_row += 1 cur_row += 1
# LANG: Settings>Coriolis: Label for 'alpha/beta game version' URL # LANG: Settings>Coriolis: Label for 'alpha/beta game version' URL
nb.Label(conf_frame, text=_('Beta URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX) nb.Label(conf_frame, text=_('Beta URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.Entry(conf_frame, textvariable=beta_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX) nb.Entry(conf_frame, textvariable=coriolis_config.beta_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
# LANG: Generic 'Reset' button label # LANG: Generic 'Reset' button label
nb.Button(conf_frame, text=_('Reset'), command=lambda: beta_textvar.set(value=DEFAULT_BETA_URL)).grid( nb.Button(conf_frame, text=_('Reset'),
command=lambda: coriolis_config.beta_textvar.set(value=DEFAULT_BETA_URL)).grid(
sticky=tk.W, row=cur_row, column=2, padx=PADX sticky=tk.W, row=cur_row, column=2, padx=PADX
) )
cur_row += 1 cur_row += 1
@ -116,8 +119,8 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
nb.Label(conf_frame, text=_('Override Beta/Normal Selection')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX) nb.Label(conf_frame, text=_('Override Beta/Normal Selection')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.OptionMenu( nb.OptionMenu(
conf_frame, conf_frame,
override_textvar, coriolis_config.override_textvar,
override_textvar.get(), coriolis_config.override_textvar.get(),
_('Normal'), # LANG: 'Normal' label for Coriolis site override selection _('Normal'), # LANG: 'Normal' label for Coriolis site override selection
_('Beta'), # LANG: 'Beta' label for Coriolis site override selection _('Beta'), # LANG: 'Beta' label for Coriolis site override selection
_('Auto') # LANG: 'Auto' label for Coriolis site override selection _('Auto') # LANG: 'Auto' label for Coriolis site override selection
@ -127,50 +130,49 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
return conf_frame return conf_frame
def prefs_changed(cmdr: str | None, is_beta: bool) -> None: def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None:
"""Update URLs.""" """
global normal_url, beta_url, override_mode Update URLs and override mode based on user preferences.
normal_url = normal_textvar.get()
beta_url = beta_textvar.get() :param cmdr: Commander name, if available
override_mode = override_textvar.get() :param is_beta: Whether the game mode is beta
override_mode = { # Convert to unlocalised names """
coriolis_config.normal_url = coriolis_config.normal_textvar.get()
coriolis_config.beta_url = coriolis_config.beta_textvar.get()
coriolis_config.override_mode = coriolis_config.override_textvar.get()
# Convert to unlocalised names
coriolis_config.override_mode = {
_('Normal'): 'normal', # LANG: Coriolis normal/beta selection - normal _('Normal'): 'normal', # LANG: Coriolis normal/beta selection - normal
_('Beta'): 'beta', # LANG: Coriolis normal/beta selection - beta _('Beta'): 'beta', # LANG: Coriolis normal/beta selection - beta
_('Auto'): 'auto', # LANG: Coriolis normal/beta selection - auto _('Auto'): 'auto', # LANG: Coriolis normal/beta selection - auto
}.get(override_mode, override_mode) }.get(coriolis_config.override_mode, coriolis_config.override_mode)
if override_mode not in ('beta', 'normal', 'auto'): if coriolis_config.override_mode not in ('beta', 'normal', 'auto'):
logger.warning(f'Unexpected value {override_mode=!r}. defaulting to "auto"') logger.warning(f'Unexpected value {coriolis_config.override_mode=!r}. Defaulting to "auto"')
override_mode = 'auto' coriolis_config.override_mode = 'auto'
override_textvar.set(value=_('Auto')) # LANG: 'Auto' label for Coriolis site override selection coriolis_config.override_textvar.set(value=_('Auto')) # LANG: 'Auto' label for Coriolis site override selection
config.set('coriolis_normal_url', normal_url) config.set('coriolis_normal_url', coriolis_config.normal_url)
config.set('coriolis_beta_url', beta_url) config.set('coriolis_beta_url', coriolis_config.beta_url)
config.set('coriolis_overide_url_selection', override_mode) config.set('coriolis_overide_url_selection', coriolis_config.override_mode)
def _get_target_url(is_beta: bool) -> str: def _get_target_url(is_beta: bool) -> str:
global override_mode if coriolis_config.override_mode not in ('auto', 'normal', 'beta'):
if override_mode not in ('auto', 'normal', 'beta'):
# LANG: Settings>Coriolis - invalid override mode found # LANG: Settings>Coriolis - invalid override mode found
show_error(_('Invalid Coriolis override mode!')) show_error(_('Invalid Coriolis override mode!'))
logger.warning(f'Unexpected override mode {override_mode!r}! defaulting to auto!') logger.warning(f'Unexpected override mode {coriolis_config.override_mode!r}! defaulting to auto!')
override_mode = 'auto' coriolis_config.override_mode = 'auto'
if coriolis_config.override_mode == 'beta':
if override_mode == 'beta': return coriolis_config.beta_url
return beta_url if coriolis_config.override_mode == 'normal':
return coriolis_config.normal_url
elif override_mode == 'normal':
return normal_url
# Must be auto # Must be auto
if is_beta: if is_beta:
return beta_url return coriolis_config.beta_url
return normal_url return coriolis_config.normal_url
# to anyone reading this, no, this is NOT the correct return type. Its magic internal stuff that I WILL be changing
# some day. Check PLUGINS.md for the right way to do this. -A_D
def shipyard_url(loadout, is_beta) -> Union[str, bool]: def shipyard_url(loadout, is_beta) -> Union[str, bool]:
@ -179,11 +181,8 @@ def shipyard_url(loadout, is_beta) -> Union[str, bool]:
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')
if not string: if not string:
return False return False
out = io.BytesIO() out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode='w') as f: with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(string) f.write(string)
encoded = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D') encoded = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
return _get_target_url(is_beta) + encoded return _get_target_url(is_beta) + encoded

View File

@ -1,26 +1,23 @@
"""Handle exporting data to EDDN.""" """
eddn.py - Exporting Data to EDDN.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# Copyright (c) EDCD, All Rights Reserved
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# Licensed under the GNU General Public License.
# See LICENSE file.
# This is an EDMC 'core' plugin.
# This is an EDMC 'core' plugin.
# All EDMC plugins are *dynamically* loaded at run-time. All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`. We build for Windows using `py2exe`.
# `py2exe` can't possibly know about anything in the dynamically loaded core plugins.
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins. Thus, you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so... YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN IN AN END-USER INSTALLATION ON WINDOWS.
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT """
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import http import http
import itertools import itertools
import json import json
@ -34,12 +31,19 @@ from collections import OrderedDict
from platform import system from platform import system
from textwrap import dedent from textwrap import dedent
from threading import Lock from threading import Lock
from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping, Optional from typing import (
TYPE_CHECKING,
Any,
Iterator,
Mapping,
MutableMapping,
Optional,
Dict,
List,
)
from typing import OrderedDict as OrderedDictT from typing import OrderedDict as OrderedDictT
from typing import Tuple, Union from typing import Tuple, Union
import requests import requests
import companion import companion
import edmc_data import edmc_data
import killswitch import killswitch
@ -88,21 +92,21 @@ class This:
self.body_name: Optional[str] = None self.body_name: Optional[str] = None
self.body_id: Optional[int] = None self.body_id: Optional[int] = None
self.body_type: Optional[int] = None self.body_type: Optional[int] = None
self.station_name: str | None = None self.station_name: Optional[str] = None
self.station_type: str | None = None self.station_type: Optional[str] = None
self.station_marketid: str | None = None self.station_marketid: Optional[str] = None
# Track Status.json data # Track Status.json data
self.status_body_name: Optional[str] = None self.status_body_name: Optional[str] = None
# Avoid duplicates # Avoid duplicates
self.marketId: Optional[str] = None self.marketId: Optional[str] = None
self.commodities: Optional[list[OrderedDictT[str, Any]]] = None self.commodities: Optional[List[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, list[str]]] = None self.outfitting: Optional[Tuple[bool, List[str]]] = None
self.shipyard: Optional[Tuple[bool, list[Mapping[str, Any]]]] = None self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None
self.fcmaterials_marketid: int = 0 self.fcmaterials_marketid: int = 0
self.fcmaterials: Optional[list[OrderedDictT[str, Any]]] = None self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi_marketid: int = 0 self.fcmaterials_capi_marketid: int = 0
self.fcmaterials_capi: Optional[list[OrderedDictT[str, Any]]] = None self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None
# For the tkinter parent window, so we can call update_idletasks() # For the tkinter parent window, so we can call update_idletasks()
self.parent: tk.Tk self.parent: tk.Tk
@ -156,7 +160,7 @@ class EDDNSender:
UNKNOWN_SCHEMA_RE = re.compile( UNKNOWN_SCHEMA_RE = re.compile(
r"^FAIL: \[JsonValidationException\('Schema " r"^FAIL: \[JsonValidationException\('Schema "
r"https://eddn.edcd.io/schemas/(?P<schema_name>.+)/(?P<schema_version>[0-9]+) is unknown, " r"https://eddn.edcd.io/schemas/(?P<schema_name>.+)/(?P<schema_version>[0-9]+) is unknown, "
r"unable to validate.',\)\]$" r"unable to validate.',\)]$"
) )
def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None: def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None:
@ -203,10 +207,8 @@ class EDDNSender:
db = db_conn.cursor() db = db_conn.cursor()
try: try:
db.execute( db.execute("""
""" CREATE TABLE IF NOT EXISTS messages (
CREATE TABLE messages
(
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
created TEXT NOT NULL, created TEXT NOT NULL,
cmdr TEXT NOT NULL, cmdr TEXT NOT NULL,
@ -215,26 +217,12 @@ class EDDNSender:
game_build TEXT, game_build TEXT,
message TEXT NOT NULL message TEXT NOT NULL
) )
""" """)
)
db.execute( db.execute("CREATE INDEX IF NOT EXISTS messages_created ON messages (created)")
""" db.execute("CREATE INDEX IF NOT EXISTS messages_cmdr ON messages (cmdr)")
CREATE INDEX messages_created ON messages
(
created
)
"""
)
db.execute( logger.info("New 'eddn_queue-v1.db' created")
"""
CREATE INDEX messages_cmdr ON messages
(
cmdr
)
"""
)
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
if str(e) != "table messages already exists": if str(e) != "table messages already exists":
@ -243,12 +231,6 @@ class EDDNSender:
db_conn.close() db_conn.close()
raise e raise e
else:
logger.info("New `eddn_queue-v1.db` created")
# We return only the connection, so tidy up
db.close()
return db_conn return db_conn
def convert_legacy_file(self): def convert_legacy_file(self):
@ -264,11 +246,10 @@ class EDDNSender:
except FileNotFoundError: except FileNotFoundError:
return return
logger.info("Conversion to `eddn_queue-v1.db` complete, removing `replay.jsonl`")
# Best effort at removing the file/contents # Best effort at removing the file/contents
# NB: The legacy code assumed it could write to the file. with open(filename, 'w') as replay_file:
logger.info("Conversion` to `eddn_queue-v1.db` complete, removing `replay.jsonl`") replay_file.truncate()
replay_file = open(filename, 'w') # Will truncate
replay_file.close()
os.unlink(filename) os.unlink(filename)
def close(self) -> None: def close(self) -> None:
@ -414,7 +395,7 @@ class EDDNSender:
""" """
logger.trace_if("plugin.eddn.send", "Sending message") logger.trace_if("plugin.eddn.send", "Sending message")
should_return: bool should_return: bool
new_data: dict[str, Any] new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg))
if should_return: if should_return:
@ -423,7 +404,7 @@ class EDDNSender:
# Even the smallest possible message compresses somewhat, so always compress # Even the smallest possible message compresses somewhat, so always compress
encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0) encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0)
headers: None | dict[str, str] = None headers: Optional[Dict[str, str]] = None
if compressed: if compressed:
headers = {'Content-Encoding': 'gzip'} headers = {'Content-Encoding': 'gzip'}
@ -454,14 +435,13 @@ class EDDNSender:
# This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema. # This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema.
return True return True
elif e.response.status_code == http.HTTPStatus.BAD_REQUEST: if e.response.status_code == http.HTTPStatus.BAD_REQUEST:
# EDDN straight up says no, so drop the message # EDDN straight up says no, so drop the message
logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}") logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}")
return True return True
else: # This should catch anything else, e.g. timeouts, gateway errors
# This should catch anything else, e.g. timeouts, gateway errors self.set_ui_status(self.http_error_to_log(e))
self.set_ui_status(self.http_error_to_log(e))
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.debug('Failed sending', exc_info=e) logger.debug('Failed sending', exc_info=e)
@ -485,19 +465,26 @@ class EDDNSender:
if not self.queue_processing.acquire(blocking=False): if not self.queue_processing.acquire(blocking=False):
logger.trace_if("plugin.eddn.send", "Couldn't obtain mutex") logger.trace_if("plugin.eddn.send", "Couldn't obtain mutex")
if reschedule: if reschedule:
logger.trace_if("plugin.eddn.send", f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now") logger.trace_if(
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule) "plugin.eddn.send",
f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now",
)
self.eddn.parent.after(
self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule
)
else: else:
logger.trace_if("plugin.eddn.send", "NO next run scheduled (there should be another one already set)") logger.trace_if(
"plugin.eddn.send",
"NO next run scheduled (there should be another one already set)",
)
return return
logger.trace_if("plugin.eddn.send", "Obtained mutex") logger.trace_if("plugin.eddn.send", "Obtained mutex")
# Used to indicate if we've rescheduled at the faster rate already. # Used to indicate if we've rescheduled at the faster rate already.
have_rescheduled = False have_rescheduled = False
# We send either if docked or 'Delay sending until docked' not set # We send either if docked or 'Delay sending until docked' not set
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY): if this.docked or not config.get_int('output') & config.OUT_EDDN_DELAY:
logger.trace_if("plugin.eddn.send", "Should send") logger.trace_if("plugin.eddn.send", "Should send")
# We need our own cursor here, in case the semantics of # We need our own cursor here, in case the semantics of
# tk `after()` could allow this to run in the middle of other # tk `after()` could allow this to run in the middle of other
@ -516,7 +503,7 @@ class EDDNSender:
db_cursor.execute( db_cursor.execute(
""" """
SELECT id FROM messages SELECT id FROM messages
ORDER BY created ASC ORDER BY created
LIMIT 1 LIMIT 1
""" """
) )
@ -586,16 +573,15 @@ class EDDNSender:
# LANG: EDDN has banned this version of our client # LANG: EDDN has banned this version of our client
return _('EDDN Error: EDMC is too old for EDDN. Please update.') return _('EDDN Error: EDMC is too old for EDDN. Please update.')
elif status_code == 400: if status_code == 400:
# we a validation check or something else. # we a validation check or something else.
logger.warning(f'EDDN Error: {status_code} -- {exception.response}') logger.warning(f'EDDN Error: {status_code} -- {exception.response}')
# LANG: EDDN returned an error that indicates something about what we sent it was wrong # LANG: EDDN returned an error that indicates something about what we sent it was wrong
return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log') return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log')
else: logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}')
logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}') # LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number
# LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
# TODO: a good few of these methods are static or could be classmethods. they should be created as such. # TODO: a good few of these methods are static or could be classmethods. they should be created as such.
@ -626,7 +612,7 @@ class EDDN:
self.sender = EDDNSender(self, self.eddn_url) self.sender = EDDNSender(self, self.eddn_url)
self.fss_signals: list[Mapping[str, Any]] = [] self.fss_signals: List[Mapping[str, Any]] = []
def close(self): def close(self):
"""Close down the EDDN class instance.""" """Close down the EDDN class instance."""
@ -650,7 +636,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode :param is_beta: whether or not we're currently in beta mode
""" """
should_return: bool should_return: bool
new_data: dict[str, Any] new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) should_return, new_data = killswitch.check_killswitch('capi.request./market', {})
if should_return: if should_return:
logger.warning("capi.request./market has been disabled by killswitch. Returning.") logger.warning("capi.request./market has been disabled by killswitch. Returning.")
@ -667,7 +653,7 @@ class EDDN:
modules, modules,
ships ships
) )
commodities: list[OrderedDictT[str, Any]] = [] commodities: List[OrderedDictT[str, Any]] = []
for commodity in data['lastStarport'].get('commodities') or []: for commodity in data['lastStarport'].get('commodities') or []:
# Check 'marketable' and 'not prohibited' # Check 'marketable' and 'not prohibited'
if (category_map.get(commodity['categoryname'], True) if (category_map.get(commodity['categoryname'], True)
@ -740,7 +726,7 @@ class EDDN:
:param data: The raw CAPI data. :param data: The raw CAPI data.
:return: Sanity-checked data. :return: Sanity-checked data.
""" """
modules: dict[str, Any] = data['lastStarport'].get('modules') modules: Dict[str, Any] = data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict): if modules is None or not isinstance(modules, dict):
if modules is None: if modules is None:
logger.debug('modules was None. FC or Damaged Station?') logger.debug('modules was None. FC or Damaged Station?')
@ -757,7 +743,7 @@ class EDDN:
# Set a safe value # Set a safe value
modules = {} modules = {}
ships: dict[str, Any] = data['lastStarport'].get('ships') ships: Dict[str, Any] = data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict): if ships is None or not isinstance(ships, dict):
if ships is None: if ships is None:
logger.debug('ships was None') logger.debug('ships was None')
@ -783,7 +769,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode :param is_beta: whether or not we're currently in beta mode
""" """
should_return: bool should_return: bool
new_data: dict[str, Any] new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return: if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -810,7 +796,7 @@ class EDDN:
modules.values() modules.values()
) )
outfitting: list[str] = sorted( outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search
) )
@ -851,7 +837,7 @@ class EDDN:
:param is_beta: whether or not we are in beta mode :param is_beta: whether or not we are in beta mode
""" """
should_return: bool should_return: bool
new_data: dict[str, Any] new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return: if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -870,7 +856,7 @@ class EDDN:
ships ships
) )
shipyard: list[Mapping[str, Any]] = sorted( shipyard: List[Mapping[str, Any]] = sorted(
itertools.chain( itertools.chain(
(ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()), (ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()),
(ship['name'].lower() for ship in ships['unavailable_list'] or {}), (ship['name'].lower() for ship in ships['unavailable_list'] or {}),
@ -913,8 +899,8 @@ class EDDN:
:param is_beta: whether or not we're in beta mode :param is_beta: whether or not we're in beta mode
:param entry: the journal entry containing the commodities data :param entry: the journal entry containing the commodities data
""" """
items: list[Mapping[str, Any]] = entry.get('Items') or [] items: List[Mapping[str, Any]] = entry.get('Items') or []
commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([ commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([
('name', self.canonicalise(commodity['Name'])), ('name', self.canonicalise(commodity['Name'])),
('meanPrice', commodity['MeanPrice']), ('meanPrice', commodity['MeanPrice']),
('buyPrice', commodity['BuyPrice']), ('buyPrice', commodity['BuyPrice']),
@ -961,11 +947,11 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode :param is_beta: Whether or not we're in beta mode
:param entry: The relevant journal entry :param entry: The relevant journal entry
""" """
modules: list[Mapping[str, Any]] = entry.get('Items', []) modules: List[Mapping[str, Any]] = entry.get('Items', [])
horizons: bool = entry.get('Horizons', False) horizons: bool = entry.get('Horizons', False)
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) # outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
# for module in modules if module['Name'] != 'int_planetapproachsuite']) # for module in modules if module['Name'] != 'int_planetapproachsuite'])
outfitting: list[str] = sorted( outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in
filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules) filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules)
) )
@ -1000,7 +986,7 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode :param is_beta: Whether or not we're in beta mode
:param entry: the relevant journal entry :param entry: the relevant journal entry
""" """
ships: list[Mapping[str, Any]] = entry.get('PriceList') or [] ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
horizons: bool = entry.get('Horizons', False) horizons: bool = entry.get('Horizons', False)
shipyard = sorted(ship['ShipType'] for ship in ships) shipyard = sorted(ship['ShipType'] for ship in ships)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
@ -1050,7 +1036,7 @@ class EDDN:
msg['header'] = self.standard_header() msg['header'] = self.standard_header()
msg_id = self.sender.add_message(cmdr, msg) msg_id = self.sender.add_message(cmdr, msg)
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY): if this.docked or not config.get_int('output') & config.OUT_EDDN_DELAY:
# No delay in sending configured, so attempt immediately # No delay in sending configured, so attempt immediately
logger.trace_if("plugin.eddn.send", "Sending 'non-station' message") logger.trace_if("plugin.eddn.send", "Sending 'non-station' message")
self.sender.send_message_by_id(msg_id) self.sender.send_message_by_id(msg_id)
@ -1123,8 +1109,7 @@ class EDDN:
logger.warning(f'No system name in entry, and system_name was not set either! entry:\n{entry!r}\n') logger.warning(f'No system name in entry, and system_name was not set either! entry:\n{entry!r}\n')
return "passed-in system_name is empty, can't add System" return "passed-in system_name is empty, can't add System"
else: entry['StarSystem'] = system_name
entry['StarSystem'] = system_name
if 'SystemAddress' not in entry: if 'SystemAddress' not in entry:
if this.system_address is None: if this.system_address is None:
@ -1918,7 +1903,7 @@ class EDDN:
gv = '' gv = ''
####################################################################### #######################################################################
# Base string # Base string
if capi_host == companion.SERVER_LIVE or capi_host == companion.SERVER_BETA: if capi_host in (companion.SERVER_LIVE, companion.SERVER_BETA):
gv = 'CAPI-Live-' gv = 'CAPI-Live-'
elif capi_host == companion.SERVER_LEGACY: elif capi_host == companion.SERVER_LEGACY:
@ -2107,7 +2092,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame:
BUTTONX = 12 # noqa: N806 # indent Checkbuttons and Radiobuttons BUTTONX = 12 # noqa: N806 # indent Checkbuttons and Radiobuttons
if prefsVersion.shouldSetDefaults('0.0.0.0', not bool(config.get_int('output'))): if prefsVersion.shouldSetDefaults('0.0.0.0', not bool(config.get_int('output'))):
output: int = (config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION) # default settings output: int = config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION # default settings
else: else:
output = config.get_int('output') output = config.get_int('output')
@ -2167,7 +2152,7 @@ def prefsvarchanged(event=None) -> None:
this.eddn_system_button['state'] = tk.NORMAL this.eddn_system_button['state'] = tk.NORMAL
# This line will grey out the 'Delay sending ...' option if the 'Send # This line will grey out the 'Delay sending ...' option if the 'Send
# system and scan data' option is off. # system and scan data' option is off.
this.eddn_delay_button['state'] = this.eddn_system.get() and tk.NORMAL or tk.DISABLED this.eddn_delay_button['state'] = tk.NORMAL if this.eddn_system.get() else tk.DISABLED
def prefs_changed(cmdr: str, is_beta: bool) -> None: def prefs_changed(cmdr: str, is_beta: bool) -> None:
@ -2324,22 +2309,22 @@ def journal_entry( # noqa: C901, CCR001
if event_name == 'fssdiscoveryscan': if event_name == 'fssdiscoveryscan':
return this.eddn.export_journal_fssdiscoveryscan(cmdr, system, state['StarPos'], is_beta, entry) return this.eddn.export_journal_fssdiscoveryscan(cmdr, system, state['StarPos'], is_beta, entry)
elif event_name == 'navbeaconscan': if event_name == 'navbeaconscan':
return this.eddn.export_journal_navbeaconscan(cmdr, system, state['StarPos'], is_beta, entry) return this.eddn.export_journal_navbeaconscan(cmdr, system, state['StarPos'], is_beta, entry)
elif event_name == 'codexentry': if event_name == 'codexentry':
return this.eddn.export_journal_codexentry(cmdr, state['StarPos'], is_beta, entry) return this.eddn.export_journal_codexentry(cmdr, state['StarPos'], is_beta, entry)
elif event_name == 'scanbarycentre': if event_name == 'scanbarycentre':
return this.eddn.export_journal_scanbarycentre(cmdr, state['StarPos'], is_beta, entry) return this.eddn.export_journal_scanbarycentre(cmdr, state['StarPos'], is_beta, entry)
elif event_name == 'navroute': if event_name == 'navroute':
return this.eddn.export_journal_navroute(cmdr, is_beta, entry) return this.eddn.export_journal_navroute(cmdr, is_beta, entry)
elif event_name == 'fcmaterials': if event_name == 'fcmaterials':
return this.eddn.export_journal_fcmaterials(cmdr, is_beta, entry) return this.eddn.export_journal_fcmaterials(cmdr, is_beta, entry)
elif event_name == 'approachsettlement': if event_name == 'approachsettlement':
# An `ApproachSettlement` can appear *before* `Location` if you # An `ApproachSettlement` can appear *before* `Location` if you
# logged at one. We won't have necessary augmentation data # logged at one. We won't have necessary augmentation data
# at this point, so bail. # at this point, so bail.
@ -2354,10 +2339,10 @@ def journal_entry( # noqa: C901, CCR001
entry entry
) )
elif event_name == 'fsssignaldiscovered': if event_name == 'fsssignaldiscovered':
this.eddn.enqueue_journal_fsssignaldiscovered(entry) this.eddn.enqueue_journal_fsssignaldiscovered(entry)
elif event_name == 'fssallbodiesfound': if event_name == 'fssallbodiesfound':
return this.eddn.export_journal_fssallbodiesfound( return this.eddn.export_journal_fssallbodiesfound(
cmdr, cmdr,
system, system,
@ -2366,7 +2351,7 @@ def journal_entry( # noqa: C901, CCR001
entry entry
) )
elif event_name == 'fssbodysignals': if event_name == 'fssbodysignals':
return this.eddn.export_journal_fssbodysignals( return this.eddn.export_journal_fssbodysignals(
cmdr, cmdr,
system, system,
@ -2626,7 +2611,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST
return economies_colony or modules_horizons or ship_horizons return economies_colony or modules_horizons or ship_horizons
def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None: def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None:
""" """
Process Status.json data to track things like current Body. Process Status.json data to track things like current Body.

View File

@ -1,36 +1,23 @@
"""Show EDSM data in display and handle lookups.""" """
edsm.py - Handling EDSM Data and Display.
# TODO: Copyright (c) EDCD, All Rights Reserved
# 1) Re-factor EDSM API calls out of journal_entry() into own function. Licensed under the GNU General Public License.
# 2) Fix how StartJump already changes things, but only partially. See LICENSE file.
# 3) Possibly this and other two 'provider' plugins could do with being
# based on a single class that they extend. There's a lot of duplicated
# logic.
# 4) Ensure the EDSM API call(back) for setting the image at end of system
# text is always fired. i.e. CAPI cmdr_data() processing.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# This is an EDMC 'core' plugin.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# All EDMC plugins are *dynamically* loaded at run-time.
#
# This is an EDMC 'core' plugin. We build for Windows using `py2exe`.
# `py2exe` can't possibly know about anything in the dynamically loaded core plugins.
# All EDMC plugins are *dynamically* loaded at run-time.
# Thus, you **MUST** check if any imports you add in this file are only
# We build for Windows using `py2exe`. referenced in this file (or only in any other core plugin), and if so...
#
# `py2exe` can't possibly know about anything in the dynamically loaded YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# core plugins. `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
# Thus you **MUST** check if any imports you add in this file are only """
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT IN
# AN END-USER INSTALLATION ON WINDOWS.
#
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import json import json
import threading import threading
import tkinter as tk import tkinter as tk
@ -40,12 +27,9 @@ from threading import Thread
from time import sleep from time import sleep
from tkinter import ttk from tkinter import ttk
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast
import requests import requests
import killswitch import killswitch
import monitor import monitor
import myNotebook
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
import plug import plug
from companion import CAPIData from companion import CAPIData
@ -58,6 +42,15 @@ if TYPE_CHECKING:
def _(x: str) -> str: def _(x: str) -> str:
return x return x
# TODO:
# 1) Re-factor EDSM API calls out of journal_entry() into own function.
# 2) Fix how StartJump already changes things, but only partially.
# 3) Possibly this and other two 'provider' plugins could do with being
# based on a single class that they extend. There's a lot of duplicated
# logic.
# 4) Ensure the EDSM API call(back) for setting the image at end of system
# text is always fired. i.e. CAPI cmdr_data() processing.
logger = get_main_logger() logger = get_main_logger()
EDSM_POLL = 0.1 EDSM_POLL = 0.1
@ -93,13 +86,13 @@ class This:
self.newgame: bool = False # starting up - batch initial burst of events self.newgame: bool = False # starting up - batch initial burst of events
self.newgame_docked: bool = False # starting up while docked self.newgame_docked: bool = False # starting up while docked
self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan
self.system_link: tk.Widget | None = None self.system_link: Optional[tk.Widget] = None
self.system_name: tk.Tk | None = None self.system_name: Optional[tk.Tk] = None
self.system_address: int | None = None # Frontier SystemAddress self.system_address: Optional[int] = None # Frontier SystemAddress
self.system_population: int | None = None self.system_population: Optional[int] = None
self.station_link: tk.Widget | None = None self.station_link: Optional[tk.Widget] = None
self.station_name: str | None = None self.station_name: Optional[str] = None
self.station_marketid: int | None = None # Frontier MarketID self.station_marketid: Optional[int] = None # Frontier MarketID
self.on_foot = False self.on_foot = False
self._IMG_KNOWN = None self._IMG_KNOWN = None
@ -109,19 +102,19 @@ class This:
self.thread: Optional[threading.Thread] = None self.thread: Optional[threading.Thread] = None
self.log: tk.IntVar | None = None self.log: Optional[tk.IntVar] = None
self.log_button: ttk.Checkbutton | None = None self.log_button: Optional[ttk.Checkbutton] = None
self.label: tk.Widget | None = None self.label: Optional[tk.Widget] = None
self.cmdr_label: myNotebook.Label | None = None self.cmdr_label: Optional[nb.Label] = None
self.cmdr_text: myNotebook.Label | None = None self.cmdr_text: Optional[nb.Label] = None
self.user_label: myNotebook.Label | None = None self.user_label: Optional[nb.Label] = None
self.user: myNotebook.Entry | None = None self.user: Optional[nb.Entry] = None
self.apikey_label: myNotebook.Label | None = None self.apikey_label: Optional[nb.Label] = None
self.apikey: myNotebook.Entry | None = None self.apikey: Optional[nb.Entry] = None
this = This() this = This()
@ -284,7 +277,7 @@ def toggle_password_visibility():
this.apikey.config(show="*") # type: ignore this.apikey.config(show="*") # type: ignore
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
""" """
Plugin preferences setup hook. Plugin preferences setup hook.
@ -297,8 +290,8 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
:return: An instance of `myNotebook.Frame`. :return: An instance of `myNotebook.Frame`.
""" """
PADX = 10 # noqa: N806 PADX = 10 # noqa: N806
BUTTONX = 12 # indent Checkbuttons and Radiobuttons # noqa: N806 BUTTONX = 12 # noqa: N806
PADY = 2 # close spacing # noqa: N806 PADY = 2 # noqa: N806
frame = nb.Frame(parent) frame = nb.Frame(parent)
frame.columnconfigure(1, weight=1) frame.columnconfigure(1, weight=1)
@ -309,51 +302,46 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
background=nb.Label().cget('background'), background=nb.Label().cget('background'),
url='https://www.edsm.net/', url='https://www.edsm.net/',
underline=True underline=True
).grid(columnspan=2, padx=PADX, sticky=tk.W) # Don't translate ).grid(columnspan=2, padx=PADX, sticky=tk.W)
this.log = tk.IntVar(value=config.get_int('edsm_out') and 1) this.log = tk.IntVar(value=config.get_int('edsm_out') and 1)
this.log_button = nb.Checkbutton( this.log_button = nb.Checkbutton(
# LANG: Settings>EDSM - Label on checkbox for 'send data' frame,
frame, text=_('Send flight log and Cmdr status to EDSM'), variable=this.log, command=prefsvarchanged text=_('Send flight log and Cmdr status to EDSM'),
variable=this.log,
command=prefsvarchanged
) )
if this.log_button: if this.log_button:
this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W) this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
nb.Label(frame).grid(sticky=tk.W) # big spacer nb.Label(frame).grid(sticky=tk.W) # big spacer
# Section heading in settings
this.label = HyperlinkLabel( this.label = HyperlinkLabel(
frame, frame,
# LANG: Settings>EDSM - Label on header/URL to EDSM API key page
text=_('Elite Dangerous Star Map credentials'), text=_('Elite Dangerous Star Map credentials'),
background=nb.Label().cget('background'), background=nb.Label().cget('background'),
url='https://www.edsm.net/settings/api', url='https://www.edsm.net/settings/api',
underline=True underline=True
) )
cur_row = 10 cur_row = 10
if this.label: if this.label:
this.label.grid(columnspan=2, padx=PADX, sticky=tk.W) this.label.grid(columnspan=2, padx=PADX, sticky=tk.W)
# LANG: Game Commander name label in EDSM settings # LANG: Game Commander name label in EDSM settings
this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window this.cmdr_label = nb.Label(frame, text=_('Cmdr'))
this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W) this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.cmdr_text = nb.Label(frame) this.cmdr_text = nb.Label(frame)
this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W) this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W)
cur_row += 1 cur_row += 1
# LANG: EDSM Commander name label in EDSM settings # LANG: EDSM Commander name label in EDSM settings
this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting this.user_label = nb.Label(frame, text=_('Commander Name'))
this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W) this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.user = nb.Entry(frame) this.user = nb.Entry(frame)
this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
cur_row += 1 cur_row += 1
# LANG: EDSM API key label # LANG: EDSM API key label
this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting this.apikey_label = nb.Label(frame, text=_('API Key'))
this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W) this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.apikey = nb.Entry(frame, show="*", width=50) this.apikey = nb.Entry(frame, show="*", width=50)
this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
@ -361,18 +349,19 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
prefs_cmdr_changed(cmdr, is_beta) prefs_cmdr_changed(cmdr, is_beta)
show_password_var.set(False) # Password is initially masked show_password_var.set(False) # Password is initially masked
show_password_checkbox = nb.Checkbutton( show_password_checkbox = nb.Checkbutton(
frame, frame,
text="Show API Key", text="Show API Key",
variable=show_password_var, variable=show_password_var,
command=toggle_password_visibility, command=toggle_password_visibility
) )
show_password_checkbox.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W) show_password_checkbox.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
return frame return frame
def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001 def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001
""" """
Handle the Commander name changing whilst Settings was open. Handle the Commander name changing whilst Settings was open.
@ -381,28 +370,21 @@ def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001
""" """
if this.log_button: if this.log_button:
this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED
if this.user: if this.user:
this.user['state'] = tk.NORMAL this.user['state'] = tk.NORMAL
this.user.delete(0, tk.END) this.user.delete(0, tk.END)
if this.apikey: if this.apikey:
this.apikey['state'] = tk.NORMAL this.apikey['state'] = tk.NORMAL
this.apikey.delete(0, tk.END) this.apikey.delete(0, tk.END)
if cmdr: if cmdr:
if this.cmdr_text: if this.cmdr_text:
this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}' this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}'
cred = credentials(cmdr) cred = credentials(cmdr)
if cred: if cred:
if this.user: if this.user:
this.user.insert(0, cred[0]) this.user.insert(0, cred[0])
if this.apikey: if this.apikey:
this.apikey.insert(0, cred[1]) this.apikey.insert(0, cred[1])
else: else:
if this.cmdr_text: if this.cmdr_text:
# LANG: We have no data on the current commander # LANG: We have no data on the current commander
@ -429,18 +411,22 @@ def set_prefs_ui_states(state: str) -> None:
Set the state of various config UI entries. Set the state of various config UI entries.
:param state: the state to set each entry to :param state: the state to set each entry to
# NOTE: This may break things, watch out in testing. (5.10)
""" """
if ( elements = [
this.label and this.cmdr_label and this.cmdr_text and this.user_label and this.user this.label,
and this.apikey_label and this.apikey this.cmdr_label,
): this.cmdr_text,
this.label['state'] = state this.user_label,
this.cmdr_label['state'] = state this.user,
this.cmdr_text['state'] = state this.apikey_label,
this.user_label['state'] = state this.apikey
this.user['state'] = state ]
this.apikey_label['state'] = state
this.apikey['state'] = state for element in elements:
if element:
element['state'] = state
def prefs_changed(cmdr: str, is_beta: bool) -> None: def prefs_changed(cmdr: str, is_beta: bool) -> None:
@ -454,7 +440,6 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
config.set('edsm_out', this.log.get()) config.set('edsm_out', this.log.get())
if cmdr and not is_beta: if cmdr and not is_beta:
# TODO: remove this when config is rewritten.
cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[]) cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[])
usernames: List[str] = config.get_list('edsm_usernames', default=[]) usernames: List[str] = config.get_list('edsm_usernames', default=[])
apikeys: List[str] = config.get_list('edsm_apikeys', default=[]) apikeys: List[str] = config.get_list('edsm_apikeys', default=[])
@ -466,7 +451,6 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
usernames[idx] = this.user.get().strip() usernames[idx] = this.user.get().strip()
apikeys.extend([''] * (1 + idx - len(apikeys))) apikeys.extend([''] * (1 + idx - len(apikeys)))
apikeys[idx] = this.apikey.get().strip() apikeys[idx] = this.apikey.get().strip()
else: else:
config.set('edsm_cmdrs', cmdrs + [cmdr]) config.set('edsm_cmdrs', cmdrs + [cmdr])
usernames.append(this.user.get().strip()) usernames.append(this.user.get().strip())
@ -495,20 +479,17 @@ def credentials(cmdr: str) -> Optional[Tuple[str, str]]:
cmdrs = [cmdr] cmdrs = [cmdr]
config.set('edsm_cmdrs', cmdrs) config.set('edsm_cmdrs', cmdrs)
if (cmdr in cmdrs and (edsm_usernames := config.get_list('edsm_usernames')) edsm_usernames = config.get_list('edsm_usernames')
and (edsm_apikeys := config.get_list('edsm_apikeys'))): edsm_apikeys = config.get_list('edsm_apikeys')
if cmdr in cmdrs and len(cmdrs) == len(edsm_usernames) == len(edsm_apikeys):
idx = cmdrs.index(cmdr) idx = cmdrs.index(cmdr)
# The EDSM cmdr and apikey might not exist yet! if idx < len(edsm_usernames) and idx < len(edsm_apikeys):
if idx >= len(edsm_usernames) or idx >= len(edsm_apikeys): logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning ({edsm_usernames[idx]=}, {edsm_apikeys[idx]=})')
return None return edsm_usernames[idx], edsm_apikeys[idx]
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning ({edsm_usernames[idx]=}, {edsm_apikeys[idx]=})') logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning None')
return None
return (edsm_usernames[idx], edsm_apikeys[idx])
else:
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning None')
return None
def journal_entry( # noqa: C901, CCR001 def journal_entry( # noqa: C901, CCR001
@ -564,7 +545,6 @@ entry: {entry!r}'''
if not this.station_name: if not this.station_name:
if this.system_population and this.system_population > 0: if this.system_population and this.system_population > 0:
to_set = STATION_UNDOCKED to_set = STATION_UNDOCKED
else: else:
to_set = '' to_set = ''
@ -582,7 +562,6 @@ entry: {entry!r}'''
this.multicrew = bool(state['Role']) this.multicrew = bool(state['Role'])
if 'StarPos' in entry: if 'StarPos' in entry:
this.coordinates = entry['StarPos'] this.coordinates = entry['StarPos']
elif entry['event'] == 'LoadGame': elif entry['event'] == 'LoadGame':
this.coordinates = None this.coordinates = None
@ -590,20 +569,16 @@ entry: {entry!r}'''
this.newgame = True this.newgame = True
this.newgame_docked = False this.newgame_docked = False
this.navbeaconscan = 0 this.navbeaconscan = 0
elif entry['event'] == 'StartUp': elif entry['event'] == 'StartUp':
this.newgame = False this.newgame = False
this.newgame_docked = False this.newgame_docked = False
this.navbeaconscan = 0 this.navbeaconscan = 0
elif entry['event'] == 'Location': elif entry['event'] == 'Location':
this.newgame = True this.newgame = True
this.newgame_docked = entry.get('Docked', False) this.newgame_docked = entry.get('Docked', False)
this.navbeaconscan = 0 this.navbeaconscan = 0
elif entry['event'] == 'NavBeaconScan': elif entry['event'] == 'NavBeaconScan':
this.navbeaconscan = entry['NumBodies'] this.navbeaconscan = entry['NumBodies']
elif entry['event'] == 'BackPack': elif entry['event'] == 'BackPack':
# Use the stored file contents, not the empty journal event # Use the stored file contents, not the empty journal event
if state['BackpackJSON']: if state['BackpackJSON']:
@ -646,7 +621,6 @@ entry: {entry!r}'''
} }
materials.update(transient) materials.update(transient)
logger.trace_if(CMDR_EVENTS, f'"LoadGame" event, queueing Materials: {cmdr=}') logger.trace_if(CMDR_EVENTS, f'"LoadGame" event, queueing Materials: {cmdr=}')
this.queue.put((cmdr, this.game_version, this.game_build, materials)) this.queue.put((cmdr, this.game_version, this.game_build, materials))
if entry['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked'): if entry['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked'):
@ -655,7 +629,6 @@ entry: {entry!r}'''
Queueing: {entry!r}''' Queueing: {entry!r}'''
) )
logger.trace_if(CMDR_EVENTS, f'"{entry["event"]=}" event, queueing: {cmdr=}') logger.trace_if(CMDR_EVENTS, f'"{entry["event"]=}" event, queueing: {cmdr=}')
this.queue.put((cmdr, this.game_version, this.game_build, entry)) this.queue.put((cmdr, this.game_version, this.game_build, entry))
return '' return ''
@ -675,11 +648,9 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
# Always store initially, even if we're not the *current* system provider. # Always store initially, even if we're not the *current* system provider.
if not this.station_marketid and data['commander']['docked']: if not this.station_marketid and data['commander']['docked']:
this.station_marketid = data['lastStarport']['id'] this.station_marketid = data['lastStarport']['id']
# Only trust CAPI if these aren't yet set # Only trust CAPI if these aren't yet set
if not this.system_name: if not this.system_name:
this.system_name = data['lastSystem']['name'] this.system_name = data['lastSystem']['name']
if not this.station_name and data['commander']['docked']: if not this.station_name and data['commander']['docked']:
this.station_name = data['lastStarport']['name'] this.station_name = data['lastStarport']['name']
@ -691,21 +662,17 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
# Do *NOT* set 'url' here, as it's set to a function that will call # Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string. # through correctly. We don't want a static string.
this.system_link.update_idletasks() this.system_link.update_idletasks()
if config.get_str('station_provider') == 'EDSM': if config.get_str('station_provider') == 'EDSM':
if this.station_link: if this.station_link:
if data['commander']['docked'] or this.on_foot and this.station_name: if data['commander']['docked'] or this.on_foot and this.station_name:
this.station_link['text'] = this.station_name this.station_link['text'] = this.station_name
elif data['lastStarport']['name'] and data['lastStarport']['name'] != "": elif data['lastStarport']['name'] and data['lastStarport']['name'] != "":
this.station_link['text'] = STATION_UNDOCKED this.station_link['text'] = STATION_UNDOCKED
else: else:
this.station_link['text'] = '' this.station_link['text'] = ''
# Do *NOT* set 'url' here, as it's set to a function that will call # Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string. # through correctly. We don't want a static string.
this.station_link.update_idletasks() this.station_link.update_idletasks()
if this.system_link and not this.system_link['text']: if this.system_link and not this.system_link['text']:
@ -722,30 +689,37 @@ if 'edsm' in debug_senders:
def get_discarded_events_list() -> None: def get_discarded_events_list() -> None:
"""Retrieve the list of to-discard events from EDSM.""" """
Retrieve the list of events to discard from EDSM.
This function queries the EDSM API to obtain the list of events that should be discarded,
and stores them in the `discarded_events` attribute.
:return: None
"""
try: try:
r = this.session.get('https://www.edsm.net/api-journal-v1/discard', timeout=_TIMEOUT) r = this.session.get('https://www.edsm.net/api-journal-v1/discard', timeout=_TIMEOUT)
r.raise_for_status() r.raise_for_status()
this.discarded_events = set(r.json()) this.discarded_events = set(r.json())
# We discard 'Docked' events because should_send() assumes that we send them
this.discarded_events.discard('Docked') # should_send() assumes that we send 'Docked' events this.discarded_events.discard('Docked')
if not this.discarded_events: if not this.discarded_events:
logger.warning( logger.warning(
'Unexpected empty discarded events list from EDSM: ' 'Unexpected empty discarded events list from EDSM: '
f'{type(this.discarded_events)} -- {this.discarded_events}' f'{type(this.discarded_events)} -- {this.discarded_events}'
) )
except Exception as e: except Exception as e:
logger.warning('Exception whilst trying to set this.discarded_events:', exc_info=e) logger.warning('Exception while trying to set this.discarded_events:', exc_info=e)
def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently def worker() -> None: # noqa: CCR001 C901
""" """
Handle uploading events to EDSM API. Handle uploading events to EDSM API.
Target function of a thread. This function is the target function of a thread. It processes events from the queue until the
queued item is None, uploading the events to the EDSM API.
Processes `this.queue` until the queued item is None. :return: None
""" """
logger.debug('Starting...') logger.debug('Starting...')
pending: List[Mapping[str, Any]] = [] # Unsent events pending: List[Mapping[str, Any]] = [] # Unsent events
@ -753,13 +727,11 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
cmdr: str = "" cmdr: str = ""
last_game_version = "" last_game_version = ""
last_game_build = "" last_game_build = ""
entry: Mapping[str, Any] = {}
while not this.discarded_events: while not this.discarded_events:
if this.shutting_down: if this.shutting_down:
logger.debug(f'returning from discarded_events loop due to {this.shutting_down=}') logger.debug(f'returning from discarded_events loop due to {this.shutting_down=}')
return return
get_discarded_events_list() get_discarded_events_list()
if this.discarded_events: if this.discarded_events:
break break
@ -776,17 +748,15 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
if item: if item:
(cmdr, game_version, game_build, entry) = item (cmdr, game_version, game_build, entry) = item
logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})') logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})')
else: else:
logger.debug('Empty queue message, setting closing = True') logger.debug('Empty queue message, setting closing = True')
closing = True # Try to send any unsent events before we close closing = True # Try to send any unsent events before we close
entry = {'event': 'ShutDown'} # Dummy to allow for `uentry['event']` belowt entry = {'event': 'ShutDown'} # Dummy to allow for `entry['event']` below
retrying = 0 retrying = 0
while retrying < 3: while retrying < 3:
if item is None: if item is None:
item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {})) item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {}))
should_skip, new_item = killswitch.check_killswitch( should_skip, new_item = killswitch.check_killswitch(
'plugins.edsm.worker', 'plugins.edsm.worker',
item, item,
@ -795,7 +765,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
if should_skip: if should_skip:
break break
if item is not None: if item is not None:
item = new_item item = new_item
@ -817,18 +786,14 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
or last_game_version != game_version or last_game_build != game_build or last_game_version != game_version or last_game_build != game_build
): ):
pending = [] pending = []
pending.append(entry) pending.append(entry)
# drop events if required by killswitch # drop events if required by killswitch
new_pending = [] new_pending = []
for e in pending: for e in pending:
skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger) skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger)
if skip: if skip:
continue continue
new_pending.append(new) new_pending.append(new)
pending = new_pending pending = new_pending
if pending and should_send(pending, entry['event']): if pending and should_send(pending, entry['event']):
@ -840,10 +805,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
"('CarrierJump', 'FSDJump', 'Location', 'Docked')" "('CarrierJump', 'FSDJump', 'Location', 'Docked')"
" and it passed should_send()") " and it passed should_send()")
for p in pending: for p in pending:
if p['event'] in ('Location'): if p['event'] in 'Location':
logger.trace_if( logger.trace_if(
'journal.locations', 'journal.locations',
f'"Location" event in pending passed should_send(),timestamp: {p["timestamp"]}' f'"Location" event in pending passed should_send(), timestamp: {p["timestamp"]}'
) )
creds = credentials(cmdr) creds = credentials(cmdr)
@ -868,24 +833,20 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
data_elided['apiKey'] = '<elided>' data_elided['apiKey'] = '<elided>'
if isinstance(data_elided['message'], bytes): if isinstance(data_elided['message'], bytes):
data_elided['message'] = data_elided['message'].decode('utf-8') data_elided['message'] = data_elided['message'].decode('utf-8')
if isinstance(data_elided['commanderName'], bytes): if isinstance(data_elided['commanderName'], bytes):
data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8') data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8')
logger.trace_if( logger.trace_if(
'journal.locations', 'journal.locations',
"pending has at least one of ('CarrierJump', 'FSDJump', 'Location', 'Docked')" "pending has at least one of ('CarrierJump', 'FSDJump', 'Location', 'Docked')"
" Attempting API call with the following events:" " Attempting API call with the following events:"
) )
for p in pending: for p in pending:
logger.trace_if('journal.locations', f"Event: {p!r}") logger.trace_if('journal.locations', f"Event: {p!r}")
if p['event'] in ('Location'): if p['event'] in 'Location':
logger.trace_if( logger.trace_if(
'journal.locations', 'journal.locations',
f'Attempting API call for "Location" event with timestamp: {p["timestamp"]}' f'Attempting API call for "Location" event with timestamp: {p["timestamp"]}'
) )
logger.trace_if( logger.trace_if(
'journal.locations', f'Overall POST data (elided) is:\n{json.dumps(data_elided, indent=2)}' 'journal.locations', f'Overall POST data (elided) is:\n{json.dumps(data_elided, indent=2)}'
) )
@ -906,17 +867,13 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
logger.warning(f'EDSM\t{msg_num} {msg}\t{json.dumps(pending, separators=(",", ": "))}') logger.warning(f'EDSM\t{msg_num} {msg}\t{json.dumps(pending, separators=(",", ": "))}')
# LANG: EDSM Plugin - Error message from EDSM API # LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=msg)) plug.show_error(_('Error: EDSM {MSG}').format(MSG=msg))
else: else:
if msg_num // 100 == 1: if msg_num // 100 == 1:
logger.trace_if('plugin.edsm.api', 'Overall OK') logger.trace_if('plugin.edsm.api', 'Overall OK')
pass pass
elif msg_num // 100 == 5: elif msg_num // 100 == 5:
logger.trace_if('plugin.edsm.api', 'Event(s) not currently processed, but saved for later') logger.trace_if('plugin.edsm.api', 'Event(s) not currently processed, but saved for later')
pass pass
else: else:
logger.warning(f'EDSM API call status not 1XX, 2XX or 5XX: {msg.num}') logger.warning(f'EDSM API call status not 1XX, 2XX or 5XX: {msg.num}')
@ -927,13 +884,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
# calls update_status in main thread # calls update_status in main thread
if not config.shutting_down and this.system_link is not None: if not config.shutting_down and this.system_link is not None:
this.system_link.event_generate('<<EDSMStatus>>', when="tail") this.system_link.event_generate('<<EDSMStatus>>', when="tail")
if r['msgnum'] // 100 != 1:
if r['msgnum'] // 100 != 1: # type: ignore logger.warning(f'EDSM event with not-1xx status:\n{r["msgnum"]}\n'
logger.warning(f'EDSM event with not-1xx status:\n{r["msgnum"]}\n' # type: ignore
f'{r["msg"]}\n{json.dumps(e, separators = (",", ": "))}') f'{r["msg"]}\n{json.dumps(e, separators = (",", ": "))}')
pending = [] pending = []
break # No exception, so assume success break # No exception, so assume success
except Exception as e: except Exception as e:
@ -943,12 +897,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
else: else:
# LANG: EDSM Plugin - Error connecting to EDSM API # LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM")) plug.show_error(_("Error: Can't connect to EDSM"))
if entry['event'].lower() in ('shutdown', 'commander', 'fileheader'): if entry['event'].lower() in ('shutdown', 'commander', 'fileheader'):
# Game shutdown or new login, so we MUST not hang on to pending # Game shutdown or new login, so we MUST not hang on to pending
pending = [] pending = []
logger.trace_if(CMDR_EVENTS, f'Blanked pending because of event: {entry["event"]}') logger.trace_if(CMDR_EVENTS, f'Blanked pending because of event: {entry["event"]}')
if closing: if closing:
logger.debug('closing, so returning.') logger.debug('closing, so returning.')
return return
@ -956,8 +908,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
last_game_version = game_version last_game_version = game_version
last_game_build = game_build last_game_build = game_build
logger.debug('Done.')
def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001
""" """
@ -967,54 +917,42 @@ def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa:
:param event: The latest event being processed :param event: The latest event being processed
:return: bool indicating whether or not to send said entries :return: bool indicating whether or not to send said entries
""" """
# We MUST flush pending on logout, in case new login is a different Commander def should_send_entry(entry: Mapping[str, Any]) -> bool:
if entry['event'] == 'Cargo':
return not this.newgame_docked
if entry['event'] == 'Docked':
return True
if this.newgame:
return True
if entry['event'] not in (
'CommunityGoal',
'ModuleBuy',
'ModuleSell',
'ModuleSwap',
'ShipyardBuy',
'ShipyardNew',
'ShipyardSwap'
):
return True
return False
if event.lower() in ('shutdown', 'fileheader'): if event.lower() in ('shutdown', 'fileheader'):
logger.trace_if(CMDR_EVENTS, f'True because {event=}') logger.trace_if(CMDR_EVENTS, f'True because {event=}')
return True return True
# batch up burst of Scan events after NavBeaconScan
if this.navbeaconscan: if this.navbeaconscan:
if entries and entries[-1]['event'] == 'Scan': if entries and entries[-1]['event'] == 'Scan':
this.navbeaconscan -= 1 this.navbeaconscan -= 1
if this.navbeaconscan: should_send_result = this.navbeaconscan == 0
logger.trace_if(CMDR_EVENTS, f'False because {this.navbeaconscan=}') logger.trace_if(CMDR_EVENTS, f'False because {this.navbeaconscan=}' if not should_send_result else '')
return should_send_result
logger.error('Invalid state NavBeaconScan exists, but passed entries either '
"doesn't exist or doesn't have the expected content")
this.navbeaconscan = 0
return False should_send_result = any(should_send_entry(entry) for entry in entries)
logger.trace_if(CMDR_EVENTS, f'False as default: {this.newgame_docked=}' if not should_send_result else '')
else: return should_send_result
logger.error(
'Invalid state NavBeaconScan exists, but passed entries either '
"doesn't exist or doesn't have the expected content"
)
this.navbeaconscan = 0
for entry in entries:
if (entry['event'] == 'Cargo' and not this.newgame_docked) or entry['event'] == 'Docked':
# Cargo is the last event on startup, unless starting when docked in which case Docked is the last event
this.newgame = False
this.newgame_docked = False
logger.trace_if(CMDR_EVENTS, f'True because {entry["event"]=}')
return True
elif this.newgame:
pass
elif entry['event'] not in (
'CommunityGoal', # Spammed periodically
'ModuleBuy', 'ModuleSell', 'ModuleSwap', # will be shortly followed by "Loadout"
'ShipyardBuy', 'ShipyardNew', 'ShipyardSwap'): # "
logger.trace_if(CMDR_EVENTS, f'True because {entry["event"]=}')
return True
else:
logger.trace_if(CMDR_EVENTS, f'{entry["event"]=}, {this.newgame_docked=}')
logger.trace_if(CMDR_EVENTS, f'False as default: {this.newgame_docked=}')
return False
def update_status(event=None) -> None: def update_status(event=None) -> None:
@ -1033,14 +971,11 @@ def edsm_notify_system(reply: Mapping[str, Any]) -> None:
this.system_link['image'] = this._IMG_ERROR this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error connecting to EDSM API # LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM")) plug.show_error(_("Error: Can't connect to EDSM"))
elif reply['msgnum'] // 100 not in (1, 4): elif reply['msgnum'] // 100 not in (1, 4):
this.system_link['image'] = this._IMG_ERROR this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error message from EDSM API # LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg'])) plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg']))
elif reply.get('systemCreated'): elif reply.get('systemCreated'):
this.system_link['image'] = this._IMG_NEW this.system_link['image'] = this._IMG_NEW
else: else:
this.system_link['image'] = this._IMG_KNOWN this.system_link['image'] = this._IMG_KNOWN

View File

@ -1,32 +1,28 @@
"""Export data for ED Shipyard.""" """
edsy.py - Exporting Data to EDSY.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# Copyright (c) EDCD, All Rights Reserved
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# Licensed under the GNU General Public License.
# See LICENSE file.
# This is an EDMC 'core' plugin.
# This is an EDMC 'core' plugin.
# All EDMC plugins are *dynamically* loaded at run-time. All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`. We build for Windows using `py2exe`.
# `py2exe` can't possibly know about anything in the dynamically loaded core plugins.
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins. Thus, you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so... YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN IN AN END-USER INSTALLATION ON WINDOWS.
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT IN """
# AN END-USER INSTALLATION ON WINDOWS.
#
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import base64 import base64
import gzip import gzip
import io import io
import json import json
from typing import Any, Mapping from typing import Any, Mapping, Union
def plugin_start3(plugin_dir: str) -> str: def plugin_start3(plugin_dir: str) -> str:
@ -40,15 +36,15 @@ def plugin_start3(plugin_dir: str) -> str:
# Return a URL for the current ship # Return a URL for the current ship
def shipyard_url(loadout: Mapping[str, Any], is_beta) -> bool | str: def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]:
""" """
Construct a URL for ship loadout. Construct a URL for ship loadout.
:param loadout: :param loadout: The ship loadout data.
:param is_beta: :param is_beta: Whether the game is in beta.
:return: :return: The constructed URL for the ship loadout.
""" """
# most compact representation # Convert loadout to JSON and gzip compress it
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')
if not string: if not string:
return False return False
@ -57,6 +53,8 @@ def shipyard_url(loadout: Mapping[str, Any], is_beta) -> bool | str:
with gzip.GzipFile(fileobj=out, mode='w') as f: with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(string) f.write(string)
return ( # Construct the URL using the appropriate base URL based on is_beta
is_beta and 'http://edsy.org/beta/#/I=' or 'http://edsy.org/#/I=' base_url = 'https://edsy.org/beta/#/I=' if is_beta else 'https://edsy.org/#/I='
) + base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D') encoded_data = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
return base_url + encoded_data

View File

@ -1,26 +1,24 @@
"""Inara Sync.""" """
inara.py - Sync with INARA.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import json import json
import threading import threading
import time import time
@ -34,9 +32,7 @@ from tkinter import ttk
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import OrderedDict as OrderedDictT from typing import OrderedDict as OrderedDictT
from typing import Sequence, Union, cast from typing import Sequence, Union, cast
import requests import requests
import edmc_data import edmc_data
import killswitch import killswitch
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
@ -168,14 +164,15 @@ if DEBUG:
TARGET_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/inara' TARGET_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/inara'
# noinspection PyUnresolvedReferences
def system_url(system_name: str) -> str: def system_url(system_name: str) -> str:
"""Get a URL for the current system.""" """Get a URL for the current system."""
if this.system_address: if this.system_address:
return requests.utils.requote_uri(f'https://inara.cz/elite/starsystem/' return requests.utils.requote_uri(f'https://inara.cz/galaxy-starsystem/'
f'?search={this.system_address}') f'?search={this.system_address}')
elif system_name: if system_name:
return requests.utils.requote_uri(f'https://inara.cz/elite/starsystem/' return requests.utils.requote_uri(f'https://inara.cz/galaxy-starsystem/'
f'?search={system_name}') f'?search={system_name}')
return '' return ''
@ -192,13 +189,11 @@ def station_url(system_name: str, station_name: str) -> str:
:return: A URL to inara for the given system and station :return: A URL to inara for the given system and station
""" """
if system_name and station_name: if system_name and station_name:
return requests.utils.requote_uri(f'https://inara.cz/elite/station/' return requests.utils.requote_uri(f'https://inara.cz/galaxy-station/?search={system_name}%20[{station_name}]')
f'?search={system_name}%20[{station_name}]')
# monitor state might think these are gone, but we don't yet
if this.system_name and this.station: if this.system_name and this.station:
return requests.utils.requote_uri(f'https://inara.cz/elite/station/' return requests.utils.requote_uri(
f'?search={this.system_name}%20[{this.station}]') f'https://inara.cz/galaxy-station/?search={this.system_name}%20[{this.station}]')
if system_name: if system_name:
return system_url(system_name) return system_url(system_name)
@ -224,9 +219,7 @@ def plugin_start3(plugin_dir: str) -> str:
def plugin_app(parent: tk.Tk) -> None: def plugin_app(parent: tk.Tk) -> None:
"""Plugin UI setup Hook.""" """Plugin UI setup Hook."""
this.parent = parent this.parent = parent
# system label in main window
this.system_link = parent.nametowidget(f".{appname.lower()}.system") this.system_link = parent.nametowidget(f".{appname.lower()}.system")
# station label in main window
this.station_link = parent.nametowidget(f".{appname.lower()}.station") this.station_link = parent.nametowidget(f".{appname.lower()}.station")
this.system_link.bind_all('<<InaraLocation>>', update_location) this.system_link.bind_all('<<InaraLocation>>', update_location)
this.system_link.bind_all('<<InaraShip>>', update_ship) this.system_link.bind_all('<<InaraShip>>', update_ship)
@ -379,11 +372,14 @@ def credentials(cmdr: Optional[str]) -> Optional[str]:
return None return None
cmdrs = config.get_list('inara_cmdrs', default=[]) cmdrs = config.get_list('inara_cmdrs', default=[])
if cmdr in cmdrs and config.get_list('inara_apikeys'): apikeys = config.get_list('inara_apikeys', default=[])
return config.get_list('inara_apikeys')[cmdrs.index(cmdr)]
else: if cmdr in cmdrs:
return None idx = cmdrs.index(cmdr)
if idx < len(apikeys):
return apikeys[idx]
return None
def journal_entry( # noqa: C901, CCR001 def journal_entry( # noqa: C901, CCR001
@ -422,11 +418,9 @@ def journal_entry( # noqa: C901, CCR001
if not monitor.is_live_galaxy(): if not monitor.is_live_galaxy():
# Since Update 14 on 2022-11-29 Inara only accepts Live data. # Since Update 14 on 2022-11-29 Inara only accepts Live data.
if ( if (
( (this.legacy_galaxy_last_notified is None or
this.legacy_galaxy_last_notified is None (datetime.now(timezone.utc) - this.legacy_galaxy_last_notified) > timedelta(seconds=300))
or (datetime.now(timezone.utc) - this.legacy_galaxy_last_notified) > timedelta(seconds=300) and config.get_int('inara_out') and not (is_beta or this.multicrew or credentials(cmdr))
)
and config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(cmdr)
): ):
# LANG: The Inara API only accepts Live galaxy data, not Legacy galaxy data # LANG: The Inara API only accepts Live galaxy data, not Legacy galaxy data
logger.info(_("Inara only accepts Live galaxy data")) logger.info(_("Inara only accepts Live galaxy data"))
@ -475,92 +469,49 @@ def journal_entry( # noqa: C901, CCR001
if config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(cmdr): if config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(cmdr):
current_credentials = Credentials(this.cmdr, this.FID, str(credentials(this.cmdr))) current_credentials = Credentials(this.cmdr, this.FID, str(credentials(this.cmdr)))
try: try:
# Dump starting state to Inara if this.newuser or event_name == 'StartUp' or (this.newsession and event_name == 'Cargo'):
if (this.newuser or event_name == 'StartUp' or (this.newsession and event_name == 'Cargo')):
this.newuser = False this.newuser = False
this.newsession = False this.newsession = False
# Don't send the API call with no values.
if state['Reputation']: if state['Reputation']:
new_add_event( reputation_data = [
'setCommanderReputationMajorFaction', {'majorfactionName': k.lower(), 'majorfactionReputation': v / 100.0}
entry['timestamp'], for k, v in state['Reputation'].items() if v is not None
[ ]
{'majorfactionName': k.lower(), 'majorfactionReputation': v / 100.0} new_add_event('setCommanderReputationMajorFaction', entry['timestamp'], reputation_data)
for k, v in state['Reputation'].items() if v is not None
]
)
if state['Engineers']: # Not populated < 3.3
to_send_list: List[Mapping[str, Any]] = []
for k, v in state['Engineers'].items():
e = {'engineerName': k}
if isinstance(v, tuple):
e['rankValue'] = v[0]
else:
e['rankStage'] = v
to_send_list.append(e)
new_add_event(
'setCommanderRankEngineer',
entry['timestamp'],
to_send_list,
)
# Update location
# Might not be available if this event is a 'StartUp' and we're replaying
# a log.
# XXX: This interferes with other more specific setCommanderTravelLocation events in the same
# batch.
# if system:
# new_add_event(
# 'setCommanderTravelLocation',
# entry['timestamp'],
# OrderedDict([
# ('starsystemName', system),
# ('stationName', station), # Can be None
# ])
# )
if state['Engineers']:
engineer_data = [
{'engineerName': k, 'rankValue': v[0] if isinstance(v, tuple) else None, 'rankStage': v}
for k, v in state['Engineers'].items()
]
new_add_event('setCommanderRankEngineer', entry['timestamp'], engineer_data)
# Update ship # Update ship
if state['ShipID']: # Unknown if started in Fighter or SRV if state['ShipID']:
cur_ship: Dict[str, Any] = { cur_ship = {
'shipType': state['ShipType'], 'shipType': state['ShipType'],
'shipGameID': state['ShipID'], 'shipGameID': state['ShipID'],
'shipName': state['ShipName'], 'shipName': state['ShipName'],
'shipIdent': state['ShipIdent'], 'shipIdent': state['ShipIdent'],
'isCurrentShip': True, 'isCurrentShip': True,
} }
if state['HullValue']: if state['HullValue']:
cur_ship['shipHullValue'] = state['HullValue'] cur_ship['shipHullValue'] = state['HullValue']
if state['ModulesValue']: if state['ModulesValue']:
cur_ship['shipModulesValue'] = state['ModulesValue'] cur_ship['shipModulesValue'] = state['ModulesValue']
cur_ship['shipRebuyCost'] = state['Rebuy'] cur_ship['shipRebuyCost'] = state['Rebuy']
new_add_event('setCommanderShip', entry['timestamp'], cur_ship) new_add_event('setCommanderShip', entry['timestamp'], cur_ship)
this.loadout = make_loadout(state) this.loadout = make_loadout(state)
new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout)
# Trigger off the "only observed as being after Ranks" event so that # Trigger off the "only observed as being after Ranks" event so that
# we have both current Ranks *and* current Progress within them. # we have both current Ranks *and* current Progress within them.
elif event_name == 'Progress': elif event_name == 'Progress':
# Send rank info to Inara on startup rank_data = [
new_add_event( {'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': v[1] / 100.0}
'setCommanderRankPilot', for k, v in state['Rank'].items() if v is not None
entry['timestamp'], ]
[ new_add_event('setCommanderRankPilot', entry['timestamp'], rank_data)
{'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': v[1] / 100.0}
for k, v in state['Rank'].items() if v is not None
]
)
# Promotions
elif event_name == 'Promotion': elif event_name == 'Promotion':
for k, v in state['Rank'].items(): for k, v in state['Rank'].items():
if k in entry: if k in entry:
@ -571,41 +522,25 @@ def journal_entry( # noqa: C901, CCR001
) )
elif event_name == 'EngineerProgress' and 'Engineer' in entry: elif event_name == 'EngineerProgress' and 'Engineer' in entry:
# TODO: due to this var name being used above, the types are weird engineer_rank_data = {
to_send_dict = {'engineerName': entry['Engineer']} 'engineerName': entry['Engineer'],
if 'Rank' in entry: 'rankValue': entry['Rank'] if 'Rank' in entry else None,
to_send_dict['rankValue'] = entry['Rank'] 'rankStage': entry['Progress'] if 'Progress' in entry else None,
}
else: new_add_event('setCommanderRankEngineer', entry['timestamp'], engineer_rank_data)
to_send_dict['rankStage'] = entry['Progress']
new_add_event(
'setCommanderRankEngineer',
entry['timestamp'],
to_send_dict
)
# PowerPlay status change # PowerPlay status change
if event_name == 'PowerplayJoin': elif event_name == 'PowerplayJoin':
new_add_event( power_join_data = {'powerName': entry['Power'], 'rankValue': 1}
'setCommanderRankPower', new_add_event('setCommanderRankPower', entry['timestamp'], power_join_data)
entry['timestamp'],
{'powerName': entry['Power'], 'rankValue': 1}
)
elif event_name == 'PowerplayLeave': elif event_name == 'PowerplayLeave':
new_add_event( power_leave_data = {'powerName': entry['Power'], 'rankValue': 0}
'setCommanderRankPower', new_add_event('setCommanderRankPower', entry['timestamp'], power_leave_data)
entry['timestamp'],
{'powerName': entry['Power'], 'rankValue': 0}
)
elif event_name == 'PowerplayDefect': elif event_name == 'PowerplayDefect':
new_add_event( power_defect_data = {'powerName': entry["ToPower"], 'rankValue': 1}
'setCommanderRankPower', new_add_event('setCommanderRankPower', entry['timestamp'], power_defect_data)
entry['timestamp'],
{'powerName': entry['ToPower'], 'rankValue': 1}
)
# Ship change # Ship change
if event_name == 'Loadout' and this.shipswap: if event_name == 'Loadout' and this.shipswap:
@ -683,7 +618,7 @@ def journal_entry( # noqa: C901, CCR001
elif event_name == 'SupercruiseExit': elif event_name == 'SupercruiseExit':
to_send = { to_send = {
'starsystemName': entry['StarSystem'], 'starsystemName': entry['StarSystem'],
} }
if entry['BodyType'] == 'Planet': if entry['BodyType'] == 'Planet':
@ -696,9 +631,9 @@ def journal_entry( # noqa: C901, CCR001
# we might not yet have system logged for use. # we might not yet have system logged for use.
if system: if system:
to_send = { to_send = {
'starsystemName': system, 'starsystemName': system,
'stationName': entry['Name'], 'stationName': entry['Name'],
'starsystemBodyName': entry['BodyName'], 'starsystemBodyName': entry['BodyName'],
'starsystemBodyCoords': [entry['Latitude'], entry['Longitude']] 'starsystemBodyCoords': [entry['Latitude'], entry['Longitude']]
} }
# Not present on, e.g. Ancient Ruins # Not present on, e.g. Ancient Ruins
@ -775,22 +710,19 @@ def journal_entry( # noqa: C901, CCR001
# Ignore the following 'Docked' event # Ignore the following 'Docked' event
this.suppress_docked = True this.suppress_docked = True
cargo: List[OrderedDictT[str, Any]]
cargo = [OrderedDict({'itemName': k, 'itemCount': state['Cargo'][k]}) for k in sorted(state['Cargo'])]
# Send cargo and materials if changed # Send cargo and materials if changed
cargo = [OrderedDict({'itemName': k, 'itemCount': state['Cargo'][k]}) for k in sorted(state['Cargo'])]
if this.cargo != cargo: if this.cargo != cargo:
new_add_event('setCommanderInventoryCargo', entry['timestamp'], cargo) new_add_event('setCommanderInventoryCargo', entry['timestamp'], cargo)
this.cargo = cargo this.cargo = cargo
materials: List[OrderedDictT[str, Any]] = [] materials = [
for category in ('Raw', 'Manufactured', 'Encoded'): OrderedDict([('itemName', k), ('itemCount', state[category][k])])
materials.extend( for category in ('Raw', 'Manufactured', 'Encoded')
[OrderedDict([('itemName', k), ('itemCount', state[category][k])]) for k in sorted(state[category])] for k in sorted(state[category])
) ]
if this.materials != materials: if this.materials != materials:
new_add_event('setCommanderInventoryMaterials', entry['timestamp'], materials) new_add_event('setCommanderInventoryMaterials', entry['timestamp'], materials)
this.materials = materials this.materials = materials
except Exception as e: except Exception as e:
@ -1398,7 +1330,7 @@ def journal_entry( # noqa: C901, CCR001
return '' # No error return '' # No error
def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001 def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later
"""CAPI event hook.""" """CAPI event hook."""
this.cmdr = data['commander']['name'] this.cmdr = data['commander']['name']
@ -1539,17 +1471,22 @@ def new_add_event(
def clean_event_list(event_list: List[Event]) -> List[Event]: def clean_event_list(event_list: List[Event]) -> List[Event]:
"""Check for killswitched events and remove or modify them as requested.""" """
out = [] Check for killswitched events and remove or modify them as requested.
for e in event_list:
bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{e.name}', e.data, logger) :param event_list: List of events to clean
if bad: :return: Cleaned list of events
"""
cleaned_events = []
for event in event_list:
is_bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{event.name}', event.data, logger)
if is_bad:
continue continue
e.data = new_event event.data = new_event
out.append(e) cleaned_events.append(event)
return out return cleaned_events
def new_worker(): def new_worker():
@ -1561,8 +1498,9 @@ def new_worker():
logger.debug('Starting...') logger.debug('Starting...')
while True: while True:
events = get_events() events = get_events()
if (res := killswitch.get_disabled("plugins.inara.worker")).disabled: disabled_killswitch = killswitch.get_disabled("plugins.inara.worker")
logger.warning(f"Inara worker disabled via killswitch. ({res.reason})") if disabled_killswitch.disabled:
logger.warning(f"Inara worker disabled via killswitch. ({disabled_killswitch.reason})")
continue continue
for creds, event_list in events.items(): for creds, event_list in events.items():
@ -1570,6 +1508,10 @@ def new_worker():
if not event_list: if not event_list:
continue continue
event_data = [
{'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list
]
data = { data = {
'header': { 'header': {
'appName': applongname, 'appName': applongname,
@ -1578,12 +1520,10 @@ def new_worker():
'commanderName': creds.cmdr, 'commanderName': creds.cmdr,
'commanderFrontierID': creds.fid, 'commanderFrontierID': creds.fid,
}, },
'events': [ 'events': event_data
{'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list
]
} }
logger.info(f'sending {len(data["events"])} events for {creds.cmdr}') logger.info(f'Sending {len(event_data)} events for {creds.cmdr}')
logger.trace_if('plugin.inara.events', f'Events:\n{json.dumps(data)}\n') logger.trace_if('plugin.inara.events', f'Events:\n{json.dumps(data)}\n')
try_send_data(TARGET_URL, data) try_send_data(TARGET_URL, data)
@ -1595,94 +1535,129 @@ def new_worker():
def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]: def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]:
""" """
Fetch a frozen copy of all events from the current queue. Fetch a copy of all events from the current queue.
:param clear: whether or not to clear the queues as we go, defaults to True :param clear: whether to clear the queues as we go, defaults to True
:return: the frozen event list :return: a copy of the event dictionary
""" """
out: Dict[Credentials, List[Event]] = {} events_copy: Dict[Credentials, List[Event]] = {}
with this.event_lock: with this.event_lock:
for key, events in this.events.items(): for key, events in this.events.items():
out[key] = list(events) events_copy[key] = list(events)
if clear: if clear:
events.clear() events.clear()
return out return events_copy
def try_send_data(url: str, data: Mapping[str, Any]) -> None: def try_send_data(url: str, data: Mapping[str, Any]) -> None:
""" """
Attempt repeatedly to send the payload forward. Attempt repeatedly to send the payload.
:param url: target URL for the payload :param url: target URL for the payload
:param data: the payload :param data: the payload
""" """
for i in range(3): for attempt in range(3):
logger.debug(f"sending data to API, attempt #{i}") logger.debug(f"Sending data to API, attempt #{attempt + 1}")
try: try:
if send_data(url, data): if send_data(url, data):
break break
except Exception as e: except Exception as e:
logger.debug('unable to send events', exc_info=e) logger.debug('Unable to send events', exc_info=e)
return return
def send_data(url: str, data: Mapping[str, Any]) -> bool: # noqa: CCR001 def send_data(url: str, data: Mapping[str, Any]) -> bool:
""" """
Write a set of events to the inara API. Send a set of events to the Inara API.
:param url: the target URL to post to :param url: The target URL to post the data.
:param data: the data to POST :param data: The data to be POSTed.
:return: success state :return: True if the data was sent successfully, False otherwise.
""" """
# NB: As of 2022-01-25 Artie has stated the Inara API does *not* support compression response = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT)
r = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT) response.raise_for_status()
r.raise_for_status() reply = response.json()
reply = r.json()
status = reply['header']['eventStatus'] status = reply['header']['eventStatus']
if status // 100 != 2: # 2xx == OK (maybe with warnings) if status // 100 != 2: # 2xx == OK (maybe with warnings)
# Log fatal errors handle_api_error(data, status, reply)
logger.warning(f'Inara\t{status} {reply["header"].get("eventStatusText", "")}')
logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}')
# LANG: INARA API returned some kind of error (error message will be contained in {MSG})
plug.show_error(_('Error: Inara {MSG}').format(MSG=reply['header'].get('eventStatusText', status)))
else: else:
# Log individual errors and warnings handle_success_reply(data, reply)
for data_event, reply_event in zip(data['events'], reply['events']):
if reply_event['eventStatus'] != 200:
if ("Everything was alright, the near-neutral status just wasn't stored."
not in reply_event.get("eventStatusText")):
logger.warning(f'Inara\t{status} {reply_event.get("eventStatusText", "")}')
logger.debug(f'JSON data:\n{json.dumps(data_event)}')
if reply_event['eventStatus'] // 100 != 2: return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such
# LANG: INARA API returned some kind of error (error message will be contained in {MSG})
plug.show_error(_('Error: Inara {MSG}').format(
MSG=f'{data_event["eventName"]},'
f'{reply_event.get("eventStatusText", reply_event["eventStatus"])}'
))
if data_event['eventName'] in (
'addCommanderTravelCarrierJump',
'addCommanderTravelDock',
'addCommanderTravelFSDJump',
'setCommanderTravelLocation'
):
this.lastlocation = reply_event.get('eventData', {})
# calls update_location in main thread
if not config.shutting_down:
this.system_link.event_generate('<<InaraLocation>>', when="tail")
elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']: def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None:
this.lastship = reply_event.get('eventData', {}) """
# calls update_ship in main thread Handle API error response.
if not config.shutting_down:
this.system_link.event_generate('<<InaraShip>>', when="tail")
return True # regardless of errors above, we DID manage to send it, therefore inform our caller as such :param data: The original data that was sent.
:param status: The HTTP status code of the API response.
:param reply: The JSON reply from the API.
"""
error_message = reply['header'].get('eventStatusText', "")
logger.warning(f'Inara\t{status} {error_message}')
logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}')
plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message))
def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None:
"""
Handle successful API response.
:param data: The original data that was sent.
:param reply: The JSON reply from the API.
"""
for data_event, reply_event in zip(data['events'], reply['events']):
reply_status = reply_event['eventStatus']
reply_text = reply_event.get("eventStatusText", "")
if reply_status != 200:
handle_individual_error(data_event, reply_status, reply_text)
handle_special_events(data_event, reply_event)
def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None:
"""
Handle individual API error.
:param data_event: The event data that was sent.
:param reply_status: The event status code from the API response.
:param reply_text: The event status text from the API response.
"""
if ("Everything was alright, the near-neutral status just wasn't stored."
not in reply_text):
logger.warning(f'Inara\t{reply_status} {reply_text}')
logger.debug(f'JSON data:\n{json.dumps(data_event)}')
if reply_status // 100 != 2:
plug.show_error(_('Error: Inara {MSG}').format(
MSG=f'{data_event["eventName"]}, {reply_text}'
))
def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None:
"""
Handle special events in the API response.
:param data_event: The event data that was sent.
:param reply_event: The event data from the API reply.
"""
if data_event['eventName'] in (
'addCommanderTravelCarrierJump',
'addCommanderTravelDock',
'addCommanderTravelFSDJump',
'setCommanderTravelLocation'
):
this.lastlocation = reply_event.get('eventData', {})
if not config.shutting_down:
this.system_link.event_generate('<<InaraLocation>>', when="tail")
elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']:
this.lastship = reply_event.get('eventData', {})
if not config.shutting_down:
this.system_link.event_generate('<<InaraShip>>', when="tail")
def update_location(event=None) -> None: def update_location(event=None) -> None: