diff --git a/.gitignore b/.gitignore index 35fa7565..4283fcb0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,18 @@ +# Ignore version file .gitversion + +# Ignore macOS DS_Store files .DS_Store + +# Ignore build artifacts build -ChangeLog.html +dist.win32/ dist.* + +# Ignore generated ChangeLog.html file +ChangeLog.html + +# Ignore files dump *.bak *.pyc @@ -11,20 +21,37 @@ dump *.pdb *.msi *.wixobj +*.zip + +# Ignore Update Things EDMarketConnector_Installer_*.exe appcast_win_*.xml appcast_mac_*.xml -EDMarketConnector.VisualElementsManifest.xml -*.zip EDMC_Installer_Config.iss +EDMarketConnector.wxs +wix/components.wxs +# Ignore Visual Elements Manifest file for Windows +EDMarketConnector.VisualElementsManifest.xml + +# Ignore IDE and editor configuration files .idea .vscode + +# Ignore virtual environments .venv/ venv/ +venv2 + +# Ignore workspace file for Visual Studio Code *.code-workspace + +# Ignore coverage reports htmlcov/ .ignored .coverage -EDMarketConnector.wxs -wix/components.wxs +pylintrc +pylint.txt + +# Ignore Submodule data directory +coriolis-data/ diff --git a/config/__init__.py b/config/__init__.py index a31cdd89..ca964274 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,11 +1,15 @@ """ -Code dealing with the configuration of the program. +__init__.py - Code dealing with the configuration of the program. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. Windows uses the Registry to store values in a flat manner. Linux uses a file, but for commonality it's still a flat data structure. macOS uses a 'defaults' object. """ - +from __future__ import annotations __all__ = [ # defined in the order they appear in the file @@ -40,10 +44,8 @@ import sys import traceback import warnings from abc import abstractmethod -from typing import Any, Callable, Optional, Type, TypeVar - +from typing import Any, Callable, Type, TypeVar import semantic_version - from constants import GITVERSION_FILE, applongname, appname # Any of these may be imported by plugins @@ -52,9 +54,9 @@ appcmdname = 'EDMC' # # Major.Minor.Patch(-prerelease)(+buildmetadata) # NB: Do *not* import this, use the functions appversion() and appversion_nobuild() -_static_appversion = '5.9.5' +_static_appversion = '5.10.0-alpha0' -_cached_version: Optional[semantic_version.Version] = None +_cached_version: semantic_version.Version | None = None copyright = '© 2015-2019 Jonathan Harris, 2020-2023 EDCD' update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml' @@ -66,7 +68,7 @@ debug_senders: list[str] = [] trace_on: list[str] = [] capi_pretend_down: bool = False -capi_debug_access_token: Optional[str] = None +capi_debug_access_token: str | None = None # This must be done here in order to avoid an import cycle with EDMCLogging. # Other code should use EDMCLogging.get_main_logger if os.getenv("EDMC_NO_UI"): @@ -79,7 +81,6 @@ else: _T = TypeVar('_T') -########################################################################### def git_shorthash_from_head() -> str: """ Determine short hash for current git HEAD. @@ -91,13 +92,14 @@ def git_shorthash_from_head() -> str: shorthash: str = None # type: ignore try: - git_cmd = subprocess.Popen('git rev-parse --short HEAD'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT - ) + git_cmd = subprocess.Popen( + "git rev-parse --short HEAD".split(), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) out, err = git_cmd.communicate() - except Exception as e: + except subprocess.CalledProcessError as e: logger.info(f"Couldn't run git command for short hash: {e!r}") else: @@ -131,7 +133,7 @@ def appversion() -> semantic_version.Version: if getattr(sys, 'frozen', False): # Running frozen, so we should have a .gitversion file # Yes, .parent because if frozen we're inside library.zip - with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, 'r', encoding='utf-8') as gitv: + with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, encoding='utf-8') as gitv: shorthash = gitv.read() else: @@ -157,11 +159,14 @@ def appversion_nobuild() -> semantic_version.Version: :return: App version without any build meta data. """ return appversion().truncate('prerelease') -########################################################################### class AbstractConfig(abc.ABC): - """Abstract root class of all platform specific Config implementations.""" + """ + Abstract root class of all platform specific Config implementations. + + Commented lines are no longer supported or replaced. + """ OUT_EDDN_SEND_STATION_DATA = 1 # OUT_MKT_BPC = 2 # No longer supported @@ -185,7 +190,6 @@ class AbstractConfig(abc.ABC): respath_path: pathlib.Path home_path: pathlib.Path default_journal_dir_path: pathlib.Path - identifier: str __in_shutdown = False # Is the application currently shutting down ? @@ -241,7 +245,7 @@ class AbstractConfig(abc.ABC): self.__eddn_url = eddn_url @property - def eddn_url(self) -> Optional[str]: + def eddn_url(self) -> str | None: """ Provide the custom EDDN URL. @@ -296,14 +300,14 @@ class AbstractConfig(abc.ABC): def _suppress_call( func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception, *args: Any, **kwargs: Any - ) -> Optional[_T]: + ) -> _T | None: if exceptions is None: exceptions = [Exception] if not isinstance(exceptions, list): exceptions = [exceptions] - with contextlib.suppress(*exceptions): # type: ignore # it works fine, mypy + with contextlib.suppress(*exceptions): return func(*args, **kwargs) return None @@ -326,16 +330,16 @@ class AbstractConfig(abc.ABC): if (a_list := self._suppress_call(self.get_list, ValueError, key, default=None)) is not None: return a_list - elif (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: + if (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: return a_str - elif (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: + if (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: return a_bool - elif (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: + if (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: return an_int - return default # type: ignore + return default @abstractmethod def get_list(self, key: str, *, default: list | None = None) -> list: @@ -462,16 +466,15 @@ def get_config(*args, **kwargs) -> AbstractConfig: from .darwin import MacConfig return MacConfig(*args, **kwargs) - elif sys.platform == "win32": # pragma: sys-platform-win32 + if sys.platform == "win32": # pragma: sys-platform-win32 from .windows import WinConfig return WinConfig(*args, **kwargs) - elif sys.platform == "linux": # pragma: sys-platform-linux + if sys.platform == "linux": # pragma: sys-platform-linux from .linux import LinuxConfig return LinuxConfig(*args, **kwargs) - else: # pragma: sys-platform-not-known - raise ValueError(f'Unknown platform: {sys.platform=}') + raise ValueError(f'Unknown platform: {sys.platform=}') config = get_config() diff --git a/config/darwin.py b/config/darwin.py index 895218a8..9c15ec32 100644 --- a/config/darwin.py +++ b/config/darwin.py @@ -1,13 +1,19 @@ -"""Darwin/macOS implementation of AbstractConfig.""" +""" +darwin.py - Darwin/macOS implementation of AbstractConfig. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations + import pathlib import sys -from typing import Any, Dict, List, Union - +from typing import Any from Foundation import ( # type: ignore NSApplicationSupportDirectory, NSBundle, NSDocumentDirectory, NSSearchPathForDirectoriesInDomains, NSUserDefaults, NSUserDomainMask ) - from config import AbstractConfig, appname, logger assert sys.platform == 'darwin' @@ -48,14 +54,14 @@ class MacConfig(AbstractConfig): self.default_journal_dir_path = support_path / 'Frontier Developments' / 'Elite Dangerous' self._defaults: Any = NSUserDefaults.standardUserDefaults() - self._settings: Dict[str, Union[int, str, list]] = dict( + self._settings: dict[str, int | str | list] = dict( self._defaults.persistentDomainForName_(self.identifier) or {} ) # make writeable if (out_dir := self.get_str('out_dir')) is None or not pathlib.Path(out_dir).exists(): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def __raw_get(self, key: str) -> Union[None, list, str, int]: + def __raw_get(self, key: str) -> None | list | str | int: """ Retrieve the raw data for the given key. @@ -82,7 +88,7 @@ class MacConfig(AbstractConfig): """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default if not isinstance(res, str): raise ValueError(f'unexpected data returned from __raw_get: {type(res)=} {res}') @@ -97,9 +103,9 @@ class MacConfig(AbstractConfig): """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res @@ -114,7 +120,7 @@ class MacConfig(AbstractConfig): if res is None: return default - elif not isinstance(res, (str, int)): + if not isinstance(res, (str, int)): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') try: @@ -122,7 +128,7 @@ class MacConfig(AbstractConfig): except ValueError as e: logger.error(f'__raw_get returned {res!r} which cannot be parsed to an int: {e}') - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default def get_bool(self, key: str, *, default: bool = None) -> bool: """ @@ -132,14 +138,14 @@ class MacConfig(AbstractConfig): """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, bool): + if not isinstance(res, bool): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. diff --git a/config/linux.py b/config/linux.py index 5d543d3f..73100800 100644 --- a/config/linux.py +++ b/config/linux.py @@ -1,9 +1,14 @@ -"""Linux config implementation.""" +""" +linux.py - Linux config implementation. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import os import pathlib import sys from configparser import ConfigParser - from config import AbstractConfig, appname, logger assert sys.platform == 'linux' diff --git a/config/windows.py b/config/windows.py index f7590a92..7fb53a37 100644 --- a/config/windows.py +++ b/config/windows.py @@ -1,6 +1,12 @@ -"""Windows config implementation.""" +""" +windows.py - Windows config implementation. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations -# spell-checker: words folderid deps hkey edcd import ctypes import functools import pathlib @@ -8,8 +14,7 @@ import sys import uuid import winreg from ctypes.wintypes import DWORD, HANDLE -from typing import List, Literal, Optional, Union - +from typing import Literal from config import AbstractConfig, applongname, appname, logger, update_interval assert sys.platform == 'win32' @@ -29,7 +34,7 @@ CoTaskMemFree = ctypes.windll.ole32.CoTaskMemFree CoTaskMemFree.argtypes = [ctypes.c_void_p] -def known_folder_path(guid: uuid.UUID) -> Optional[str]: +def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -43,7 +48,8 @@ class WinConfig(AbstractConfig): """Implementation of AbstractConfig for Windows.""" def __init__(self, do_winsparkle=True) -> None: - self.app_dir_path = pathlib.Path(str(known_folder_path(FOLDERID_LocalAppData))) / appname + super().__init__() + self.app_dir_path = pathlib.Path(known_folder_path(FOLDERID_LocalAppData)) / appname # type: ignore self.app_dir_path.mkdir(exist_ok=True) self.plugin_dir_path = self.app_dir_path / 'plugins' @@ -52,19 +58,17 @@ class WinConfig(AbstractConfig): if getattr(sys, 'frozen', False): self.respath_path = pathlib.Path(sys.executable).parent self.internal_plugin_dir_path = self.respath_path / 'plugins' - else: self.respath_path = pathlib.Path(__file__).parent.parent self.internal_plugin_dir_path = self.respath_path / 'plugins' self.home_path = pathlib.Path.home() - journal_dir_str = known_folder_path(FOLDERID_SavedGames) - journaldir = pathlib.Path(journal_dir_str) if journal_dir_str is not None else None - self.default_journal_dir_path = None # type: ignore - if journaldir is not None: - self.default_journal_dir_path = journaldir / 'Frontier Developments' / 'Elite Dangerous' + journal_dir_path = pathlib.Path( + known_folder_path(FOLDERID_SavedGames)) / 'Frontier Developments' / 'Elite Dangerous' # type: ignore + self.default_journal_dir_path = journal_dir_path if journal_dir_path.is_dir() else None # type: ignore + REGISTRY_SUBKEY = r'Software\Marginal\EDMarketConnector' # noqa: N806 create_key_defaults = functools.partial( winreg.CreateKeyEx, key=winreg.HKEY_CURRENT_USER, @@ -72,20 +76,18 @@ class WinConfig(AbstractConfig): ) try: - self.__reg_handle: winreg.HKEYType = create_key_defaults( - sub_key=r'Software\Marginal\EDMarketConnector' - ) + self.__reg_handle: winreg.HKEYType = create_key_defaults(sub_key=REGISTRY_SUBKEY) if do_winsparkle: self.__setup_winsparkle() except OSError: - logger.exception('could not create required registry keys') + logger.exception('Could not create required registry keys') raise self.identifier = applongname if (outdir_str := self.get_str('outdir')) is None or not pathlib.Path(outdir_str).is_dir(): docs = known_folder_path(FOLDERID_Documents) - self.set('outdir', docs if docs is not None else self.home) + self.set("outdir", docs if docs is not None else self.home) def __setup_winsparkle(self): """Ensure the necessary Registry keys for WinSparkle are present.""" @@ -94,32 +96,30 @@ class WinConfig(AbstractConfig): key=winreg.HKEY_CURRENT_USER, access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY, ) - try: - edcd_handle: winreg.HKEYType = create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') - winsparkle_reg: winreg.HKEYType = winreg.CreateKeyEx( - edcd_handle, sub_key='WinSparkle', access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY - ) + try: + with create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') as edcd_handle: + with winreg.CreateKeyEx(edcd_handle, sub_key='WinSparkle', + access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY) as winsparkle_reg: + # Set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings + UPDATE_INTERVAL_NAME = 'UpdateInterval' # noqa: N806 + CHECK_FOR_UPDATES_NAME = 'CheckForUpdates' # noqa: N806 + REG_SZ = winreg.REG_SZ # noqa: N806 + + winreg.SetValueEx(winsparkle_reg, UPDATE_INTERVAL_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + str(update_interval)) + + try: + winreg.QueryValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME) + except FileNotFoundError: + # Key doesn't exist, set it to a default + winreg.SetValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + '1') except OSError: - logger.exception('could not open WinSparkle handle') + logger.exception('Could not open WinSparkle handle') raise - # set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings - winreg.SetValueEx( - winsparkle_reg, 'UpdateInterval', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, str(update_interval) - ) - - try: - winreg.QueryValueEx(winsparkle_reg, 'CheckForUpdates') - - except FileNotFoundError: - # Key doesn't exist, set it to a default - winreg.SetValueEx(winsparkle_reg, 'CheckForUpdates', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, '1') - - winsparkle_reg.Close() - edcd_handle.Close() - - def __get_regentry(self, key: str) -> Union[None, list, str, int]: + def __get_regentry(self, key: str) -> None | list | str | int: """Access the Registry for the raw entry.""" try: value, _type = winreg.QueryValueEx(self.__reg_handle, key) @@ -127,20 +127,18 @@ class WinConfig(AbstractConfig): # Key doesn't exist return None - # The type returned is actually as we'd expect for each of these. The casts are here for type checkers and # For programmers who want to actually know what is going on if _type == winreg.REG_SZ: return str(value) - elif _type == winreg.REG_DWORD: + if _type == winreg.REG_DWORD: return int(value) - elif _type == winreg.REG_MULTI_SZ: + if _type == winreg.REG_MULTI_SZ: return list(value) - else: - logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}') - return None + logger.warning(f'Registry key {key=} returned unknown type {_type=} {value=}') + return None def get_str(self, key: str, *, default: str | None = None) -> str: """ @@ -152,7 +150,7 @@ class WinConfig(AbstractConfig): if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, str): + if not isinstance(res, str): raise ValueError(f'Data from registry is not a string: {type(res)=} {res=}') return res @@ -167,7 +165,7 @@ class WinConfig(AbstractConfig): if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'Data from registry is not a list: {type(res)=} {res}') return res @@ -195,11 +193,11 @@ class WinConfig(AbstractConfig): """ res = self.get_int(key, default=default) # type: ignore if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default return bool(res) - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. @@ -209,9 +207,8 @@ class WinConfig(AbstractConfig): reg_type: Literal[1] | Literal[4] | Literal[7] if isinstance(val, str): reg_type = winreg.REG_SZ - winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val) - elif isinstance(val, int): # The original code checked for numbers.Integral, I dont think that is needed. + elif isinstance(val, int): reg_type = winreg.REG_DWORD elif isinstance(val, list): @@ -224,7 +221,6 @@ class WinConfig(AbstractConfig): else: raise ValueError(f'Unexpected type for value {type(val)=}') - # Its complaining about the list, it works, tested on windows, ignored. winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, reg_type, val) # type: ignore def delete(self, key: str, *, suppress=False) -> None: diff --git a/plugins/coriolis.py b/plugins/coriolis.py index 7161a4e0..283b49d8 100644 --- a/plugins/coriolis.py +++ b/plugins/coriolis.py @@ -19,6 +19,7 @@ referenced in this file (or only in any other core plugin), and if so... `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import base64 import gzip @@ -26,7 +27,7 @@ import io import json import tkinter as tk from tkinter import ttk -from typing import TYPE_CHECKING, Union, Optional +from typing import TYPE_CHECKING import myNotebook as nb # noqa: N813 # its not my fault. from EDMCLogging import get_main_logger from plug import show_error @@ -80,7 +81,7 @@ def plugin_start3(path: str) -> str: return 'Coriolis' -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """Set up plugin preferences.""" PADX = 10 # noqa: N806 @@ -130,7 +131,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return conf_frame -def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None: +def prefs_changed(cmdr: str | None, is_beta: bool) -> None: """ Update URLs and override mode based on user preferences. @@ -175,7 +176,7 @@ def _get_target_url(is_beta: bool) -> str: return coriolis_config.normal_url -def shipyard_url(loadout, is_beta) -> Union[str, bool]: +def shipyard_url(loadout, is_beta) -> str | bool: """Return a URL for the current ship.""" # most compact representation string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') diff --git a/plugins/eddn.py b/plugins/eddn.py index d2e7d01e..ee70d6e9 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -18,6 +18,8 @@ referenced in this file (or only in any other core plugin), and if so... `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import http import itertools import json @@ -37,9 +39,6 @@ from typing import ( Iterator, Mapping, MutableMapping, - Optional, - Dict, - List, ) from typing import OrderedDict as OrderedDictT from typing import Tuple, Union @@ -86,27 +85,27 @@ class This: self.odyssey = False # Track location to add to Journal events - self.system_address: Optional[str] = None - self.system_name: Optional[str] = None - self.coordinates: Optional[Tuple] = None - self.body_name: Optional[str] = None - self.body_id: Optional[int] = None - self.body_type: Optional[int] = None - self.station_name: Optional[str] = None - self.station_type: Optional[str] = None - self.station_marketid: Optional[str] = None + self.system_address: str | None = None + self.system_name: str | None = None + self.coordinates: tuple | None = None + self.body_name: str | None = None + self.body_id: int | None = None + self.body_type: int | None = None + self.station_name: str | None = None + self.station_type: str | None = None + self.station_marketid: str | None = None # Track Status.json data - self.status_body_name: Optional[str] = None + self.status_body_name: str | None = None # Avoid duplicates - self.marketId: Optional[str] = None - self.commodities: Optional[List[OrderedDictT[str, Any]]] = None - self.outfitting: Optional[Tuple[bool, List[str]]] = None - self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None + self.marketId: str | None = None + self.commodities: list[OrderedDictT[str, Any]] | None = None + self.outfitting: Tuple[bool, list[str]] | None = None + self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None self.fcmaterials_marketid: int = 0 - self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials: list[OrderedDictT[str, Any]] | None = None self.fcmaterials_capi_marketid: int = 0 - self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials_capi: list[OrderedDictT[str, Any]] | None = None # For the tkinter parent window, so we can call update_idletasks() self.parent: tk.Tk @@ -395,7 +394,7 @@ class EDDNSender: """ logger.trace_if("plugin.eddn.send", "Sending message") should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: @@ -404,7 +403,7 @@ class EDDNSender: # Even the smallest possible message compresses somewhat, so always compress encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0) - headers: Optional[Dict[str, str]] = None + headers: dict[str, str] | None = None if compressed: headers = {'Content-Encoding': 'gzip'} @@ -612,7 +611,7 @@ class EDDN: self.sender = EDDNSender(self, self.eddn_url) - self.fss_signals: List[Mapping[str, Any]] = [] + self.fss_signals: list[Mapping[str, Any]] = [] def close(self): """Close down the EDDN class instance.""" @@ -636,7 +635,7 @@ class EDDN: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) if should_return: logger.warning("capi.request./market has been disabled by killswitch. Returning.") @@ -653,7 +652,7 @@ class EDDN: modules, ships ) - commodities: List[OrderedDictT[str, Any]] = [] + commodities: list[OrderedDictT[str, Any]] = [] for commodity in data['lastStarport'].get('commodities') or []: # Check 'marketable' and 'not prohibited' if (category_map.get(commodity['categoryname'], True) @@ -726,7 +725,7 @@ class EDDN: :param data: The raw CAPI data. :return: Sanity-checked data. """ - modules: Dict[str, Any] = data['lastStarport'].get('modules') + modules: dict[str, Any] = data['lastStarport'].get('modules') if modules is None or not isinstance(modules, dict): if modules is None: logger.debug('modules was None. FC or Damaged Station?') @@ -743,13 +742,13 @@ class EDDN: # Set a safe value modules = {} - ships: Dict[str, Any] = data['lastStarport'].get('ships') + ships: dict[str, Any] = data['lastStarport'].get('ships') if ships is None or not isinstance(ships, dict): if ships is None: logger.debug('ships was None') else: - logger.error(f'ships was neither None nor a Dict! Type = {type(ships)}') + logger.error(f'ships was neither None nor a dict! Type = {type(ships)}') # Set a safe value ships = {'shipyard_list': {}, 'unavailable_list': []} @@ -769,7 +768,7 @@ class EDDN: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -796,7 +795,7 @@ class EDDN: modules.values() ) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search ) @@ -837,7 +836,7 @@ class EDDN: :param is_beta: whether or not we are in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -856,7 +855,7 @@ class EDDN: ships ) - shipyard: List[Mapping[str, Any]] = sorted( + shipyard: list[Mapping[str, Any]] = sorted( itertools.chain( (ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()), (ship['name'].lower() for ship in ships['unavailable_list'] or {}), @@ -899,8 +898,8 @@ class EDDN: :param is_beta: whether or not we're in beta mode :param entry: the journal entry containing the commodities data """ - items: List[Mapping[str, Any]] = entry.get('Items') or [] - commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([ + items: list[Mapping[str, Any]] = entry.get('Items') or [] + commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([ ('name', self.canonicalise(commodity['Name'])), ('meanPrice', commodity['MeanPrice']), ('buyPrice', commodity['BuyPrice']), @@ -947,11 +946,11 @@ class EDDN: :param is_beta: Whether or not we're in beta mode :param entry: The relevant journal entry """ - modules: List[Mapping[str, Any]] = entry.get('Items', []) + modules: list[Mapping[str, Any]] = entry.get('Items', []) horizons: bool = entry.get('Horizons', False) # outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) # for module in modules if module['Name'] != 'int_planetapproachsuite']) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules) ) @@ -986,7 +985,7 @@ class EDDN: :param is_beta: Whether or not we're in beta mode :param entry: the relevant journal entry """ - ships: List[Mapping[str, Any]] = entry.get('PriceList') or [] + ships: list[Mapping[str, Any]] = entry.get('Pricelist') or [] horizons: bool = entry.get('Horizons', False) shipyard = sorted(ship['ShipType'] for ship in ships) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. @@ -1042,7 +1041,7 @@ class EDDN: self.sender.send_message_by_id(msg_id) def standard_header( - self, game_version: Optional[str] = None, game_build: Optional[str] = None + self, game_version: str | None = None, game_build: str | None = None ) -> MutableMapping[str, Any]: """ Return the standard header for an EDDN message, given tracked state. @@ -1134,7 +1133,7 @@ class EDDN: def export_journal_fssdiscoveryscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSDiscoveryScan to EDDN on the correct schema. @@ -1176,7 +1175,7 @@ class EDDN: def export_journal_navbeaconscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an NavBeaconScan to EDDN on the correct schema. @@ -1218,7 +1217,7 @@ class EDDN: def export_journal_codexentry( # noqa: CCR001 self, cmdr: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a CodexEntry to EDDN on the correct schema. @@ -1320,7 +1319,7 @@ class EDDN: def export_journal_scanbarycentre( self, cmdr: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a ScanBaryCentre to EDDN on the correct schema. @@ -1374,7 +1373,7 @@ class EDDN: def export_journal_navroute( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a NavRoute to EDDN on the correct schema. @@ -1447,7 +1446,7 @@ class EDDN: def export_journal_fcmaterials( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FCMaterials message to EDDN on the correct schema. @@ -1531,7 +1530,7 @@ class EDDN: def export_capi_fcmaterials( self, data: CAPIData, is_beta: bool, horizons: bool - ) -> Optional[str]: + ) -> str | None: """ Send CAPI-sourced 'onfootmicroresources' data on `fcmaterials/1` schema. @@ -1594,7 +1593,7 @@ class EDDN: def export_journal_approachsettlement( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an ApproachSettlement to EDDN on the correct schema. @@ -1669,7 +1668,7 @@ class EDDN: def export_journal_fssallbodiesfound( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSAllBodiesFound message to EDDN on the correct schema. @@ -1719,7 +1718,7 @@ class EDDN: def export_journal_fssbodysignals( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSBodySignals message to EDDN on the correct schema. @@ -1789,7 +1788,7 @@ class EDDN: def export_journal_fsssignaldiscovered( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSSignalDiscovered message to EDDN on the correct schema. @@ -1892,7 +1891,7 @@ class EDDN: match = self.CANONICALISE_RE.match(item) return match and match.group(1) or item - def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_endpoint: str) -> str: + def capi_gameversion_from_host_endpoint(self, capi_host: str | None, capi_endpoint: str) -> str: """ Return the correct CAPI gameversion string for the given host/endpoint. @@ -1910,7 +1909,7 @@ class EDDN: gv = 'CAPI-Legacy-' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_host=} lead to bad gameversion") gv = 'CAPI-UNKNOWN-' ####################################################################### @@ -1924,7 +1923,7 @@ class EDDN: gv += 'shipyard' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_endpoint=} lead to bad gameversion") gv += 'UNKNOWN' ####################################################################### @@ -1943,7 +1942,7 @@ def plugin_start3(plugin_dir: str) -> str: return 'EDDN' -def plugin_app(parent: tk.Tk) -> Optional[tk.Frame]: +def plugin_app(parent: tk.Tk) -> tk.Frame | None: """ Set up any plugin-specific UI. @@ -2183,7 +2182,7 @@ def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys with names ending `_Localised` from a dict. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2207,7 +2206,7 @@ def capi_filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys for known CAPI 'localised' names. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2234,7 +2233,7 @@ def journal_entry( # noqa: C901, CCR001 station: str, entry: MutableMapping[str, Any], state: Mapping[str, Any] -) -> Optional[str]: +) -> str | None: """ Process a new Journal entry. @@ -2491,7 +2490,7 @@ def journal_entry( # noqa: C901, CCR001 return None -def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: +def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> str | None: """ Process new CAPI data for Legacy galaxy. @@ -2510,7 +2509,7 @@ def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: return cmdr_data(data, is_beta) -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data for not-Legacy galaxy (might be beta). @@ -2611,7 +2610,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST return economies_colony or modules_horizons or ship_horizons -def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None: +def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None: """ Process Status.json data to track things like current Body. diff --git a/plugins/edsm.py b/plugins/edsm.py index 2f644dcb..33af692b 100644 --- a/plugins/edsm.py +++ b/plugins/edsm.py @@ -18,6 +18,8 @@ referenced in this file (or only in any other core plugin), and if so... `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import json import threading import tkinter as tk @@ -26,7 +28,7 @@ from queue import Queue from threading import Thread from time import sleep from tkinter import ttk -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Mapping, MutableMapping, cast import requests import killswitch import monitor @@ -72,27 +74,27 @@ class This: self.game_build = "" # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.session: requests.Session = requests.Session() self.session.headers['User-Agent'] = user_agent self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread - self.discarded_events: Set[str] = set() # List discarded events from EDSM - self.lastlookup: Dict[str, Any] # Result of last system lookup + self.discarded_events: set[str] = set() # List discarded events from EDSM + self.lastlookup: dict[str, Any] # Result of last system lookup # Game state self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew - self.coordinates: Optional[Tuple[int, int, int]] = None + self.coordinates: tuple[int, int, int] | None = None self.newgame: bool = False # starting up - batch initial burst of events self.newgame_docked: bool = False # starting up while docked self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan - self.system_link: Optional[tk.Widget] = None - self.system_name: Optional[tk.Tk] = None - self.system_address: Optional[int] = None # Frontier SystemAddress - self.system_population: Optional[int] = None - self.station_link: Optional[tk.Widget] = None - self.station_name: Optional[str] = None - self.station_marketid: Optional[int] = None # Frontier MarketID + self.system_link: tk.Widget | None = None + self.system_name: tk.Tk | None = None + self.system_address: int | None = None # Frontier SystemAddress + self.system_population: int | None = None + self.station_link: tk.Widget | None = None + self.station_name: str | None = None + self.station_marketid: int | None = None # Frontier MarketID self.on_foot = False self._IMG_KNOWN = None @@ -100,21 +102,21 @@ class This: self._IMG_NEW = None self._IMG_ERROR = None - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None - self.log: Optional[tk.IntVar] = None - self.log_button: Optional[ttk.Checkbutton] = None + self.log: tk.IntVar | None = None + self.log_button: ttk.Checkbutton | None = None - self.label: Optional[tk.Widget] = None + self.label: tk.Widget | None = None - self.cmdr_label: Optional[nb.Label] = None - self.cmdr_text: Optional[nb.Label] = None + self.cmdr_label: nb.Label | None = None + self.cmdr_text: nb.Label | None = None - self.user_label: Optional[nb.Label] = None - self.user: Optional[nb.Entry] = None + self.user_label: nb.Label | None = None + self.user: nb.Entry | None = None - self.apikey_label: Optional[nb.Label] = None - self.apikey: Optional[nb.Entry] = None + self.apikey_label: nb.Label | None = None + self.apikey: nb.Entry | None = None this = This() @@ -277,7 +279,7 @@ def toggle_password_visibility(): this.apikey.config(show="*") # type: ignore -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """ Plugin preferences setup hook. @@ -361,7 +363,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return frame -def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001 +def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001 """ Handle the Commander name changing whilst Settings was open. @@ -390,7 +392,7 @@ def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR # LANG: We have no data on the current commander this.cmdr_text['text'] = _('None') - to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED + to_set: Literal['normal'] | Literal['disabled'] = tk.DISABLED if cmdr and not is_beta and this.log and this.log.get(): to_set = tk.NORMAL @@ -440,9 +442,9 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_out', this.log.get()) if cmdr and not is_beta: - cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[]) - usernames: List[str] = config.get_list('edsm_usernames', default=[]) - apikeys: List[str] = config.get_list('edsm_apikeys', default=[]) + cmdrs: list[str] = config.get_list('edsm_cmdrs', default=[]) + usernames: list[str] = config.get_list('edsm_usernames', default=[]) + apikeys: list[str] = config.get_list('edsm_apikeys', default=[]) if this.user and this.apikey: if cmdr in cmdrs: @@ -460,7 +462,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_apikeys', apikeys) -def credentials(cmdr: str) -> Optional[Tuple[str, str]]: +def credentials(cmdr: str) -> tuple[str, str] | None: """ Get credentials for the given commander, if they exist. @@ -635,7 +637,7 @@ Queueing: {entry!r}''' # Update system data -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data. @@ -722,7 +724,7 @@ def worker() -> None: # noqa: CCR001 C901 :return: None """ logger.debug('Starting...') - pending: List[Mapping[str, Any]] = [] # Unsent events + pending: list[Mapping[str, Any]] = [] # Unsent events closing = False cmdr: str = "" last_game_version = "" @@ -744,7 +746,7 @@ def worker() -> None: # noqa: CCR001 C901 logger.debug(f'{this.shutting_down=}, so setting closing = True') closing = True - item: Optional[Tuple[str, str, str, Mapping[str, Any]]] = this.queue.get() + item: tuple[str, str, str, Mapping[str, Any]] | None = this.queue.get() if item: (cmdr, game_version, game_build, entry) = item logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})') @@ -756,7 +758,7 @@ def worker() -> None: # noqa: CCR001 C901 retrying = 0 while retrying < 3: if item is None: - item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {})) + item = cast(tuple[str, str, str, Mapping[str, Any]], ("", {})) should_skip, new_item = killswitch.check_killswitch( 'plugins.edsm.worker', item, @@ -909,7 +911,7 @@ def worker() -> None: # noqa: CCR001 C901 last_game_build = game_build -def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 +def should_send(entries: list[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 """ Whether or not any of the given entries should be sent to EDSM. diff --git a/plugins/edsy.py b/plugins/edsy.py index 0c78a429..a02d3424 100644 --- a/plugins/edsy.py +++ b/plugins/edsy.py @@ -18,11 +18,13 @@ referenced in this file (or only in any other core plugin), and if so... `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import base64 import gzip import io import json -from typing import Any, Mapping, Union +from typing import Any, Mapping def plugin_start3(plugin_dir: str) -> str: @@ -36,7 +38,7 @@ def plugin_start3(plugin_dir: str) -> str: # Return a URL for the current ship -def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]: +def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> bool | str: """ Construct a URL for ship loadout. diff --git a/plugins/inara.py b/plugins/inara.py index efe010df..2a124f54 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -18,6 +18,7 @@ referenced in this file (or only in any other core plugin), and if so... `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import json import threading @@ -29,9 +30,8 @@ from datetime import datetime, timedelta, timezone from operator import itemgetter from threading import Lock, Thread from tkinter import ttk -from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional +from typing import TYPE_CHECKING, Any, Callable, Deque, Mapping, NamedTuple, Sequence, cast, Union from typing import OrderedDict as OrderedDictT -from typing import Sequence, Union, cast import requests import edmc_data import killswitch @@ -63,8 +63,8 @@ CREDITS_DELTA_MIN_ABSOLUTE = 10_000_000 # Absolute difference threshold class Credentials(NamedTuple): """Credentials holds the set of credentials required to identify an inara API payload to inara.""" - cmdr: Optional[str] - fid: Optional[str] + cmdr: str | None + fid: str | None api_key: str @@ -89,25 +89,25 @@ class This: self.parent: tk.Tk # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.lastlocation = None # eventData from the last Commander's Flight Log event self.lastship = None # eventData from the last addCommanderShip or setCommanderShip event # Cached Cmdr state - self.cmdr: Optional[str] = None - self.FID: Optional[str] = None # Frontier ID + self.cmdr: str | None = None + self.FID: str | None = None # Frontier ID self.multicrew: bool = False # don't send captain's ship info to Inara while on a crew self.newuser: bool = False # just entered API Key - send state immediately self.newsession: bool = True # starting a new session - wait for Cargo event self.undocked: bool = False # just undocked self.suppress_docked = False # Skip initial Docked event if started docked - self.cargo: Optional[List[OrderedDictT[str, Any]]] = None - self.materials: Optional[List[OrderedDictT[str, Any]]] = None + self.cargo: list[OrderedDictT[str, Any]] | None = None + self.materials: list[OrderedDictT[str, Any]] | None = None self.last_credits: int = 0 # Send credit update soon after Startup / new game - self.storedmodules: Optional[List[OrderedDictT[str, Any]]] = None - self.loadout: Optional[OrderedDictT[str, Any]] = None - self.fleet: Optional[List[OrderedDictT[str, Any]]] = None + self.storedmodules: list[OrderedDictT[str, Any]] | None = None + self.loadout: OrderedDictT[str, Any] | None = None + self.fleet: list[OrderedDictT[str, Any]] | None = None self.shipswap: bool = False # just swapped ship self.on_foot = False @@ -115,9 +115,9 @@ class This: # Main window clicks self.system_link: tk.Widget = None # type: ignore - self.system_name: Optional[str] = None # type: ignore - self.system_address: Optional[str] = None # type: ignore - self.system_population: Optional[int] = None + self.system_name: str | None = None # type: ignore + self.system_address: str | None = None # type: ignore + self.system_population: int | None = None self.station_link: tk.Widget = None # type: ignore self.station = None self.station_marketid = None @@ -129,7 +129,7 @@ class This: self.apikey: nb.Entry self.apikey_label: tk.Label - self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque) + self.events: dict[Credentials, Deque[Event]] = defaultdict(deque) self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]) -> None: @@ -361,7 +361,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: ) -def credentials(cmdr: Optional[str]) -> Optional[str]: +def credentials(cmdr: str | None) -> str | None: """ Get the credentials for the current commander. @@ -383,7 +383,7 @@ def credentials(cmdr: Optional[str]) -> Optional[str]: def journal_entry( # noqa: C901, CCR001 - cmdr: str, is_beta: bool, system: str, station: str, entry: Dict[str, Any], state: Dict[str, Any] + cmdr: str, is_beta: bool, system: str, station: str, entry: dict[str, Any], state: dict[str, Any] ) -> str: """ Journal entry hook. @@ -394,7 +394,7 @@ def journal_entry( # noqa: C901, CCR001 # causing users to spam Inara with 'URL provider' queries, and we want to # stop that. should_return: bool - new_entry: Dict[str, Any] = {} + new_entry: dict[str, Any] = {} should_return, new_entry = killswitch.check_killswitch('plugins.inara.journal', entry, logger) if should_return: @@ -813,7 +813,7 @@ def journal_entry( # noqa: C901, CCR001 # Fleet if event_name == 'StoredShips': - fleet: List[OrderedDictT[str, Any]] = sorted( + fleet: list[OrderedDictT[str, Any]] = sorted( [OrderedDict({ 'shipType': x['ShipType'], 'shipGameID': x['ShipID'], @@ -860,7 +860,7 @@ def journal_entry( # noqa: C901, CCR001 # Stored modules if event_name == 'StoredModules': items = {mod['StorageSlot']: mod for mod in entry['Items']} # Impose an order - modules: List[OrderedDictT[str, Any]] = [] + modules: list[OrderedDictT[str, Any]] = [] for slot in sorted(items): item = items[slot] module: OrderedDictT[str, Any] = OrderedDict([ @@ -1088,7 +1088,7 @@ def journal_entry( # noqa: C901, CCR001 # # So we're going to do a lot of checking here and bail out if we dont like the look of ANYTHING here - to_send_data: Optional[Dict[str, Any]] = {} # This is a glorified sentinel until lower down. + to_send_data: dict[str, Any] | None = {} # This is a glorified sentinel until lower down. # On Horizons, neither of these exist on TouchDown star_system_name = entry.get('StarSystem', this.system_name) body_name = entry.get('Body', state['Body'] if state['BodyType'] == 'Planet' else None) @@ -1370,7 +1370,7 @@ def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later pass -def make_loadout(state: Dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 +def make_loadout(state: dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 """ Construct an inara loadout from an event. @@ -1440,8 +1440,8 @@ def new_add_event( name: str, timestamp: str, data: EVENT_DATA, - cmdr: Optional[str] = None, - fid: Optional[str] = None + cmdr: str | None = None, + fid: str | None = None ): """ Add a journal event to the queue, to be sent to inara at the next opportunity. @@ -1470,11 +1470,11 @@ def new_add_event( this.events[key].append(Event(name, timestamp, data)) -def clean_event_list(event_list: List[Event]) -> List[Event]: +def clean_event_list(event_list: list[Event]) -> list[Event]: """ Check for killswitched events and remove or modify them as requested. - :param event_list: List of events to clean + :param event_list: list of events to clean :return: Cleaned list of events """ cleaned_events = [] @@ -1533,14 +1533,14 @@ def new_worker(): logger.debug('Done.') -def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]: +def get_events(clear: bool = True) -> dict[Credentials, list[Event]]: """ Fetch a copy of all events from the current queue. :param clear: whether to clear the queues as we go, defaults to True :return: a copy of the event dictionary """ - events_copy: Dict[Credentials, List[Event]] = {} + events_copy: dict[Credentials, list[Event]] = {} with this.event_lock: for key, events in this.events.items(): @@ -1590,7 +1590,7 @@ def send_data(url: str, data: Mapping[str, Any]) -> bool: return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such -def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None: +def handle_api_error(data: Mapping[str, Any], status: int, reply: dict[str, Any]) -> None: """ Handle API error response. @@ -1604,7 +1604,7 @@ def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any] plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message)) -def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None: +def handle_success_reply(data: Mapping[str, Any], reply: dict[str, Any]) -> None: """ Handle successful API response. @@ -1619,7 +1619,7 @@ def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None handle_special_events(data_event, reply_event) -def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None: +def handle_individual_error(data_event: dict[str, Any], reply_status: int, reply_text: str) -> None: """ Handle individual API error. @@ -1638,7 +1638,7 @@ def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply )) -def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None: +def handle_special_events(data_event: dict[str, Any], reply_event: dict[str, Any]) -> None: """ Handle special events in the API response. diff --git a/tests/EDMCLogging.py/test_logging_classvar.py b/tests/EDMCLogging.py/test_logging_classvar.py index 24ab009e..89d3220e 100644 --- a/tests/EDMCLogging.py/test_logging_classvar.py +++ b/tests/EDMCLogging.py/test_logging_classvar.py @@ -37,7 +37,7 @@ def test_class_logger(caplog: 'LogCaptureFixture') -> None: ClassVarLogger.set_logger(logger) ClassVarLogger.logger.debug('test') # type: ignore # its there ClassVarLogger.logger.info('test2') # type: ignore # its there - log_stuff('test3') # type: ignore # its there + log_stuff('test3') # Dont move these, it relies on the line numbres. assert 'EDMarketConnector.EDMCLogging.py:test_logging_classvar.py:38 test' in caplog.text diff --git a/tests/config/_old_config.py b/tests/config/_old_config.py index 211c1e72..22f0b18f 100644 --- a/tests/config/_old_config.py +++ b/tests/config/_old_config.py @@ -1,12 +1,13 @@ -# type: ignore +"""Old Configuration Test File.""" +from __future__ import annotations + import numbers import sys import warnings from configparser import NoOptionError from os import getenv, makedirs, mkdir, pardir from os.path import dirname, expanduser, isdir, join, normpath -from typing import TYPE_CHECKING, Optional, Union - +from typing import TYPE_CHECKING from config import applongname, appname, update_interval from EDMCLogging import get_main_logger @@ -81,7 +82,7 @@ elif sys.platform == 'win32': RegDeleteValue.restype = LONG RegDeleteValue.argtypes = [HKEY, LPCWSTR] - def known_folder_path(guid: uuid.UUID) -> Optional[str]: + def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -95,7 +96,7 @@ elif sys.platform == 'linux': from configparser import RawConfigParser -class OldConfig(): +class OldConfig: """Object that holds all configuration data.""" OUT_EDDN_SEND_STATION_DATA = 1 @@ -153,20 +154,19 @@ class OldConfig(): if not self.get('outdir') or not isdir(str(self.get('outdir'))): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" val = self.settings.get(key) if val is None: return default - elif isinstance(val, str): + if isinstance(val, str): return str(val) - elif isinstance(val, list): + if isinstance(val, list): return list(val) # make writeable - else: - return default + return default def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -181,7 +181,7 @@ class OldConfig(): logger.debug('The exception type is ...', exc_info=e) return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" self.settings[key] = val @@ -202,7 +202,7 @@ class OldConfig(): elif sys.platform == 'win32': def __init__(self): - self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore # Not going to change + self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore if not isdir(self.app_dir): mkdir(self.app_dir) @@ -279,10 +279,10 @@ class OldConfig(): RegSetValueEx(sparklekey, 'UpdateInterval', 0, 1, buf, len(buf) * 2) RegCloseKey(sparklekey) - if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change + if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore self.set('outdir', known_folder_path(FOLDERID_Documents) or self.home) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" key_type = DWORD() key_size = DWORD() @@ -304,11 +304,10 @@ class OldConfig(): if RegQueryValueEx(self.hkey, key, 0, ctypes.byref(key_type), buf, ctypes.byref(key_size)): return default - elif key_type.value == REG_MULTI_SZ: + if key_type.value == REG_MULTI_SZ: return list(ctypes.wstring_at(buf, len(buf)-2).split('\x00')) - else: - return str(buf.value) + return str(buf.value) def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -328,10 +327,9 @@ class OldConfig(): ): return default - else: - return key_val.value + return key_val.value - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, str): buf = ctypes.create_unicode_buffer(val) @@ -388,7 +386,7 @@ class OldConfig(): self.config = RawConfigParser(comment_prefixes=('#',)) try: - with codecs.open(self.filename, 'r') as h: + with codecs.open(self.filename) as h: self.config.read_file(h) except Exception as e: @@ -398,7 +396,7 @@ class OldConfig(): if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change self.set('outdir', expanduser('~')) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" try: val = self.config.get(self.SECTION, key) @@ -407,8 +405,7 @@ class OldConfig(): # so we add a spurious ';' entry in set() and remove it here assert val.split('\n')[-1] == ';', val.split('\n') return [self._unescape(x) for x in val.split('\n')[:-1]] - else: - return self._unescape(val) + return self._unescape(val) except NoOptionError: logger.debug(f'attempted to get key {key} that does not exist') @@ -434,13 +431,13 @@ class OldConfig(): return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, bool): - self.config.set(self.SECTION, key, val and '1' or '0') # type: ignore # Not going to change + self.config.set(self.SECTION, key, val and '1' or '0') - elif isinstance(val, str) or isinstance(val, numbers.Integral): - self.config.set(self.SECTION, key, self._escape(val)) # type: ignore # Not going to change + elif isinstance(val, (numbers.Integral, str)): + self.config.set(self.SECTION, key, self._escape(val)) elif isinstance(val, list): self.config.set(self.SECTION, key, '\n'.join([self._escape(x) for x in val] + [';'])) @@ -460,7 +457,7 @@ class OldConfig(): def close(self) -> None: """Close the configuration.""" self.save() - self.config = None + self.config = None # type: ignore def _escape(self, val: str) -> str: """Escape a string for storage.""" diff --git a/tests/config/test_config.py b/tests/config/test_config.py index ad6bbd6f..e839afc7 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -7,15 +7,13 @@ key deletions. Said modifications are to keys that are generated internally. Most of these tests are parity tests with the "old" config, and likely one day can be entirely removed. """ -from __future__ import annotations - import contextlib import itertools import pathlib import random import string import sys -from typing import Any, Iterable, List, cast +from typing import Any, Iterable, cast import pytest from pytest import mark @@ -30,12 +28,12 @@ from _old_config import old_config # noqa: E402 from config import config # noqa: E402 -def _fuzz_list(length: int) -> List[str]: +def _fuzz_list(length: int) -> list[str]: out = [] for _ in range(length): out.append(_fuzz_generators[str](random.randint(0, 1337))) - return cast(List[str], out) + return cast(list[str], out) _fuzz_generators = { # Type annotating this would be a nightmare. @@ -72,7 +70,7 @@ bool_tests = [True, False] big_int = int(0xFFFFFFFF) # 32 bit int -def _make_params(args: List[Any], id_name: str = 'random_test_{i}') -> list: +def _make_params(args: list[Any], id_name: str = 'random_test_{i}') -> list: return [pytest.param(x, id=id_name.format(i=i)) for i, x in enumerate(args)] @@ -81,7 +79,7 @@ def _build_test_list(static_data, random_data, random_id_name='random_test_{i}') class TestNewConfig: - """Test the new config with an array of hand picked and random data.""" + """Test the new config with an array of hand-picked and random data.""" def __update_linuxconfig(self) -> None: """On linux config uses ConfigParser, which doesn't update from disk changes. Force the update here.""" @@ -117,7 +115,7 @@ class TestNewConfig: config.delete(name) @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list and then ask for it back.""" name = f'list_test_{ hash("".join(lst)) }' config.set(name, lst) @@ -216,7 +214,7 @@ class TestOldNewConfig: assert res == string @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list though the old config, recall it using the new config.""" lst = [x.replace("\r", "") for x in lst] # OldConfig on linux fails to store these correctly if sys.platform == 'win32': diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index c617d52c..649140c8 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -1,13 +1,13 @@ """Tests for journal_lock.py code.""" +from __future__ import annotations + import multiprocessing as mp import os import pathlib import sys from typing import Generator - import pytest from pytest import MonkeyPatch, TempdirFactory, TempPathFactory - from config import config from journal_lock import JournalLock, JournalLockResult @@ -142,7 +142,7 @@ class TestJournalLock: def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': - return tmp_path_factory.mktemp("changing") + return tmp_path_factory.mktemp("changing") # type: ignore print('Other key, calling up ...') return config.get_str(key) # Call the non-mocked @@ -301,7 +301,7 @@ class TestJournalLock: # Need to release any handles on the lockfile else the sub-process # might not be able to clean up properly, and that will impact # on later tests. - jlock.journal_dir_lockfile.close() + jlock.journal_dir_lockfile.close() # type: ignore print('Telling sub-process to quit...') exit_q.put('quit') diff --git a/tests/killswitch.py/test_apply.py b/tests/killswitch.py/test_apply.py index c199ec48..ab9bb267 100644 --- a/tests/killswitch.py/test_apply.py +++ b/tests/killswitch.py/test_apply.py @@ -1,6 +1,8 @@ """Test the apply functions used by killswitch to modify data.""" +from __future__ import annotations + import copy -from typing import Any, Optional +from typing import Any import pytest @@ -33,11 +35,11 @@ def test_apply(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, resul def test_apply_errors() -> None: """_apply should fail when passed something that isn't a Sequence or MutableMapping.""" with pytest.raises(ValueError, match=r'Dont know how to'): - killswitch._apply(set(), '0', None, False) # type: ignore # Its intentional that its broken - killswitch._apply(None, '', None) # type: ignore # Its intentional that its broken + killswitch._apply(set(), '0') # type: ignore # Its intentional that its broken + killswitch._apply(None, '') # type: ignore # Its intentional that its broken with pytest.raises(ValueError, match=r'Cannot use string'): - killswitch._apply([], 'test', None, False) + killswitch._apply([], 'test') def test_apply_no_error() -> None: @@ -61,7 +63,7 @@ def test_apply_no_error() -> None: (False, 0), (str((1 << 63)-1), (1 << 63)-1), (True, 1), (str(1 << 1337), 1 << 1337) ] ) -def test_get_int(input: str, expected: Optional[int]) -> None: +def test_get_int(input: str, expected: int | None) -> None: """Check that _get_int doesn't throw when handed bad data.""" assert expected == killswitch._get_int(input) diff --git a/tests/killswitch.py/test_killswitch.py b/tests/killswitch.py/test_killswitch.py index cda672ac..3c681ded 100644 --- a/tests/killswitch.py/test_killswitch.py +++ b/tests/killswitch.py/test_killswitch.py @@ -1,7 +1,7 @@ """Tests of killswitch behaviour.""" -import copy -from typing import Optional +from __future__ import annotations +import copy import pytest import semantic_version @@ -34,7 +34,7 @@ TEST_SET = killswitch.KillSwitchSet([ ], ) def test_killswitch( - input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: Optional[killswitch.UPDATABLE_DATA], + input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: killswitch.UPDATABLE_DATA | None, version: str ) -> None: """Simple killswitch tests."""