1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-14 08:17:13 +03:00

Merge pull request #2084 from HullSeals/enhancement/2051/config

[2051] Config Files Audit
LGTM
This commit is contained in:
Phoebe 2023-11-16 19:26:32 +01:00 committed by GitHub
commit fae7b32e43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 322 additions and 284 deletions

37
.gitignore vendored
View File

@ -1,8 +1,18 @@
# Ignore version file
.gitversion
# Ignore macOS DS_Store files
.DS_Store
# Ignore build artifacts
build
ChangeLog.html
dist.win32/
dist.*
# Ignore generated ChangeLog.html file
ChangeLog.html
# Ignore files
dump
*.bak
*.pyc
@ -11,20 +21,37 @@ dump
*.pdb
*.msi
*.wixobj
*.zip
# Ignore Update Things
EDMarketConnector_Installer_*.exe
appcast_win_*.xml
appcast_mac_*.xml
EDMarketConnector.VisualElementsManifest.xml
*.zip
EDMC_Installer_Config.iss
EDMarketConnector.wxs
wix/components.wxs
# Ignore Visual Elements Manifest file for Windows
EDMarketConnector.VisualElementsManifest.xml
# Ignore IDE and editor configuration files
.idea
.vscode
# Ignore virtual environments
.venv/
venv/
venv2
# Ignore workspace file for Visual Studio Code
*.code-workspace
# Ignore coverage reports
htmlcov/
.ignored
.coverage
EDMarketConnector.wxs
wix/components.wxs
pylintrc
pylint.txt
# Ignore Submodule data directory
coriolis-data/

View File

@ -1,11 +1,15 @@
"""
Code dealing with the configuration of the program.
__init__.py - Code dealing with the configuration of the program.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
Windows uses the Registry to store values in a flat manner.
Linux uses a file, but for commonality it's still a flat data structure.
macOS uses a 'defaults' object.
"""
from __future__ import annotations
__all__ = [
# defined in the order they appear in the file
@ -40,10 +44,8 @@ import sys
import traceback
import warnings
from abc import abstractmethod
from typing import Any, Callable, Optional, Type, TypeVar
from typing import Any, Callable, Type, TypeVar
import semantic_version
from constants import GITVERSION_FILE, applongname, appname
# Any of these may be imported by plugins
@ -52,9 +54,9 @@ appcmdname = 'EDMC'
# <https://semver.org/#semantic-versioning-specification-semver>
# Major.Minor.Patch(-prerelease)(+buildmetadata)
# NB: Do *not* import this, use the functions appversion() and appversion_nobuild()
_static_appversion = '5.9.5'
_static_appversion = '5.10.0-alpha0'
_cached_version: Optional[semantic_version.Version] = None
_cached_version: semantic_version.Version | None = None
copyright = '© 2015-2019 Jonathan Harris, 2020-2023 EDCD'
update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml'
@ -66,7 +68,7 @@ debug_senders: list[str] = []
trace_on: list[str] = []
capi_pretend_down: bool = False
capi_debug_access_token: Optional[str] = None
capi_debug_access_token: str | None = None
# This must be done here in order to avoid an import cycle with EDMCLogging.
# Other code should use EDMCLogging.get_main_logger
if os.getenv("EDMC_NO_UI"):
@ -79,7 +81,6 @@ else:
_T = TypeVar('_T')
###########################################################################
def git_shorthash_from_head() -> str:
"""
Determine short hash for current git HEAD.
@ -91,13 +92,14 @@ def git_shorthash_from_head() -> str:
shorthash: str = None # type: ignore
try:
git_cmd = subprocess.Popen('git rev-parse --short HEAD'.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
git_cmd = subprocess.Popen(
"git rev-parse --short HEAD".split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
out, err = git_cmd.communicate()
except Exception as e:
except subprocess.CalledProcessError as e:
logger.info(f"Couldn't run git command for short hash: {e!r}")
else:
@ -131,7 +133,7 @@ def appversion() -> semantic_version.Version:
if getattr(sys, 'frozen', False):
# Running frozen, so we should have a .gitversion file
# Yes, .parent because if frozen we're inside library.zip
with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, 'r', encoding='utf-8') as gitv:
with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, encoding='utf-8') as gitv:
shorthash = gitv.read()
else:
@ -157,11 +159,14 @@ def appversion_nobuild() -> semantic_version.Version:
:return: App version without any build meta data.
"""
return appversion().truncate('prerelease')
###########################################################################
class AbstractConfig(abc.ABC):
"""Abstract root class of all platform specific Config implementations."""
"""
Abstract root class of all platform specific Config implementations.
Commented lines are no longer supported or replaced.
"""
OUT_EDDN_SEND_STATION_DATA = 1
# OUT_MKT_BPC = 2 # No longer supported
@ -185,7 +190,6 @@ class AbstractConfig(abc.ABC):
respath_path: pathlib.Path
home_path: pathlib.Path
default_journal_dir_path: pathlib.Path
identifier: str
__in_shutdown = False # Is the application currently shutting down ?
@ -241,7 +245,7 @@ class AbstractConfig(abc.ABC):
self.__eddn_url = eddn_url
@property
def eddn_url(self) -> Optional[str]:
def eddn_url(self) -> str | None:
"""
Provide the custom EDDN URL.
@ -296,14 +300,14 @@ class AbstractConfig(abc.ABC):
def _suppress_call(
func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception,
*args: Any, **kwargs: Any
) -> Optional[_T]:
) -> _T | None:
if exceptions is None:
exceptions = [Exception]
if not isinstance(exceptions, list):
exceptions = [exceptions]
with contextlib.suppress(*exceptions): # type: ignore # it works fine, mypy
with contextlib.suppress(*exceptions):
return func(*args, **kwargs)
return None
@ -326,16 +330,16 @@ class AbstractConfig(abc.ABC):
if (a_list := self._suppress_call(self.get_list, ValueError, key, default=None)) is not None:
return a_list
elif (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None:
if (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None:
return a_str
elif (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None:
if (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None:
return a_bool
elif (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None:
if (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None:
return an_int
return default # type: ignore
return default
@abstractmethod
def get_list(self, key: str, *, default: list | None = None) -> list:
@ -462,16 +466,15 @@ def get_config(*args, **kwargs) -> AbstractConfig:
from .darwin import MacConfig
return MacConfig(*args, **kwargs)
elif sys.platform == "win32": # pragma: sys-platform-win32
if sys.platform == "win32": # pragma: sys-platform-win32
from .windows import WinConfig
return WinConfig(*args, **kwargs)
elif sys.platform == "linux": # pragma: sys-platform-linux
if sys.platform == "linux": # pragma: sys-platform-linux
from .linux import LinuxConfig
return LinuxConfig(*args, **kwargs)
else: # pragma: sys-platform-not-known
raise ValueError(f'Unknown platform: {sys.platform=}')
raise ValueError(f'Unknown platform: {sys.platform=}')
config = get_config()

View File

@ -1,13 +1,19 @@
"""Darwin/macOS implementation of AbstractConfig."""
"""
darwin.py - Darwin/macOS implementation of AbstractConfig.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import pathlib
import sys
from typing import Any, Dict, List, Union
from typing import Any
from Foundation import ( # type: ignore
NSApplicationSupportDirectory, NSBundle, NSDocumentDirectory, NSSearchPathForDirectoriesInDomains, NSUserDefaults,
NSUserDomainMask
)
from config import AbstractConfig, appname, logger
assert sys.platform == 'darwin'
@ -48,14 +54,14 @@ class MacConfig(AbstractConfig):
self.default_journal_dir_path = support_path / 'Frontier Developments' / 'Elite Dangerous'
self._defaults: Any = NSUserDefaults.standardUserDefaults()
self._settings: Dict[str, Union[int, str, list]] = dict(
self._settings: dict[str, int | str | list] = dict(
self._defaults.persistentDomainForName_(self.identifier) or {}
) # make writeable
if (out_dir := self.get_str('out_dir')) is None or not pathlib.Path(out_dir).exists():
self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0])
def __raw_get(self, key: str) -> Union[None, list, str, int]:
def __raw_get(self, key: str) -> None | list | str | int:
"""
Retrieve the raw data for the given key.
@ -82,7 +88,7 @@ class MacConfig(AbstractConfig):
"""
res = self.__raw_get(key)
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return default # Yes it could be None, but we're _assuming_ that people gave us a default
if not isinstance(res, str):
raise ValueError(f'unexpected data returned from __raw_get: {type(res)=} {res}')
@ -97,9 +103,9 @@ class MacConfig(AbstractConfig):
"""
res = self.__raw_get(key)
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return default # Yes it could be None, but we're _assuming_ that people gave us a default
elif not isinstance(res, list):
if not isinstance(res, list):
raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}')
return res
@ -114,7 +120,7 @@ class MacConfig(AbstractConfig):
if res is None:
return default
elif not isinstance(res, (str, int)):
if not isinstance(res, (str, int)):
raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}')
try:
@ -122,7 +128,7 @@ class MacConfig(AbstractConfig):
except ValueError as e:
logger.error(f'__raw_get returned {res!r} which cannot be parsed to an int: {e}')
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return default # Yes it could be None, but we're _assuming_ that people gave us a default
def get_bool(self, key: str, *, default: bool = None) -> bool:
"""
@ -132,14 +138,14 @@ class MacConfig(AbstractConfig):
"""
res = self.__raw_get(key)
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return default # Yes it could be None, but we're _assuming_ that people gave us a default
elif not isinstance(res, bool):
if not isinstance(res, bool):
raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}')
return res
def set(self, key: str, val: Union[int, str, List[str], bool]) -> None:
def set(self, key: str, val: int | str | list[str] | bool) -> None:
"""
Set the given key's data to the given value.

View File

@ -1,9 +1,14 @@
"""Linux config implementation."""
"""
linux.py - Linux config implementation.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
import os
import pathlib
import sys
from configparser import ConfigParser
from config import AbstractConfig, appname, logger
assert sys.platform == 'linux'

View File

@ -1,6 +1,12 @@
"""Windows config implementation."""
"""
windows.py - Windows config implementation.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
# spell-checker: words folderid deps hkey edcd
import ctypes
import functools
import pathlib
@ -8,8 +14,7 @@ import sys
import uuid
import winreg
from ctypes.wintypes import DWORD, HANDLE
from typing import List, Literal, Optional, Union
from typing import Literal
from config import AbstractConfig, applongname, appname, logger, update_interval
assert sys.platform == 'win32'
@ -29,7 +34,7 @@ CoTaskMemFree = ctypes.windll.ole32.CoTaskMemFree
CoTaskMemFree.argtypes = [ctypes.c_void_p]
def known_folder_path(guid: uuid.UUID) -> Optional[str]:
def known_folder_path(guid: uuid.UUID) -> str | None:
"""Look up a Windows GUID to actual folder path name."""
buf = ctypes.c_wchar_p()
if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)):
@ -43,7 +48,8 @@ class WinConfig(AbstractConfig):
"""Implementation of AbstractConfig for Windows."""
def __init__(self, do_winsparkle=True) -> None:
self.app_dir_path = pathlib.Path(str(known_folder_path(FOLDERID_LocalAppData))) / appname
super().__init__()
self.app_dir_path = pathlib.Path(known_folder_path(FOLDERID_LocalAppData)) / appname # type: ignore
self.app_dir_path.mkdir(exist_ok=True)
self.plugin_dir_path = self.app_dir_path / 'plugins'
@ -52,19 +58,17 @@ class WinConfig(AbstractConfig):
if getattr(sys, 'frozen', False):
self.respath_path = pathlib.Path(sys.executable).parent
self.internal_plugin_dir_path = self.respath_path / 'plugins'
else:
self.respath_path = pathlib.Path(__file__).parent.parent
self.internal_plugin_dir_path = self.respath_path / 'plugins'
self.home_path = pathlib.Path.home()
journal_dir_str = known_folder_path(FOLDERID_SavedGames)
journaldir = pathlib.Path(journal_dir_str) if journal_dir_str is not None else None
self.default_journal_dir_path = None # type: ignore
if journaldir is not None:
self.default_journal_dir_path = journaldir / 'Frontier Developments' / 'Elite Dangerous'
journal_dir_path = pathlib.Path(
known_folder_path(FOLDERID_SavedGames)) / 'Frontier Developments' / 'Elite Dangerous' # type: ignore
self.default_journal_dir_path = journal_dir_path if journal_dir_path.is_dir() else None # type: ignore
REGISTRY_SUBKEY = r'Software\Marginal\EDMarketConnector' # noqa: N806
create_key_defaults = functools.partial(
winreg.CreateKeyEx,
key=winreg.HKEY_CURRENT_USER,
@ -72,20 +76,18 @@ class WinConfig(AbstractConfig):
)
try:
self.__reg_handle: winreg.HKEYType = create_key_defaults(
sub_key=r'Software\Marginal\EDMarketConnector'
)
self.__reg_handle: winreg.HKEYType = create_key_defaults(sub_key=REGISTRY_SUBKEY)
if do_winsparkle:
self.__setup_winsparkle()
except OSError:
logger.exception('could not create required registry keys')
logger.exception('Could not create required registry keys')
raise
self.identifier = applongname
if (outdir_str := self.get_str('outdir')) is None or not pathlib.Path(outdir_str).is_dir():
docs = known_folder_path(FOLDERID_Documents)
self.set('outdir', docs if docs is not None else self.home)
self.set("outdir", docs if docs is not None else self.home)
def __setup_winsparkle(self):
"""Ensure the necessary Registry keys for WinSparkle are present."""
@ -94,32 +96,30 @@ class WinConfig(AbstractConfig):
key=winreg.HKEY_CURRENT_USER,
access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY,
)
try:
edcd_handle: winreg.HKEYType = create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector')
winsparkle_reg: winreg.HKEYType = winreg.CreateKeyEx(
edcd_handle, sub_key='WinSparkle', access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY
)
try:
with create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') as edcd_handle:
with winreg.CreateKeyEx(edcd_handle, sub_key='WinSparkle',
access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY) as winsparkle_reg:
# Set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings
UPDATE_INTERVAL_NAME = 'UpdateInterval' # noqa: N806
CHECK_FOR_UPDATES_NAME = 'CheckForUpdates' # noqa: N806
REG_SZ = winreg.REG_SZ # noqa: N806
winreg.SetValueEx(winsparkle_reg, UPDATE_INTERVAL_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ,
str(update_interval))
try:
winreg.QueryValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME)
except FileNotFoundError:
# Key doesn't exist, set it to a default
winreg.SetValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ,
'1')
except OSError:
logger.exception('could not open WinSparkle handle')
logger.exception('Could not open WinSparkle handle')
raise
# set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings
winreg.SetValueEx(
winsparkle_reg, 'UpdateInterval', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, str(update_interval)
)
try:
winreg.QueryValueEx(winsparkle_reg, 'CheckForUpdates')
except FileNotFoundError:
# Key doesn't exist, set it to a default
winreg.SetValueEx(winsparkle_reg, 'CheckForUpdates', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, '1')
winsparkle_reg.Close()
edcd_handle.Close()
def __get_regentry(self, key: str) -> Union[None, list, str, int]:
def __get_regentry(self, key: str) -> None | list | str | int:
"""Access the Registry for the raw entry."""
try:
value, _type = winreg.QueryValueEx(self.__reg_handle, key)
@ -127,20 +127,18 @@ class WinConfig(AbstractConfig):
# Key doesn't exist
return None
# The type returned is actually as we'd expect for each of these. The casts are here for type checkers and
# For programmers who want to actually know what is going on
if _type == winreg.REG_SZ:
return str(value)
elif _type == winreg.REG_DWORD:
if _type == winreg.REG_DWORD:
return int(value)
elif _type == winreg.REG_MULTI_SZ:
if _type == winreg.REG_MULTI_SZ:
return list(value)
else:
logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}')
return None
logger.warning(f'Registry key {key=} returned unknown type {_type=} {value=}')
return None
def get_str(self, key: str, *, default: str | None = None) -> str:
"""
@ -152,7 +150,7 @@ class WinConfig(AbstractConfig):
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
elif not isinstance(res, str):
if not isinstance(res, str):
raise ValueError(f'Data from registry is not a string: {type(res)=} {res=}')
return res
@ -167,7 +165,7 @@ class WinConfig(AbstractConfig):
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
elif not isinstance(res, list):
if not isinstance(res, list):
raise ValueError(f'Data from registry is not a list: {type(res)=} {res}')
return res
@ -195,11 +193,11 @@ class WinConfig(AbstractConfig):
"""
res = self.get_int(key, default=default) # type: ignore
if res is None:
return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return default # Yes it could be None, but we're _assuming_ that people gave us a default
return bool(res)
def set(self, key: str, val: Union[int, str, List[str], bool]) -> None:
def set(self, key: str, val: int | str | list[str] | bool) -> None:
"""
Set the given key's data to the given value.
@ -209,9 +207,8 @@ class WinConfig(AbstractConfig):
reg_type: Literal[1] | Literal[4] | Literal[7]
if isinstance(val, str):
reg_type = winreg.REG_SZ
winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val)
elif isinstance(val, int): # The original code checked for numbers.Integral, I dont think that is needed.
elif isinstance(val, int):
reg_type = winreg.REG_DWORD
elif isinstance(val, list):
@ -224,7 +221,6 @@ class WinConfig(AbstractConfig):
else:
raise ValueError(f'Unexpected type for value {type(val)=}')
# Its complaining about the list, it works, tested on windows, ignored.
winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, reg_type, val) # type: ignore
def delete(self, key: str, *, suppress=False) -> None:

View File

@ -19,6 +19,7 @@ referenced in this file (or only in any other core plugin), and if so...
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
from __future__ import annotations
import base64
import gzip
@ -26,7 +27,7 @@ import io
import json
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Union, Optional
from typing import TYPE_CHECKING
import myNotebook as nb # noqa: N813 # its not my fault.
from EDMCLogging import get_main_logger
from plug import show_error
@ -80,7 +81,7 @@ def plugin_start3(path: str) -> str:
return 'Coriolis'
def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
"""Set up plugin preferences."""
PADX = 10 # noqa: N806
@ -130,7 +131,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk
return conf_frame
def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None:
def prefs_changed(cmdr: str | None, is_beta: bool) -> None:
"""
Update URLs and override mode based on user preferences.
@ -175,7 +176,7 @@ def _get_target_url(is_beta: bool) -> str:
return coriolis_config.normal_url
def shipyard_url(loadout, is_beta) -> Union[str, bool]:
def shipyard_url(loadout, is_beta) -> str | bool:
"""Return a URL for the current ship."""
# most compact representation
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')

View File

@ -18,6 +18,8 @@ referenced in this file (or only in any other core plugin), and if so...
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
from __future__ import annotations
import http
import itertools
import json
@ -37,9 +39,6 @@ from typing import (
Iterator,
Mapping,
MutableMapping,
Optional,
Dict,
List,
)
from typing import OrderedDict as OrderedDictT
from typing import Tuple, Union
@ -86,27 +85,27 @@ class This:
self.odyssey = False
# Track location to add to Journal events
self.system_address: Optional[str] = None
self.system_name: Optional[str] = None
self.coordinates: Optional[Tuple] = None
self.body_name: Optional[str] = None
self.body_id: Optional[int] = None
self.body_type: Optional[int] = None
self.station_name: Optional[str] = None
self.station_type: Optional[str] = None
self.station_marketid: Optional[str] = None
self.system_address: str | None = None
self.system_name: str | None = None
self.coordinates: tuple | None = None
self.body_name: str | None = None
self.body_id: int | None = None
self.body_type: int | None = None
self.station_name: str | None = None
self.station_type: str | None = None
self.station_marketid: str | None = None
# Track Status.json data
self.status_body_name: Optional[str] = None
self.status_body_name: str | None = None
# Avoid duplicates
self.marketId: Optional[str] = None
self.commodities: Optional[List[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, List[str]]] = None
self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None
self.marketId: str | None = None
self.commodities: list[OrderedDictT[str, Any]] | None = None
self.outfitting: Tuple[bool, list[str]] | None = None
self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None
self.fcmaterials_marketid: int = 0
self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials: list[OrderedDictT[str, Any]] | None = None
self.fcmaterials_capi_marketid: int = 0
self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi: list[OrderedDictT[str, Any]] | None = None
# For the tkinter parent window, so we can call update_idletasks()
self.parent: tk.Tk
@ -395,7 +394,7 @@ class EDDNSender:
"""
logger.trace_if("plugin.eddn.send", "Sending message")
should_return: bool
new_data: Dict[str, Any]
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg))
if should_return:
@ -404,7 +403,7 @@ class EDDNSender:
# Even the smallest possible message compresses somewhat, so always compress
encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0)
headers: Optional[Dict[str, str]] = None
headers: dict[str, str] | None = None
if compressed:
headers = {'Content-Encoding': 'gzip'}
@ -612,7 +611,7 @@ class EDDN:
self.sender = EDDNSender(self, self.eddn_url)
self.fss_signals: List[Mapping[str, Any]] = []
self.fss_signals: list[Mapping[str, Any]] = []
def close(self):
"""Close down the EDDN class instance."""
@ -636,7 +635,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: Dict[str, Any]
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./market', {})
if should_return:
logger.warning("capi.request./market has been disabled by killswitch. Returning.")
@ -653,7 +652,7 @@ class EDDN:
modules,
ships
)
commodities: List[OrderedDictT[str, Any]] = []
commodities: list[OrderedDictT[str, Any]] = []
for commodity in data['lastStarport'].get('commodities') or []:
# Check 'marketable' and 'not prohibited'
if (category_map.get(commodity['categoryname'], True)
@ -726,7 +725,7 @@ class EDDN:
:param data: The raw CAPI data.
:return: Sanity-checked data.
"""
modules: Dict[str, Any] = data['lastStarport'].get('modules')
modules: dict[str, Any] = data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict):
if modules is None:
logger.debug('modules was None. FC or Damaged Station?')
@ -743,13 +742,13 @@ class EDDN:
# Set a safe value
modules = {}
ships: Dict[str, Any] = data['lastStarport'].get('ships')
ships: dict[str, Any] = data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict):
if ships is None:
logger.debug('ships was None')
else:
logger.error(f'ships was neither None nor a Dict! Type = {type(ships)}')
logger.error(f'ships was neither None nor a dict! Type = {type(ships)}')
# Set a safe value
ships = {'shipyard_list': {}, 'unavailable_list': []}
@ -769,7 +768,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: Dict[str, Any]
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -796,7 +795,7 @@ class EDDN:
modules.values()
)
outfitting: List[str] = sorted(
outfitting: list[str] = sorted(
self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search
)
@ -837,7 +836,7 @@ class EDDN:
:param is_beta: whether or not we are in beta mode
"""
should_return: bool
new_data: Dict[str, Any]
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -856,7 +855,7 @@ class EDDN:
ships
)
shipyard: List[Mapping[str, Any]] = sorted(
shipyard: list[Mapping[str, Any]] = sorted(
itertools.chain(
(ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()),
(ship['name'].lower() for ship in ships['unavailable_list'] or {}),
@ -899,8 +898,8 @@ class EDDN:
:param is_beta: whether or not we're in beta mode
:param entry: the journal entry containing the commodities data
"""
items: List[Mapping[str, Any]] = entry.get('Items') or []
commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([
items: list[Mapping[str, Any]] = entry.get('Items') or []
commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([
('name', self.canonicalise(commodity['Name'])),
('meanPrice', commodity['MeanPrice']),
('buyPrice', commodity['BuyPrice']),
@ -947,11 +946,11 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: The relevant journal entry
"""
modules: List[Mapping[str, Any]] = entry.get('Items', [])
modules: list[Mapping[str, Any]] = entry.get('Items', [])
horizons: bool = entry.get('Horizons', False)
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
# for module in modules if module['Name'] != 'int_planetapproachsuite'])
outfitting: List[str] = sorted(
outfitting: list[str] = sorted(
self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in
filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules)
)
@ -986,7 +985,7 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: the relevant journal entry
"""
ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
ships: list[Mapping[str, Any]] = entry.get('Pricelist') or []
horizons: bool = entry.get('Horizons', False)
shipyard = sorted(ship['ShipType'] for ship in ships)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
@ -1042,7 +1041,7 @@ class EDDN:
self.sender.send_message_by_id(msg_id)
def standard_header(
self, game_version: Optional[str] = None, game_build: Optional[str] = None
self, game_version: str | None = None, game_build: str | None = None
) -> MutableMapping[str, Any]:
"""
Return the standard header for an EDDN message, given tracked state.
@ -1134,7 +1133,7 @@ class EDDN:
def export_journal_fssdiscoveryscan(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an FSSDiscoveryScan to EDDN on the correct schema.
@ -1176,7 +1175,7 @@ class EDDN:
def export_journal_navbeaconscan(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an NavBeaconScan to EDDN on the correct schema.
@ -1218,7 +1217,7 @@ class EDDN:
def export_journal_codexentry( # noqa: CCR001
self, cmdr: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send a CodexEntry to EDDN on the correct schema.
@ -1320,7 +1319,7 @@ class EDDN:
def export_journal_scanbarycentre(
self, cmdr: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send a ScanBaryCentre to EDDN on the correct schema.
@ -1374,7 +1373,7 @@ class EDDN:
def export_journal_navroute(
self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send a NavRoute to EDDN on the correct schema.
@ -1447,7 +1446,7 @@ class EDDN:
def export_journal_fcmaterials(
self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an FCMaterials message to EDDN on the correct schema.
@ -1531,7 +1530,7 @@ class EDDN:
def export_capi_fcmaterials(
self, data: CAPIData, is_beta: bool, horizons: bool
) -> Optional[str]:
) -> str | None:
"""
Send CAPI-sourced 'onfootmicroresources' data on `fcmaterials/1` schema.
@ -1594,7 +1593,7 @@ class EDDN:
def export_journal_approachsettlement(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an ApproachSettlement to EDDN on the correct schema.
@ -1669,7 +1668,7 @@ class EDDN:
def export_journal_fssallbodiesfound(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an FSSAllBodiesFound message to EDDN on the correct schema.
@ -1719,7 +1718,7 @@ class EDDN:
def export_journal_fssbodysignals(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an FSSBodySignals message to EDDN on the correct schema.
@ -1789,7 +1788,7 @@ class EDDN:
def export_journal_fsssignaldiscovered(
self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Send an FSSSignalDiscovered message to EDDN on the correct schema.
@ -1892,7 +1891,7 @@ class EDDN:
match = self.CANONICALISE_RE.match(item)
return match and match.group(1) or item
def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_endpoint: str) -> str:
def capi_gameversion_from_host_endpoint(self, capi_host: str | None, capi_endpoint: str) -> str:
"""
Return the correct CAPI gameversion string for the given host/endpoint.
@ -1910,7 +1909,7 @@ class EDDN:
gv = 'CAPI-Legacy-'
else:
# Technically incorrect, but it will inform Listeners
# Technically incorrect, but it will inform listeners
logger.error(f"{capi_host=} lead to bad gameversion")
gv = 'CAPI-UNKNOWN-'
#######################################################################
@ -1924,7 +1923,7 @@ class EDDN:
gv += 'shipyard'
else:
# Technically incorrect, but it will inform Listeners
# Technically incorrect, but it will inform listeners
logger.error(f"{capi_endpoint=} lead to bad gameversion")
gv += 'UNKNOWN'
#######################################################################
@ -1943,7 +1942,7 @@ def plugin_start3(plugin_dir: str) -> str:
return 'EDDN'
def plugin_app(parent: tk.Tk) -> Optional[tk.Frame]:
def plugin_app(parent: tk.Tk) -> tk.Frame | None:
"""
Set up any plugin-specific UI.
@ -2183,7 +2182,7 @@ def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]:
"""
Recursively remove any dict keys with names ending `_Localised` from a dict.
:param d: Dict to filter keys of.
:param d: dict to filter keys of.
:return: The filtered dict.
"""
filtered: OrderedDictT[str, Any] = OrderedDict()
@ -2207,7 +2206,7 @@ def capi_filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]:
"""
Recursively remove any dict keys for known CAPI 'localised' names.
:param d: Dict to filter keys of.
:param d: dict to filter keys of.
:return: The filtered dict.
"""
filtered: OrderedDictT[str, Any] = OrderedDict()
@ -2234,7 +2233,7 @@ def journal_entry( # noqa: C901, CCR001
station: str,
entry: MutableMapping[str, Any],
state: Mapping[str, Any]
) -> Optional[str]:
) -> str | None:
"""
Process a new Journal entry.
@ -2491,7 +2490,7 @@ def journal_entry( # noqa: C901, CCR001
return None
def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]:
def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> str | None:
"""
Process new CAPI data for Legacy galaxy.
@ -2510,7 +2509,7 @@ def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]:
return cmdr_data(data, is_beta)
def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001
"""
Process new CAPI data for not-Legacy galaxy (might be beta).
@ -2611,7 +2610,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST
return economies_colony or modules_horizons or ship_horizons
def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None:
def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None:
"""
Process Status.json data to track things like current Body.

View File

@ -18,6 +18,8 @@ referenced in this file (or only in any other core plugin), and if so...
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
from __future__ import annotations
import json
import threading
import tkinter as tk
@ -26,7 +28,7 @@ from queue import Queue
from threading import Thread
from time import sleep
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Literal, Mapping, MutableMapping, cast
import requests
import killswitch
import monitor
@ -72,27 +74,27 @@ class This:
self.game_build = ""
# Handle only sending Live galaxy data
self.legacy_galaxy_last_notified: Optional[datetime] = None
self.legacy_galaxy_last_notified: datetime | None = None
self.session: requests.Session = requests.Session()
self.session.headers['User-Agent'] = user_agent
self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread
self.discarded_events: Set[str] = set() # List discarded events from EDSM
self.lastlookup: Dict[str, Any] # Result of last system lookup
self.discarded_events: set[str] = set() # List discarded events from EDSM
self.lastlookup: dict[str, Any] # Result of last system lookup
# Game state
self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew
self.coordinates: Optional[Tuple[int, int, int]] = None
self.coordinates: tuple[int, int, int] | None = None
self.newgame: bool = False # starting up - batch initial burst of events
self.newgame_docked: bool = False # starting up while docked
self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan
self.system_link: Optional[tk.Widget] = None
self.system_name: Optional[tk.Tk] = None
self.system_address: Optional[int] = None # Frontier SystemAddress
self.system_population: Optional[int] = None
self.station_link: Optional[tk.Widget] = None
self.station_name: Optional[str] = None
self.station_marketid: Optional[int] = None # Frontier MarketID
self.system_link: tk.Widget | None = None
self.system_name: tk.Tk | None = None
self.system_address: int | None = None # Frontier SystemAddress
self.system_population: int | None = None
self.station_link: tk.Widget | None = None
self.station_name: str | None = None
self.station_marketid: int | None = None # Frontier MarketID
self.on_foot = False
self._IMG_KNOWN = None
@ -100,21 +102,21 @@ class This:
self._IMG_NEW = None
self._IMG_ERROR = None
self.thread: Optional[threading.Thread] = None
self.thread: threading.Thread | None = None
self.log: Optional[tk.IntVar] = None
self.log_button: Optional[ttk.Checkbutton] = None
self.log: tk.IntVar | None = None
self.log_button: ttk.Checkbutton | None = None
self.label: Optional[tk.Widget] = None
self.label: tk.Widget | None = None
self.cmdr_label: Optional[nb.Label] = None
self.cmdr_text: Optional[nb.Label] = None
self.cmdr_label: nb.Label | None = None
self.cmdr_text: nb.Label | None = None
self.user_label: Optional[nb.Label] = None
self.user: Optional[nb.Entry] = None
self.user_label: nb.Label | None = None
self.user: nb.Entry | None = None
self.apikey_label: Optional[nb.Label] = None
self.apikey: Optional[nb.Entry] = None
self.apikey_label: nb.Label | None = None
self.apikey: nb.Entry | None = None
this = This()
@ -277,7 +279,7 @@ def toggle_password_visibility():
this.apikey.config(show="*") # type: ignore
def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
"""
Plugin preferences setup hook.
@ -361,7 +363,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk
return frame
def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001
def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001
"""
Handle the Commander name changing whilst Settings was open.
@ -390,7 +392,7 @@ def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR
# LANG: We have no data on the current commander
this.cmdr_text['text'] = _('None')
to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED
to_set: Literal['normal'] | Literal['disabled'] = tk.DISABLED
if cmdr and not is_beta and this.log and this.log.get():
to_set = tk.NORMAL
@ -440,9 +442,9 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
config.set('edsm_out', this.log.get())
if cmdr and not is_beta:
cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[])
usernames: List[str] = config.get_list('edsm_usernames', default=[])
apikeys: List[str] = config.get_list('edsm_apikeys', default=[])
cmdrs: list[str] = config.get_list('edsm_cmdrs', default=[])
usernames: list[str] = config.get_list('edsm_usernames', default=[])
apikeys: list[str] = config.get_list('edsm_apikeys', default=[])
if this.user and this.apikey:
if cmdr in cmdrs:
@ -460,7 +462,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
config.set('edsm_apikeys', apikeys)
def credentials(cmdr: str) -> Optional[Tuple[str, str]]:
def credentials(cmdr: str) -> tuple[str, str] | None:
"""
Get credentials for the given commander, if they exist.
@ -635,7 +637,7 @@ Queueing: {entry!r}'''
# Update system data
def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001
"""
Process new CAPI data.
@ -722,7 +724,7 @@ def worker() -> None: # noqa: CCR001 C901
:return: None
"""
logger.debug('Starting...')
pending: List[Mapping[str, Any]] = [] # Unsent events
pending: list[Mapping[str, Any]] = [] # Unsent events
closing = False
cmdr: str = ""
last_game_version = ""
@ -744,7 +746,7 @@ def worker() -> None: # noqa: CCR001 C901
logger.debug(f'{this.shutting_down=}, so setting closing = True')
closing = True
item: Optional[Tuple[str, str, str, Mapping[str, Any]]] = this.queue.get()
item: tuple[str, str, str, Mapping[str, Any]] | None = this.queue.get()
if item:
(cmdr, game_version, game_build, entry) = item
logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})')
@ -756,7 +758,7 @@ def worker() -> None: # noqa: CCR001 C901
retrying = 0
while retrying < 3:
if item is None:
item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {}))
item = cast(tuple[str, str, str, Mapping[str, Any]], ("", {}))
should_skip, new_item = killswitch.check_killswitch(
'plugins.edsm.worker',
item,
@ -909,7 +911,7 @@ def worker() -> None: # noqa: CCR001 C901
last_game_build = game_build
def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001
def should_send(entries: list[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001
"""
Whether or not any of the given entries should be sent to EDSM.

View File

@ -18,11 +18,13 @@ referenced in this file (or only in any other core plugin), and if so...
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
from __future__ import annotations
import base64
import gzip
import io
import json
from typing import Any, Mapping, Union
from typing import Any, Mapping
def plugin_start3(plugin_dir: str) -> str:
@ -36,7 +38,7 @@ def plugin_start3(plugin_dir: str) -> str:
# Return a URL for the current ship
def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]:
def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> bool | str:
"""
Construct a URL for ship loadout.

View File

@ -18,6 +18,7 @@ referenced in this file (or only in any other core plugin), and if so...
`build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT
IN AN END-USER INSTALLATION ON WINDOWS.
"""
from __future__ import annotations
import json
import threading
@ -29,9 +30,8 @@ from datetime import datetime, timedelta, timezone
from operator import itemgetter
from threading import Lock, Thread
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Callable, Deque, Mapping, NamedTuple, Sequence, cast, Union
from typing import OrderedDict as OrderedDictT
from typing import Sequence, Union, cast
import requests
import edmc_data
import killswitch
@ -63,8 +63,8 @@ CREDITS_DELTA_MIN_ABSOLUTE = 10_000_000 # Absolute difference threshold
class Credentials(NamedTuple):
"""Credentials holds the set of credentials required to identify an inara API payload to inara."""
cmdr: Optional[str]
fid: Optional[str]
cmdr: str | None
fid: str | None
api_key: str
@ -89,25 +89,25 @@ class This:
self.parent: tk.Tk
# Handle only sending Live galaxy data
self.legacy_galaxy_last_notified: Optional[datetime] = None
self.legacy_galaxy_last_notified: datetime | None = None
self.lastlocation = None # eventData from the last Commander's Flight Log event
self.lastship = None # eventData from the last addCommanderShip or setCommanderShip event
# Cached Cmdr state
self.cmdr: Optional[str] = None
self.FID: Optional[str] = None # Frontier ID
self.cmdr: str | None = None
self.FID: str | None = None # Frontier ID
self.multicrew: bool = False # don't send captain's ship info to Inara while on a crew
self.newuser: bool = False # just entered API Key - send state immediately
self.newsession: bool = True # starting a new session - wait for Cargo event
self.undocked: bool = False # just undocked
self.suppress_docked = False # Skip initial Docked event if started docked
self.cargo: Optional[List[OrderedDictT[str, Any]]] = None
self.materials: Optional[List[OrderedDictT[str, Any]]] = None
self.cargo: list[OrderedDictT[str, Any]] | None = None
self.materials: list[OrderedDictT[str, Any]] | None = None
self.last_credits: int = 0 # Send credit update soon after Startup / new game
self.storedmodules: Optional[List[OrderedDictT[str, Any]]] = None
self.loadout: Optional[OrderedDictT[str, Any]] = None
self.fleet: Optional[List[OrderedDictT[str, Any]]] = None
self.storedmodules: list[OrderedDictT[str, Any]] | None = None
self.loadout: OrderedDictT[str, Any] | None = None
self.fleet: list[OrderedDictT[str, Any]] | None = None
self.shipswap: bool = False # just swapped ship
self.on_foot = False
@ -115,9 +115,9 @@ class This:
# Main window clicks
self.system_link: tk.Widget = None # type: ignore
self.system_name: Optional[str] = None # type: ignore
self.system_address: Optional[str] = None # type: ignore
self.system_population: Optional[int] = None
self.system_name: str | None = None # type: ignore
self.system_address: str | None = None # type: ignore
self.system_population: int | None = None
self.station_link: tk.Widget = None # type: ignore
self.station = None
self.station_marketid = None
@ -129,7 +129,7 @@ class This:
self.apikey: nb.Entry
self.apikey_label: tk.Label
self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque)
self.events: dict[Credentials, Deque[Event]] = defaultdict(deque)
self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events
def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]) -> None:
@ -361,7 +361,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
)
def credentials(cmdr: Optional[str]) -> Optional[str]:
def credentials(cmdr: str | None) -> str | None:
"""
Get the credentials for the current commander.
@ -383,7 +383,7 @@ def credentials(cmdr: Optional[str]) -> Optional[str]:
def journal_entry( # noqa: C901, CCR001
cmdr: str, is_beta: bool, system: str, station: str, entry: Dict[str, Any], state: Dict[str, Any]
cmdr: str, is_beta: bool, system: str, station: str, entry: dict[str, Any], state: dict[str, Any]
) -> str:
"""
Journal entry hook.
@ -394,7 +394,7 @@ def journal_entry( # noqa: C901, CCR001
# causing users to spam Inara with 'URL provider' queries, and we want to
# stop that.
should_return: bool
new_entry: Dict[str, Any] = {}
new_entry: dict[str, Any] = {}
should_return, new_entry = killswitch.check_killswitch('plugins.inara.journal', entry, logger)
if should_return:
@ -813,7 +813,7 @@ def journal_entry( # noqa: C901, CCR001
# Fleet
if event_name == 'StoredShips':
fleet: List[OrderedDictT[str, Any]] = sorted(
fleet: list[OrderedDictT[str, Any]] = sorted(
[OrderedDict({
'shipType': x['ShipType'],
'shipGameID': x['ShipID'],
@ -860,7 +860,7 @@ def journal_entry( # noqa: C901, CCR001
# Stored modules
if event_name == 'StoredModules':
items = {mod['StorageSlot']: mod for mod in entry['Items']} # Impose an order
modules: List[OrderedDictT[str, Any]] = []
modules: list[OrderedDictT[str, Any]] = []
for slot in sorted(items):
item = items[slot]
module: OrderedDictT[str, Any] = OrderedDict([
@ -1088,7 +1088,7 @@ def journal_entry( # noqa: C901, CCR001
#
# So we're going to do a lot of checking here and bail out if we dont like the look of ANYTHING here
to_send_data: Optional[Dict[str, Any]] = {} # This is a glorified sentinel until lower down.
to_send_data: dict[str, Any] | None = {} # This is a glorified sentinel until lower down.
# On Horizons, neither of these exist on TouchDown
star_system_name = entry.get('StarSystem', this.system_name)
body_name = entry.get('Body', state['Body'] if state['BodyType'] == 'Planet' else None)
@ -1370,7 +1370,7 @@ def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later
pass
def make_loadout(state: Dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001
def make_loadout(state: dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001
"""
Construct an inara loadout from an event.
@ -1440,8 +1440,8 @@ def new_add_event(
name: str,
timestamp: str,
data: EVENT_DATA,
cmdr: Optional[str] = None,
fid: Optional[str] = None
cmdr: str | None = None,
fid: str | None = None
):
"""
Add a journal event to the queue, to be sent to inara at the next opportunity.
@ -1470,11 +1470,11 @@ def new_add_event(
this.events[key].append(Event(name, timestamp, data))
def clean_event_list(event_list: List[Event]) -> List[Event]:
def clean_event_list(event_list: list[Event]) -> list[Event]:
"""
Check for killswitched events and remove or modify them as requested.
:param event_list: List of events to clean
:param event_list: list of events to clean
:return: Cleaned list of events
"""
cleaned_events = []
@ -1533,14 +1533,14 @@ def new_worker():
logger.debug('Done.')
def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]:
def get_events(clear: bool = True) -> dict[Credentials, list[Event]]:
"""
Fetch a copy of all events from the current queue.
:param clear: whether to clear the queues as we go, defaults to True
:return: a copy of the event dictionary
"""
events_copy: Dict[Credentials, List[Event]] = {}
events_copy: dict[Credentials, list[Event]] = {}
with this.event_lock:
for key, events in this.events.items():
@ -1590,7 +1590,7 @@ def send_data(url: str, data: Mapping[str, Any]) -> bool:
return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such
def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None:
def handle_api_error(data: Mapping[str, Any], status: int, reply: dict[str, Any]) -> None:
"""
Handle API error response.
@ -1604,7 +1604,7 @@ def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]
plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message))
def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None:
def handle_success_reply(data: Mapping[str, Any], reply: dict[str, Any]) -> None:
"""
Handle successful API response.
@ -1619,7 +1619,7 @@ def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None
handle_special_events(data_event, reply_event)
def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None:
def handle_individual_error(data_event: dict[str, Any], reply_status: int, reply_text: str) -> None:
"""
Handle individual API error.
@ -1638,7 +1638,7 @@ def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply
))
def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None:
def handle_special_events(data_event: dict[str, Any], reply_event: dict[str, Any]) -> None:
"""
Handle special events in the API response.

View File

@ -37,7 +37,7 @@ def test_class_logger(caplog: 'LogCaptureFixture') -> None:
ClassVarLogger.set_logger(logger)
ClassVarLogger.logger.debug('test') # type: ignore # its there
ClassVarLogger.logger.info('test2') # type: ignore # its there
log_stuff('test3') # type: ignore # its there
log_stuff('test3')
# Dont move these, it relies on the line numbres.
assert 'EDMarketConnector.EDMCLogging.py:test_logging_classvar.py:38 test' in caplog.text

View File

@ -1,12 +1,13 @@
# type: ignore
"""Old Configuration Test File."""
from __future__ import annotations
import numbers
import sys
import warnings
from configparser import NoOptionError
from os import getenv, makedirs, mkdir, pardir
from os.path import dirname, expanduser, isdir, join, normpath
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING
from config import applongname, appname, update_interval
from EDMCLogging import get_main_logger
@ -81,7 +82,7 @@ elif sys.platform == 'win32':
RegDeleteValue.restype = LONG
RegDeleteValue.argtypes = [HKEY, LPCWSTR]
def known_folder_path(guid: uuid.UUID) -> Optional[str]:
def known_folder_path(guid: uuid.UUID) -> str | None:
"""Look up a Windows GUID to actual folder path name."""
buf = ctypes.c_wchar_p()
if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)):
@ -95,7 +96,7 @@ elif sys.platform == 'linux':
from configparser import RawConfigParser
class OldConfig():
class OldConfig:
"""Object that holds all configuration data."""
OUT_EDDN_SEND_STATION_DATA = 1
@ -153,20 +154,19 @@ class OldConfig():
if not self.get('outdir') or not isdir(str(self.get('outdir'))):
self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0])
def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]:
def get(self, key: str, default: None | list | str = None) -> None | list | str:
"""Look up a string configuration value."""
val = self.settings.get(key)
if val is None:
return default
elif isinstance(val, str):
if isinstance(val, str):
return str(val)
elif isinstance(val, list):
if isinstance(val, list):
return list(val) # make writeable
else:
return default
return default
def getint(self, key: str, default: int = 0) -> int:
"""Look up an integer configuration value."""
@ -181,7 +181,7 @@ class OldConfig():
logger.debug('The exception type is ...', exc_info=e)
return default
def set(self, key: str, val: Union[int, str, list]) -> None:
def set(self, key: str, val: int | str | list) -> None:
"""Set value on the specified configuration key."""
self.settings[key] = val
@ -202,7 +202,7 @@ class OldConfig():
elif sys.platform == 'win32':
def __init__(self):
self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore # Not going to change
self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore
if not isdir(self.app_dir):
mkdir(self.app_dir)
@ -279,10 +279,10 @@ class OldConfig():
RegSetValueEx(sparklekey, 'UpdateInterval', 0, 1, buf, len(buf) * 2)
RegCloseKey(sparklekey)
if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change
if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore
self.set('outdir', known_folder_path(FOLDERID_Documents) or self.home)
def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]:
def get(self, key: str, default: None | list | str = None) -> None | list | str:
"""Look up a string configuration value."""
key_type = DWORD()
key_size = DWORD()
@ -304,11 +304,10 @@ class OldConfig():
if RegQueryValueEx(self.hkey, key, 0, ctypes.byref(key_type), buf, ctypes.byref(key_size)):
return default
elif key_type.value == REG_MULTI_SZ:
if key_type.value == REG_MULTI_SZ:
return list(ctypes.wstring_at(buf, len(buf)-2).split('\x00'))
else:
return str(buf.value)
return str(buf.value)
def getint(self, key: str, default: int = 0) -> int:
"""Look up an integer configuration value."""
@ -328,10 +327,9 @@ class OldConfig():
):
return default
else:
return key_val.value
return key_val.value
def set(self, key: str, val: Union[int, str, list]) -> None:
def set(self, key: str, val: int | str | list) -> None:
"""Set value on the specified configuration key."""
if isinstance(val, str):
buf = ctypes.create_unicode_buffer(val)
@ -388,7 +386,7 @@ class OldConfig():
self.config = RawConfigParser(comment_prefixes=('#',))
try:
with codecs.open(self.filename, 'r') as h:
with codecs.open(self.filename) as h:
self.config.read_file(h)
except Exception as e:
@ -398,7 +396,7 @@ class OldConfig():
if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change
self.set('outdir', expanduser('~'))
def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]:
def get(self, key: str, default: None | list | str = None) -> None | list | str:
"""Look up a string configuration value."""
try:
val = self.config.get(self.SECTION, key)
@ -407,8 +405,7 @@ class OldConfig():
# so we add a spurious ';' entry in set() and remove it here
assert val.split('\n')[-1] == ';', val.split('\n')
return [self._unescape(x) for x in val.split('\n')[:-1]]
else:
return self._unescape(val)
return self._unescape(val)
except NoOptionError:
logger.debug(f'attempted to get key {key} that does not exist')
@ -434,13 +431,13 @@ class OldConfig():
return default
def set(self, key: str, val: Union[int, str, list]) -> None:
def set(self, key: str, val: int | str | list) -> None:
"""Set value on the specified configuration key."""
if isinstance(val, bool):
self.config.set(self.SECTION, key, val and '1' or '0') # type: ignore # Not going to change
self.config.set(self.SECTION, key, val and '1' or '0')
elif isinstance(val, str) or isinstance(val, numbers.Integral):
self.config.set(self.SECTION, key, self._escape(val)) # type: ignore # Not going to change
elif isinstance(val, (numbers.Integral, str)):
self.config.set(self.SECTION, key, self._escape(val))
elif isinstance(val, list):
self.config.set(self.SECTION, key, '\n'.join([self._escape(x) for x in val] + [';']))
@ -460,7 +457,7 @@ class OldConfig():
def close(self) -> None:
"""Close the configuration."""
self.save()
self.config = None
self.config = None # type: ignore
def _escape(self, val: str) -> str:
"""Escape a string for storage."""

View File

@ -7,15 +7,13 @@ key deletions. Said modifications are to keys that are generated internally.
Most of these tests are parity tests with the "old" config, and likely one day can be
entirely removed.
"""
from __future__ import annotations
import contextlib
import itertools
import pathlib
import random
import string
import sys
from typing import Any, Iterable, List, cast
from typing import Any, Iterable, cast
import pytest
from pytest import mark
@ -30,12 +28,12 @@ from _old_config import old_config # noqa: E402
from config import config # noqa: E402
def _fuzz_list(length: int) -> List[str]:
def _fuzz_list(length: int) -> list[str]:
out = []
for _ in range(length):
out.append(_fuzz_generators[str](random.randint(0, 1337)))
return cast(List[str], out)
return cast(list[str], out)
_fuzz_generators = { # Type annotating this would be a nightmare.
@ -72,7 +70,7 @@ bool_tests = [True, False]
big_int = int(0xFFFFFFFF) # 32 bit int
def _make_params(args: List[Any], id_name: str = 'random_test_{i}') -> list:
def _make_params(args: list[Any], id_name: str = 'random_test_{i}') -> list:
return [pytest.param(x, id=id_name.format(i=i)) for i, x in enumerate(args)]
@ -81,7 +79,7 @@ def _build_test_list(static_data, random_data, random_id_name='random_test_{i}')
class TestNewConfig:
"""Test the new config with an array of hand picked and random data."""
"""Test the new config with an array of hand-picked and random data."""
def __update_linuxconfig(self) -> None:
"""On linux config uses ConfigParser, which doesn't update from disk changes. Force the update here."""
@ -117,7 +115,7 @@ class TestNewConfig:
config.delete(name)
@mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list)))
def test_list(self, lst: List[str]) -> None:
def test_list(self, lst: list[str]) -> None:
"""Save a list and then ask for it back."""
name = f'list_test_{ hash("".join(lst)) }'
config.set(name, lst)
@ -216,7 +214,7 @@ class TestOldNewConfig:
assert res == string
@mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list)))
def test_list(self, lst: List[str]) -> None:
def test_list(self, lst: list[str]) -> None:
"""Save a list though the old config, recall it using the new config."""
lst = [x.replace("\r", "") for x in lst] # OldConfig on linux fails to store these correctly
if sys.platform == 'win32':

View File

@ -1,13 +1,13 @@
"""Tests for journal_lock.py code."""
from __future__ import annotations
import multiprocessing as mp
import os
import pathlib
import sys
from typing import Generator
import pytest
from pytest import MonkeyPatch, TempdirFactory, TempPathFactory
from config import config
from journal_lock import JournalLock, JournalLockResult
@ -142,7 +142,7 @@ class TestJournalLock:
def get_str(key: str, *, default: str | None = None) -> str:
"""Mock config.*Config get_str to provide fake journaldir."""
if key == 'journaldir':
return tmp_path_factory.mktemp("changing")
return tmp_path_factory.mktemp("changing") # type: ignore
print('Other key, calling up ...')
return config.get_str(key) # Call the non-mocked
@ -301,7 +301,7 @@ class TestJournalLock:
# Need to release any handles on the lockfile else the sub-process
# might not be able to clean up properly, and that will impact
# on later tests.
jlock.journal_dir_lockfile.close()
jlock.journal_dir_lockfile.close() # type: ignore
print('Telling sub-process to quit...')
exit_q.put('quit')

View File

@ -1,6 +1,8 @@
"""Test the apply functions used by killswitch to modify data."""
from __future__ import annotations
import copy
from typing import Any, Optional
from typing import Any
import pytest
@ -33,11 +35,11 @@ def test_apply(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, resul
def test_apply_errors() -> None:
"""_apply should fail when passed something that isn't a Sequence or MutableMapping."""
with pytest.raises(ValueError, match=r'Dont know how to'):
killswitch._apply(set(), '0', None, False) # type: ignore # Its intentional that its broken
killswitch._apply(None, '', None) # type: ignore # Its intentional that its broken
killswitch._apply(set(), '0') # type: ignore # Its intentional that its broken
killswitch._apply(None, '') # type: ignore # Its intentional that its broken
with pytest.raises(ValueError, match=r'Cannot use string'):
killswitch._apply([], 'test', None, False)
killswitch._apply([], 'test')
def test_apply_no_error() -> None:
@ -61,7 +63,7 @@ def test_apply_no_error() -> None:
(False, 0), (str((1 << 63)-1), (1 << 63)-1), (True, 1), (str(1 << 1337), 1 << 1337)
]
)
def test_get_int(input: str, expected: Optional[int]) -> None:
def test_get_int(input: str, expected: int | None) -> None:
"""Check that _get_int doesn't throw when handed bad data."""
assert expected == killswitch._get_int(input)

View File

@ -1,7 +1,7 @@
"""Tests of killswitch behaviour."""
import copy
from typing import Optional
from __future__ import annotations
import copy
import pytest
import semantic_version
@ -34,7 +34,7 @@ TEST_SET = killswitch.KillSwitchSet([
],
)
def test_killswitch(
input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: Optional[killswitch.UPDATABLE_DATA],
input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: killswitch.UPDATABLE_DATA | None,
version: str
) -> None:
"""Simple killswitch tests."""