1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-13 07:47:14 +03:00

[2051] Core Plugin Audit

Broken out of #2068
This commit is contained in:
David Sangrey 2023-10-19 17:44:33 -04:00
parent 987a904d7f
commit 2c803d7f4c
No known key found for this signature in database
GPG Key ID: 3AEADBB0186884BC
5 changed files with 545 additions and 653 deletions

View File

@ -1,86 +1,86 @@
"""Coriolis ship export."""
"""
coriolis.py - Coriolis Ship Export.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import base64
import gzip
import io
import json
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Union, Optional
import myNotebook as nb # noqa: N813 # its not my fault.
from EDMCLogging import get_main_logger
from plug import show_error
from config import config
if TYPE_CHECKING:
def _(s: str) -> str:
...
# Migrate settings from <= 3.01
from config import config
if not config.get_str('shipyard_provider') and config.get_int('shipyard'):
config.set('shipyard_provider', 'Coriolis')
class CoriolisConfig:
"""Coriolis Configuration."""
config.delete('shipyard', suppress=True)
def __init__(self):
self.normal_url = ''
self.beta_url = ''
self.override_mode = ''
self.normal_textvar = tk.StringVar()
self.beta_textvar = tk.StringVar()
self.override_textvar = tk.StringVar()
def initialize_urls(self):
"""Initialize Coriolis URLs and override mode from configuration."""
self.normal_url = config.get_str('coriolis_normal_url', default=DEFAULT_NORMAL_URL)
self.beta_url = config.get_str('coriolis_beta_url', default=DEFAULT_BETA_URL)
self.override_mode = config.get_str('coriolis_overide_url_selection', default=DEFAULT_OVERRIDE_MODE)
self.normal_textvar.set(value=self.normal_url)
self.beta_textvar.set(value=self.beta_url)
self.override_textvar.set(
value={
'auto': _('Auto'), # LANG: 'Auto' label for Coriolis site override selection
'normal': _('Normal'), # LANG: 'Normal' label for Coriolis site override selection
'beta': _('Beta') # LANG: 'Beta' label for Coriolis site override selection
}.get(self.override_mode, _('Auto')) # LANG: 'Auto' label for Coriolis site override selection
)
coriolis_config = CoriolisConfig()
logger = get_main_logger()
DEFAULT_NORMAL_URL = 'https://coriolis.io/import?data='
DEFAULT_BETA_URL = 'https://beta.coriolis.io/import?data='
DEFAULT_OVERRIDE_MODE = 'auto'
normal_url = ''
beta_url = ''
override_mode = ''
normal_textvar = tk.StringVar()
beta_textvar = tk.StringVar()
override_textvar = tk.StringVar() # This will always contain a _localised_ version
def plugin_start3(path: str) -> str:
"""Set up URLs."""
global normal_url, beta_url, override_mode
normal_url = config.get_str('coriolis_normal_url', default=DEFAULT_NORMAL_URL)
beta_url = config.get_str('coriolis_beta_url', default=DEFAULT_BETA_URL)
override_mode = config.get_str('coriolis_overide_url_selection', default=DEFAULT_OVERRIDE_MODE)
normal_textvar.set(value=normal_url)
beta_textvar.set(value=beta_url)
override_textvar.set(
value={
'auto': _('Auto'), # LANG: 'Auto' label for Coriolis site override selection
'normal': _('Normal'), # LANG: 'Normal' label for Coriolis site override selection
'beta': _('Beta') # LANG: 'Beta' label for Coriolis site override selection
}.get(override_mode, _('Auto')) # LANG: 'Auto' label for Coriolis site override selection
)
coriolis_config.initialize_urls()
return 'Coriolis'
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
"""Set up plugin preferences."""
PADX = 10 # noqa: N806
@ -95,18 +95,21 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
# LANG: Settings>Coriolis: Label for 'NOT alpha/beta game version' URL
nb.Label(conf_frame, text=_('Normal URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.Entry(conf_frame, textvariable=normal_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
nb.Entry(conf_frame,
textvariable=coriolis_config.normal_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
# LANG: Generic 'Reset' button label
nb.Button(conf_frame, text=_("Reset"), command=lambda: normal_textvar.set(value=DEFAULT_NORMAL_URL)).grid(
nb.Button(conf_frame, text=_("Reset"),
command=lambda: coriolis_config.normal_textvar.set(value=DEFAULT_NORMAL_URL)).grid(
sticky=tk.W, row=cur_row, column=2, padx=PADX
)
cur_row += 1
# LANG: Settings>Coriolis: Label for 'alpha/beta game version' URL
nb.Label(conf_frame, text=_('Beta URL')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.Entry(conf_frame, textvariable=beta_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
nb.Entry(conf_frame, textvariable=coriolis_config.beta_textvar).grid(sticky=tk.EW, row=cur_row, column=1, padx=PADX)
# LANG: Generic 'Reset' button label
nb.Button(conf_frame, text=_('Reset'), command=lambda: beta_textvar.set(value=DEFAULT_BETA_URL)).grid(
nb.Button(conf_frame, text=_('Reset'),
command=lambda: coriolis_config.beta_textvar.set(value=DEFAULT_BETA_URL)).grid(
sticky=tk.W, row=cur_row, column=2, padx=PADX
)
cur_row += 1
@ -116,8 +119,8 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
nb.Label(conf_frame, text=_('Override Beta/Normal Selection')).grid(sticky=tk.W, row=cur_row, column=0, padx=PADX)
nb.OptionMenu(
conf_frame,
override_textvar,
override_textvar.get(),
coriolis_config.override_textvar,
coriolis_config.override_textvar.get(),
_('Normal'), # LANG: 'Normal' label for Coriolis site override selection
_('Beta'), # LANG: 'Beta' label for Coriolis site override selection
_('Auto') # LANG: 'Auto' label for Coriolis site override selection
@ -127,50 +130,49 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
return conf_frame
def prefs_changed(cmdr: str | None, is_beta: bool) -> None:
"""Update URLs."""
global normal_url, beta_url, override_mode
normal_url = normal_textvar.get()
beta_url = beta_textvar.get()
override_mode = override_textvar.get()
override_mode = { # Convert to unlocalised names
def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None:
"""
Update URLs and override mode based on user preferences.
:param cmdr: Commander name, if available
:param is_beta: Whether the game mode is beta
"""
coriolis_config.normal_url = coriolis_config.normal_textvar.get()
coriolis_config.beta_url = coriolis_config.beta_textvar.get()
coriolis_config.override_mode = coriolis_config.override_textvar.get()
# Convert to unlocalised names
coriolis_config.override_mode = {
_('Normal'): 'normal', # LANG: Coriolis normal/beta selection - normal
_('Beta'): 'beta', # LANG: Coriolis normal/beta selection - beta
_('Auto'): 'auto', # LANG: Coriolis normal/beta selection - auto
}.get(override_mode, override_mode)
_('Beta'): 'beta', # LANG: Coriolis normal/beta selection - beta
_('Auto'): 'auto', # LANG: Coriolis normal/beta selection - auto
}.get(coriolis_config.override_mode, coriolis_config.override_mode)
if override_mode not in ('beta', 'normal', 'auto'):
logger.warning(f'Unexpected value {override_mode=!r}. defaulting to "auto"')
override_mode = 'auto'
override_textvar.set(value=_('Auto')) # LANG: 'Auto' label for Coriolis site override selection
if coriolis_config.override_mode not in ('beta', 'normal', 'auto'):
logger.warning(f'Unexpected value {coriolis_config.override_mode=!r}. Defaulting to "auto"')
coriolis_config.override_mode = 'auto'
coriolis_config.override_textvar.set(value=_('Auto')) # LANG: 'Auto' label for Coriolis site override selection
config.set('coriolis_normal_url', normal_url)
config.set('coriolis_beta_url', beta_url)
config.set('coriolis_overide_url_selection', override_mode)
config.set('coriolis_normal_url', coriolis_config.normal_url)
config.set('coriolis_beta_url', coriolis_config.beta_url)
config.set('coriolis_overide_url_selection', coriolis_config.override_mode)
def _get_target_url(is_beta: bool) -> str:
global override_mode
if override_mode not in ('auto', 'normal', 'beta'):
if coriolis_config.override_mode not in ('auto', 'normal', 'beta'):
# LANG: Settings>Coriolis - invalid override mode found
show_error(_('Invalid Coriolis override mode!'))
logger.warning(f'Unexpected override mode {override_mode!r}! defaulting to auto!')
override_mode = 'auto'
if override_mode == 'beta':
return beta_url
elif override_mode == 'normal':
return normal_url
logger.warning(f'Unexpected override mode {coriolis_config.override_mode!r}! defaulting to auto!')
coriolis_config.override_mode = 'auto'
if coriolis_config.override_mode == 'beta':
return coriolis_config.beta_url
if coriolis_config.override_mode == 'normal':
return coriolis_config.normal_url
# Must be auto
if is_beta:
return beta_url
return coriolis_config.beta_url
return normal_url
# to anyone reading this, no, this is NOT the correct return type. Its magic internal stuff that I WILL be changing
# some day. Check PLUGINS.md for the right way to do this. -A_D
return coriolis_config.normal_url
def shipyard_url(loadout, is_beta) -> Union[str, bool]:
@ -179,11 +181,8 @@ def shipyard_url(loadout, is_beta) -> Union[str, bool]:
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')
if not string:
return False
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(string)
encoded = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
return _get_target_url(is_beta) + encoded

View File

@ -1,26 +1,23 @@
"""Handle exporting data to EDDN."""
"""
eddn.py - Exporting Data to EDDN.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
import http
import itertools
import json
@ -34,12 +31,19 @@ from collections import OrderedDict
from platform import system
from textwrap import dedent
from threading import Lock
from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping, Optional
from typing import (
TYPE_CHECKING,
Any,
Iterator,
Mapping,
MutableMapping,
Optional,
Dict,
List,
)
from typing import OrderedDict as OrderedDictT
from typing import Tuple, Union
import requests
import companion
import edmc_data
import killswitch
@ -88,21 +92,21 @@ class This:
self.body_name: Optional[str] = None
self.body_id: Optional[int] = None
self.body_type: Optional[int] = None
self.station_name: str | None = None
self.station_type: str | None = None
self.station_marketid: str | None = None
self.station_name: Optional[str] = None
self.station_type: Optional[str] = None
self.station_marketid: Optional[str] = None
# Track Status.json data
self.status_body_name: Optional[str] = None
# Avoid duplicates
self.marketId: Optional[str] = None
self.commodities: Optional[list[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, list[str]]] = None
self.shipyard: Optional[Tuple[bool, list[Mapping[str, Any]]]] = None
self.commodities: Optional[List[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, List[str]]] = None
self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None
self.fcmaterials_marketid: int = 0
self.fcmaterials: Optional[list[OrderedDictT[str, Any]]] = None
self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi_marketid: int = 0
self.fcmaterials_capi: Optional[list[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None
# For the tkinter parent window, so we can call update_idletasks()
self.parent: tk.Tk
@ -156,7 +160,7 @@ class EDDNSender:
UNKNOWN_SCHEMA_RE = re.compile(
r"^FAIL: \[JsonValidationException\('Schema "
r"https://eddn.edcd.io/schemas/(?P<schema_name>.+)/(?P<schema_version>[0-9]+) is unknown, "
r"unable to validate.',\)\]$"
r"unable to validate.',\)]$"
)
def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None:
@ -203,10 +207,8 @@ class EDDNSender:
db = db_conn.cursor()
try:
db.execute(
"""
CREATE TABLE messages
(
db.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created TEXT NOT NULL,
cmdr TEXT NOT NULL,
@ -215,26 +217,12 @@ class EDDNSender:
game_build TEXT,
message TEXT NOT NULL
)
"""
)
""")
db.execute(
"""
CREATE INDEX messages_created ON messages
(
created
)
"""
)
db.execute("CREATE INDEX IF NOT EXISTS messages_created ON messages (created)")
db.execute("CREATE INDEX IF NOT EXISTS messages_cmdr ON messages (cmdr)")
db.execute(
"""
CREATE INDEX messages_cmdr ON messages
(
cmdr
)
"""
)
logger.info("New 'eddn_queue-v1.db' created")
except sqlite3.OperationalError as e:
if str(e) != "table messages already exists":
@ -243,12 +231,6 @@ class EDDNSender:
db_conn.close()
raise e
else:
logger.info("New `eddn_queue-v1.db` created")
# We return only the connection, so tidy up
db.close()
return db_conn
def convert_legacy_file(self):
@ -264,11 +246,10 @@ class EDDNSender:
except FileNotFoundError:
return
logger.info("Conversion to `eddn_queue-v1.db` complete, removing `replay.jsonl`")
# Best effort at removing the file/contents
# NB: The legacy code assumed it could write to the file.
logger.info("Conversion` to `eddn_queue-v1.db` complete, removing `replay.jsonl`")
replay_file = open(filename, 'w') # Will truncate
replay_file.close()
with open(filename, 'w') as replay_file:
replay_file.truncate()
os.unlink(filename)
def close(self) -> None:
@ -414,7 +395,7 @@ class EDDNSender:
"""
logger.trace_if("plugin.eddn.send", "Sending message")
should_return: bool
new_data: dict[str, Any]
new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg))
if should_return:
@ -423,7 +404,7 @@ class EDDNSender:
# Even the smallest possible message compresses somewhat, so always compress
encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0)
headers: None | dict[str, str] = None
headers: Optional[Dict[str, str]] = None
if compressed:
headers = {'Content-Encoding': 'gzip'}
@ -454,14 +435,13 @@ class EDDNSender:
# This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema.
return True
elif e.response.status_code == http.HTTPStatus.BAD_REQUEST:
if e.response.status_code == http.HTTPStatus.BAD_REQUEST:
# EDDN straight up says no, so drop the message
logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}")
return True
else:
# This should catch anything else, e.g. timeouts, gateway errors
self.set_ui_status(self.http_error_to_log(e))
# This should catch anything else, e.g. timeouts, gateway errors
self.set_ui_status(self.http_error_to_log(e))
except requests.exceptions.RequestException as e:
logger.debug('Failed sending', exc_info=e)
@ -485,19 +465,26 @@ class EDDNSender:
if not self.queue_processing.acquire(blocking=False):
logger.trace_if("plugin.eddn.send", "Couldn't obtain mutex")
if reschedule:
logger.trace_if("plugin.eddn.send", f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now")
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule)
logger.trace_if(
"plugin.eddn.send",
f"Next run scheduled for {self.eddn.REPLAY_PERIOD}ms from now",
)
self.eddn.parent.after(
self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule
)
else:
logger.trace_if("plugin.eddn.send", "NO next run scheduled (there should be another one already set)")
logger.trace_if(
"plugin.eddn.send",
"NO next run scheduled (there should be another one already set)",
)
return
logger.trace_if("plugin.eddn.send", "Obtained mutex")
# Used to indicate if we've rescheduled at the faster rate already.
have_rescheduled = False
# We send either if docked or 'Delay sending until docked' not set
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY):
if this.docked or not config.get_int('output') & config.OUT_EDDN_DELAY:
logger.trace_if("plugin.eddn.send", "Should send")
# We need our own cursor here, in case the semantics of
# tk `after()` could allow this to run in the middle of other
@ -516,7 +503,7 @@ class EDDNSender:
db_cursor.execute(
"""
SELECT id FROM messages
ORDER BY created ASC
ORDER BY created
LIMIT 1
"""
)
@ -586,16 +573,15 @@ class EDDNSender:
# LANG: EDDN has banned this version of our client
return _('EDDN Error: EDMC is too old for EDDN. Please update.')
elif status_code == 400:
if status_code == 400:
# we a validation check or something else.
logger.warning(f'EDDN Error: {status_code} -- {exception.response}')
# LANG: EDDN returned an error that indicates something about what we sent it was wrong
return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log')
else:
logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}')
# LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number
return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}')
# LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number
return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
# TODO: a good few of these methods are static or could be classmethods. they should be created as such.
@ -626,7 +612,7 @@ class EDDN:
self.sender = EDDNSender(self, self.eddn_url)
self.fss_signals: list[Mapping[str, Any]] = []
self.fss_signals: List[Mapping[str, Any]] = []
def close(self):
"""Close down the EDDN class instance."""
@ -650,7 +636,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: dict[str, Any]
new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./market', {})
if should_return:
logger.warning("capi.request./market has been disabled by killswitch. Returning.")
@ -667,7 +653,7 @@ class EDDN:
modules,
ships
)
commodities: list[OrderedDictT[str, Any]] = []
commodities: List[OrderedDictT[str, Any]] = []
for commodity in data['lastStarport'].get('commodities') or []:
# Check 'marketable' and 'not prohibited'
if (category_map.get(commodity['categoryname'], True)
@ -740,7 +726,7 @@ class EDDN:
:param data: The raw CAPI data.
:return: Sanity-checked data.
"""
modules: dict[str, Any] = data['lastStarport'].get('modules')
modules: Dict[str, Any] = data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict):
if modules is None:
logger.debug('modules was None. FC or Damaged Station?')
@ -757,7 +743,7 @@ class EDDN:
# Set a safe value
modules = {}
ships: dict[str, Any] = data['lastStarport'].get('ships')
ships: Dict[str, Any] = data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict):
if ships is None:
logger.debug('ships was None')
@ -783,7 +769,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: dict[str, Any]
new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -810,7 +796,7 @@ class EDDN:
modules.values()
)
outfitting: list[str] = sorted(
outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search
)
@ -851,7 +837,7 @@ class EDDN:
:param is_beta: whether or not we are in beta mode
"""
should_return: bool
new_data: dict[str, Any]
new_data: Dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -870,7 +856,7 @@ class EDDN:
ships
)
shipyard: list[Mapping[str, Any]] = sorted(
shipyard: List[Mapping[str, Any]] = sorted(
itertools.chain(
(ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()),
(ship['name'].lower() for ship in ships['unavailable_list'] or {}),
@ -913,8 +899,8 @@ class EDDN:
:param is_beta: whether or not we're in beta mode
:param entry: the journal entry containing the commodities data
"""
items: list[Mapping[str, Any]] = entry.get('Items') or []
commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([
items: List[Mapping[str, Any]] = entry.get('Items') or []
commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([
('name', self.canonicalise(commodity['Name'])),
('meanPrice', commodity['MeanPrice']),
('buyPrice', commodity['BuyPrice']),
@ -961,11 +947,11 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: The relevant journal entry
"""
modules: list[Mapping[str, Any]] = entry.get('Items', [])
modules: List[Mapping[str, Any]] = entry.get('Items', [])
horizons: bool = entry.get('Horizons', False)
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
# for module in modules if module['Name'] != 'int_planetapproachsuite'])
outfitting: list[str] = sorted(
outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in
filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules)
)
@ -1000,7 +986,7 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: the relevant journal entry
"""
ships: list[Mapping[str, Any]] = entry.get('PriceList') or []
ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
horizons: bool = entry.get('Horizons', False)
shipyard = sorted(ship['ShipType'] for ship in ships)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
@ -1050,7 +1036,7 @@ class EDDN:
msg['header'] = self.standard_header()
msg_id = self.sender.add_message(cmdr, msg)
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY):
if this.docked or not config.get_int('output') & config.OUT_EDDN_DELAY:
# No delay in sending configured, so attempt immediately
logger.trace_if("plugin.eddn.send", "Sending 'non-station' message")
self.sender.send_message_by_id(msg_id)
@ -1123,8 +1109,7 @@ class EDDN:
logger.warning(f'No system name in entry, and system_name was not set either! entry:\n{entry!r}\n')
return "passed-in system_name is empty, can't add System"
else:
entry['StarSystem'] = system_name
entry['StarSystem'] = system_name
if 'SystemAddress' not in entry:
if this.system_address is None:
@ -1918,7 +1903,7 @@ class EDDN:
gv = ''
#######################################################################
# Base string
if capi_host == companion.SERVER_LIVE or capi_host == companion.SERVER_BETA:
if capi_host in (companion.SERVER_LIVE, companion.SERVER_BETA):
gv = 'CAPI-Live-'
elif capi_host == companion.SERVER_LEGACY:
@ -2107,7 +2092,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame:
BUTTONX = 12 # noqa: N806 # indent Checkbuttons and Radiobuttons
if prefsVersion.shouldSetDefaults('0.0.0.0', not bool(config.get_int('output'))):
output: int = (config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION) # default settings
output: int = config.OUT_EDDN_SEND_STATION_DATA | config.OUT_EDDN_SEND_NON_STATION # default settings
else:
output = config.get_int('output')
@ -2167,7 +2152,7 @@ def prefsvarchanged(event=None) -> None:
this.eddn_system_button['state'] = tk.NORMAL
# This line will grey out the 'Delay sending ...' option if the 'Send
# system and scan data' option is off.
this.eddn_delay_button['state'] = this.eddn_system.get() and tk.NORMAL or tk.DISABLED
this.eddn_delay_button['state'] = tk.NORMAL if this.eddn_system.get() else tk.DISABLED
def prefs_changed(cmdr: str, is_beta: bool) -> None:
@ -2324,22 +2309,22 @@ def journal_entry( # noqa: C901, CCR001
if event_name == 'fssdiscoveryscan':
return this.eddn.export_journal_fssdiscoveryscan(cmdr, system, state['StarPos'], is_beta, entry)
elif event_name == 'navbeaconscan':
if event_name == 'navbeaconscan':
return this.eddn.export_journal_navbeaconscan(cmdr, system, state['StarPos'], is_beta, entry)
elif event_name == 'codexentry':
if event_name == 'codexentry':
return this.eddn.export_journal_codexentry(cmdr, state['StarPos'], is_beta, entry)
elif event_name == 'scanbarycentre':
if event_name == 'scanbarycentre':
return this.eddn.export_journal_scanbarycentre(cmdr, state['StarPos'], is_beta, entry)
elif event_name == 'navroute':
if event_name == 'navroute':
return this.eddn.export_journal_navroute(cmdr, is_beta, entry)
elif event_name == 'fcmaterials':
if event_name == 'fcmaterials':
return this.eddn.export_journal_fcmaterials(cmdr, is_beta, entry)
elif event_name == 'approachsettlement':
if event_name == 'approachsettlement':
# An `ApproachSettlement` can appear *before* `Location` if you
# logged at one. We won't have necessary augmentation data
# at this point, so bail.
@ -2354,10 +2339,10 @@ def journal_entry( # noqa: C901, CCR001
entry
)
elif event_name == 'fsssignaldiscovered':
if event_name == 'fsssignaldiscovered':
this.eddn.enqueue_journal_fsssignaldiscovered(entry)
elif event_name == 'fssallbodiesfound':
if event_name == 'fssallbodiesfound':
return this.eddn.export_journal_fssallbodiesfound(
cmdr,
system,
@ -2366,7 +2351,7 @@ def journal_entry( # noqa: C901, CCR001
entry
)
elif event_name == 'fssbodysignals':
if event_name == 'fssbodysignals':
return this.eddn.export_journal_fssbodysignals(
cmdr,
system,
@ -2626,7 +2611,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST
return economies_colony or modules_horizons or ship_horizons
def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None:
def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None:
"""
Process Status.json data to track things like current Body.

View File

@ -1,36 +1,23 @@
"""Show EDSM data in display and handle lookups."""
"""
edsm.py - Handling EDSM Data and Display.
# TODO:
# 1) Re-factor EDSM API calls out of journal_entry() into own function.
# 2) Fix how StartJump already changes things, but only partially.
# 3) Possibly this and other two 'provider' plugins could do with being
# based on a single class that they extend. There's a lot of duplicated
# logic.
# 4) Ensure the EDSM API call(back) for setting the image at end of system
# text is always fired. i.e. CAPI cmdr_data() processing.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT IN
# AN END-USER INSTALLATION ON WINDOWS.
#
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
import json
import threading
import tkinter as tk
@ -40,12 +27,9 @@ from threading import Thread
from time import sleep
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast
import requests
import killswitch
import monitor
import myNotebook
import myNotebook as nb # noqa: N813
import plug
from companion import CAPIData
@ -58,6 +42,15 @@ if TYPE_CHECKING:
def _(x: str) -> str:
return x
# TODO:
# 1) Re-factor EDSM API calls out of journal_entry() into own function.
# 2) Fix how StartJump already changes things, but only partially.
# 3) Possibly this and other two 'provider' plugins could do with being
# based on a single class that they extend. There's a lot of duplicated
# logic.
# 4) Ensure the EDSM API call(back) for setting the image at end of system
# text is always fired. i.e. CAPI cmdr_data() processing.
logger = get_main_logger()
EDSM_POLL = 0.1
@ -93,13 +86,13 @@ class This:
self.newgame: bool = False # starting up - batch initial burst of events
self.newgame_docked: bool = False # starting up while docked
self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan
self.system_link: tk.Widget | None = None
self.system_name: tk.Tk | None = None
self.system_address: int | None = None # Frontier SystemAddress
self.system_population: int | None = None
self.station_link: tk.Widget | None = None
self.station_name: str | None = None
self.station_marketid: int | None = None # Frontier MarketID
self.system_link: Optional[tk.Widget] = None
self.system_name: Optional[tk.Tk] = None
self.system_address: Optional[int] = None # Frontier SystemAddress
self.system_population: Optional[int] = None
self.station_link: Optional[tk.Widget] = None
self.station_name: Optional[str] = None
self.station_marketid: Optional[int] = None # Frontier MarketID
self.on_foot = False
self._IMG_KNOWN = None
@ -109,19 +102,19 @@ class This:
self.thread: Optional[threading.Thread] = None
self.log: tk.IntVar | None = None
self.log_button: ttk.Checkbutton | None = None
self.log: Optional[tk.IntVar] = None
self.log_button: Optional[ttk.Checkbutton] = None
self.label: tk.Widget | None = None
self.label: Optional[tk.Widget] = None
self.cmdr_label: myNotebook.Label | None = None
self.cmdr_text: myNotebook.Label | None = None
self.cmdr_label: Optional[nb.Label] = None
self.cmdr_text: Optional[nb.Label] = None
self.user_label: myNotebook.Label | None = None
self.user: myNotebook.Entry | None = None
self.user_label: Optional[nb.Label] = None
self.user: Optional[nb.Entry] = None
self.apikey_label: myNotebook.Label | None = None
self.apikey: myNotebook.Entry | None = None
self.apikey_label: Optional[nb.Label] = None
self.apikey: Optional[nb.Entry] = None
this = This()
@ -284,7 +277,7 @@ def toggle_password_visibility():
this.apikey.config(show="*") # type: ignore
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
"""
Plugin preferences setup hook.
@ -297,8 +290,8 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
:return: An instance of `myNotebook.Frame`.
"""
PADX = 10 # noqa: N806
BUTTONX = 12 # indent Checkbuttons and Radiobuttons # noqa: N806
PADY = 2 # close spacing # noqa: N806
BUTTONX = 12 # noqa: N806
PADY = 2 # noqa: N806
frame = nb.Frame(parent)
frame.columnconfigure(1, weight=1)
@ -309,51 +302,46 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
background=nb.Label().cget('background'),
url='https://www.edsm.net/',
underline=True
).grid(columnspan=2, padx=PADX, sticky=tk.W) # Don't translate
).grid(columnspan=2, padx=PADX, sticky=tk.W)
this.log = tk.IntVar(value=config.get_int('edsm_out') and 1)
this.log_button = nb.Checkbutton(
# LANG: Settings>EDSM - Label on checkbox for 'send data'
frame, text=_('Send flight log and Cmdr status to EDSM'), variable=this.log, command=prefsvarchanged
frame,
text=_('Send flight log and Cmdr status to EDSM'),
variable=this.log,
command=prefsvarchanged
)
if this.log_button:
this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
nb.Label(frame).grid(sticky=tk.W) # big spacer
# Section heading in settings
this.label = HyperlinkLabel(
frame,
# LANG: Settings>EDSM - Label on header/URL to EDSM API key page
text=_('Elite Dangerous Star Map credentials'),
background=nb.Label().cget('background'),
url='https://www.edsm.net/settings/api',
underline=True
)
cur_row = 10
if this.label:
this.label.grid(columnspan=2, padx=PADX, sticky=tk.W)
# LANG: Game Commander name label in EDSM settings
this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window
this.cmdr_label = nb.Label(frame, text=_('Cmdr'))
this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.cmdr_text = nb.Label(frame)
this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W)
cur_row += 1
# LANG: EDSM Commander name label in EDSM settings
this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting
this.user_label = nb.Label(frame, text=_('Commander Name'))
this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.user = nb.Entry(frame)
this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
cur_row += 1
# LANG: EDSM API key label
this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting
this.apikey_label = nb.Label(frame, text=_('API Key'))
this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.apikey = nb.Entry(frame, show="*", width=50)
this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
@ -361,18 +349,19 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Fr
prefs_cmdr_changed(cmdr, is_beta)
show_password_var.set(False) # Password is initially masked
show_password_checkbox = nb.Checkbutton(
frame,
text="Show API Key",
variable=show_password_var,
command=toggle_password_visibility,
command=toggle_password_visibility
)
show_password_checkbox.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
return frame
def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001
def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001
"""
Handle the Commander name changing whilst Settings was open.
@ -381,28 +370,21 @@ def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001
"""
if this.log_button:
this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED
if this.user:
this.user['state'] = tk.NORMAL
this.user.delete(0, tk.END)
if this.apikey:
this.apikey['state'] = tk.NORMAL
this.apikey.delete(0, tk.END)
if cmdr:
if this.cmdr_text:
this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}'
cred = credentials(cmdr)
if cred:
if this.user:
this.user.insert(0, cred[0])
if this.apikey:
this.apikey.insert(0, cred[1])
else:
if this.cmdr_text:
# LANG: We have no data on the current commander
@ -429,18 +411,22 @@ def set_prefs_ui_states(state: str) -> None:
Set the state of various config UI entries.
:param state: the state to set each entry to
# NOTE: This may break things, watch out in testing. (5.10)
"""
if (
this.label and this.cmdr_label and this.cmdr_text and this.user_label and this.user
and this.apikey_label and this.apikey
):
this.label['state'] = state
this.cmdr_label['state'] = state
this.cmdr_text['state'] = state
this.user_label['state'] = state
this.user['state'] = state
this.apikey_label['state'] = state
this.apikey['state'] = state
elements = [
this.label,
this.cmdr_label,
this.cmdr_text,
this.user_label,
this.user,
this.apikey_label,
this.apikey
]
for element in elements:
if element:
element['state'] = state
def prefs_changed(cmdr: str, is_beta: bool) -> None:
@ -454,7 +440,6 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
config.set('edsm_out', this.log.get())
if cmdr and not is_beta:
# TODO: remove this when config is rewritten.
cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[])
usernames: List[str] = config.get_list('edsm_usernames', default=[])
apikeys: List[str] = config.get_list('edsm_apikeys', default=[])
@ -466,7 +451,6 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
usernames[idx] = this.user.get().strip()
apikeys.extend([''] * (1 + idx - len(apikeys)))
apikeys[idx] = this.apikey.get().strip()
else:
config.set('edsm_cmdrs', cmdrs + [cmdr])
usernames.append(this.user.get().strip())
@ -495,20 +479,17 @@ def credentials(cmdr: str) -> Optional[Tuple[str, str]]:
cmdrs = [cmdr]
config.set('edsm_cmdrs', cmdrs)
if (cmdr in cmdrs and (edsm_usernames := config.get_list('edsm_usernames'))
and (edsm_apikeys := config.get_list('edsm_apikeys'))):
edsm_usernames = config.get_list('edsm_usernames')
edsm_apikeys = config.get_list('edsm_apikeys')
if cmdr in cmdrs and len(cmdrs) == len(edsm_usernames) == len(edsm_apikeys):
idx = cmdrs.index(cmdr)
# The EDSM cmdr and apikey might not exist yet!
if idx >= len(edsm_usernames) or idx >= len(edsm_apikeys):
return None
if idx < len(edsm_usernames) and idx < len(edsm_apikeys):
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning ({edsm_usernames[idx]=}, {edsm_apikeys[idx]=})')
return edsm_usernames[idx], edsm_apikeys[idx]
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning ({edsm_usernames[idx]=}, {edsm_apikeys[idx]=})')
return (edsm_usernames[idx], edsm_apikeys[idx])
else:
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning None')
return None
logger.trace_if(CMDR_CREDS, f'{cmdr=}: returning None')
return None
def journal_entry( # noqa: C901, CCR001
@ -564,7 +545,6 @@ entry: {entry!r}'''
if not this.station_name:
if this.system_population and this.system_population > 0:
to_set = STATION_UNDOCKED
else:
to_set = ''
@ -582,7 +562,6 @@ entry: {entry!r}'''
this.multicrew = bool(state['Role'])
if 'StarPos' in entry:
this.coordinates = entry['StarPos']
elif entry['event'] == 'LoadGame':
this.coordinates = None
@ -590,20 +569,16 @@ entry: {entry!r}'''
this.newgame = True
this.newgame_docked = False
this.navbeaconscan = 0
elif entry['event'] == 'StartUp':
this.newgame = False
this.newgame_docked = False
this.navbeaconscan = 0
elif entry['event'] == 'Location':
this.newgame = True
this.newgame_docked = entry.get('Docked', False)
this.navbeaconscan = 0
elif entry['event'] == 'NavBeaconScan':
this.navbeaconscan = entry['NumBodies']
elif entry['event'] == 'BackPack':
# Use the stored file contents, not the empty journal event
if state['BackpackJSON']:
@ -646,7 +621,6 @@ entry: {entry!r}'''
}
materials.update(transient)
logger.trace_if(CMDR_EVENTS, f'"LoadGame" event, queueing Materials: {cmdr=}')
this.queue.put((cmdr, this.game_version, this.game_build, materials))
if entry['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked'):
@ -655,7 +629,6 @@ entry: {entry!r}'''
Queueing: {entry!r}'''
)
logger.trace_if(CMDR_EVENTS, f'"{entry["event"]=}" event, queueing: {cmdr=}')
this.queue.put((cmdr, this.game_version, this.game_build, entry))
return ''
@ -675,11 +648,9 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
# Always store initially, even if we're not the *current* system provider.
if not this.station_marketid and data['commander']['docked']:
this.station_marketid = data['lastStarport']['id']
# Only trust CAPI if these aren't yet set
if not this.system_name:
this.system_name = data['lastSystem']['name']
if not this.station_name and data['commander']['docked']:
this.station_name = data['lastStarport']['name']
@ -691,21 +662,17 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.system_link.update_idletasks()
if config.get_str('station_provider') == 'EDSM':
if this.station_link:
if data['commander']['docked'] or this.on_foot and this.station_name:
this.station_link['text'] = this.station_name
elif data['lastStarport']['name'] and data['lastStarport']['name'] != "":
this.station_link['text'] = STATION_UNDOCKED
else:
this.station_link['text'] = ''
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.station_link.update_idletasks()
if this.system_link and not this.system_link['text']:
@ -722,30 +689,37 @@ if 'edsm' in debug_senders:
def get_discarded_events_list() -> None:
"""Retrieve the list of to-discard events from EDSM."""
"""
Retrieve the list of events to discard from EDSM.
This function queries the EDSM API to obtain the list of events that should be discarded,
and stores them in the `discarded_events` attribute.
:return: None
"""
try:
r = this.session.get('https://www.edsm.net/api-journal-v1/discard', timeout=_TIMEOUT)
r.raise_for_status()
this.discarded_events = set(r.json())
this.discarded_events.discard('Docked') # should_send() assumes that we send 'Docked' events
# We discard 'Docked' events because should_send() assumes that we send them
this.discarded_events.discard('Docked')
if not this.discarded_events:
logger.warning(
'Unexpected empty discarded events list from EDSM: '
f'{type(this.discarded_events)} -- {this.discarded_events}'
)
except Exception as e:
logger.warning('Exception whilst trying to set this.discarded_events:', exc_info=e)
logger.warning('Exception while trying to set this.discarded_events:', exc_info=e)
def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
def worker() -> None: # noqa: CCR001 C901
"""
Handle uploading events to EDSM API.
Target function of a thread.
This function is the target function of a thread. It processes events from the queue until the
queued item is None, uploading the events to the EDSM API.
Processes `this.queue` until the queued item is None.
:return: None
"""
logger.debug('Starting...')
pending: List[Mapping[str, Any]] = [] # Unsent events
@ -753,13 +727,11 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
cmdr: str = ""
last_game_version = ""
last_game_build = ""
entry: Mapping[str, Any] = {}
while not this.discarded_events:
if this.shutting_down:
logger.debug(f'returning from discarded_events loop due to {this.shutting_down=}')
return
get_discarded_events_list()
if this.discarded_events:
break
@ -776,17 +748,15 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
if item:
(cmdr, game_version, game_build, entry) = item
logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})')
else:
logger.debug('Empty queue message, setting closing = True')
closing = True # Try to send any unsent events before we close
entry = {'event': 'ShutDown'} # Dummy to allow for `uentry['event']` belowt
entry = {'event': 'ShutDown'} # Dummy to allow for `entry['event']` below
retrying = 0
while retrying < 3:
if item is None:
item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {}))
should_skip, new_item = killswitch.check_killswitch(
'plugins.edsm.worker',
item,
@ -795,7 +765,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
if should_skip:
break
if item is not None:
item = new_item
@ -817,18 +786,14 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
or last_game_version != game_version or last_game_build != game_build
):
pending = []
pending.append(entry)
# drop events if required by killswitch
new_pending = []
for e in pending:
skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger)
if skip:
continue
new_pending.append(new)
pending = new_pending
if pending and should_send(pending, entry['event']):
@ -840,10 +805,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
"('CarrierJump', 'FSDJump', 'Location', 'Docked')"
" and it passed should_send()")
for p in pending:
if p['event'] in ('Location'):
if p['event'] in 'Location':
logger.trace_if(
'journal.locations',
f'"Location" event in pending passed should_send(),timestamp: {p["timestamp"]}'
f'"Location" event in pending passed should_send(), timestamp: {p["timestamp"]}'
)
creds = credentials(cmdr)
@ -868,24 +833,20 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
data_elided['apiKey'] = '<elided>'
if isinstance(data_elided['message'], bytes):
data_elided['message'] = data_elided['message'].decode('utf-8')
if isinstance(data_elided['commanderName'], bytes):
data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8')
logger.trace_if(
'journal.locations',
"pending has at least one of ('CarrierJump', 'FSDJump', 'Location', 'Docked')"
" Attempting API call with the following events:"
)
for p in pending:
logger.trace_if('journal.locations', f"Event: {p!r}")
if p['event'] in ('Location'):
if p['event'] in 'Location':
logger.trace_if(
'journal.locations',
f'Attempting API call for "Location" event with timestamp: {p["timestamp"]}'
)
logger.trace_if(
'journal.locations', f'Overall POST data (elided) is:\n{json.dumps(data_elided, indent=2)}'
)
@ -906,17 +867,13 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
logger.warning(f'EDSM\t{msg_num} {msg}\t{json.dumps(pending, separators=(",", ": "))}')
# LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=msg))
else:
if msg_num // 100 == 1:
logger.trace_if('plugin.edsm.api', 'Overall OK')
pass
elif msg_num // 100 == 5:
logger.trace_if('plugin.edsm.api', 'Event(s) not currently processed, but saved for later')
pass
else:
logger.warning(f'EDSM API call status not 1XX, 2XX or 5XX: {msg.num}')
@ -927,13 +884,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
# calls update_status in main thread
if not config.shutting_down and this.system_link is not None:
this.system_link.event_generate('<<EDSMStatus>>', when="tail")
if r['msgnum'] // 100 != 1: # type: ignore
logger.warning(f'EDSM event with not-1xx status:\n{r["msgnum"]}\n' # type: ignore
if r['msgnum'] // 100 != 1:
logger.warning(f'EDSM event with not-1xx status:\n{r["msgnum"]}\n'
f'{r["msg"]}\n{json.dumps(e, separators = (",", ": "))}')
pending = []
break # No exception, so assume success
except Exception as e:
@ -943,12 +897,10 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
else:
# LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM"))
if entry['event'].lower() in ('shutdown', 'commander', 'fileheader'):
# Game shutdown or new login, so we MUST not hang on to pending
pending = []
logger.trace_if(CMDR_EVENTS, f'Blanked pending because of event: {entry["event"]}')
if closing:
logger.debug('closing, so returning.')
return
@ -956,8 +908,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
last_game_version = game_version
last_game_build = game_build
logger.debug('Done.')
def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001
"""
@ -967,54 +917,42 @@ def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa:
:param event: The latest event being processed
:return: bool indicating whether or not to send said entries
"""
# We MUST flush pending on logout, in case new login is a different Commander
def should_send_entry(entry: Mapping[str, Any]) -> bool:
if entry['event'] == 'Cargo':
return not this.newgame_docked
if entry['event'] == 'Docked':
return True
if this.newgame:
return True
if entry['event'] not in (
'CommunityGoal',
'ModuleBuy',
'ModuleSell',
'ModuleSwap',
'ShipyardBuy',
'ShipyardNew',
'ShipyardSwap'
):
return True
return False
if event.lower() in ('shutdown', 'fileheader'):
logger.trace_if(CMDR_EVENTS, f'True because {event=}')
return True
# batch up burst of Scan events after NavBeaconScan
if this.navbeaconscan:
if entries and entries[-1]['event'] == 'Scan':
this.navbeaconscan -= 1
if this.navbeaconscan:
logger.trace_if(CMDR_EVENTS, f'False because {this.navbeaconscan=}')
should_send_result = this.navbeaconscan == 0
logger.trace_if(CMDR_EVENTS, f'False because {this.navbeaconscan=}' if not should_send_result else '')
return should_send_result
logger.error('Invalid state NavBeaconScan exists, but passed entries either '
"doesn't exist or doesn't have the expected content")
this.navbeaconscan = 0
return False
else:
logger.error(
'Invalid state NavBeaconScan exists, but passed entries either '
"doesn't exist or doesn't have the expected content"
)
this.navbeaconscan = 0
for entry in entries:
if (entry['event'] == 'Cargo' and not this.newgame_docked) or entry['event'] == 'Docked':
# Cargo is the last event on startup, unless starting when docked in which case Docked is the last event
this.newgame = False
this.newgame_docked = False
logger.trace_if(CMDR_EVENTS, f'True because {entry["event"]=}')
return True
elif this.newgame:
pass
elif entry['event'] not in (
'CommunityGoal', # Spammed periodically
'ModuleBuy', 'ModuleSell', 'ModuleSwap', # will be shortly followed by "Loadout"
'ShipyardBuy', 'ShipyardNew', 'ShipyardSwap'): # "
logger.trace_if(CMDR_EVENTS, f'True because {entry["event"]=}')
return True
else:
logger.trace_if(CMDR_EVENTS, f'{entry["event"]=}, {this.newgame_docked=}')
logger.trace_if(CMDR_EVENTS, f'False as default: {this.newgame_docked=}')
return False
should_send_result = any(should_send_entry(entry) for entry in entries)
logger.trace_if(CMDR_EVENTS, f'False as default: {this.newgame_docked=}' if not should_send_result else '')
return should_send_result
def update_status(event=None) -> None:
@ -1033,14 +971,11 @@ def edsm_notify_system(reply: Mapping[str, Any]) -> None:
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM"))
elif reply['msgnum'] // 100 not in (1, 4):
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg']))
elif reply.get('systemCreated'):
this.system_link['image'] = this._IMG_NEW
else:
this.system_link['image'] = this._IMG_KNOWN

View File

@ -1,32 +1,28 @@
"""Export data for ED Shipyard."""
"""
edsy.py - Exporting Data to EDSY.
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT IN
# AN END-USER INSTALLATION ON WINDOWS.
#
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
import base64
import gzip
import io
import json
from typing import Any, Mapping
from typing import Any, Mapping, Union
def plugin_start3(plugin_dir: str) -> str:
@ -40,15 +36,15 @@ def plugin_start3(plugin_dir: str) -> str:
# Return a URL for the current ship
def shipyard_url(loadout: Mapping[str, Any], is_beta) -> bool | str:
def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]:
"""
Construct a URL for ship loadout.
:param loadout:
:param is_beta:
:return:
:param loadout: The ship loadout data.
:param is_beta: Whether the game is in beta.
:return: The constructed URL for the ship loadout.
"""
# most compact representation
# Convert loadout to JSON and gzip compress it
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')
if not string:
return False
@ -57,6 +53,8 @@ def shipyard_url(loadout: Mapping[str, Any], is_beta) -> bool | str:
with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(string)
return (
is_beta and 'http://edsy.org/beta/#/I=' or 'http://edsy.org/#/I='
) + base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
# Construct the URL using the appropriate base URL based on is_beta
base_url = 'https://edsy.org/beta/#/I=' if is_beta else 'https://edsy.org/#/I='
encoded_data = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
return base_url + encoded_data

View File

@ -1,26 +1,24 @@
"""Inara Sync."""
"""
inara.py - Sync with INARA.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
This is an EDMC 'core' plugin.
All EDMC plugins are *dynamically* loaded at run-time.
We build for Windows using `py2exe`.
`py2exe` can't possibly know about anything in the dynamically loaded core plugins.
Thus, you **MUST** check if any imports you add in this file are only
referenced in this file (or only in any other core plugin), and if so...
YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
#
# This is an EDMC 'core' plugin.
#
# All EDMC plugins are *dynamically* loaded at run-time.
#
# We build for Windows using `py2exe`.
#
# `py2exe` can't possibly know about anything in the dynamically loaded
# core plugins.
#
# Thus you **MUST** check if any imports you add in this file are only
# referenced in this file (or only in any other core plugin), and if so...
#
# YOU MUST ENSURE THAT PERTINENT ADJUSTMENTS ARE MADE IN
# `build.py` SO AS TO ENSURE THE FILES ARE ACTUALLY PRESENT
# IN AN END-USER INSTALLATION ON WINDOWS.
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import json
import threading
import time
@ -34,9 +32,7 @@ from tkinter import ttk
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import OrderedDict as OrderedDictT
from typing import Sequence, Union, cast
import requests
import edmc_data
import killswitch
import myNotebook as nb # noqa: N813
@ -168,14 +164,15 @@ if DEBUG:
TARGET_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/inara'
# noinspection PyUnresolvedReferences
def system_url(system_name: str) -> str:
"""Get a URL for the current system."""
if this.system_address:
return requests.utils.requote_uri(f'https://inara.cz/elite/starsystem/'
return requests.utils.requote_uri(f'https://inara.cz/galaxy-starsystem/'
f'?search={this.system_address}')
elif system_name:
return requests.utils.requote_uri(f'https://inara.cz/elite/starsystem/'
if system_name:
return requests.utils.requote_uri(f'https://inara.cz/galaxy-starsystem/'
f'?search={system_name}')
return ''
@ -192,13 +189,11 @@ def station_url(system_name: str, station_name: str) -> str:
:return: A URL to inara for the given system and station
"""
if system_name and station_name:
return requests.utils.requote_uri(f'https://inara.cz/elite/station/'
f'?search={system_name}%20[{station_name}]')
return requests.utils.requote_uri(f'https://inara.cz/galaxy-station/?search={system_name}%20[{station_name}]')
# monitor state might think these are gone, but we don't yet
if this.system_name and this.station:
return requests.utils.requote_uri(f'https://inara.cz/elite/station/'
f'?search={this.system_name}%20[{this.station}]')
return requests.utils.requote_uri(
f'https://inara.cz/galaxy-station/?search={this.system_name}%20[{this.station}]')
if system_name:
return system_url(system_name)
@ -224,9 +219,7 @@ def plugin_start3(plugin_dir: str) -> str:
def plugin_app(parent: tk.Tk) -> None:
"""Plugin UI setup Hook."""
this.parent = parent
# system label in main window
this.system_link = parent.nametowidget(f".{appname.lower()}.system")
# station label in main window
this.station_link = parent.nametowidget(f".{appname.lower()}.station")
this.system_link.bind_all('<<InaraLocation>>', update_location)
this.system_link.bind_all('<<InaraShip>>', update_ship)
@ -379,11 +372,14 @@ def credentials(cmdr: Optional[str]) -> Optional[str]:
return None
cmdrs = config.get_list('inara_cmdrs', default=[])
if cmdr in cmdrs and config.get_list('inara_apikeys'):
return config.get_list('inara_apikeys')[cmdrs.index(cmdr)]
apikeys = config.get_list('inara_apikeys', default=[])
else:
return None
if cmdr in cmdrs:
idx = cmdrs.index(cmdr)
if idx < len(apikeys):
return apikeys[idx]
return None
def journal_entry( # noqa: C901, CCR001
@ -422,11 +418,9 @@ def journal_entry( # noqa: C901, CCR001
if not monitor.is_live_galaxy():
# Since Update 14 on 2022-11-29 Inara only accepts Live data.
if (
(
this.legacy_galaxy_last_notified is None
or (datetime.now(timezone.utc) - this.legacy_galaxy_last_notified) > timedelta(seconds=300)
)
and config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(cmdr)
(this.legacy_galaxy_last_notified is None or
(datetime.now(timezone.utc) - this.legacy_galaxy_last_notified) > timedelta(seconds=300))
and config.get_int('inara_out') and not (is_beta or this.multicrew or credentials(cmdr))
):
# LANG: The Inara API only accepts Live galaxy data, not Legacy galaxy data
logger.info(_("Inara only accepts Live galaxy data"))
@ -475,92 +469,49 @@ def journal_entry( # noqa: C901, CCR001
if config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(cmdr):
current_credentials = Credentials(this.cmdr, this.FID, str(credentials(this.cmdr)))
try:
# Dump starting state to Inara
if (this.newuser or event_name == 'StartUp' or (this.newsession and event_name == 'Cargo')):
if this.newuser or event_name == 'StartUp' or (this.newsession and event_name == 'Cargo'):
this.newuser = False
this.newsession = False
# Don't send the API call with no values.
if state['Reputation']:
new_add_event(
'setCommanderReputationMajorFaction',
entry['timestamp'],
[
{'majorfactionName': k.lower(), 'majorfactionReputation': v / 100.0}
for k, v in state['Reputation'].items() if v is not None
]
)
if state['Engineers']: # Not populated < 3.3
to_send_list: List[Mapping[str, Any]] = []
for k, v in state['Engineers'].items():
e = {'engineerName': k}
if isinstance(v, tuple):
e['rankValue'] = v[0]
else:
e['rankStage'] = v
to_send_list.append(e)
new_add_event(
'setCommanderRankEngineer',
entry['timestamp'],
to_send_list,
)
# Update location
# Might not be available if this event is a 'StartUp' and we're replaying
# a log.
# XXX: This interferes with other more specific setCommanderTravelLocation events in the same
# batch.
# if system:
# new_add_event(
# 'setCommanderTravelLocation',
# entry['timestamp'],
# OrderedDict([
# ('starsystemName', system),
# ('stationName', station), # Can be None
# ])
# )
reputation_data = [
{'majorfactionName': k.lower(), 'majorfactionReputation': v / 100.0}
for k, v in state['Reputation'].items() if v is not None
]
new_add_event('setCommanderReputationMajorFaction', entry['timestamp'], reputation_data)
if state['Engineers']:
engineer_data = [
{'engineerName': k, 'rankValue': v[0] if isinstance(v, tuple) else None, 'rankStage': v}
for k, v in state['Engineers'].items()
]
new_add_event('setCommanderRankEngineer', entry['timestamp'], engineer_data)
# Update ship
if state['ShipID']: # Unknown if started in Fighter or SRV
cur_ship: Dict[str, Any] = {
if state['ShipID']:
cur_ship = {
'shipType': state['ShipType'],
'shipGameID': state['ShipID'],
'shipName': state['ShipName'],
'shipIdent': state['ShipIdent'],
'isCurrentShip': True,
}
if state['HullValue']:
cur_ship['shipHullValue'] = state['HullValue']
if state['ModulesValue']:
cur_ship['shipModulesValue'] = state['ModulesValue']
cur_ship['shipRebuyCost'] = state['Rebuy']
new_add_event('setCommanderShip', entry['timestamp'], cur_ship)
this.loadout = make_loadout(state)
new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout)
# Trigger off the "only observed as being after Ranks" event so that
# we have both current Ranks *and* current Progress within them.
elif event_name == 'Progress':
# Send rank info to Inara on startup
new_add_event(
'setCommanderRankPilot',
entry['timestamp'],
[
{'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': v[1] / 100.0}
for k, v in state['Rank'].items() if v is not None
]
)
rank_data = [
{'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': v[1] / 100.0}
for k, v in state['Rank'].items() if v is not None
]
new_add_event('setCommanderRankPilot', entry['timestamp'], rank_data)
# Promotions
elif event_name == 'Promotion':
for k, v in state['Rank'].items():
if k in entry:
@ -571,41 +522,25 @@ def journal_entry( # noqa: C901, CCR001
)
elif event_name == 'EngineerProgress' and 'Engineer' in entry:
# TODO: due to this var name being used above, the types are weird
to_send_dict = {'engineerName': entry['Engineer']}
if 'Rank' in entry:
to_send_dict['rankValue'] = entry['Rank']
else:
to_send_dict['rankStage'] = entry['Progress']
new_add_event(
'setCommanderRankEngineer',
entry['timestamp'],
to_send_dict
)
engineer_rank_data = {
'engineerName': entry['Engineer'],
'rankValue': entry['Rank'] if 'Rank' in entry else None,
'rankStage': entry['Progress'] if 'Progress' in entry else None,
}
new_add_event('setCommanderRankEngineer', entry['timestamp'], engineer_rank_data)
# PowerPlay status change
if event_name == 'PowerplayJoin':
new_add_event(
'setCommanderRankPower',
entry['timestamp'],
{'powerName': entry['Power'], 'rankValue': 1}
)
elif event_name == 'PowerplayJoin':
power_join_data = {'powerName': entry['Power'], 'rankValue': 1}
new_add_event('setCommanderRankPower', entry['timestamp'], power_join_data)
elif event_name == 'PowerplayLeave':
new_add_event(
'setCommanderRankPower',
entry['timestamp'],
{'powerName': entry['Power'], 'rankValue': 0}
)
power_leave_data = {'powerName': entry['Power'], 'rankValue': 0}
new_add_event('setCommanderRankPower', entry['timestamp'], power_leave_data)
elif event_name == 'PowerplayDefect':
new_add_event(
'setCommanderRankPower',
entry['timestamp'],
{'powerName': entry['ToPower'], 'rankValue': 1}
)
power_defect_data = {'powerName': entry["ToPower"], 'rankValue': 1}
new_add_event('setCommanderRankPower', entry['timestamp'], power_defect_data)
# Ship change
if event_name == 'Loadout' and this.shipswap:
@ -683,7 +618,7 @@ def journal_entry( # noqa: C901, CCR001
elif event_name == 'SupercruiseExit':
to_send = {
'starsystemName': entry['StarSystem'],
'starsystemName': entry['StarSystem'],
}
if entry['BodyType'] == 'Planet':
@ -696,9 +631,9 @@ def journal_entry( # noqa: C901, CCR001
# we might not yet have system logged for use.
if system:
to_send = {
'starsystemName': system,
'stationName': entry['Name'],
'starsystemBodyName': entry['BodyName'],
'starsystemName': system,
'stationName': entry['Name'],
'starsystemBodyName': entry['BodyName'],
'starsystemBodyCoords': [entry['Latitude'], entry['Longitude']]
}
# Not present on, e.g. Ancient Ruins
@ -775,22 +710,19 @@ def journal_entry( # noqa: C901, CCR001
# Ignore the following 'Docked' event
this.suppress_docked = True
cargo: List[OrderedDictT[str, Any]]
cargo = [OrderedDict({'itemName': k, 'itemCount': state['Cargo'][k]}) for k in sorted(state['Cargo'])]
# Send cargo and materials if changed
cargo = [OrderedDict({'itemName': k, 'itemCount': state['Cargo'][k]}) for k in sorted(state['Cargo'])]
if this.cargo != cargo:
new_add_event('setCommanderInventoryCargo', entry['timestamp'], cargo)
this.cargo = cargo
materials: List[OrderedDictT[str, Any]] = []
for category in ('Raw', 'Manufactured', 'Encoded'):
materials.extend(
[OrderedDict([('itemName', k), ('itemCount', state[category][k])]) for k in sorted(state[category])]
)
materials = [
OrderedDict([('itemName', k), ('itemCount', state[category][k])])
for category in ('Raw', 'Manufactured', 'Encoded')
for k in sorted(state[category])
]
if this.materials != materials:
new_add_event('setCommanderInventoryMaterials', entry['timestamp'], materials)
new_add_event('setCommanderInventoryMaterials', entry['timestamp'], materials)
this.materials = materials
except Exception as e:
@ -1398,7 +1330,7 @@ def journal_entry( # noqa: C901, CCR001
return '' # No error
def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001
def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later
"""CAPI event hook."""
this.cmdr = data['commander']['name']
@ -1539,17 +1471,22 @@ def new_add_event(
def clean_event_list(event_list: List[Event]) -> List[Event]:
"""Check for killswitched events and remove or modify them as requested."""
out = []
for e in event_list:
bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{e.name}', e.data, logger)
if bad:
"""
Check for killswitched events and remove or modify them as requested.
:param event_list: List of events to clean
:return: Cleaned list of events
"""
cleaned_events = []
for event in event_list:
is_bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{event.name}', event.data, logger)
if is_bad:
continue
e.data = new_event
out.append(e)
event.data = new_event
cleaned_events.append(event)
return out
return cleaned_events
def new_worker():
@ -1561,8 +1498,9 @@ def new_worker():
logger.debug('Starting...')
while True:
events = get_events()
if (res := killswitch.get_disabled("plugins.inara.worker")).disabled:
logger.warning(f"Inara worker disabled via killswitch. ({res.reason})")
disabled_killswitch = killswitch.get_disabled("plugins.inara.worker")
if disabled_killswitch.disabled:
logger.warning(f"Inara worker disabled via killswitch. ({disabled_killswitch.reason})")
continue
for creds, event_list in events.items():
@ -1570,6 +1508,10 @@ def new_worker():
if not event_list:
continue
event_data = [
{'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list
]
data = {
'header': {
'appName': applongname,
@ -1578,12 +1520,10 @@ def new_worker():
'commanderName': creds.cmdr,
'commanderFrontierID': creds.fid,
},
'events': [
{'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list
]
'events': event_data
}
logger.info(f'sending {len(data["events"])} events for {creds.cmdr}')
logger.info(f'Sending {len(event_data)} events for {creds.cmdr}')
logger.trace_if('plugin.inara.events', f'Events:\n{json.dumps(data)}\n')
try_send_data(TARGET_URL, data)
@ -1595,94 +1535,129 @@ def new_worker():
def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]:
"""
Fetch a frozen copy of all events from the current queue.
Fetch a copy of all events from the current queue.
:param clear: whether or not to clear the queues as we go, defaults to True
:return: the frozen event list
:param clear: whether to clear the queues as we go, defaults to True
:return: a copy of the event dictionary
"""
out: Dict[Credentials, List[Event]] = {}
events_copy: Dict[Credentials, List[Event]] = {}
with this.event_lock:
for key, events in this.events.items():
out[key] = list(events)
events_copy[key] = list(events)
if clear:
events.clear()
return out
return events_copy
def try_send_data(url: str, data: Mapping[str, Any]) -> None:
"""
Attempt repeatedly to send the payload forward.
Attempt repeatedly to send the payload.
:param url: target URL for the payload
:param data: the payload
"""
for i in range(3):
logger.debug(f"sending data to API, attempt #{i}")
for attempt in range(3):
logger.debug(f"Sending data to API, attempt #{attempt + 1}")
try:
if send_data(url, data):
break
except Exception as e:
logger.debug('unable to send events', exc_info=e)
logger.debug('Unable to send events', exc_info=e)
return
def send_data(url: str, data: Mapping[str, Any]) -> bool: # noqa: CCR001
def send_data(url: str, data: Mapping[str, Any]) -> bool:
"""
Write a set of events to the inara API.
Send a set of events to the Inara API.
:param url: the target URL to post to
:param data: the data to POST
:return: success state
:param url: The target URL to post the data.
:param data: The data to be POSTed.
:return: True if the data was sent successfully, False otherwise.
"""
# NB: As of 2022-01-25 Artie has stated the Inara API does *not* support compression
r = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT)
r.raise_for_status()
reply = r.json()
response = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT)
response.raise_for_status()
reply = response.json()
status = reply['header']['eventStatus']
if status // 100 != 2: # 2xx == OK (maybe with warnings)
# Log fatal errors
logger.warning(f'Inara\t{status} {reply["header"].get("eventStatusText", "")}')
logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}')
# LANG: INARA API returned some kind of error (error message will be contained in {MSG})
plug.show_error(_('Error: Inara {MSG}').format(MSG=reply['header'].get('eventStatusText', status)))
handle_api_error(data, status, reply)
else:
# Log individual errors and warnings
for data_event, reply_event in zip(data['events'], reply['events']):
if reply_event['eventStatus'] != 200:
if ("Everything was alright, the near-neutral status just wasn't stored."
not in reply_event.get("eventStatusText")):
logger.warning(f'Inara\t{status} {reply_event.get("eventStatusText", "")}')
logger.debug(f'JSON data:\n{json.dumps(data_event)}')
handle_success_reply(data, reply)
if reply_event['eventStatus'] // 100 != 2:
# LANG: INARA API returned some kind of error (error message will be contained in {MSG})
plug.show_error(_('Error: Inara {MSG}').format(
MSG=f'{data_event["eventName"]},'
f'{reply_event.get("eventStatusText", reply_event["eventStatus"])}'
))
return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such
if data_event['eventName'] in (
'addCommanderTravelCarrierJump',
'addCommanderTravelDock',
'addCommanderTravelFSDJump',
'setCommanderTravelLocation'
):
this.lastlocation = reply_event.get('eventData', {})
# calls update_location in main thread
if not config.shutting_down:
this.system_link.event_generate('<<InaraLocation>>', when="tail")
elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']:
this.lastship = reply_event.get('eventData', {})
# calls update_ship in main thread
if not config.shutting_down:
this.system_link.event_generate('<<InaraShip>>', when="tail")
def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None:
"""
Handle API error response.
return True # regardless of errors above, we DID manage to send it, therefore inform our caller as such
:param data: The original data that was sent.
:param status: The HTTP status code of the API response.
:param reply: The JSON reply from the API.
"""
error_message = reply['header'].get('eventStatusText', "")
logger.warning(f'Inara\t{status} {error_message}')
logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}')
plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message))
def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None:
"""
Handle successful API response.
:param data: The original data that was sent.
:param reply: The JSON reply from the API.
"""
for data_event, reply_event in zip(data['events'], reply['events']):
reply_status = reply_event['eventStatus']
reply_text = reply_event.get("eventStatusText", "")
if reply_status != 200:
handle_individual_error(data_event, reply_status, reply_text)
handle_special_events(data_event, reply_event)
def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None:
"""
Handle individual API error.
:param data_event: The event data that was sent.
:param reply_status: The event status code from the API response.
:param reply_text: The event status text from the API response.
"""
if ("Everything was alright, the near-neutral status just wasn't stored."
not in reply_text):
logger.warning(f'Inara\t{reply_status} {reply_text}')
logger.debug(f'JSON data:\n{json.dumps(data_event)}')
if reply_status // 100 != 2:
plug.show_error(_('Error: Inara {MSG}').format(
MSG=f'{data_event["eventName"]}, {reply_text}'
))
def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None:
"""
Handle special events in the API response.
:param data_event: The event data that was sent.
:param reply_event: The event data from the API reply.
"""
if data_event['eventName'] in (
'addCommanderTravelCarrierJump',
'addCommanderTravelDock',
'addCommanderTravelFSDJump',
'setCommanderTravelLocation'
):
this.lastlocation = reply_event.get('eventData', {})
if not config.shutting_down:
this.system_link.event_generate('<<InaraLocation>>', when="tail")
elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']:
this.lastship = reply_event.get('eventData', {})
if not config.shutting_down:
this.system_link.event_generate('<<InaraShip>>', when="tail")
def update_location(event=None) -> None: