1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-12 07:20:02 +03:00

[2051] First Pass Remaining Files

This commit is contained in:
David Sangrey 2023-11-30 21:30:18 -05:00
parent 386b0f69b5
commit cb2a18025c
No known key found for this signature in database
GPG Key ID: 3AEADBB0186884BC
8 changed files with 255 additions and 250 deletions

View File

@ -1,10 +1,15 @@
"""
Handle use of Frontier's Companion API (CAPI) service.
companion.py - Handle use of Frontier's Companion API (CAPI) service.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
Deals with initiating authentication for, and use of, CAPI.
Some associated code is in protocol.py which creates and handles the edmc://
protocol used for the callback.
"""
from __future__ import annotations
import base64
import collections
@ -21,13 +26,10 @@ import time
import tkinter as tk
import urllib.parse
import webbrowser
from builtins import object, range, str
from email.utils import parsedate
from queue import Queue
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, OrderedDict, TypeVar, Union
from typing import TYPE_CHECKING, Any, Mapping, OrderedDict, TypeVar
import requests
import config as conf_module
import killswitch
import protocol
@ -43,7 +45,7 @@ if TYPE_CHECKING:
UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally
else:
UserDict = collections.UserDict # type: ignore # Otherwise simply use the actual class
UserDict = collections.UserDict # Otherwise simply use the actual class
capi_query_cooldown = 60 # Minimum time between (sets of) CAPI queries
@ -59,7 +61,7 @@ SERVER_LIVE = 'https://companion.orerve.net'
SERVER_LEGACY = 'https://legacy-companion.orerve.net'
SERVER_BETA = 'https://pts-companion.orerve.net'
commodity_map: Dict = {}
commodity_map: dict = {}
class CAPIData(UserDict):
@ -67,10 +69,10 @@ class CAPIData(UserDict):
def __init__(
self,
data: Union[str, Dict[str, Any], 'CAPIData', None] = None,
source_host: Optional[str] = None,
source_endpoint: Optional[str] = None,
request_cmdr: Optional[str] = None
data: str | dict[str, Any] | 'CAPIData' | None = None,
source_host: str | None = None,
source_endpoint: str | None = None,
request_cmdr: str | None = None
) -> None:
if data is None:
super().__init__()
@ -102,13 +104,13 @@ class CAPIData(UserDict):
This has side-effects of fixing `data` to be as expected in terms of
types of those elements.
"""
modules: Dict[str, Any] = self.data['lastStarport'].get('modules')
modules: dict[str, Any] = self.data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict):
if modules is None:
logger.debug('modules was None. FC or Damaged Station?')
elif isinstance(modules, list):
if len(modules) == 0:
if not modules:
logger.debug('modules is empty list. Damaged Station?')
else:
@ -120,13 +122,13 @@ class CAPIData(UserDict):
# Set a safe value
self.data['lastStarport']['modules'] = modules = {}
ships: Dict[str, Any] = self.data['lastStarport'].get('ships')
ships: dict[str, Any] = self.data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict):
if ships is None:
logger.debug('ships was None')
else:
logger.error(f'ships was neither None nor a Dict! type: {type(ships)}, content: {ships}')
logger.error(f'ships was neither None nor a dict! type: {type(ships)}, content: {ships}')
# Set a safe value
self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []}
@ -152,7 +154,7 @@ class CAPIDataRawEndpoint:
class CAPIDataRaw:
"""The last obtained raw CAPI response for each endpoint."""
raw_data: Dict[str, CAPIDataRawEndpoint] = {}
raw_data: dict[str, CAPIDataRawEndpoint] = {}
def record_endpoint(
self, endpoint: str,
@ -176,14 +178,14 @@ class CAPIDataRaw:
def __iter__(self):
"""Make this iterable on its raw_data dict."""
yield from self.raw_data
yield from self.raw_data.keys()
def __getitem__(self, item):
"""Make the raw_data dict's items get'able."""
return self.raw_data.__getitem__(item)
return self.raw_data[item]
def listify(thing: Union[List, Dict]) -> List:
def listify(thing: list | dict) -> list:
"""
Convert actual JSON array or int-indexed dict into a Python list.
@ -196,11 +198,11 @@ def listify(thing: Union[List, Dict]) -> List:
if thing is None:
return [] # data is not present
elif isinstance(thing, list):
if isinstance(thing, list):
return list(thing) # array is not sparse
elif isinstance(thing, dict):
retval: List[Any] = []
if isinstance(thing, dict):
retval: list[Any] = []
for k, v in thing.items():
idx = int(k)
@ -211,9 +213,7 @@ def listify(thing: Union[List, Dict]) -> List:
retval[idx] = v
return retval
else:
raise ValueError(f"expected an array or sparse array, got {thing!r}")
raise ValueError(f"expected an array or sparse array, got {thing!r}")
class ServerError(Exception):
@ -297,7 +297,7 @@ class CmdrError(Exception):
self.args = (_('Error: Wrong Cmdr'),)
class Auth(object):
class Auth:
"""Handles authentication with the Frontier CAPI service via oAuth2."""
# Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in
@ -313,15 +313,15 @@ class Auth(object):
self.cmdr: str = cmdr
self.requests_session = requests.Session()
self.requests_session.headers['User-Agent'] = user_agent
self.verifier: Union[bytes, None] = None
self.state: Union[str, None] = None
self.verifier: bytes | None = None
self.state: str | None = None
def __del__(self) -> None:
"""Ensure our Session is closed if we're being deleted."""
if self.requests_session:
self.requests_session.close()
def refresh(self) -> Optional[str]:
def refresh(self) -> str | None:
"""
Attempt use of Refresh Token to get a valid Access Token.
@ -347,7 +347,7 @@ class Auth(object):
logger.debug(f'idx = {idx}')
tokens = config.get_list('fdev_apikeys', default=[])
tokens = tokens + [''] * (len(cmdrs) - len(tokens))
tokens += [''] * (len(cmdrs) - len(tokens))
if tokens[idx]:
logger.debug('We have a refresh token for that idx')
data = {
@ -358,7 +358,7 @@ class Auth(object):
logger.debug('Attempting refresh with Frontier...')
try:
r: Optional[requests.Response] = None
r: requests.Response | None = None
r = self.requests_session.post(
FRONTIER_AUTH_SERVER + self.FRONTIER_AUTH_PATH_TOKEN,
data=data,
@ -372,11 +372,10 @@ class Auth(object):
return data.get('access_token')
else:
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
self.dump(r)
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
self.dump(r)
except (ValueError, requests.RequestException, ) as e:
except (ValueError, requests.RequestException) as e:
logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"\n{e!r}")
if r is not None:
self.dump(r)
@ -490,7 +489,7 @@ class Auth(object):
cmdrs = config.get_list('cmdrs', default=[])
idx = cmdrs.index(self.cmdr)
tokens = config.get_list('fdev_apikeys', default=[])
tokens = tokens + [''] * (len(cmdrs) - len(tokens))
tokens += [''] * (len(cmdrs) - len(tokens))
tokens[idx] = data_token.get('refresh_token', '')
config.set('fdev_apikeys', tokens)
config.save() # Save settings now for use by command-line app
@ -518,9 +517,9 @@ class Auth(object):
raise CredentialsError(f'{_("Error")}: {error!r}')
@staticmethod
def invalidate(cmdr: Optional[str]) -> None:
def invalidate(cmdr: str | None) -> None:
"""Invalidate Refresh Token for specified Commander."""
to_set: Optional[list] = None
to_set: list | None = None
if cmdr is None:
logger.info('Frontier CAPI Auth: Invalidating ALL tokens!')
cmdrs = config.get_list('cmdrs', default=[])
@ -531,7 +530,7 @@ class Auth(object):
cmdrs = config.get_list('cmdrs', default=[])
idx = cmdrs.index(cmdr)
to_set = config.get_list('fdev_apikeys', default=[])
to_set = to_set + [''] * (len(cmdrs) - len(to_set)) # type: ignore
to_set += [''] * (len(cmdrs) - len(to_set))
to_set[idx] = ''
if to_set is None:
@ -560,7 +559,7 @@ class EDMCCAPIReturn:
"""Base class for Request, Failure or Response."""
def __init__(
self, query_time: int, tk_response_event: Optional[str] = None,
self, query_time: int, tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False
):
self.tk_response_event = tk_response_event # Name of tk event to generate when response queued.
@ -577,7 +576,7 @@ class EDMCCAPIRequest(EDMCCAPIReturn):
def __init__(
self, capi_host: str, endpoint: str,
query_time: int,
tk_response_event: Optional[str] = None,
tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False
):
super().__init__(
@ -612,7 +611,7 @@ class EDMCCAPIFailedRequest(EDMCCAPIReturn):
self.exception: Exception = exception # Exception that recipient should raise.
class Session(object):
class Session:
"""Methods for handling Frontier Auth and CAPI queries."""
STATE_INIT, STATE_AUTH, STATE_OK = list(range(3))
@ -628,11 +627,11 @@ class Session(object):
def __init__(self) -> None:
self.state = Session.STATE_INIT
self.credentials: Optional[Dict[str, Any]] = None
self.credentials: dict[str, Any] | None = None
self.requests_session = requests.Session()
self.auth: Optional[Auth] = None
self.auth: Auth | None = None
self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query
self.tk_master: Optional[tk.Tk] = None
self.tk_master: tk.Tk | None = None
self.capi_raw_data = CAPIDataRaw() # Cache of raw replies from CAPI service
# Queue that holds requests for CAPI queries, the items should always
@ -642,7 +641,7 @@ class Session(object):
# queries back to the requesting code (technically anything checking
# this queue, but it should be either EDMarketConnector.AppWindow or
# EDMC.py). Items may be EDMCCAPIResponse or EDMCCAPIFailedRequest.
self.capi_response_queue: Queue[Union[EDMCCAPIResponse, EDMCCAPIFailedRequest]] = Queue()
self.capi_response_queue: Queue[EDMCCAPIResponse | EDMCCAPIFailedRequest] = Queue()
logger.debug('Starting CAPI queries thread...')
self.capi_query_thread = threading.Thread(
target=self.capi_query_worker,
@ -667,7 +666,7 @@ class Session(object):
self.state = Session.STATE_OK
def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> bool:
def login(self, cmdr: str | None = None, is_beta: bool | None = None) -> bool:
"""
Attempt oAuth2 login.
@ -694,7 +693,7 @@ class Session(object):
logger.error('self.credentials is None')
raise CredentialsError('Missing credentials') # Shouldn't happen
elif self.state == Session.STATE_OK:
if self.state == Session.STATE_OK:
logger.debug('already logged in (state == STATE_OK)')
return True # already logged in
@ -704,10 +703,9 @@ class Session(object):
logger.debug(f'already logged in (is_beta = {is_beta})')
return True # already logged in
else:
logger.debug('changed account or retrying login during auth')
self.reinit_session()
self.credentials = credentials
logger.debug('changed account or retrying login during auth')
self.reinit_session()
self.credentials = credentials
self.state = Session.STATE_INIT
self.auth = Auth(self.credentials['cmdr'])
@ -719,11 +717,10 @@ class Session(object):
self.start_frontier_auth(access_token)
return True
else:
logger.debug('We do NOT have an access_token')
self.state = Session.STATE_AUTH
return False
# Wait for callback
logger.debug('We do NOT have an access_token')
self.state = Session.STATE_AUTH
return False
# Wait for callback
# Callback from protocol handler
def auth_callback(self) -> None:
@ -745,7 +742,7 @@ class Session(object):
self.auth = None
raise # Bad thing happened
if getattr(sys, 'frozen', False):
tk.messagebox.showinfo(title="Authentication Successful",
tk.messagebox.showinfo(title="Authentication Successful", # type: ignore
message="Authentication with cAPI Successful.\n"
"You may now close the Frontier login tab if it is still open.")
@ -812,11 +809,11 @@ class Session(object):
raise ServerConnectionError(f'Pretending CAPI down: {capi_endpoint}')
if conf_module.capi_debug_access_token is not None:
self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}' # type: ignore # noqa: E501
self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}'
# This is one-shot
conf_module.capi_debug_access_token = None
r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout) # type: ignore
r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout)
logger.trace_if('capi.worker', '... got result...')
r.raise_for_status() # Typically 403 "Forbidden" on token expiry
@ -835,21 +832,7 @@ class Session(object):
raise ServerConnectionError(f'Unable to connect to endpoint: {capi_endpoint}') from e
except requests.HTTPError as e: # In response to raise_for_status()
logger.exception(f'Frontier CAPI Auth: GET {capi_endpoint}')
self.dump(r)
if r.status_code == 401: # CAPI doesn't think we're Auth'd
# TODO: This needs to try a REFRESH, not a full re-auth
# No need for translation, we'll go straight into trying new Auth
# and thus any message would be overwritten.
raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"') from e
if r.status_code == 418: # "I'm a teapot" - used to signal maintenance
# LANG: Frontier CAPI returned 418, meaning down for maintenance
raise ServerError(_("Frontier CAPI down for maintenance")) from e
logger.exception('Frontier CAPI: Misc. Error')
raise ServerError('Frontier CAPI: Misc. Error') from e
handle_http_error(e.response, capi_endpoint) # type: ignore # Handle various HTTP errors
except ValueError as e:
logger.exception(f'decoding CAPI response content:\n{r.content.decode(encoding="utf-8")}\n')
@ -870,6 +853,28 @@ class Session(object):
return capi_data
def handle_http_error(response: requests.Response, endpoint: str):
"""
Handle different types of HTTP errors raised during CAPI requests.
:param response: The HTTP response object.
:param endpoint: The CAPI endpoint that was queried.
:raises: Various exceptions based on the error scenarios.
"""
logger.exception(f'Frontier CAPI Auth: GET {endpoint}')
self.dump(response)
if response.status_code == 401:
# CAPI doesn't think we're Auth'd
raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"')
if response.status_code == 418:
# "I'm a teapot" - used to signal maintenance
raise ServerError(_("Frontier CAPI down for maintenance"))
logger.exception('Frontier CAPI: Misc. Error')
raise ServerError('Frontier CAPI: Misc. Error')
def capi_station_queries( # noqa: CCR001
capi_host: str, timeout: int = capi_default_requests_timeout
) -> CAPIData:
@ -939,9 +944,8 @@ class Session(object):
logger.warning(f"{last_starport_id!r} != {int(market_data['id'])!r}")
raise ServerLagging()
else:
market_data['name'] = last_starport_name
station_data['lastStarport'].update(market_data)
market_data['name'] = last_starport_name
station_data['lastStarport'].update(market_data)
if services.get('outfitting') or services.get('shipyard'):
shipyard_data = capi_single_query(capi_host, self.FRONTIER_CAPI_PATH_SHIPYARD, timeout=timeout)
@ -953,9 +957,8 @@ class Session(object):
logger.warning(f"{last_starport_id!r} != {int(shipyard_data['id'])!r}")
raise ServerLagging()
else:
shipyard_data['name'] = last_starport_name
station_data['lastStarport'].update(shipyard_data)
shipyard_data['name'] = last_starport_name
station_data['lastStarport'].update(shipyard_data)
# WORKAROUND END
return station_data
@ -1024,7 +1027,7 @@ class Session(object):
)
def station(
self, query_time: int, tk_response_event: Optional[str] = None,
self, query_time: int, tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False
) -> None:
"""
@ -1178,11 +1181,9 @@ class Session(object):
logger.debug(f"Using {SERVER_LIVE} because monitor.is_live_galaxy() was True")
return SERVER_LIVE
else:
logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False")
return SERVER_LEGACY
logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False")
return SERVER_LEGACY
return ''
######################################################################
@ -1300,11 +1301,11 @@ def ship(data: CAPIData) -> CAPIData:
V = TypeVar('V')
def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int) -> V:
def index_possibly_sparse_list(data: Mapping[str, V] | list[V], key: int) -> V:
"""
Index into a "list" that may or may not be sparseified into a dict.
:param data: List or Dict to index
:param data: list or dict to index
:param key: Key to use to index
:raises ValueError: When data is of an unexpected type
:return: The value at the key
@ -1320,11 +1321,10 @@ def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int)
if isinstance(data, list):
return data[key]
elif isinstance(data, (dict, OrderedDict)):
if isinstance(data, (dict, OrderedDict)):
return data[str(key)]
else:
raise ValueError(f'Unexpected data type {type(data)}')
raise ValueError(f'Unexpected data type {type(data)}')
######################################################################

View File

@ -79,16 +79,15 @@ def get_hotkeymgr() -> AbstractHotkeyMgr:
from hotkey.darwin import MacHotkeyMgr
return MacHotkeyMgr()
elif sys.platform == 'win32':
if sys.platform == 'win32':
from hotkey.windows import WindowsHotkeyMgr
return WindowsHotkeyMgr()
elif sys.platform == 'linux':
if sys.platform == 'linux':
from hotkey.linux import LinuxHotKeyMgr
return LinuxHotKeyMgr()
else:
raise ValueError(f'Unknown platform: {sys.platform}')
raise ValueError(f'Unknown platform: {sys.platform}')
# singleton

View File

@ -1,8 +1,10 @@
"""darwin/macOS implementation of hotkey.AbstractHotkeyMgr."""
from __future__ import annotations
import pathlib
import sys
import tkinter as tk
from typing import Callable, Optional, Tuple, Union
from typing import Callable
assert sys.platform == 'darwin'
import objc
@ -107,7 +109,7 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
# suppress the event by not chaining the old function
return the_event
elif the_event.type() in (NSKeyDown, NSKeyUp):
if the_event.type() in (NSKeyDown, NSKeyUp):
c = the_event.charactersIgnoringModifiers()
self.acquire_key = (c and ord(c[0]) or 0) | \
(the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask)
@ -192,7 +194,7 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after(50, self._acquire_poll)
def fromevent(self, event) -> Optional[Union[bool, Tuple]]:
def fromevent(self, event) -> bool | tuple | None:
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
@ -209,17 +211,17 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
return False
# BkSp, Del, Clear = clear hotkey
elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]:
if keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]:
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
# don't allow keys needed for typing in System Map
elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a:
if keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a:
NSBeep()
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
return (keycode, modifiers)
return keycode, modifiers
def display(self, keycode, modifiers) -> str:
"""

View File

@ -1,6 +1,7 @@
"""Linux implementation of hotkey.AbstractHotkeyMgr."""
import sys
from __future__ import annotations
import sys
from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr

View File

@ -1,4 +1,6 @@
"""Windows implementation of hotkey.AbstractHotkeyMgr."""
from __future__ import annotations
import atexit
import ctypes
import pathlib
@ -7,8 +9,6 @@ import threading
import tkinter as tk
import winsound
from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD
from typing import Optional, Tuple, Union
from config import config
from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr
@ -266,7 +266,7 @@ class WindowsHotkeyMgr(AbstractHotkeyMgr):
"""Stop acquiring hotkey state."""
pass
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001
def fromevent(self, event) -> bool | tuple | None: # noqa: CCR001
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
@ -285,33 +285,32 @@ class WindowsHotkeyMgr(AbstractHotkeyMgr):
keycode = event.keycode
if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]:
return (0, modifiers)
return 0, modifiers
if not modifiers:
if keycode == VK_ESCAPE: # Esc = retain previous
return False
elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey
if keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey
return None
elif (
if (
keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z')
): # don't allow keys needed for typing in System Map
winsound.MessageBeep()
return None
elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY]
or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys
return (0, modifiers)
if (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY]
or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys
return 0, modifiers
# See if the keycode is usable and available
if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode):
UnregisterHotKey(None, 2)
return (keycode, modifiers)
return keycode, modifiers
else:
winsound.MessageBeep()
return None
winsound.MessageBeep()
return None
def display(self, keycode, modifiers) -> str:
"""

View File

@ -1,7 +1,11 @@
"""Monitor for new Journal files and contents of latest."""
# v [sic]
# spell-checker: words onfoot unforseen relog fsdjump suitloadoutid slotid suitid loadoutid fauto Intimidator
# spell-checker: words joinacrew quitacrew sellshiponrebuy newbal navroute npccrewpaidwage sauto
"""
monitor.py - Monitor for new Journal files and contents of latest.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json
import pathlib
@ -14,19 +18,16 @@ from collections import OrderedDict, defaultdict
from os import SEEK_END, SEEK_SET, listdir
from os.path import basename, expanduser, getctime, isdir, join
from time import gmtime, localtime, mktime, sleep, strftime, strptime, time
from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping, Tuple
if TYPE_CHECKING:
import tkinter
from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping
import semantic_version
import util_ships
from config import config
from edmc_data import edmc_suit_shortnames, edmc_suit_symbol_localised
from EDMCLogging import get_main_logger
# spell-checker: words navroute
if TYPE_CHECKING:
import tkinter
logger = get_main_logger()
STARTUP = 'journal.startup'
@ -76,11 +77,10 @@ else:
# Journal handler
class EDLogs(FileSystemEventHandler): # type: ignore # See below
class EDLogs(FileSystemEventHandler):
"""Monitoring of Journal files."""
# Magic with FileSystemEventHandler can confuse type checkers when they do not have access to every import
_POLL = 1 # Polling is cheap, so do it often
_RE_CANONICALISE = re.compile(r'\$(.+)_name;')
_RE_CATEGORY = re.compile(r'\$MICRORESOURCE_CATEGORY_(.+);')
@ -207,7 +207,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
:return: bool - False if we couldn't access/find latest Journal file.
"""
logger.debug('Begin...')
self.root = root # type: ignore
self.root = root
journal_dir = config.get_str('journaldir')
if journal_dir == '' or journal_dir is None:
@ -515,8 +515,6 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
else:
self.game_was_running = self.game_running()
logger.debug('Done.')
def synthesize_startup_event(self) -> dict[str, Any]:
"""
Synthesize a 'StartUp' event to notify plugins of initial state.
@ -570,7 +568,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try:
# Preserve property order because why not?
entry: MutableMapping[str, Any] = json.loads(line, object_pairs_hook=OrderedDict)
entry['timestamp'] # we expect this to exist # TODO: replace with assert? or an if key in check
assert 'timestamp' in entry, "Timestamp does not exist in the entry"
self.__navroute_retry()
@ -933,7 +931,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
###############################################################
if 'StarPos' in entry:
# Plugins need this as well, so copy in state
self.state['StarPos'] = tuple(entry['StarPos']) # type: ignore
self.state['StarPos'] = tuple(entry['StarPos'])
else:
logger.warning(f"'{event_type}' event without 'StarPos' !!!:\n{entry}\n")
@ -1109,7 +1107,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
while attempts < shiplocker_max_attempts:
attempts += 1
try:
with open(shiplocker_filename, 'rb') as h: # type: ignore
with open(shiplocker_filename, 'rb') as h:
entry = json.load(h, object_pairs_hook=OrderedDict)
self.state['ShipLockerJSON'] = entry
break
@ -1551,7 +1549,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
entry = json.load(mf)
except json.JSONDecodeError:
logger.exception('Failed decoding ModulesInfo.json', exc_info=True)
logger.exception('Failed decoding ModulesInfo.json')
else:
self.state['ModuleInfo'] = entry
@ -1812,7 +1810,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
self.state['Credits'] -= entry.get('Price', 0)
elif event_type == 'carrierbanktransfer':
if (newbal := entry.get('PlayerBalance')):
if newbal := entry.get('PlayerBalance'):
self.state['Credits'] = newbal
elif event_type == 'carrierdecommission':
@ -1911,7 +1909,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return name
def suitloadout_store_from_event(self, entry) -> Tuple[int, int]:
def suitloadout_store_from_event(self, entry) -> tuple[int, int]:
"""
Store Suit and SuitLoadout data from a journal event.
@ -1990,64 +1988,64 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
# TODO: *This* will need refactoring and a proper validation infrastructure
# designed for this in the future. This is a bandaid for a known issue.
def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001 C901
def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001
"""
Check an `EngineerProgress` Journal event for validity.
:param entry: Journal event dict
:return: True if passes validation, else False.
"""
# The event should have at least one of these
if 'Engineers' not in entry and 'Progress' not in entry:
engineers_present = 'Engineers' in entry
progress_present = 'Progress' in entry
if not (engineers_present or progress_present):
logger.warning(f"EngineerProgress has neither 'Engineers' nor 'Progress': {entry=}")
return False
# But not both of them
if 'Engineers' in entry and 'Progress' in entry:
if engineers_present and progress_present:
logger.warning(f"EngineerProgress has BOTH 'Engineers' and 'Progress': {entry=}")
return False
if 'Engineers' in entry:
if engineers_present:
engineers = entry['Engineers']
# 'Engineers' version should have a list as value
if not isinstance(entry['Engineers'], list):
if not isinstance(engineers, list):
logger.warning(f"EngineerProgress 'Engineers' is not a list: {entry=}")
return False
# It should have at least one entry? This might still be valid ?
if len(entry['Engineers']) < 1:
if len(engineers) < 1:
logger.warning(f"EngineerProgress 'Engineers' list is empty ?: {entry=}")
# TODO: As this might be valid, we might want to only log
return False
# And that list should have all of these keys
for e in entry['Engineers']:
for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'):
if f not in e:
# For some Progress there's no Rank/RankProgress yet
if f in ('Rank', 'RankProgress'):
if (progress := e.get('Progress', None)) is not None:
if progress in ('Invited', 'Known'):
continue
# For some Progress there's no Rank/RankProgress yet
required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress')
for e in engineers:
missing_keys = [key for key in required_keys if key not in e]
if any(key in ('Rank', 'RankProgress') and e.get('Progress') in ('Invited', 'Known') for key in
missing_keys):
continue
logger.warning(f"Engineer entry without '{f}' key: {e=} in {entry=}")
return False
if missing_keys:
logger.warning(f"Engineer entry without '{missing_keys[0]}' key: {e=} in {entry=}")
return False
if 'Progress' in entry:
if progress_present:
# Progress is only a single Engineer, so it's not an array
# { "timestamp":"2021-05-24T17:57:52Z",
# "event":"EngineerProgress",
# "Engineer":"Felicity Farseer",
# "EngineerID":300100,
# "Progress":"Invited" }
for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'):
if f not in entry:
# For some Progress there's no Rank/RankProgress yet
if f in ('Rank', 'RankProgress'):
if (progress := entry.get('Progress', None)) is not None:
if progress in ('Invited', 'Known'):
continue
logger.warning(f"Progress event without '{f}' key: {entry=}")
# For some Progress there's no Rank/RankProgress yet
required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress')
missing_keys = [key for key in required_keys if key not in entry]
if any(key in ('Rank', 'RankProgress') and entry.get('Progress') in ('Invited', 'Known') for key in
missing_keys):
if missing_keys:
logger.warning(f"Progress event without '{missing_keys[0]}' key: {entry=}")
return False
return True
@ -2152,7 +2150,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return True
elif sys.platform == 'win32':
def WindowTitle(h): # noqa: N802 # type: ignore
def WindowTitle(h): # noqa: N802
if h:
length = GetWindowTextLength(h) + 1
buf = ctypes.create_unicode_buffer(length)
@ -2261,18 +2259,18 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return
ship = util_ships.ship_file_name(self.state['ShipName'], self.state['ShipType'])
regexp = re.compile(re.escape(ship) + r'\.\d{4}\-\d\d\-\d\dT\d\d\.\d\d\.\d\d\.txt')
oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x))) # type: ignore
regexp = re.compile(re.escape(ship) + r'\.\d{4}-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt')
oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x)))
if oldfiles:
try:
with open(join(config.get_str('outdir'), oldfiles[-1]), 'r', encoding='utf-8') as h: # type: ignore
with open(join(config.get_str('outdir'), oldfiles[-1]), encoding='utf-8') as h:
if h.read() == string:
return # same as last time - don't write
except UnicodeError:
logger.exception("UnicodeError reading old ship loadout with utf-8 encoding, trying without...")
try:
with open(join(config.get_str('outdir'), oldfiles[-1]), 'r') as h: # type: ignore
with open(join(config.get_str('outdir'), oldfiles[-1])) as h:
if h.read() == string:
return # same as last time - don't write
@ -2291,9 +2289,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
# Write
ts = strftime('%Y-%m-%dT%H.%M.%S', localtime(time()))
filename = join( # type: ignore
config.get_str('outdir'), f'{ship}.{ts}.txt'
)
filename = join(config.get_str('outdir'), f'{ship}.{ts}.txt')
try:
with open(filename, 'wt', encoding='utf-8') as h:
@ -2380,7 +2376,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try:
with open(join(self.currentdir, 'NavRoute.json'), 'r') as f:
with open(join(self.currentdir, 'NavRoute.json')) as f:
raw = f.read()
except Exception as e:
@ -2391,7 +2387,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
data = json.loads(raw)
except json.JSONDecodeError:
logger.exception('Failed to decode NavRoute.json', exc_info=True)
logger.exception('Failed to decode NavRoute.json')
return None
if 'timestamp' not in data: # quick sanity check
@ -2406,7 +2402,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try:
with open(join(self.currentdir, 'FCMaterials.json'), 'r') as f:
with open(join(self.currentdir, 'FCMaterials.json')) as f:
raw = f.read()
except Exception as e:
@ -2417,7 +2413,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
data = json.loads(raw)
except json.JSONDecodeError:
logger.exception('Failed to decode FCMaterials.json', exc_info=True)
logger.exception('Failed to decode FCMaterials.json')
return None
if 'timestamp' not in data: # quick sanity check

134
plug.py
View File

@ -1,4 +1,12 @@
"""Plugin API."""
"""
plug.py - Plugin API.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import copy
import importlib
import logging
@ -6,9 +14,8 @@ import operator
import os
import sys
import tkinter as tk
from builtins import object, str
from tkinter import ttk
from typing import Any, Callable, List, Mapping, MutableMapping, Optional
from typing import Any, Mapping, MutableMapping
import companion
import myNotebook as nb # noqa: N813
@ -26,7 +33,7 @@ PLUGINS_not_py3 = []
class LastError:
"""Holds the last plugin error."""
msg: Optional[str]
msg: str | None
root: tk.Tk
def __init__(self) -> None:
@ -36,10 +43,10 @@ class LastError:
last_error = LastError()
class Plugin(object):
class Plugin:
"""An EDMC plugin."""
def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[logging.Logger]):
def __init__(self, name: str, loadfile: str | None, plugin_logger: logging.Logger | None):
"""
Load a single plugin.
@ -49,9 +56,9 @@ class Plugin(object):
:raises Exception: Typically ImportError or OSError
"""
self.name: str = name # Display name.
self.folder: Optional[str] = name # basename of plugin folder. None for internal plugins.
self.folder: str | None = name # basename of plugin folder. None for internal plugins.
self.module = None # None for disabled plugins.
self.logger: Optional[logging.Logger] = plugin_logger
self.logger: logging.Logger | None = plugin_logger
if loadfile:
logger.info(f'loading plugin "{name.replace(".", "_")}" from "{loadfile}"')
@ -64,7 +71,7 @@ class Plugin(object):
).load_module()
if getattr(module, 'plugin_start3', None):
newname = module.plugin_start3(os.path.dirname(loadfile))
self.name = newname and str(newname) or name
self.name = str(newname) if newname else self.name
self.module = module
elif getattr(module, 'plugin_start', None):
logger.warning(f'plugin {name} needs migrating\n')
@ -77,7 +84,7 @@ class Plugin(object):
else:
logger.info(f'plugin {name} disabled')
def _get_func(self, funcname: str) -> Optional[Callable]:
def _get_func(self, funcname: str): # Removing Unhelpful Type Hint
"""
Get a function from a plugin.
@ -86,7 +93,7 @@ class Plugin(object):
"""
return getattr(self.module, funcname, None)
def get_app(self, parent: tk.Frame) -> Optional[tk.Frame]:
def get_app(self, parent: tk.Frame) -> tk.Frame | None:
"""
If the plugin provides mainwindow content create and return it.
@ -100,7 +107,7 @@ class Plugin(object):
if appitem is None:
return None
elif isinstance(appitem, tuple):
if isinstance(appitem, tuple):
if (
len(appitem) != 2
or not isinstance(appitem[0], tk.Widget)
@ -118,7 +125,7 @@ class Plugin(object):
return None
def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Optional[tk.Frame]:
def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame | None:
"""
If the plugin provides a prefs frame, create and return it.
@ -132,41 +139,50 @@ class Plugin(object):
if plugin_prefs:
try:
frame = plugin_prefs(parent, cmdr, is_beta)
if not isinstance(frame, nb.Frame):
raise AssertionError
return frame
if isinstance(frame, nb.Frame):
return frame
raise AssertionError
except Exception:
logger.exception(f'Failed for Plugin "{self.name}"')
return None
def load_plugins(master: tk.Tk) -> None: # noqa: CCR001
def load_plugins(master: tk.Tk) -> None:
"""Find and load all plugins."""
last_error.root = master
internal = []
for name in sorted(os.listdir(config.internal_plugin_dir_path)):
if name.endswith('.py') and not name[0] in ['.', '_']:
try:
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir_path, name), logger)
plugin.folder = None # Suppress listing in Plugins prefs tab
internal.append(plugin)
except Exception:
logger.exception(f'Failure loading internal Plugin "{name}"')
internal = _load_internal_plugins()
PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower()))
# Add plugin folder to load path so packages can be loaded from plugin folder
sys.path.append(config.plugin_dir)
found = _load_found_plugins()
PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower()))
def _load_internal_plugins():
internal = []
for name in sorted(os.listdir(config.internal_plugin_dir_path)):
if name.endswith('.py') and name[0] not in ['.', '_']:
try:
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir_path, name), logger)
plugin.folder = None
internal.append(plugin)
except Exception:
logger.exception(f'Failure loading internal Plugin "{name}"')
return internal
def _load_found_plugins():
found = []
# Load any plugins that are also packages first, but note it's *still*
# 100% relying on there being a `load.py`, as only that will be loaded.
# The intent here is to e.g. have EDMC-Overlay load before any plugins
# that depend on it.
for name in sorted(
os.listdir(config.plugin_dir_path),
key=lambda n: (not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower())
):
for name in sorted(os.listdir(config.plugin_dir_path), key=lambda n: (
not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower())):
if not os.path.isdir(os.path.join(config.plugin_dir_path, name)) or name[0] in ['.', '_']:
pass
elif name.endswith('.disabled'):
@ -177,19 +193,17 @@ def load_plugins(master: tk.Tk) -> None: # noqa: CCR001
# Add plugin's folder to load path in case plugin has internal package dependencies
sys.path.append(os.path.join(config.plugin_dir_path, name))
# Create a logger for this 'found' plugin. Must be before the
# load.py is loaded.
import EDMCLogging
# Create a logger for this 'found' plugin. Must be before the load.py is loaded.
plugin_logger = EDMCLogging.get_plugin_logger(name)
found.append(Plugin(name, os.path.join(config.plugin_dir_path, name, 'load.py'), plugin_logger))
except Exception:
logger.exception(f'Failure loading found Plugin "{name}"')
pass
PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower()))
return found
def provides(fn_name: str) -> List[str]:
def provides(fn_name: str) -> list[str]:
"""
Find plugins that provide a function.
@ -202,7 +216,7 @@ def provides(fn_name: str) -> List[str]:
def invoke(
plugin_name: str, fallback: str | None, fn_name: str, *args: Any
) -> Optional[str]:
) -> str | None:
"""
Invoke a function on a named plugin.
@ -228,7 +242,7 @@ def invoke(
return None
def notify_stop() -> Optional[str]:
def notify_stop() -> str | None:
"""
Notify each plugin that the program is closing.
@ -251,6 +265,16 @@ def notify_stop() -> Optional[str]:
return error
def _notify_prefs_plugins(fn_name: str, cmdr: str | None, is_beta: bool) -> None:
for plugin in PLUGINS:
prefs_callback = plugin._get_func(fn_name)
if prefs_callback:
try:
prefs_callback(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None:
"""
Notify plugins that the Cmdr was changed while the settings dialog is open.
@ -259,13 +283,7 @@ def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None:
:param cmdr: current Cmdr name (or None).
:param is_beta: whether the player is in a Beta universe.
"""
for plugin in PLUGINS:
prefs_cmdr_changed = plugin._get_func('prefs_cmdr_changed')
if prefs_cmdr_changed:
try:
prefs_cmdr_changed(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
_notify_prefs_plugins("prefs_cmdr_changed", cmdr, is_beta)
def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None:
@ -278,20 +296,14 @@ def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None:
:param cmdr: current Cmdr name (or None).
:param is_beta: whether the player is in a Beta universe.
"""
for plugin in PLUGINS:
prefs_changed = plugin._get_func('prefs_changed')
if prefs_changed:
try:
prefs_changed(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
_notify_prefs_plugins("prefs_changed", cmdr, is_beta)
def notify_journal_entry(
cmdr: str, is_beta: bool, system: str | None, station: str | None,
entry: MutableMapping[str, Any],
state: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send a journal entry to each plugin.
@ -303,7 +315,7 @@ def notify_journal_entry(
:param is_beta: whether the player is in a Beta universe.
:returns: Error message from the first plugin that returns one (if any)
"""
if entry['event'] in ('Location'):
if entry['event'] in 'Location':
logger.trace_if('journal.locations', 'Notifying plugins of "Location" event')
error = None
@ -323,7 +335,7 @@ def notify_journal_entry_cqc(
cmdr: str, is_beta: bool,
entry: MutableMapping[str, Any],
state: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an in-CQC journal entry to each plugin.
@ -348,10 +360,7 @@ def notify_journal_entry_cqc(
return error
def notify_dashboard_entry(
cmdr: str, is_beta: bool,
entry: MutableMapping[str, Any],
) -> Optional[str]:
def notify_dashboard_entry(cmdr: str, is_beta: bool, entry: MutableMapping[str, Any],) -> str | None:
"""
Send a status entry to each plugin.
@ -373,10 +382,7 @@ def notify_dashboard_entry(
return error
def notify_capidata(
data: companion.CAPIData,
is_beta: bool
) -> Optional[str]:
def notify_capidata(data: companion.CAPIData, is_beta: bool) -> str | None:
"""
Send the latest EDMC data from the FD servers to each plugin.
@ -404,9 +410,7 @@ def notify_capidata(
return error
def notify_capi_fleetcarrierdata(
data: companion.CAPIData
) -> str | None:
def notify_capi_fleetcarrierdata(data: companion.CAPIData) -> str | None:
"""
Send the latest CAPI Fleetcarrier data from the FD servers to each plugin.

View File

@ -1,12 +1,16 @@
"""CMDR Status information."""
"""
stats.py - CMDR Status Information.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
import csv
import json
import sys
import tkinter
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast
import companion
import EDMCLogging
import myNotebook as nb # noqa: N813
@ -488,11 +492,11 @@ class StatsResults(tk.Toplevel):
:param align: The alignment of the data, defaults to tk.W
"""
row = -1 # To silence unbound warnings
for i in range(len(content)):
# label = HyperlinkLabel(parent, text=content[i], popup_copy=True)
label = nb.Label(parent, text=content[i])
for i, col_content in enumerate(content):
# label = HyperlinkLabel(parent, text=col_content, popup_copy=True)
label = nb.Label(parent, text=col_content)
if with_copy:
label.bind('<Button-1>', self.copy_callback(label, content[i]))
label.bind('<Button-1>', self.copy_callback(label, col_content))
if i == 0:
label.grid(padx=10, sticky=tk.W)
@ -512,7 +516,7 @@ class StatsResults(tk.Toplevel):
@staticmethod
def copy_callback(label: tk.Label, text_to_copy: str) -> Callable[..., None]:
"""Copy data in Label to clipboard."""
def do_copy(event: tkinter.Event) -> None:
def do_copy(event: tk.Event) -> None:
label.clipboard_clear()
label.clipboard_append(text_to_copy)
old_bg = label['bg']