1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-17 17:42:20 +03:00

Merge pull request #2102 from HullSeals/enhancement/2051/remainders

[2051] First Pass Remaining Files
This commit is contained in:
Phoebe 2023-12-01 04:07:54 +01:00 committed by GitHub
commit 14319d0173
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 277 additions and 270 deletions

View File

@ -9,7 +9,6 @@ import os
import shutil import shutil
import sys import sys
import pathlib import pathlib
from typing import List, Tuple
from string import Template from string import Template
from os.path import join, isdir from os.path import join, isdir
import py2exe import py2exe
@ -57,8 +56,8 @@ def system_check(dist_dir: str) -> str:
def generate_data_files( def generate_data_files(
app_name: str, gitversion_file: str, plugins: List[str] app_name: str, gitversion_file: str, plugins: list[str]
) -> List[Tuple[str, List[str]]]: ) -> list[tuple[str, list[str]]]:
"""Create the required datafiles to build.""" """Create the required datafiles to build."""
l10n_dir = "L10n" l10n_dir = "L10n"
fdevids_dir = "FDevIDs" fdevids_dir = "FDevIDs"
@ -106,7 +105,7 @@ def build() -> None:
gitversion_filename: str = system_check(dist_dir) gitversion_filename: str = system_check(dist_dir)
# Constants # Constants
plugins: List[str] = [ plugins: list[str] = [
"plugins/coriolis.py", "plugins/coriolis.py",
"plugins/eddn.py", "plugins/eddn.py",
"plugins/edsm.py", "plugins/edsm.py",
@ -141,7 +140,7 @@ def build() -> None:
} }
# Function to generate DATA_FILES list # Function to generate DATA_FILES list
data_files: List[Tuple[str, List[str]]] = generate_data_files( data_files: list[tuple[str, list[str]]] = generate_data_files(
appname, gitversion_filename, plugins appname, gitversion_filename, plugins
) )

View File

@ -1,10 +1,15 @@
""" """
Handle use of Frontier's Companion API (CAPI) service. companion.py - Handle use of Frontier's Companion API (CAPI) service.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
Deals with initiating authentication for, and use of, CAPI. Deals with initiating authentication for, and use of, CAPI.
Some associated code is in protocol.py which creates and handles the edmc:// Some associated code is in protocol.py which creates and handles the edmc://
protocol used for the callback. protocol used for the callback.
""" """
from __future__ import annotations
import base64 import base64
import collections import collections
@ -21,13 +26,10 @@ import time
import tkinter as tk import tkinter as tk
import urllib.parse import urllib.parse
import webbrowser import webbrowser
from builtins import object, range, str
from email.utils import parsedate from email.utils import parsedate
from queue import Queue from queue import Queue
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, OrderedDict, TypeVar, Union from typing import TYPE_CHECKING, Any, Mapping, OrderedDict, TypeVar
import requests import requests
import config as conf_module import config as conf_module
import killswitch import killswitch
import protocol import protocol
@ -43,7 +45,7 @@ if TYPE_CHECKING:
UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally
else: else:
UserDict = collections.UserDict # type: ignore # Otherwise simply use the actual class UserDict = collections.UserDict # Otherwise simply use the actual class
capi_query_cooldown = 60 # Minimum time between (sets of) CAPI queries capi_query_cooldown = 60 # Minimum time between (sets of) CAPI queries
@ -59,7 +61,7 @@ SERVER_LIVE = 'https://companion.orerve.net'
SERVER_LEGACY = 'https://legacy-companion.orerve.net' SERVER_LEGACY = 'https://legacy-companion.orerve.net'
SERVER_BETA = 'https://pts-companion.orerve.net' SERVER_BETA = 'https://pts-companion.orerve.net'
commodity_map: Dict = {} commodity_map: dict = {}
class CAPIData(UserDict): class CAPIData(UserDict):
@ -67,10 +69,10 @@ class CAPIData(UserDict):
def __init__( def __init__(
self, self,
data: Union[str, Dict[str, Any], 'CAPIData', None] = None, data: str | dict[str, Any] | 'CAPIData' | None = None,
source_host: Optional[str] = None, source_host: str | None = None,
source_endpoint: Optional[str] = None, source_endpoint: str | None = None,
request_cmdr: Optional[str] = None request_cmdr: str | None = None
) -> None: ) -> None:
if data is None: if data is None:
super().__init__() super().__init__()
@ -102,13 +104,13 @@ class CAPIData(UserDict):
This has side-effects of fixing `data` to be as expected in terms of This has side-effects of fixing `data` to be as expected in terms of
types of those elements. types of those elements.
""" """
modules: Dict[str, Any] = self.data['lastStarport'].get('modules') modules: dict[str, Any] = self.data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict): if modules is None or not isinstance(modules, dict):
if modules is None: if modules is None:
logger.debug('modules was None. FC or Damaged Station?') logger.debug('modules was None. FC or Damaged Station?')
elif isinstance(modules, list): elif isinstance(modules, list):
if len(modules) == 0: if not modules:
logger.debug('modules is empty list. Damaged Station?') logger.debug('modules is empty list. Damaged Station?')
else: else:
@ -120,13 +122,13 @@ class CAPIData(UserDict):
# Set a safe value # Set a safe value
self.data['lastStarport']['modules'] = modules = {} self.data['lastStarport']['modules'] = modules = {}
ships: Dict[str, Any] = self.data['lastStarport'].get('ships') ships: dict[str, Any] = self.data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict): if ships is None or not isinstance(ships, dict):
if ships is None: if ships is None:
logger.debug('ships was None') logger.debug('ships was None')
else: else:
logger.error(f'ships was neither None nor a Dict! type: {type(ships)}, content: {ships}') logger.error(f'ships was neither None nor a dict! type: {type(ships)}, content: {ships}')
# Set a safe value # Set a safe value
self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []} self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []}
@ -152,7 +154,7 @@ class CAPIDataRawEndpoint:
class CAPIDataRaw: class CAPIDataRaw:
"""The last obtained raw CAPI response for each endpoint.""" """The last obtained raw CAPI response for each endpoint."""
raw_data: Dict[str, CAPIDataRawEndpoint] = {} raw_data: dict[str, CAPIDataRawEndpoint] = {}
def record_endpoint( def record_endpoint(
self, endpoint: str, self, endpoint: str,
@ -176,14 +178,14 @@ class CAPIDataRaw:
def __iter__(self): def __iter__(self):
"""Make this iterable on its raw_data dict.""" """Make this iterable on its raw_data dict."""
yield from self.raw_data yield from self.raw_data.keys()
def __getitem__(self, item): def __getitem__(self, item):
"""Make the raw_data dict's items get'able.""" """Make the raw_data dict's items get'able."""
return self.raw_data.__getitem__(item) return self.raw_data[item]
def listify(thing: Union[List, Dict]) -> List: def listify(thing: list | dict) -> list:
""" """
Convert actual JSON array or int-indexed dict into a Python list. Convert actual JSON array or int-indexed dict into a Python list.
@ -196,11 +198,11 @@ def listify(thing: Union[List, Dict]) -> List:
if thing is None: if thing is None:
return [] # data is not present return [] # data is not present
elif isinstance(thing, list): if isinstance(thing, list):
return list(thing) # array is not sparse return list(thing) # array is not sparse
elif isinstance(thing, dict): if isinstance(thing, dict):
retval: List[Any] = [] retval: list[Any] = []
for k, v in thing.items(): for k, v in thing.items():
idx = int(k) idx = int(k)
@ -211,9 +213,7 @@ def listify(thing: Union[List, Dict]) -> List:
retval[idx] = v retval[idx] = v
return retval return retval
raise ValueError(f"expected an array or sparse array, got {thing!r}")
else:
raise ValueError(f"expected an array or sparse array, got {thing!r}")
class ServerError(Exception): class ServerError(Exception):
@ -297,7 +297,7 @@ class CmdrError(Exception):
self.args = (_('Error: Wrong Cmdr'),) self.args = (_('Error: Wrong Cmdr'),)
class Auth(object): class Auth:
"""Handles authentication with the Frontier CAPI service via oAuth2.""" """Handles authentication with the Frontier CAPI service via oAuth2."""
# Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in # Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in
@ -313,15 +313,15 @@ class Auth(object):
self.cmdr: str = cmdr self.cmdr: str = cmdr
self.requests_session = requests.Session() self.requests_session = requests.Session()
self.requests_session.headers['User-Agent'] = user_agent self.requests_session.headers['User-Agent'] = user_agent
self.verifier: Union[bytes, None] = None self.verifier: bytes | None = None
self.state: Union[str, None] = None self.state: str | None = None
def __del__(self) -> None: def __del__(self) -> None:
"""Ensure our Session is closed if we're being deleted.""" """Ensure our Session is closed if we're being deleted."""
if self.requests_session: if self.requests_session:
self.requests_session.close() self.requests_session.close()
def refresh(self) -> Optional[str]: def refresh(self) -> str | None:
""" """
Attempt use of Refresh Token to get a valid Access Token. Attempt use of Refresh Token to get a valid Access Token.
@ -347,7 +347,7 @@ class Auth(object):
logger.debug(f'idx = {idx}') logger.debug(f'idx = {idx}')
tokens = config.get_list('fdev_apikeys', default=[]) tokens = config.get_list('fdev_apikeys', default=[])
tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens += [''] * (len(cmdrs) - len(tokens))
if tokens[idx]: if tokens[idx]:
logger.debug('We have a refresh token for that idx') logger.debug('We have a refresh token for that idx')
data = { data = {
@ -358,7 +358,7 @@ class Auth(object):
logger.debug('Attempting refresh with Frontier...') logger.debug('Attempting refresh with Frontier...')
try: try:
r: Optional[requests.Response] = None r: requests.Response | None = None
r = self.requests_session.post( r = self.requests_session.post(
FRONTIER_AUTH_SERVER + self.FRONTIER_AUTH_PATH_TOKEN, FRONTIER_AUTH_SERVER + self.FRONTIER_AUTH_PATH_TOKEN,
data=data, data=data,
@ -372,11 +372,10 @@ class Auth(object):
return data.get('access_token') return data.get('access_token')
else: logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") self.dump(r)
self.dump(r)
except (ValueError, requests.RequestException, ) as e: except (ValueError, requests.RequestException) as e:
logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"\n{e!r}") logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"\n{e!r}")
if r is not None: if r is not None:
self.dump(r) self.dump(r)
@ -490,7 +489,7 @@ class Auth(object):
cmdrs = config.get_list('cmdrs', default=[]) cmdrs = config.get_list('cmdrs', default=[])
idx = cmdrs.index(self.cmdr) idx = cmdrs.index(self.cmdr)
tokens = config.get_list('fdev_apikeys', default=[]) tokens = config.get_list('fdev_apikeys', default=[])
tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens += [''] * (len(cmdrs) - len(tokens))
tokens[idx] = data_token.get('refresh_token', '') tokens[idx] = data_token.get('refresh_token', '')
config.set('fdev_apikeys', tokens) config.set('fdev_apikeys', tokens)
config.save() # Save settings now for use by command-line app config.save() # Save settings now for use by command-line app
@ -518,9 +517,9 @@ class Auth(object):
raise CredentialsError(f'{_("Error")}: {error!r}') raise CredentialsError(f'{_("Error")}: {error!r}')
@staticmethod @staticmethod
def invalidate(cmdr: Optional[str]) -> None: def invalidate(cmdr: str | None) -> None:
"""Invalidate Refresh Token for specified Commander.""" """Invalidate Refresh Token for specified Commander."""
to_set: Optional[list] = None to_set: list | None = None
if cmdr is None: if cmdr is None:
logger.info('Frontier CAPI Auth: Invalidating ALL tokens!') logger.info('Frontier CAPI Auth: Invalidating ALL tokens!')
cmdrs = config.get_list('cmdrs', default=[]) cmdrs = config.get_list('cmdrs', default=[])
@ -531,7 +530,7 @@ class Auth(object):
cmdrs = config.get_list('cmdrs', default=[]) cmdrs = config.get_list('cmdrs', default=[])
idx = cmdrs.index(cmdr) idx = cmdrs.index(cmdr)
to_set = config.get_list('fdev_apikeys', default=[]) to_set = config.get_list('fdev_apikeys', default=[])
to_set = to_set + [''] * (len(cmdrs) - len(to_set)) # type: ignore to_set += [''] * (len(cmdrs) - len(to_set))
to_set[idx] = '' to_set[idx] = ''
if to_set is None: if to_set is None:
@ -560,7 +559,7 @@ class EDMCCAPIReturn:
"""Base class for Request, Failure or Response.""" """Base class for Request, Failure or Response."""
def __init__( def __init__(
self, query_time: int, tk_response_event: Optional[str] = None, self, query_time: int, tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False play_sound: bool = False, auto_update: bool = False
): ):
self.tk_response_event = tk_response_event # Name of tk event to generate when response queued. self.tk_response_event = tk_response_event # Name of tk event to generate when response queued.
@ -577,7 +576,7 @@ class EDMCCAPIRequest(EDMCCAPIReturn):
def __init__( def __init__(
self, capi_host: str, endpoint: str, self, capi_host: str, endpoint: str,
query_time: int, query_time: int,
tk_response_event: Optional[str] = None, tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False play_sound: bool = False, auto_update: bool = False
): ):
super().__init__( super().__init__(
@ -612,7 +611,7 @@ class EDMCCAPIFailedRequest(EDMCCAPIReturn):
self.exception: Exception = exception # Exception that recipient should raise. self.exception: Exception = exception # Exception that recipient should raise.
class Session(object): class Session:
"""Methods for handling Frontier Auth and CAPI queries.""" """Methods for handling Frontier Auth and CAPI queries."""
STATE_INIT, STATE_AUTH, STATE_OK = list(range(3)) STATE_INIT, STATE_AUTH, STATE_OK = list(range(3))
@ -628,11 +627,11 @@ class Session(object):
def __init__(self) -> None: def __init__(self) -> None:
self.state = Session.STATE_INIT self.state = Session.STATE_INIT
self.credentials: Optional[Dict[str, Any]] = None self.credentials: dict[str, Any] | None = None
self.requests_session = requests.Session() self.requests_session = requests.Session()
self.auth: Optional[Auth] = None self.auth: Auth | None = None
self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query
self.tk_master: Optional[tk.Tk] = None self.tk_master: tk.Tk | None = None
self.capi_raw_data = CAPIDataRaw() # Cache of raw replies from CAPI service self.capi_raw_data = CAPIDataRaw() # Cache of raw replies from CAPI service
# Queue that holds requests for CAPI queries, the items should always # Queue that holds requests for CAPI queries, the items should always
@ -642,7 +641,7 @@ class Session(object):
# queries back to the requesting code (technically anything checking # queries back to the requesting code (technically anything checking
# this queue, but it should be either EDMarketConnector.AppWindow or # this queue, but it should be either EDMarketConnector.AppWindow or
# EDMC.py). Items may be EDMCCAPIResponse or EDMCCAPIFailedRequest. # EDMC.py). Items may be EDMCCAPIResponse or EDMCCAPIFailedRequest.
self.capi_response_queue: Queue[Union[EDMCCAPIResponse, EDMCCAPIFailedRequest]] = Queue() self.capi_response_queue: Queue[EDMCCAPIResponse | EDMCCAPIFailedRequest] = Queue()
logger.debug('Starting CAPI queries thread...') logger.debug('Starting CAPI queries thread...')
self.capi_query_thread = threading.Thread( self.capi_query_thread = threading.Thread(
target=self.capi_query_worker, target=self.capi_query_worker,
@ -667,7 +666,7 @@ class Session(object):
self.state = Session.STATE_OK self.state = Session.STATE_OK
def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> bool: def login(self, cmdr: str | None = None, is_beta: bool | None = None) -> bool:
""" """
Attempt oAuth2 login. Attempt oAuth2 login.
@ -694,7 +693,7 @@ class Session(object):
logger.error('self.credentials is None') logger.error('self.credentials is None')
raise CredentialsError('Missing credentials') # Shouldn't happen raise CredentialsError('Missing credentials') # Shouldn't happen
elif self.state == Session.STATE_OK: if self.state == Session.STATE_OK:
logger.debug('already logged in (state == STATE_OK)') logger.debug('already logged in (state == STATE_OK)')
return True # already logged in return True # already logged in
@ -704,10 +703,9 @@ class Session(object):
logger.debug(f'already logged in (is_beta = {is_beta})') logger.debug(f'already logged in (is_beta = {is_beta})')
return True # already logged in return True # already logged in
else: logger.debug('changed account or retrying login during auth')
logger.debug('changed account or retrying login during auth') self.reinit_session()
self.reinit_session() self.credentials = credentials
self.credentials = credentials
self.state = Session.STATE_INIT self.state = Session.STATE_INIT
self.auth = Auth(self.credentials['cmdr']) self.auth = Auth(self.credentials['cmdr'])
@ -719,11 +717,10 @@ class Session(object):
self.start_frontier_auth(access_token) self.start_frontier_auth(access_token)
return True return True
else: logger.debug('We do NOT have an access_token')
logger.debug('We do NOT have an access_token') self.state = Session.STATE_AUTH
self.state = Session.STATE_AUTH return False
return False # Wait for callback
# Wait for callback
# Callback from protocol handler # Callback from protocol handler
def auth_callback(self) -> None: def auth_callback(self) -> None:
@ -745,7 +742,7 @@ class Session(object):
self.auth = None self.auth = None
raise # Bad thing happened raise # Bad thing happened
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
tk.messagebox.showinfo(title="Authentication Successful", tk.messagebox.showinfo(title="Authentication Successful", # type: ignore
message="Authentication with cAPI Successful.\n" message="Authentication with cAPI Successful.\n"
"You may now close the Frontier login tab if it is still open.") "You may now close the Frontier login tab if it is still open.")
@ -812,11 +809,11 @@ class Session(object):
raise ServerConnectionError(f'Pretending CAPI down: {capi_endpoint}') raise ServerConnectionError(f'Pretending CAPI down: {capi_endpoint}')
if conf_module.capi_debug_access_token is not None: if conf_module.capi_debug_access_token is not None:
self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}' # type: ignore # noqa: E501 self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}'
# This is one-shot # This is one-shot
conf_module.capi_debug_access_token = None conf_module.capi_debug_access_token = None
r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout) # type: ignore r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout)
logger.trace_if('capi.worker', '... got result...') logger.trace_if('capi.worker', '... got result...')
r.raise_for_status() # Typically 403 "Forbidden" on token expiry r.raise_for_status() # Typically 403 "Forbidden" on token expiry
@ -835,21 +832,7 @@ class Session(object):
raise ServerConnectionError(f'Unable to connect to endpoint: {capi_endpoint}') from e raise ServerConnectionError(f'Unable to connect to endpoint: {capi_endpoint}') from e
except requests.HTTPError as e: # In response to raise_for_status() except requests.HTTPError as e: # In response to raise_for_status()
logger.exception(f'Frontier CAPI Auth: GET {capi_endpoint}') handle_http_error(e.response, capi_endpoint) # type: ignore # Handle various HTTP errors
self.dump(r)
if r.status_code == 401: # CAPI doesn't think we're Auth'd
# TODO: This needs to try a REFRESH, not a full re-auth
# No need for translation, we'll go straight into trying new Auth
# and thus any message would be overwritten.
raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"') from e
if r.status_code == 418: # "I'm a teapot" - used to signal maintenance
# LANG: Frontier CAPI returned 418, meaning down for maintenance
raise ServerError(_("Frontier CAPI down for maintenance")) from e
logger.exception('Frontier CAPI: Misc. Error')
raise ServerError('Frontier CAPI: Misc. Error') from e
except ValueError as e: except ValueError as e:
logger.exception(f'decoding CAPI response content:\n{r.content.decode(encoding="utf-8")}\n') logger.exception(f'decoding CAPI response content:\n{r.content.decode(encoding="utf-8")}\n')
@ -870,6 +853,32 @@ class Session(object):
return capi_data return capi_data
def handle_http_error(response: requests.Response, endpoint: str):
"""
Handle different types of HTTP errors raised during CAPI requests.
:param response: The HTTP response object.
:param endpoint: The CAPI endpoint that was queried.
:raises: Various exceptions based on the error scenarios.
"""
logger.exception(f'Frontier CAPI Auth: GET {endpoint}')
self.dump(response)
if response.status_code == 401:
# TODO: This needs to try a REFRESH, not a full re-auth
# No need for translation, we'll go straight into trying new Auth
# and thus any message would be overwritten.
# CAPI doesn't think we're Auth'd
raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"')
if response.status_code == 418:
# "I'm a teapot" - used to signal maintenance
# LANG: Frontier CAPI returned 418, meaning down for maintenance
raise ServerError(_("Frontier CAPI down for maintenance"))
logger.exception('Frontier CAPI: Misc. Error')
raise ServerError('Frontier CAPI: Misc. Error')
def capi_station_queries( # noqa: CCR001 def capi_station_queries( # noqa: CCR001
capi_host: str, timeout: int = capi_default_requests_timeout capi_host: str, timeout: int = capi_default_requests_timeout
) -> CAPIData: ) -> CAPIData:
@ -939,9 +948,8 @@ class Session(object):
logger.warning(f"{last_starport_id!r} != {int(market_data['id'])!r}") logger.warning(f"{last_starport_id!r} != {int(market_data['id'])!r}")
raise ServerLagging() raise ServerLagging()
else: market_data['name'] = last_starport_name
market_data['name'] = last_starport_name station_data['lastStarport'].update(market_data)
station_data['lastStarport'].update(market_data)
if services.get('outfitting') or services.get('shipyard'): if services.get('outfitting') or services.get('shipyard'):
shipyard_data = capi_single_query(capi_host, self.FRONTIER_CAPI_PATH_SHIPYARD, timeout=timeout) shipyard_data = capi_single_query(capi_host, self.FRONTIER_CAPI_PATH_SHIPYARD, timeout=timeout)
@ -953,9 +961,8 @@ class Session(object):
logger.warning(f"{last_starport_id!r} != {int(shipyard_data['id'])!r}") logger.warning(f"{last_starport_id!r} != {int(shipyard_data['id'])!r}")
raise ServerLagging() raise ServerLagging()
else: shipyard_data['name'] = last_starport_name
shipyard_data['name'] = last_starport_name station_data['lastStarport'].update(shipyard_data)
station_data['lastStarport'].update(shipyard_data)
# WORKAROUND END # WORKAROUND END
return station_data return station_data
@ -1024,7 +1031,7 @@ class Session(object):
) )
def station( def station(
self, query_time: int, tk_response_event: Optional[str] = None, self, query_time: int, tk_response_event: str | None = None,
play_sound: bool = False, auto_update: bool = False play_sound: bool = False, auto_update: bool = False
) -> None: ) -> None:
""" """
@ -1178,11 +1185,9 @@ class Session(object):
logger.debug(f"Using {SERVER_LIVE} because monitor.is_live_galaxy() was True") logger.debug(f"Using {SERVER_LIVE} because monitor.is_live_galaxy() was True")
return SERVER_LIVE return SERVER_LIVE
else: logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False")
logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False") return SERVER_LEGACY
return SERVER_LEGACY
return ''
###################################################################### ######################################################################
@ -1300,11 +1305,11 @@ def ship(data: CAPIData) -> CAPIData:
V = TypeVar('V') V = TypeVar('V')
def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int) -> V: def index_possibly_sparse_list(data: Mapping[str, V] | list[V], key: int) -> V:
""" """
Index into a "list" that may or may not be sparseified into a dict. Index into a "list" that may or may not be sparseified into a dict.
:param data: List or Dict to index :param data: list or dict to index
:param key: Key to use to index :param key: Key to use to index
:raises ValueError: When data is of an unexpected type :raises ValueError: When data is of an unexpected type
:return: The value at the key :return: The value at the key
@ -1320,11 +1325,10 @@ def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int)
if isinstance(data, list): if isinstance(data, list):
return data[key] return data[key]
elif isinstance(data, (dict, OrderedDict)): if isinstance(data, (dict, OrderedDict)):
return data[str(key)] return data[str(key)]
else: raise ValueError(f'Unexpected data type {type(data)}')
raise ValueError(f'Unexpected data type {type(data)}')
###################################################################### ######################################################################

View File

@ -3,11 +3,10 @@ Example EDMC plugin.
It adds a single button to the EDMC interface that displays the number of times it has been clicked. It adds a single button to the EDMC interface that displays the number of times it has been clicked.
""" """
from __future__ import annotations
import logging import logging
import tkinter as tk import tkinter as tk
from typing import Optional
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
from config import appname, config from config import appname, config
@ -48,7 +47,7 @@ class ClickCounter:
""" """
self.on_preferences_closed("", False) # Save our prefs self.on_preferences_closed("", False) # Save our prefs
def setup_preferences(self, parent: nb.Notebook, cmdr: str, is_beta: bool) -> Optional[tk.Frame]: def setup_preferences(self, parent: nb.Notebook, cmdr: str, is_beta: bool) -> tk.Frame | None:
""" """
setup_preferences is called by plugin_prefs below. setup_preferences is called by plugin_prefs below.
@ -127,7 +126,7 @@ def plugin_stop() -> None:
return cc.on_unload() return cc.on_unload()
def plugin_prefs(parent: nb.Notebook, cmdr: str, is_beta: bool) -> Optional[tk.Frame]: def plugin_prefs(parent: nb.Notebook, cmdr: str, is_beta: bool) -> tk.Frame | None:
""" """
Handle preferences tab for the plugin. Handle preferences tab for the plugin.
@ -145,7 +144,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
return cc.on_preferences_closed(cmdr, is_beta) return cc.on_preferences_closed(cmdr, is_beta)
def plugin_app(parent: tk.Frame) -> Optional[tk.Frame]: def plugin_app(parent: tk.Frame) -> tk.Frame | None:
""" """
Set up the UI of the plugin. Set up the UI of the plugin.

View File

@ -1,10 +1,11 @@
"""Handle keyboard input for manual update triggering.""" """Handle keyboard input for manual update triggering."""
from __future__ import annotations
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import abc import abc
import sys import sys
from abc import abstractmethod from abc import abstractmethod
from typing import Optional, Tuple, Union
class AbstractHotkeyMgr(abc.ABC): class AbstractHotkeyMgr(abc.ABC):
@ -31,7 +32,7 @@ class AbstractHotkeyMgr(abc.ABC):
pass pass
@abstractmethod @abstractmethod
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: def fromevent(self, event) -> bool | tuple | None:
""" """
Return configuration (keycode, modifiers) or None=clear or False=retain previous. Return configuration (keycode, modifiers) or None=clear or False=retain previous.
@ -79,16 +80,15 @@ def get_hotkeymgr() -> AbstractHotkeyMgr:
from hotkey.darwin import MacHotkeyMgr from hotkey.darwin import MacHotkeyMgr
return MacHotkeyMgr() return MacHotkeyMgr()
elif sys.platform == 'win32': if sys.platform == 'win32':
from hotkey.windows import WindowsHotkeyMgr from hotkey.windows import WindowsHotkeyMgr
return WindowsHotkeyMgr() return WindowsHotkeyMgr()
elif sys.platform == 'linux': if sys.platform == 'linux':
from hotkey.linux import LinuxHotKeyMgr from hotkey.linux import LinuxHotKeyMgr
return LinuxHotKeyMgr() return LinuxHotKeyMgr()
else: raise ValueError(f'Unknown platform: {sys.platform}')
raise ValueError(f'Unknown platform: {sys.platform}')
# singleton # singleton

View File

@ -1,8 +1,10 @@
"""darwin/macOS implementation of hotkey.AbstractHotkeyMgr.""" """darwin/macOS implementation of hotkey.AbstractHotkeyMgr."""
from __future__ import annotations
import pathlib import pathlib
import sys import sys
import tkinter as tk import tkinter as tk
from typing import Callable, Optional, Tuple, Union from typing import Callable
assert sys.platform == 'darwin' assert sys.platform == 'darwin'
import objc import objc
@ -107,7 +109,7 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
# suppress the event by not chaining the old function # suppress the event by not chaining the old function
return the_event return the_event
elif the_event.type() in (NSKeyDown, NSKeyUp): if the_event.type() in (NSKeyDown, NSKeyUp):
c = the_event.charactersIgnoringModifiers() c = the_event.charactersIgnoringModifiers()
self.acquire_key = (c and ord(c[0]) or 0) | \ self.acquire_key = (c and ord(c[0]) or 0) | \
(the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask) (the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask)
@ -192,7 +194,7 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after(50, self._acquire_poll) self.root.after(50, self._acquire_poll)
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: def fromevent(self, event) -> bool | tuple | None:
""" """
Return configuration (keycode, modifiers) or None=clear or False=retain previous. Return configuration (keycode, modifiers) or None=clear or False=retain previous.
@ -209,17 +211,17 @@ class MacHotkeyMgr(AbstractHotkeyMgr):
return False return False
# BkSp, Del, Clear = clear hotkey # BkSp, Del, Clear = clear hotkey
elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]: if keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]:
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None return None
# don't allow keys needed for typing in System Map # don't allow keys needed for typing in System Map
elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a: if keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a:
NSBeep() NSBeep()
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None return None
return (keycode, modifiers) return keycode, modifiers
def display(self, keycode, modifiers) -> str: def display(self, keycode, modifiers) -> str:
""" """

View File

@ -1,6 +1,7 @@
"""Linux implementation of hotkey.AbstractHotkeyMgr.""" """Linux implementation of hotkey.AbstractHotkeyMgr."""
import sys from __future__ import annotations
import sys
from EDMCLogging import get_main_logger from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr from hotkey import AbstractHotkeyMgr

View File

@ -1,4 +1,6 @@
"""Windows implementation of hotkey.AbstractHotkeyMgr.""" """Windows implementation of hotkey.AbstractHotkeyMgr."""
from __future__ import annotations
import atexit import atexit
import ctypes import ctypes
import pathlib import pathlib
@ -7,8 +9,6 @@ import threading
import tkinter as tk import tkinter as tk
import winsound import winsound
from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD
from typing import Optional, Tuple, Union
from config import config from config import config
from EDMCLogging import get_main_logger from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr from hotkey import AbstractHotkeyMgr
@ -266,7 +266,7 @@ class WindowsHotkeyMgr(AbstractHotkeyMgr):
"""Stop acquiring hotkey state.""" """Stop acquiring hotkey state."""
pass pass
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001 def fromevent(self, event) -> bool | tuple | None: # noqa: CCR001
""" """
Return configuration (keycode, modifiers) or None=clear or False=retain previous. Return configuration (keycode, modifiers) or None=clear or False=retain previous.
@ -285,33 +285,32 @@ class WindowsHotkeyMgr(AbstractHotkeyMgr):
keycode = event.keycode keycode = event.keycode
if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]: if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]:
return (0, modifiers) return 0, modifiers
if not modifiers: if not modifiers:
if keycode == VK_ESCAPE: # Esc = retain previous if keycode == VK_ESCAPE: # Esc = retain previous
return False return False
elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey if keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey
return None return None
elif ( if (
keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z') keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z')
): # don't allow keys needed for typing in System Map ): # don't allow keys needed for typing in System Map
winsound.MessageBeep() winsound.MessageBeep()
return None return None
elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY] if (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY]
or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys
return (0, modifiers) return 0, modifiers
# See if the keycode is usable and available # See if the keycode is usable and available
if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode): if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode):
UnregisterHotKey(None, 2) UnregisterHotKey(None, 2)
return (keycode, modifiers) return keycode, modifiers
else: winsound.MessageBeep()
winsound.MessageBeep() return None
return None
def display(self, keycode, modifiers) -> str: def display(self, keycode, modifiers) -> str:
""" """

View File

@ -1,7 +1,11 @@
"""Monitor for new Journal files and contents of latest.""" """
# v [sic] monitor.py - Monitor for new Journal files and contents of latest.
# spell-checker: words onfoot unforseen relog fsdjump suitloadoutid slotid suitid loadoutid fauto Intimidator
# spell-checker: words joinacrew quitacrew sellshiponrebuy newbal navroute npccrewpaidwage sauto Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json import json
import pathlib import pathlib
@ -14,19 +18,16 @@ from collections import OrderedDict, defaultdict
from os import SEEK_END, SEEK_SET, listdir from os import SEEK_END, SEEK_SET, listdir
from os.path import basename, expanduser, getctime, isdir, join from os.path import basename, expanduser, getctime, isdir, join
from time import gmtime, localtime, mktime, sleep, strftime, strptime, time from time import gmtime, localtime, mktime, sleep, strftime, strptime, time
from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping, Tuple from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping
if TYPE_CHECKING:
import tkinter
import semantic_version import semantic_version
import util_ships import util_ships
from config import config from config import config
from edmc_data import edmc_suit_shortnames, edmc_suit_symbol_localised from edmc_data import edmc_suit_shortnames, edmc_suit_symbol_localised
from EDMCLogging import get_main_logger from EDMCLogging import get_main_logger
# spell-checker: words navroute if TYPE_CHECKING:
import tkinter
logger = get_main_logger() logger = get_main_logger()
STARTUP = 'journal.startup' STARTUP = 'journal.startup'
@ -76,11 +77,10 @@ else:
# Journal handler # Journal handler
class EDLogs(FileSystemEventHandler): # type: ignore # See below class EDLogs(FileSystemEventHandler):
"""Monitoring of Journal files.""" """Monitoring of Journal files."""
# Magic with FileSystemEventHandler can confuse type checkers when they do not have access to every import # Magic with FileSystemEventHandler can confuse type checkers when they do not have access to every import
_POLL = 1 # Polling is cheap, so do it often _POLL = 1 # Polling is cheap, so do it often
_RE_CANONICALISE = re.compile(r'\$(.+)_name;') _RE_CANONICALISE = re.compile(r'\$(.+)_name;')
_RE_CATEGORY = re.compile(r'\$MICRORESOURCE_CATEGORY_(.+);') _RE_CATEGORY = re.compile(r'\$MICRORESOURCE_CATEGORY_(.+);')
@ -207,7 +207,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
:return: bool - False if we couldn't access/find latest Journal file. :return: bool - False if we couldn't access/find latest Journal file.
""" """
logger.debug('Begin...') logger.debug('Begin...')
self.root = root # type: ignore self.root = root
journal_dir = config.get_str('journaldir') journal_dir = config.get_str('journaldir')
if journal_dir == '' or journal_dir is None: if journal_dir == '' or journal_dir is None:
@ -515,8 +515,6 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
else: else:
self.game_was_running = self.game_running() self.game_was_running = self.game_running()
logger.debug('Done.')
def synthesize_startup_event(self) -> dict[str, Any]: def synthesize_startup_event(self) -> dict[str, Any]:
""" """
Synthesize a 'StartUp' event to notify plugins of initial state. Synthesize a 'StartUp' event to notify plugins of initial state.
@ -570,7 +568,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try: try:
# Preserve property order because why not? # Preserve property order because why not?
entry: MutableMapping[str, Any] = json.loads(line, object_pairs_hook=OrderedDict) entry: MutableMapping[str, Any] = json.loads(line, object_pairs_hook=OrderedDict)
entry['timestamp'] # we expect this to exist # TODO: replace with assert? or an if key in check assert 'timestamp' in entry, "Timestamp does not exist in the entry"
self.__navroute_retry() self.__navroute_retry()
@ -933,7 +931,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
############################################################### ###############################################################
if 'StarPos' in entry: if 'StarPos' in entry:
# Plugins need this as well, so copy in state # Plugins need this as well, so copy in state
self.state['StarPos'] = tuple(entry['StarPos']) # type: ignore self.state['StarPos'] = tuple(entry['StarPos'])
else: else:
logger.warning(f"'{event_type}' event without 'StarPos' !!!:\n{entry}\n") logger.warning(f"'{event_type}' event without 'StarPos' !!!:\n{entry}\n")
@ -1109,7 +1107,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
while attempts < shiplocker_max_attempts: while attempts < shiplocker_max_attempts:
attempts += 1 attempts += 1
try: try:
with open(shiplocker_filename, 'rb') as h: # type: ignore with open(shiplocker_filename, 'rb') as h:
entry = json.load(h, object_pairs_hook=OrderedDict) entry = json.load(h, object_pairs_hook=OrderedDict)
self.state['ShipLockerJSON'] = entry self.state['ShipLockerJSON'] = entry
break break
@ -1551,7 +1549,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
entry = json.load(mf) entry = json.load(mf)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.exception('Failed decoding ModulesInfo.json', exc_info=True) logger.exception('Failed decoding ModulesInfo.json')
else: else:
self.state['ModuleInfo'] = entry self.state['ModuleInfo'] = entry
@ -1812,7 +1810,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
self.state['Credits'] -= entry.get('Price', 0) self.state['Credits'] -= entry.get('Price', 0)
elif event_type == 'carrierbanktransfer': elif event_type == 'carrierbanktransfer':
if (newbal := entry.get('PlayerBalance')): if newbal := entry.get('PlayerBalance'):
self.state['Credits'] = newbal self.state['Credits'] = newbal
elif event_type == 'carrierdecommission': elif event_type == 'carrierdecommission':
@ -1911,7 +1909,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return name return name
def suitloadout_store_from_event(self, entry) -> Tuple[int, int]: def suitloadout_store_from_event(self, entry) -> tuple[int, int]:
""" """
Store Suit and SuitLoadout data from a journal event. Store Suit and SuitLoadout data from a journal event.
@ -1990,64 +1988,64 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
# TODO: *This* will need refactoring and a proper validation infrastructure # TODO: *This* will need refactoring and a proper validation infrastructure
# designed for this in the future. This is a bandaid for a known issue. # designed for this in the future. This is a bandaid for a known issue.
def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001 C901 def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001
""" """
Check an `EngineerProgress` Journal event for validity. Check an `EngineerProgress` Journal event for validity.
:param entry: Journal event dict :param entry: Journal event dict
:return: True if passes validation, else False. :return: True if passes validation, else False.
""" """
# The event should have at least one of these engineers_present = 'Engineers' in entry
if 'Engineers' not in entry and 'Progress' not in entry: progress_present = 'Progress' in entry
if not (engineers_present or progress_present):
logger.warning(f"EngineerProgress has neither 'Engineers' nor 'Progress': {entry=}") logger.warning(f"EngineerProgress has neither 'Engineers' nor 'Progress': {entry=}")
return False return False
# But not both of them if engineers_present and progress_present:
if 'Engineers' in entry and 'Progress' in entry:
logger.warning(f"EngineerProgress has BOTH 'Engineers' and 'Progress': {entry=}") logger.warning(f"EngineerProgress has BOTH 'Engineers' and 'Progress': {entry=}")
return False return False
if 'Engineers' in entry: if engineers_present:
engineers = entry['Engineers']
# 'Engineers' version should have a list as value # 'Engineers' version should have a list as value
if not isinstance(entry['Engineers'], list): if not isinstance(engineers, list):
logger.warning(f"EngineerProgress 'Engineers' is not a list: {entry=}") logger.warning(f"EngineerProgress 'Engineers' is not a list: {entry=}")
return False return False
# It should have at least one entry? This might still be valid ? # It should have at least one entry? This might still be valid ?
if len(entry['Engineers']) < 1: if len(engineers) < 1:
logger.warning(f"EngineerProgress 'Engineers' list is empty ?: {entry=}") logger.warning(f"EngineerProgress 'Engineers' list is empty ?: {entry=}")
# TODO: As this might be valid, we might want to only log # TODO: As this might be valid, we might want to only log
return False return False
# And that list should have all of these keys # And that list should have all of these keys
for e in entry['Engineers']: # For some Progress there's no Rank/RankProgress yet
for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'): required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress')
if f not in e: for e in engineers:
# For some Progress there's no Rank/RankProgress yet missing_keys = [key for key in required_keys if key not in e]
if f in ('Rank', 'RankProgress'): if any(key in ('Rank', 'RankProgress') and e.get('Progress') in ('Invited', 'Known') for key in
if (progress := e.get('Progress', None)) is not None: missing_keys):
if progress in ('Invited', 'Known'): continue
continue
logger.warning(f"Engineer entry without '{f}' key: {e=} in {entry=}") if missing_keys:
return False logger.warning(f"Engineer entry without '{missing_keys[0]}' key: {e=} in {entry=}")
return False
if 'Progress' in entry: if progress_present:
# Progress is only a single Engineer, so it's not an array # Progress is only a single Engineer, so it's not an array
# { "timestamp":"2021-05-24T17:57:52Z", # { "timestamp":"2021-05-24T17:57:52Z",
# "event":"EngineerProgress", # "event":"EngineerProgress",
# "Engineer":"Felicity Farseer", # "Engineer":"Felicity Farseer",
# "EngineerID":300100, # "EngineerID":300100,
# "Progress":"Invited" } # "Progress":"Invited" }
for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'): # For some Progress there's no Rank/RankProgress yet
if f not in entry: required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress')
# For some Progress there's no Rank/RankProgress yet missing_keys = [key for key in required_keys if key not in entry]
if f in ('Rank', 'RankProgress'): if any(key in ('Rank', 'RankProgress') and entry.get('Progress') in ('Invited', 'Known') for key in
if (progress := entry.get('Progress', None)) is not None: missing_keys):
if progress in ('Invited', 'Known'): if missing_keys:
continue logger.warning(f"Progress event without '{missing_keys[0]}' key: {entry=}")
logger.warning(f"Progress event without '{f}' key: {entry=}")
return False return False
return True return True
@ -2152,7 +2150,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return True return True
elif sys.platform == 'win32': elif sys.platform == 'win32':
def WindowTitle(h): # noqa: N802 # type: ignore def WindowTitle(h): # noqa: N802
if h: if h:
length = GetWindowTextLength(h) + 1 length = GetWindowTextLength(h) + 1
buf = ctypes.create_unicode_buffer(length) buf = ctypes.create_unicode_buffer(length)
@ -2261,18 +2259,18 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
return return
ship = util_ships.ship_file_name(self.state['ShipName'], self.state['ShipType']) ship = util_ships.ship_file_name(self.state['ShipName'], self.state['ShipType'])
regexp = re.compile(re.escape(ship) + r'\.\d{4}\-\d\d\-\d\dT\d\d\.\d\d\.\d\d\.txt') regexp = re.compile(re.escape(ship) + r'\.\d{4}-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt')
oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x))) # type: ignore oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x)))
if oldfiles: if oldfiles:
try: try:
with open(join(config.get_str('outdir'), oldfiles[-1]), 'r', encoding='utf-8') as h: # type: ignore with open(join(config.get_str('outdir'), oldfiles[-1]), encoding='utf-8') as h:
if h.read() == string: if h.read() == string:
return # same as last time - don't write return # same as last time - don't write
except UnicodeError: except UnicodeError:
logger.exception("UnicodeError reading old ship loadout with utf-8 encoding, trying without...") logger.exception("UnicodeError reading old ship loadout with utf-8 encoding, trying without...")
try: try:
with open(join(config.get_str('outdir'), oldfiles[-1]), 'r') as h: # type: ignore with open(join(config.get_str('outdir'), oldfiles[-1])) as h:
if h.read() == string: if h.read() == string:
return # same as last time - don't write return # same as last time - don't write
@ -2291,9 +2289,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
# Write # Write
ts = strftime('%Y-%m-%dT%H.%M.%S', localtime(time())) ts = strftime('%Y-%m-%dT%H.%M.%S', localtime(time()))
filename = join( # type: ignore filename = join(config.get_str('outdir'), f'{ship}.{ts}.txt')
config.get_str('outdir'), f'{ship}.{ts}.txt'
)
try: try:
with open(filename, 'wt', encoding='utf-8') as h: with open(filename, 'wt', encoding='utf-8') as h:
@ -2380,7 +2376,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try: try:
with open(join(self.currentdir, 'NavRoute.json'), 'r') as f: with open(join(self.currentdir, 'NavRoute.json')) as f:
raw = f.read() raw = f.read()
except Exception as e: except Exception as e:
@ -2391,7 +2387,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
data = json.loads(raw) data = json.loads(raw)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.exception('Failed to decode NavRoute.json', exc_info=True) logger.exception('Failed to decode NavRoute.json')
return None return None
if 'timestamp' not in data: # quick sanity check if 'timestamp' not in data: # quick sanity check
@ -2406,7 +2402,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
try: try:
with open(join(self.currentdir, 'FCMaterials.json'), 'r') as f: with open(join(self.currentdir, 'FCMaterials.json')) as f:
raw = f.read() raw = f.read()
except Exception as e: except Exception as e:
@ -2417,7 +2413,7 @@ class EDLogs(FileSystemEventHandler): # type: ignore # See below
data = json.loads(raw) data = json.loads(raw)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.exception('Failed to decode FCMaterials.json', exc_info=True) logger.exception('Failed to decode FCMaterials.json')
return None return None
if 'timestamp' not in data: # quick sanity check if 'timestamp' not in data: # quick sanity check

View File

@ -10,7 +10,6 @@ from __future__ import annotations
import json import json
from collections import OrderedDict from collections import OrderedDict
from typing import OrderedDict as OrderedDictT from typing import OrderedDict as OrderedDictT
from config import config from config import config
from edmc_data import ( from edmc_data import (
outfitting_armour_map as armour_map, outfitting_armour_map as armour_map,

134
plug.py
View File

@ -1,4 +1,12 @@
"""Plugin API.""" """
plug.py - Plugin API.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import copy import copy
import importlib import importlib
import logging import logging
@ -6,9 +14,8 @@ import operator
import os import os
import sys import sys
import tkinter as tk import tkinter as tk
from builtins import object, str
from tkinter import ttk from tkinter import ttk
from typing import Any, Callable, List, Mapping, MutableMapping, Optional from typing import Any, Mapping, MutableMapping
import companion import companion
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
@ -26,7 +33,7 @@ PLUGINS_not_py3 = []
class LastError: class LastError:
"""Holds the last plugin error.""" """Holds the last plugin error."""
msg: Optional[str] msg: str | None
root: tk.Tk root: tk.Tk
def __init__(self) -> None: def __init__(self) -> None:
@ -36,10 +43,10 @@ class LastError:
last_error = LastError() last_error = LastError()
class Plugin(object): class Plugin:
"""An EDMC plugin.""" """An EDMC plugin."""
def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[logging.Logger]): def __init__(self, name: str, loadfile: str | None, plugin_logger: logging.Logger | None):
""" """
Load a single plugin. Load a single plugin.
@ -49,9 +56,9 @@ class Plugin(object):
:raises Exception: Typically ImportError or OSError :raises Exception: Typically ImportError or OSError
""" """
self.name: str = name # Display name. self.name: str = name # Display name.
self.folder: Optional[str] = name # basename of plugin folder. None for internal plugins. self.folder: str | None = name # basename of plugin folder. None for internal plugins.
self.module = None # None for disabled plugins. self.module = None # None for disabled plugins.
self.logger: Optional[logging.Logger] = plugin_logger self.logger: logging.Logger | None = plugin_logger
if loadfile: if loadfile:
logger.info(f'loading plugin "{name.replace(".", "_")}" from "{loadfile}"') logger.info(f'loading plugin "{name.replace(".", "_")}" from "{loadfile}"')
@ -64,7 +71,7 @@ class Plugin(object):
).load_module() ).load_module()
if getattr(module, 'plugin_start3', None): if getattr(module, 'plugin_start3', None):
newname = module.plugin_start3(os.path.dirname(loadfile)) newname = module.plugin_start3(os.path.dirname(loadfile))
self.name = newname and str(newname) or name self.name = str(newname) if newname else self.name
self.module = module self.module = module
elif getattr(module, 'plugin_start', None): elif getattr(module, 'plugin_start', None):
logger.warning(f'plugin {name} needs migrating\n') logger.warning(f'plugin {name} needs migrating\n')
@ -77,7 +84,7 @@ class Plugin(object):
else: else:
logger.info(f'plugin {name} disabled') logger.info(f'plugin {name} disabled')
def _get_func(self, funcname: str) -> Optional[Callable]: def _get_func(self, funcname: str):
""" """
Get a function from a plugin. Get a function from a plugin.
@ -86,7 +93,7 @@ class Plugin(object):
""" """
return getattr(self.module, funcname, None) return getattr(self.module, funcname, None)
def get_app(self, parent: tk.Frame) -> Optional[tk.Frame]: def get_app(self, parent: tk.Frame) -> tk.Frame | None:
""" """
If the plugin provides mainwindow content create and return it. If the plugin provides mainwindow content create and return it.
@ -100,7 +107,7 @@ class Plugin(object):
if appitem is None: if appitem is None:
return None return None
elif isinstance(appitem, tuple): if isinstance(appitem, tuple):
if ( if (
len(appitem) != 2 len(appitem) != 2
or not isinstance(appitem[0], tk.Widget) or not isinstance(appitem[0], tk.Widget)
@ -118,7 +125,7 @@ class Plugin(object):
return None return None
def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Optional[tk.Frame]: def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame | None:
""" """
If the plugin provides a prefs frame, create and return it. If the plugin provides a prefs frame, create and return it.
@ -132,41 +139,50 @@ class Plugin(object):
if plugin_prefs: if plugin_prefs:
try: try:
frame = plugin_prefs(parent, cmdr, is_beta) frame = plugin_prefs(parent, cmdr, is_beta)
if not isinstance(frame, nb.Frame): if isinstance(frame, nb.Frame):
raise AssertionError return frame
return frame raise AssertionError
except Exception: except Exception:
logger.exception(f'Failed for Plugin "{self.name}"') logger.exception(f'Failed for Plugin "{self.name}"')
return None return None
def load_plugins(master: tk.Tk) -> None: # noqa: CCR001 def load_plugins(master: tk.Tk) -> None:
"""Find and load all plugins.""" """Find and load all plugins."""
last_error.root = master last_error.root = master
internal = [] internal = _load_internal_plugins()
for name in sorted(os.listdir(config.internal_plugin_dir_path)):
if name.endswith('.py') and not name[0] in ['.', '_']:
try:
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir_path, name), logger)
plugin.folder = None # Suppress listing in Plugins prefs tab
internal.append(plugin)
except Exception:
logger.exception(f'Failure loading internal Plugin "{name}"')
PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower())) PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower()))
# Add plugin folder to load path so packages can be loaded from plugin folder # Add plugin folder to load path so packages can be loaded from plugin folder
sys.path.append(config.plugin_dir) sys.path.append(config.plugin_dir)
found = _load_found_plugins()
PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower()))
def _load_internal_plugins():
internal = []
for name in sorted(os.listdir(config.internal_plugin_dir_path)):
if name.endswith('.py') and name[0] not in ['.', '_']:
try:
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir_path, name), logger)
plugin.folder = None
internal.append(plugin)
except Exception:
logger.exception(f'Failure loading internal Plugin "{name}"')
return internal
def _load_found_plugins():
found = [] found = []
# Load any plugins that are also packages first, but note it's *still* # Load any plugins that are also packages first, but note it's *still*
# 100% relying on there being a `load.py`, as only that will be loaded. # 100% relying on there being a `load.py`, as only that will be loaded.
# The intent here is to e.g. have EDMC-Overlay load before any plugins # The intent here is to e.g. have EDMC-Overlay load before any plugins
# that depend on it. # that depend on it.
for name in sorted(
os.listdir(config.plugin_dir_path), for name in sorted(os.listdir(config.plugin_dir_path), key=lambda n: (
key=lambda n: (not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower()) not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower())):
):
if not os.path.isdir(os.path.join(config.plugin_dir_path, name)) or name[0] in ['.', '_']: if not os.path.isdir(os.path.join(config.plugin_dir_path, name)) or name[0] in ['.', '_']:
pass pass
elif name.endswith('.disabled'): elif name.endswith('.disabled'):
@ -177,19 +193,17 @@ def load_plugins(master: tk.Tk) -> None: # noqa: CCR001
# Add plugin's folder to load path in case plugin has internal package dependencies # Add plugin's folder to load path in case plugin has internal package dependencies
sys.path.append(os.path.join(config.plugin_dir_path, name)) sys.path.append(os.path.join(config.plugin_dir_path, name))
# Create a logger for this 'found' plugin. Must be before the
# load.py is loaded.
import EDMCLogging import EDMCLogging
# Create a logger for this 'found' plugin. Must be before the load.py is loaded.
plugin_logger = EDMCLogging.get_plugin_logger(name) plugin_logger = EDMCLogging.get_plugin_logger(name)
found.append(Plugin(name, os.path.join(config.plugin_dir_path, name, 'load.py'), plugin_logger)) found.append(Plugin(name, os.path.join(config.plugin_dir_path, name, 'load.py'), plugin_logger))
except Exception: except Exception:
logger.exception(f'Failure loading found Plugin "{name}"') logger.exception(f'Failure loading found Plugin "{name}"')
pass pass
PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower())) return found
def provides(fn_name: str) -> List[str]: def provides(fn_name: str) -> list[str]:
""" """
Find plugins that provide a function. Find plugins that provide a function.
@ -202,7 +216,7 @@ def provides(fn_name: str) -> List[str]:
def invoke( def invoke(
plugin_name: str, fallback: str | None, fn_name: str, *args: Any plugin_name: str, fallback: str | None, fn_name: str, *args: Any
) -> Optional[str]: ) -> str | None:
""" """
Invoke a function on a named plugin. Invoke a function on a named plugin.
@ -228,7 +242,7 @@ def invoke(
return None return None
def notify_stop() -> Optional[str]: def notify_stop() -> str | None:
""" """
Notify each plugin that the program is closing. Notify each plugin that the program is closing.
@ -251,6 +265,16 @@ def notify_stop() -> Optional[str]:
return error return error
def _notify_prefs_plugins(fn_name: str, cmdr: str | None, is_beta: bool) -> None:
for plugin in PLUGINS:
prefs_callback = plugin._get_func(fn_name)
if prefs_callback:
try:
prefs_callback(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None:
""" """
Notify plugins that the Cmdr was changed while the settings dialog is open. Notify plugins that the Cmdr was changed while the settings dialog is open.
@ -259,13 +283,7 @@ def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None:
:param cmdr: current Cmdr name (or None). :param cmdr: current Cmdr name (or None).
:param is_beta: whether the player is in a Beta universe. :param is_beta: whether the player is in a Beta universe.
""" """
for plugin in PLUGINS: _notify_prefs_plugins("prefs_cmdr_changed", cmdr, is_beta)
prefs_cmdr_changed = plugin._get_func('prefs_cmdr_changed')
if prefs_cmdr_changed:
try:
prefs_cmdr_changed(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None: def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None:
@ -278,20 +296,14 @@ def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None:
:param cmdr: current Cmdr name (or None). :param cmdr: current Cmdr name (or None).
:param is_beta: whether the player is in a Beta universe. :param is_beta: whether the player is in a Beta universe.
""" """
for plugin in PLUGINS: _notify_prefs_plugins("prefs_changed", cmdr, is_beta)
prefs_changed = plugin._get_func('prefs_changed')
if prefs_changed:
try:
prefs_changed(cmdr, is_beta)
except Exception:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_journal_entry( def notify_journal_entry(
cmdr: str, is_beta: bool, system: str | None, station: str | None, cmdr: str, is_beta: bool, system: str | None, station: str | None,
entry: MutableMapping[str, Any], entry: MutableMapping[str, Any],
state: Mapping[str, Any] state: Mapping[str, Any]
) -> Optional[str]: ) -> str | None:
""" """
Send a journal entry to each plugin. Send a journal entry to each plugin.
@ -303,7 +315,7 @@ def notify_journal_entry(
:param is_beta: whether the player is in a Beta universe. :param is_beta: whether the player is in a Beta universe.
:returns: Error message from the first plugin that returns one (if any) :returns: Error message from the first plugin that returns one (if any)
""" """
if entry['event'] in ('Location'): if entry['event'] in 'Location':
logger.trace_if('journal.locations', 'Notifying plugins of "Location" event') logger.trace_if('journal.locations', 'Notifying plugins of "Location" event')
error = None error = None
@ -323,7 +335,7 @@ def notify_journal_entry_cqc(
cmdr: str, is_beta: bool, cmdr: str, is_beta: bool,
entry: MutableMapping[str, Any], entry: MutableMapping[str, Any],
state: Mapping[str, Any] state: Mapping[str, Any]
) -> Optional[str]: ) -> str | None:
""" """
Send an in-CQC journal entry to each plugin. Send an in-CQC journal entry to each plugin.
@ -348,10 +360,7 @@ def notify_journal_entry_cqc(
return error return error
def notify_dashboard_entry( def notify_dashboard_entry(cmdr: str, is_beta: bool, entry: MutableMapping[str, Any],) -> str | None:
cmdr: str, is_beta: bool,
entry: MutableMapping[str, Any],
) -> Optional[str]:
""" """
Send a status entry to each plugin. Send a status entry to each plugin.
@ -373,10 +382,7 @@ def notify_dashboard_entry(
return error return error
def notify_capidata( def notify_capidata(data: companion.CAPIData, is_beta: bool) -> str | None:
data: companion.CAPIData,
is_beta: bool
) -> Optional[str]:
""" """
Send the latest EDMC data from the FD servers to each plugin. Send the latest EDMC data from the FD servers to each plugin.
@ -404,9 +410,7 @@ def notify_capidata(
return error return error
def notify_capi_fleetcarrierdata( def notify_capi_fleetcarrierdata(data: companion.CAPIData) -> str | None:
data: companion.CAPIData
) -> str | None:
""" """
Send the latest CAPI Fleetcarrier data from the FD servers to each plugin. Send the latest CAPI Fleetcarrier data from the FD servers to each plugin.

View File

@ -41,7 +41,6 @@ from typing import (
MutableMapping, MutableMapping,
) )
from typing import OrderedDict as OrderedDictT from typing import OrderedDict as OrderedDictT
from typing import Tuple, Union
import requests import requests
import companion import companion
import edmc_data import edmc_data
@ -100,8 +99,8 @@ class This:
# Avoid duplicates # Avoid duplicates
self.marketId: str | None = None self.marketId: str | None = None
self.commodities: list[OrderedDictT[str, Any]] | None = None self.commodities: list[OrderedDictT[str, Any]] | None = None
self.outfitting: Tuple[bool, list[str]] | None = None self.outfitting: tuple[bool, list[str]] | None = None
self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None self.shipyard: tuple[bool, list[Mapping[str, Any]]] | None = None
self.fcmaterials_marketid: int = 0 self.fcmaterials_marketid: int = 0
self.fcmaterials: list[OrderedDictT[str, Any]] | None = None self.fcmaterials: list[OrderedDictT[str, Any]] | None = None
self.fcmaterials_capi_marketid: int = 0 self.fcmaterials_capi_marketid: int = 0
@ -714,7 +713,7 @@ class EDDN:
# Send any FCMaterials.json-equivalent 'orders' as well # Send any FCMaterials.json-equivalent 'orders' as well
self.export_capi_fcmaterials(data, is_beta, horizons) self.export_capi_fcmaterials(data, is_beta, horizons)
def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: def safe_modules_and_ships(self, data: Mapping[str, Any]) -> tuple[dict, dict]:
""" """
Produce a sanity-checked version of ships and modules from CAPI data. Produce a sanity-checked version of ships and modules from CAPI data.
@ -1091,7 +1090,7 @@ class EDDN:
entry: MutableMapping[str, Any], entry: MutableMapping[str, Any],
system_name: str, system_name: str,
system_coordinates: list system_coordinates: list
) -> Union[str, MutableMapping[str, Any]]: ) -> str | MutableMapping[str, Any]:
""" """
Augment a journal entry with necessary system data. Augment a journal entry with necessary system data.

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""EDMC preferences library.""" """EDMC preferences library."""
from __future__ import annotations
import contextlib import contextlib
import logging import logging
@ -13,7 +14,7 @@ from os.path import expanduser, expandvars, join, normpath
from tkinter import colorchooser as tkColorChooser # type: ignore # noqa: N812 from tkinter import colorchooser as tkColorChooser # type: ignore # noqa: N812
from tkinter import ttk from tkinter import ttk
from types import TracebackType from types import TracebackType
from typing import TYPE_CHECKING, Any, Callable, Optional, Type, Union from typing import TYPE_CHECKING, Any, Callable, Optional, Type
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
import plug import plug
@ -276,7 +277,7 @@ class PreferencesDialog(tk.Toplevel):
self.resizable(tk.FALSE, tk.FALSE) self.resizable(tk.FALSE, tk.FALSE)
self.cmdr: Union[str, bool, None] = False # Note if Cmdr changes in the Journal self.cmdr: str | bool | None = False # Note if Cmdr changes in the Journal
self.is_beta: bool = False # Note if Beta status changes in the Journal self.is_beta: bool = False # Note if Beta status changes in the Journal
self.cmdrchanged_alarm: Optional[str] = None # This stores an ID that can be used to cancel a scheduled call self.cmdrchanged_alarm: Optional[str] = None # This stores an ID that can be used to cancel a scheduled call

View File

@ -1,12 +1,16 @@
"""CMDR Status information.""" """
stats.py - CMDR Status Information.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
import csv import csv
import json import json
import sys import sys
import tkinter
import tkinter as tk import tkinter as tk
from tkinter import ttk from tkinter import ttk
from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast
import companion import companion
import EDMCLogging import EDMCLogging
import myNotebook as nb # noqa: N813 import myNotebook as nb # noqa: N813
@ -488,11 +492,11 @@ class StatsResults(tk.Toplevel):
:param align: The alignment of the data, defaults to tk.W :param align: The alignment of the data, defaults to tk.W
""" """
row = -1 # To silence unbound warnings row = -1 # To silence unbound warnings
for i in range(len(content)): for i, col_content in enumerate(content):
# label = HyperlinkLabel(parent, text=content[i], popup_copy=True) # label = HyperlinkLabel(parent, text=col_content, popup_copy=True)
label = nb.Label(parent, text=content[i]) label = nb.Label(parent, text=col_content)
if with_copy: if with_copy:
label.bind('<Button-1>', self.copy_callback(label, content[i])) label.bind('<Button-1>', self.copy_callback(label, col_content))
if i == 0: if i == 0:
label.grid(padx=10, sticky=tk.W) label.grid(padx=10, sticky=tk.W)
@ -512,7 +516,7 @@ class StatsResults(tk.Toplevel):
@staticmethod @staticmethod
def copy_callback(label: tk.Label, text_to_copy: str) -> Callable[..., None]: def copy_callback(label: tk.Label, text_to_copy: str) -> Callable[..., None]:
"""Copy data in Label to clipboard.""" """Copy data in Label to clipboard."""
def do_copy(event: tkinter.Event) -> None: def do_copy(event: tk.Event) -> None:
label.clipboard_clear() label.clipboard_clear()
label.clipboard_append(text_to_copy) label.clipboard_append(text_to_copy)
old_bg = label['bg'] old_bg = label['bg']