diff --git a/.flake8 b/.flake8 index c8dc0cc6..7207bf81 100644 --- a/.flake8 +++ b/.flake8 @@ -8,6 +8,7 @@ exclude = venv/ .venv/ wix/ + hotkey/darwin.py # FIXME: Check under macOS VM at some point # Show exactly where in a line the error happened #show-source = True diff --git a/.github/workflows/push-checks.yml b/.github/workflows/push-checks.yml index e0c98897..07f45e75 100644 --- a/.github/workflows/push-checks.yml +++ b/.github/workflows/push-checks.yml @@ -9,10 +9,15 @@ name: Push-Checks on: push: - branches: [ develop ] + # We'll catch issues on `develop` or any PR branch. + branches-ignore: + - 'main' + - 'stable' + - 'releases' + - 'beta' jobs: - build: + push_checks: runs-on: ubuntu-22.04 @@ -59,3 +64,7 @@ jobs: grep -E -z -Z '\.py$' | \ xargs -0 flake8 --count --statistics --extend-ignore D ####################################################################### + + - name: mypy type checks + run: | + ./scripts/mypy-all.sh --platform win32 diff --git a/.mypy.ini b/.mypy.ini index 10ef53b7..75f4ccdb 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -1,5 +1,5 @@ [mypy] -follow_imports = skip +follow_imports = normal ignore_missing_imports = True scripts_are_modules = True ; Without this bare `mypy ` may get warnings for e.g. @@ -7,3 +7,4 @@ scripts_are_modules = True ; i.e. no typing info. check_untyped_defs = True ; platform = darwin +explicit_package_bases = True diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d841cda1..5b534279 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -51,7 +51,7 @@ repos: - id: mypy # verbose: true # log_file: 'pre-commit_mypy.log' - additional_dependencies: [ types-requests ] + additional_dependencies: [ types-pkg-resources, types-requests, types-urllib3 ] # args: [ "--follow-imports", "skip", "--ignore-missing-imports", "--scripts-are-modules" ] ### # pydocstyle.exe diff --git a/Build-exe-and-msi.py b/Build-exe-and-msi.py index adc04432..71e1eace 100644 --- a/Build-exe-and-msi.py +++ b/Build-exe-and-msi.py @@ -28,6 +28,9 @@ if sys.platform == 'win32': else: raise AssertionError(f'Unsupported platform {sys.platform}') + +# This added to make mypy happy +assert sys.platform == 'win32' ########################################################################### ########################################################################### diff --git a/EDMC.py b/EDMC.py index 4daf4af3..2a7d7cd4 100755 --- a/EDMC.py +++ b/EDMC.py @@ -71,7 +71,7 @@ def versioncmp(versionstring) -> List: return list(map(int, versionstring.split('.'))) -def deep_get(target: dict, *args: str, default=None) -> Any: +def deep_get(target: dict | companion.CAPIData, *args: str, default=None) -> Any: """ Walk into a dict and return the specified deep value. @@ -224,12 +224,15 @@ sys.path: {sys.path}''' logger.debug(f'logdir = "{monitor.currentdir}"') logfile = monitor.journal_newest_filename(monitor.currentdir) + if logfile is None: + raise ValueError("None from monitor.journal_newest_filename") logger.debug(f'Using logfile "{logfile}"') - with open(logfile, 'r', encoding='utf-8') as loghandle: + with open(logfile, 'rb', 0) as loghandle: for line in loghandle: try: monitor.parse_entry(line) + except Exception: logger.debug(f'Invalid journal entry {line!r}') @@ -410,7 +413,23 @@ sys.path: {sys.path}''' # Retry for shipyard sleep(SERVER_RETRY) - new_data = companion.session.station() + companion.session.station(int(time())) + # Wait for the response + _capi_request_timeout = 60 + try: + capi_response = companion.session.capi_response_queue.get( + block=True, timeout=_capi_request_timeout + ) + + except queue.Empty: + logger.error(f'CAPI requests timed out after {_capi_request_timeout} seconds') + sys.exit(EXIT_SERVER) + + if isinstance(capi_response, companion.EDMCCAPIFailedRequest): + logger.error(f'Failed Request: {capi_response.message}') + sys.exit(EXIT_SERVER) + + new_data = capi_response.capi_data # might have undocked while we were waiting for retry in which case station data is unreliable if new_data['commander'].get('docked') and \ deep_get(new_data, 'lastSystem', 'name') == monitor.system and \ diff --git a/EDMCLogging.py b/EDMCLogging.py index 1bf83bc7..e698b35b 100644 --- a/EDMCLogging.py +++ b/EDMCLogging.py @@ -145,7 +145,7 @@ class Logger: logging.Logger instance. """ - def __init__(self, logger_name: str, loglevel: int = _default_loglevel): + def __init__(self, logger_name: str, loglevel: int | str = _default_loglevel): """ Set up a `logging.Logger` with our preferred configuration. @@ -216,7 +216,7 @@ class Logger: """ return self.logger_channel - def set_channels_loglevel(self, level: int) -> None: + def set_channels_loglevel(self, level: int | str) -> None: """ Set the specified log level on the channels. @@ -226,7 +226,7 @@ class Logger: self.logger_channel.setLevel(level) self.logger_channel_rotating.setLevel(level) - def set_console_loglevel(self, level: int) -> None: + def set_console_loglevel(self, level: int | str) -> None: """ Set the specified log level on the console channel. @@ -541,7 +541,7 @@ def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin': # Singleton -loglevel = config.get_str('loglevel') +loglevel: str | int = config.get_str('loglevel') if not loglevel: loglevel = logging.INFO diff --git a/EDMarketConnector.py b/EDMarketConnector.py index 4cabfa94..ad043c93 100755 --- a/EDMarketConnector.py +++ b/EDMarketConnector.py @@ -16,7 +16,7 @@ from builtins import object, str from os import chdir, environ from os.path import dirname, join from time import localtime, strftime, time -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union # Have this as early as possible for people running EDMarketConnector.exe # from cmd.exe or a bat file or similar. Else they might not be in the correct @@ -594,7 +594,7 @@ class AppWindow(object): # The type needs defining for adding the menu entry, but won't be # properly set until later - self.updater: update.Updater = None + self.updater: update.Updater | None = None self.menubar = tk.Menu() if sys.platform == 'darwin': @@ -649,7 +649,9 @@ class AppWindow(object): self.help_menu.add_command(command=self.help_general) self.help_menu.add_command(command=self.help_privacy) self.help_menu.add_command(command=self.help_releases) - self.help_menu.add_command(command=lambda: self.updater.check_for_updates()) + if self.updater is not None: + self.help_menu.add_command(command=lambda: self.updater.check_for_updates()) + self.help_menu.add_command(command=lambda: not self.HelpAbout.showing and self.HelpAbout(self.w)) self.menubar.add_cascade(menu=self.help_menu) @@ -917,7 +919,7 @@ class AppWindow(object): def login(self): """Initiate CAPI/Frontier login and set other necessary state.""" should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.auth', {}) if should_return: @@ -1011,8 +1013,7 @@ class AppWindow(object): """ logger.trace_if('capi.worker', 'Begin') should_return: bool - new_data: Dict[str, Any] = {} - + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.auth', {}) if should_return: logger.warning('capi.auth has been disabled via killswitch. Returning.') @@ -1100,7 +1101,7 @@ class AppWindow(object): """ logger.trace_if('capi.worker', 'Begin') should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request.fleetcarrier', {}) if should_return: @@ -1311,7 +1312,7 @@ class AppWindow(object): play_bad = True should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) if should_return: @@ -1504,7 +1505,7 @@ class AppWindow(object): config.set('cmdrs', config.get_list('cmdrs', default=[]) + [monitor.cmdr]) self.login() - if monitor.mode == 'CQC' and entry['event']: + if monitor.cmdr and monitor.mode == 'CQC' and entry['event']: err = plug.notify_journal_entry_cqc(monitor.cmdr, monitor.is_beta, entry, monitor.state) if err: self.status['text'] = err @@ -1522,7 +1523,9 @@ class AppWindow(object): # Disable WinSparkle automatic update checks, IFF configured to do so when in-game if config.get_int('disable_autoappupdatecheckingame') and 1: - self.updater.set_automatic_updates_check(False) + if self.updater is not None: + self.updater.set_automatic_updates_check(False) + logger.info('Monitor: Disable WinSparkle automatic update checks') # Can't start dashboard monitoring @@ -1534,12 +1537,16 @@ class AppWindow(object): and config.get_int('output') & config.OUT_SHIP: monitor.export_ship() - err = plug.notify_journal_entry(monitor.cmdr, - monitor.is_beta, - monitor.system, - monitor.station, - entry, - monitor.state) + if monitor.cmdr and monitor.system and monitor.station: + err = plug.notify_journal_entry( + monitor.cmdr, + monitor.is_beta, + monitor.system, + monitor.station, + entry, + monitor.state + ) + if err: self.status['text'] = err if not config.get_int('hotkey_mute'): @@ -1568,7 +1575,7 @@ class AppWindow(object): auto_update = True should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] if auto_update: should_return, new_data = killswitch.check_killswitch('capi.auth', {}) @@ -1583,7 +1590,9 @@ class AppWindow(object): if entry['event'] == 'ShutDown': # Enable WinSparkle automatic update checks # NB: Do this blindly, in case option got changed whilst in-game - self.updater.set_automatic_updates_check(True) + if self.updater is not None: + self.updater.set_automatic_updates_check(True) + logger.info('Monitor: Enable WinSparkle automatic update checks') def auth(self, event=None) -> None: @@ -1626,7 +1635,9 @@ class AppWindow(object): entry = dashboard.status # Currently we don't do anything with these events - err = plug.notify_dashboard_entry(monitor.cmdr, monitor.is_beta, entry) + if monitor.cmdr: + err = plug.notify_dashboard_entry(monitor.cmdr, monitor.is_beta, entry) + if err: self.status['text'] = err if not config.get_int('hotkey_mute'): @@ -1634,13 +1645,13 @@ class AppWindow(object): def plugin_error(self, event=None) -> None: """Display asynchronous error from plugin.""" - if plug.last_error.get('msg'): - self.status['text'] = plug.last_error['msg'] + if plug.last_error.msg: + self.status['text'] = plug.last_error.msg self.w.update_idletasks() if not config.get_int('hotkey_mute'): hotkeymgr.play_bad() - def shipyard_url(self, shipname: str) -> str: + def shipyard_url(self, shipname: str) -> str | None: """Despatch a ship URL to the configured handler.""" if not (loadout := monitor.ship()): logger.warning('No ship loadout, aborting.') @@ -1667,11 +1678,11 @@ class AppWindow(object): return f'file://localhost/{file_name}' - def system_url(self, system: str) -> str: + def system_url(self, system: str) -> str | None: """Despatch a system URL to the configured handler.""" return plug.invoke(config.get_str('system_provider'), 'EDSM', 'system_url', monitor.system) - def station_url(self, station: str) -> str: + def station_url(self, station: str) -> str | None: """Despatch a station URL to the configured handler.""" return plug.invoke(config.get_str('station_provider'), 'eddb', 'station_url', monitor.system, monitor.station) @@ -1694,10 +1705,11 @@ class AppWindow(object): monitor.system and tk.NORMAL or tk.DISABLED) - def ontop_changed(self, event=None) -> None: - """Set main window 'on top' state as appropriate.""" - config.set('always_ontop', self.always_ontop.get()) - self.w.wm_attributes('-topmost', self.always_ontop.get()) + if sys.platform == 'win32': + def ontop_changed(self, event=None) -> None: + """Set main window 'on top' state as appropriate.""" + config.set('always_ontop', self.always_ontop.get()) + self.w.wm_attributes('-topmost', self.always_ontop.get()) def copy(self, event=None) -> None: """Copy system, and possible station, name to clipboard.""" @@ -1748,7 +1760,7 @@ class AppWindow(object): self.resizable(tk.FALSE, tk.FALSE) - frame = ttk.Frame(self) + frame = tk.Frame(self) frame.grid(sticky=tk.NSEW) row = 1 @@ -1761,7 +1773,7 @@ class AppWindow(object): ############################################################ # version - ttk.Label(frame).grid(row=row, column=0) # spacer + tk.Label(frame).grid(row=row, column=0) # spacer row += 1 self.appversion_label = tk.Label(frame, text=appversion()) self.appversion_label.grid(row=row, column=0, sticky=tk.E) @@ -1837,13 +1849,14 @@ class AppWindow(object): with open(f, 'wb') as h: h.write(str(companion.session.capi_raw_data).encode(encoding='utf-8')) - def exit_tray(self, systray: 'SysTrayIcon') -> None: - """Tray icon is shutting down.""" - exit_thread = threading.Thread( - target=self.onexit, - daemon=True, - ) - exit_thread.start() + if sys.platform == 'win32': + def exit_tray(self, systray: 'SysTrayIcon') -> None: + """Tray icon is shutting down.""" + exit_thread = threading.Thread( + target=self.onexit, + daemon=True, + ) + exit_thread.start() def onexit(self, event=None) -> None: """Application shutdown procedure.""" @@ -1869,7 +1882,8 @@ class AppWindow(object): # First so it doesn't interrupt us logger.info('Closing update checker...') - self.updater.close() + if self.updater is not None: + self.updater.close() # Earlier than anything else so plugin code can't interfere *and* it # won't still be running in a manner that might rely on something @@ -2018,11 +2032,9 @@ def show_killswitch_poppup(root=None): idx += 1 idx += 1 - ok_button = tk.Button(frame, text="ok", command=tl.destroy) + ok_button = tk.Button(frame, text="Ok", command=tl.destroy) ok_button.grid(columnspan=2, sticky=tk.EW) - theme.apply(tl) - # Run the app if __name__ == "__main__": # noqa: C901 @@ -2167,10 +2179,13 @@ sys.path: {sys.path}''' if not ui_scale: ui_scale = 100 config.set('ui_scale', ui_scale) + theme.default_ui_scale = root.tk.call('tk', 'scaling') logger.trace_if('tk', f'Default tk scaling = {theme.default_ui_scale}') theme.startup_ui_scale = ui_scale - root.tk.call('tk', 'scaling', theme.default_ui_scale * float(ui_scale) / 100.0) + if theme.default_ui_scale is not None: + root.tk.call('tk', 'scaling', theme.default_ui_scale * float(ui_scale) / 100.0) + app = AppWindow(root) def messagebox_not_py3(): diff --git a/collate.py b/collate.py index 3ab5224b..48007527 100755 --- a/collate.py +++ b/collate.py @@ -100,7 +100,7 @@ def addmodules(data): # noqa: C901, CCR001 if not data['lastStarport'].get('modules'): return - outfile = 'outfitting.csv' + outfile = pathlib.Path('outfitting.csv') modules = {} fields = ('id', 'symbol', 'category', 'name', 'mount', 'guidance', 'ship', 'class', 'rating', 'entitlement') diff --git a/companion.py b/companion.py index fe12ecde..f464e0d6 100644 --- a/companion.py +++ b/companion.py @@ -329,7 +329,7 @@ class Auth(object): logger.debug(f'Trying for "{self.cmdr}"') should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.auth', {}) if should_return: @@ -675,7 +675,7 @@ class Session(object): :return: True if login succeeded, False if re-authorization initiated. """ should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.auth', {}) if should_return: @@ -796,7 +796,7 @@ class Session(object): """ capi_data: CAPIData = CAPIData() should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request.' + capi_endpoint, {}) if should_return: diff --git a/config/__init__.py b/config/__init__.py index b3b240ed..ae2fad73 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -40,7 +40,7 @@ import sys import traceback import warnings from abc import abstractmethod -from typing import Any, Callable, List, Optional, Type, TypeVar, Union +from typing import Any, Callable, Optional, Type, TypeVar import semantic_version @@ -59,10 +59,10 @@ copyright = '© 2015-2019 Jonathan Harris, 2020-2022 EDCD' update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml' update_interval = 8*60*60 # Providers marked to be in debug mode. Generally this is expected to switch to sending data to a log file -debug_senders: List[str] = [] +debug_senders: list[str] = [] # TRACE logging code that should actually be used. Means not spamming it # *all* if only interested in some things. -trace_on: List[str] = [] +trace_on: list[str] = [] capi_pretend_down: bool = False capi_debug_access_token: Optional[str] = None @@ -293,7 +293,7 @@ class AbstractConfig(abc.ABC): @staticmethod def _suppress_call( - func: Callable[..., _T], exceptions: Union[Type[BaseException], List[Type[BaseException]]] = Exception, + func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception, *args: Any, **kwargs: Any ) -> Optional[_T]: if exceptions is None: @@ -307,7 +307,10 @@ class AbstractConfig(abc.ABC): return None - def get(self, key: str, default: Union[list, str, bool, int] = None) -> Union[list, str, bool, int]: + def get( + self, key: str, + default: list | str | bool | int | None = None + ) -> list | str | bool | int | None: """ Return the data for the requested key, or a default. @@ -334,7 +337,7 @@ class AbstractConfig(abc.ABC): return default # type: ignore @abstractmethod - def get_list(self, key: str, *, default: list = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -343,7 +346,7 @@ class AbstractConfig(abc.ABC): raise NotImplementedError @abstractmethod - def get_str(self, key: str, *, default: str = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -356,7 +359,7 @@ class AbstractConfig(abc.ABC): raise NotImplementedError @abstractmethod - def get_bool(self, key: str, *, default: bool = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -396,7 +399,7 @@ class AbstractConfig(abc.ABC): raise NotImplementedError @abstractmethod - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. diff --git a/config/linux.py b/config/linux.py index 04087b32..5d543d3f 100644 --- a/config/linux.py +++ b/config/linux.py @@ -3,7 +3,6 @@ import os import pathlib import sys from configparser import ConfigParser -from typing import List, Optional, Union from config import AbstractConfig, appname, logger @@ -18,7 +17,7 @@ class LinuxConfig(AbstractConfig): __unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'} __escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'} - def __init__(self, filename: Optional[str] = None) -> None: + def __init__(self, filename: str | None = None) -> None: super().__init__() # http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser() @@ -42,7 +41,7 @@ class LinuxConfig(AbstractConfig): self.filename.parent.mkdir(exist_ok=True, parents=True) - self.config: Optional[ConfigParser] = ConfigParser(comment_prefixes=('#',), interpolation=None) + self.config: ConfigParser | None = ConfigParser(comment_prefixes=('#',), interpolation=None) self.config.read(self.filename) # read() ignores files that dont exist # Ensure that our section exists. This is here because configparser will happily create files for us, but it @@ -85,7 +84,7 @@ class LinuxConfig(AbstractConfig): :param s: str - The string to unescape. :return: str - The unescaped string. """ - out: List[str] = [] + out: list[str] = [] i = 0 while i < len(s): c = s[i] @@ -107,7 +106,7 @@ class LinuxConfig(AbstractConfig): return "".join(out) - def __raw_get(self, key: str) -> Optional[str]: + def __raw_get(self, key: str) -> str | None: """ Get a raw data value from the config file. @@ -119,7 +118,7 @@ class LinuxConfig(AbstractConfig): return self.config[self.SECTION].get(key) - def get_str(self, key: str, *, default: str = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -134,7 +133,7 @@ class LinuxConfig(AbstractConfig): return self.__unescape(data) - def get_list(self, key: str, *, default: list = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -168,7 +167,7 @@ class LinuxConfig(AbstractConfig): except ValueError as e: raise ValueError(f'requested {key=} as int cannot be converted to int') from e - def get_bool(self, key: str, *, default: bool = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -183,7 +182,7 @@ class LinuxConfig(AbstractConfig): return bool(int(data)) - def set(self, key: str, val: Union[int, str, List[str]]) -> None: + def set(self, key: str, val: int | str | list[str]) -> None: """ Set the given key's data to the given value. @@ -192,7 +191,7 @@ class LinuxConfig(AbstractConfig): if self.config is None: raise ValueError('attempt to use a closed config') - to_set: Optional[str] = None + to_set: str | None = None if isinstance(val, bool): to_set = str(int(val)) diff --git a/config/windows.py b/config/windows.py index d29a2b42..f7590a92 100644 --- a/config/windows.py +++ b/config/windows.py @@ -8,7 +8,7 @@ import sys import uuid import winreg from ctypes.wintypes import DWORD, HANDLE -from typing import List, Optional, Union +from typing import List, Literal, Optional, Union from config import AbstractConfig, applongname, appname, logger, update_interval @@ -142,7 +142,7 @@ class WinConfig(AbstractConfig): logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}') return None - def get_str(self, key: str, *, default: str = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -157,7 +157,7 @@ class WinConfig(AbstractConfig): return res - def get_list(self, key: str, *, default: list = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -187,7 +187,7 @@ class WinConfig(AbstractConfig): return res - def get_bool(self, key: str, *, default: bool = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -205,7 +205,8 @@ class WinConfig(AbstractConfig): Implements :meth:`AbstractConfig.set`. """ - reg_type = None + # These are the types that winreg.REG_* below resolve to. + reg_type: Literal[1] | Literal[4] | Literal[7] if isinstance(val, str): reg_type = winreg.REG_SZ winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val) diff --git a/coriolis.py b/coriolis-update-files.py similarity index 100% rename from coriolis.py rename to coriolis-update-files.py diff --git a/dashboard.py b/dashboard.py index c371e2b7..c88f9a21 100644 --- a/dashboard.py +++ b/dashboard.py @@ -167,7 +167,7 @@ class Dashboard(FileSystemEventHandler): # Can get on_modified events when the file is emptied self.process(event.src_path if not event.is_directory else None) - def process(self, logfile: str = None) -> None: + def process(self, logfile: str | None = None) -> None: """ Process the contents of current Status.json file. diff --git a/docs/Releasing.md b/docs/Releasing.md index 9840c694..a8c4dc80 100644 --- a/docs/Releasing.md +++ b/docs/Releasing.md @@ -191,17 +191,17 @@ that it's actually included in the installer. Before you create a new install each time you should: 1. Ensure the data sourced from coriolis.io is up to date and works: - 1. Update the `coriolis-data` repo. **NB: You will need 'npm' installed for +2. Update the `coriolis-data` repo. **NB: You will need 'npm' installed for this.** - 1. `cd coriolis-data` - 1. `git pull` - 1. `npm install` - to check it's worked. - 1. Run `coriolis.py` to update `modules.p` and `ships.p`. **NB: The - submodule might have been updated by a GitHub workflow/PR/merge, so - be sure to perform this step for every build.** - 1. XXX: Test ? - 1. `git commit` the changes to the repo and the `.p` files. -1. Ensure translations are up to date, see [Translations.md](Translations.md). + 1. `cd coriolis-data` + 2. `git pull` + 3. `npm install` - to check it's worked. +3. Run `coriolis-update-files.py` to update `modules.p` and `ships.p`. **NB: + The submodule might have been updated by a GitHub workflow/PR/merge, so + be sure to perform this step for every build.** +4. XXX: Test ? +5. `git commit` the changes to the repo and the `.p` files. +6. Ensure translations are up to date, see [Translations.md](Translations.md). # Preparing to Package diff --git a/docs/examples/click_counter/load.py b/docs/examples/click_counter/load.py index 776e9d4b..70c24f53 100644 --- a/docs/examples/click_counter/load.py +++ b/docs/examples/click_counter/load.py @@ -27,7 +27,7 @@ class ClickCounter: def __init__(self) -> None: # Be sure to use names that wont collide in our config variables - self.click_count: Optional[tk.StringVar] = tk.StringVar(value=str(config.get_int('click_counter_count'))) + self.click_count = tk.StringVar(value=str(config.get_int('click_counter_count'))) logger.info("ClickCounter instantiated") def on_load(self) -> str: @@ -99,8 +99,8 @@ class ClickCounter: ) button.grid(row=current_row) current_row += 1 - nb.Label(frame, text="Count:").grid(row=current_row, sticky=tk.W) - nb.Label(frame, textvariable=self.click_count).grid(row=current_row, column=1) + tk.Label(frame, text="Count:").grid(row=current_row, sticky=tk.W) + tk.Label(frame, textvariable=self.click_count).grid(row=current_row, column=1) return frame diff --git a/docs/examples/plugintest/load.py b/docs/examples/plugintest/load.py index 2639cfd9..314eef19 100644 --- a/docs/examples/plugintest/load.py +++ b/docs/examples/plugintest/load.py @@ -13,19 +13,6 @@ from SubA import SubA from config import appname, appversion, config -# For compatibility with pre-5.0.0 -if not hasattr(config, 'get_int'): - config.get_int = config.getint - -if not hasattr(config, 'get_str'): - config.get_str = config.get - -if not hasattr(config, 'get_bool'): - config.get_bool = lambda key: bool(config.getint(key)) - -if not hasattr(config, 'get_list'): - config.get_list = config.get - # This could also be returned from plugin_start3() plugin_name = os.path.basename(os.path.dirname(__file__)) diff --git a/edshipyard.py b/edshipyard.py index d709a2e1..1bfa77e6 100644 --- a/edshipyard.py +++ b/edshipyard.py @@ -81,7 +81,7 @@ def export(data, filename=None) -> None: # noqa: C901, CCR001 if not v: continue - module: __Module = outfitting.lookup(v['module'], ship_map) + module: __Module | None = outfitting.lookup(v['module'], ship_map) if not module: continue diff --git a/hotkey.py b/hotkey.py deleted file mode 100644 index 5a72cde1..00000000 --- a/hotkey.py +++ /dev/null @@ -1,703 +0,0 @@ -"""Handle keyboard input for manual update triggering.""" -# -*- coding: utf-8 -*- - -import abc -import pathlib -import sys -import tkinter as tk -from abc import abstractmethod -from typing import Optional, Tuple, Union - -from config import config -from EDMCLogging import get_main_logger - -logger = get_main_logger() - - -class AbstractHotkeyMgr(abc.ABC): - """Abstract root class of all platforms specific HotKeyMgr.""" - - @abstractmethod - def register(self, root, keycode, modifiers) -> None: - """Register the hotkey handler.""" - pass - - @abstractmethod - def unregister(self) -> None: - """Unregister the hotkey handling.""" - pass - - @abstractmethod - def play_good(self) -> None: - """Play the 'good' sound.""" - pass - - @abstractmethod - def play_bad(self) -> None: - """Play the 'bad' sound.""" - pass - - -if sys.platform == 'darwin': - - import objc - from AppKit import ( - NSAlternateKeyMask, NSApplication, NSBeep, NSClearLineFunctionKey, NSCommandKeyMask, NSControlKeyMask, - NSDeleteFunctionKey, NSDeviceIndependentModifierFlagsMask, NSEvent, NSF1FunctionKey, NSF35FunctionKey, - NSFlagsChanged, NSKeyDown, NSKeyDownMask, NSKeyUp, NSNumericPadKeyMask, NSShiftKeyMask, NSSound, NSWorkspace - ) - - -class MacHotkeyMgr(AbstractHotkeyMgr): - """Hot key management.""" - - POLL = 250 - # https://developer.apple.com/library/mac/documentation/Cocoa/Reference/ApplicationKit/Classes/NSEvent_Class/#//apple_ref/doc/constant_group/Function_Key_Unicodes - DISPLAY = { - 0x03: u'⌅', 0x09: u'⇥', 0xd: u'↩', 0x19: u'⇤', 0x1b: u'esc', 0x20: u'⏘', 0x7f: u'⌫', - 0xf700: u'↑', 0xf701: u'↓', 0xf702: u'←', 0xf703: u'→', - 0xf727: u'Ins', - 0xf728: u'⌦', 0xf729: u'↖', 0xf72a: u'Fn', 0xf72b: u'↘', - 0xf72c: u'⇞', 0xf72d: u'⇟', 0xf72e: u'PrtScr', 0xf72f: u'ScrollLock', - 0xf730: u'Pause', 0xf731: u'SysReq', 0xf732: u'Break', 0xf733: u'Reset', - 0xf739: u'⌧', - } - (ACQUIRE_INACTIVE, ACQUIRE_ACTIVE, ACQUIRE_NEW) = range(3) - - def __init__(self): - self.MODIFIERMASK = NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask \ - | NSNumericPadKeyMask - self.root = None - - self.keycode = 0 - self.modifiers = 0 - self.activated = False - self.observer = None - - self.acquire_key = 0 - self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE - - self.tkProcessKeyEvent_old = None - - self.snd_good = NSSound.alloc().initWithContentsOfFile_byReference_( - pathlib.Path(config.respath_path) / 'snd_good.wav', False - ) - self.snd_bad = NSSound.alloc().initWithContentsOfFile_byReference_( - pathlib.Path(config.respath_path) / 'snd_bad.wav', False - ) - - def register(self, root: tk.Tk, keycode, modifiers) -> None: - """ - Register current hotkey for monitoring. - - :param root: parent window. - :param keycode: Key to monitor. - :param modifiers: Any modifiers to take into account. - """ - self.root = root - self.keycode = keycode - self.modifiers = modifiers - self.activated = False - - if keycode: - if not self.observer: - self.root.after_idle(self._observe) - self.root.after(MacHotkeyMgr.POLL, self._poll) - - # Monkey-patch tk (tkMacOSXKeyEvent.c) - if not self.tkProcessKeyEvent_old: - sel = b'tkProcessKeyEvent:' - cls = NSApplication.sharedApplication().class__() # type: ignore - self.tkProcessKeyEvent_old = NSApplication.sharedApplication().methodForSelector_(sel) # type: ignore - newmethod = objc.selector( # type: ignore - self.tkProcessKeyEvent, - selector=self.tkProcessKeyEvent_old.selector, - signature=self.tkProcessKeyEvent_old.signature - ) - objc.classAddMethod(cls, sel, newmethod) # type: ignore - - def tkProcessKeyEvent(self, cls, the_event): # noqa: N802 - """ - Monkey-patch tk (tkMacOSXKeyEvent.c). - - - workaround crash on OSX 10.9 & 10.10 on seeing a composing character - - notice when modifier key state changes - - keep a copy of NSEvent.charactersIgnoringModifiers, which is what we need for the hotkey - - (Would like to use a decorator but need to ensure the application is created before this is installed) - :param cls: ??? - :param the_event: tk event - :return: ??? - """ - if self.acquire_state: - if the_event.type() == NSFlagsChanged: - self.acquire_key = the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask - self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW - # suppress the event by not chaining the old function - return the_event - - elif the_event.type() in (NSKeyDown, NSKeyUp): - c = the_event.charactersIgnoringModifiers() - self.acquire_key = (c and ord(c[0]) or 0) | \ - (the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask) - self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW - # suppress the event by not chaining the old function - return the_event - - # replace empty characters with charactersIgnoringModifiers to avoid crash - elif the_event.type() in (NSKeyDown, NSKeyUp) and not the_event.characters(): - the_event = NSEvent.keyEventWithType_location_modifierFlags_timestamp_windowNumber_context_characters_charactersIgnoringModifiers_isARepeat_keyCode_( # noqa: E501 - # noqa: E501 - the_event.type(), - the_event.locationInWindow(), - the_event.modifierFlags(), - the_event.timestamp(), - the_event.windowNumber(), - the_event.context(), - the_event.charactersIgnoringModifiers(), - the_event.charactersIgnoringModifiers(), - the_event.isARepeat(), - the_event.keyCode() - ) - return self.tkProcessKeyEvent_old(cls, the_event) - - def _observe(self): - # Must be called after root.mainloop() so that the app's message loop has been created - self.observer = NSEvent.addGlobalMonitorForEventsMatchingMask_handler_(NSKeyDownMask, self._handler) - - def _poll(self): - if config.shutting_down: - return - - # No way of signalling to Tkinter from within the callback handler block that doesn't - # cause Python to crash, so poll. - if self.activated: - self.activated = False - self.root.event_generate('<>', when="tail") - - if self.keycode or self.modifiers: - self.root.after(MacHotkeyMgr.POLL, self._poll) - - def unregister(self) -> None: - """Remove hotkey registration.""" - self.keycode = None - self.modifiers = None - - if sys.platform == 'darwin': # noqa: C901 - @objc.callbackFor(NSEvent.addGlobalMonitorForEventsMatchingMask_handler_) - def _handler(self, event) -> None: - # use event.charactersIgnoringModifiers to handle composing characters like Alt-e - if ((event.modifierFlags() & self.MODIFIERMASK) == self.modifiers - and ord(event.charactersIgnoringModifiers()[0]) == self.keycode): - if config.get_int('hotkey_always'): - self.activated = True - - else: # Only trigger if game client is front process - front = NSWorkspace.sharedWorkspace().frontmostApplication() - if front and front.bundleIdentifier() == 'uk.co.frontier.EliteDangerous': - self.activated = True - - def acquire_start(self) -> None: - """Start acquiring hotkey state via polling.""" - self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE - self.root.after_idle(self._acquire_poll) - - def acquire_stop(self) -> None: - """Stop acquiring hotkey state.""" - self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE - - def _acquire_poll(self) -> None: - """Perform a poll of current hotkey state.""" - if config.shutting_down: - return - - # No way of signalling to Tkinter from within the monkey-patched event handler that doesn't - # cause Python to crash, so poll. - if self.acquire_state: - if self.acquire_state == MacHotkeyMgr.ACQUIRE_NEW: - # Abuse tkEvent's keycode field to hold our acquired key & modifier - self.root.event_generate('', keycode=self.acquire_key) - self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE - self.root.after(50, self._acquire_poll) - - def fromevent(self, event) -> Optional[Union[bool, Tuple]]: - """ - Return configuration (keycode, modifiers) or None=clear or False=retain previous. - - :param event: tk event ? - :return: False to retain previous, None to not use, else (keycode, modifiers) - """ - (keycode, modifiers) = (event.keycode & 0xffff, event.keycode & 0xffff0000) # Set by _acquire_poll() - if (keycode - and not (modifiers & (NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask))): - if keycode == 0x1b: # Esc = retain previous - self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE - return False - - # BkSp, Del, Clear = clear hotkey - elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]: - self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE - return None - - # don't allow keys needed for typing in System Map - elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a: - NSBeep() - self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE - return None - - return (keycode, modifiers) - - def display(self, keycode, modifiers) -> str: - """ - Return displayable form of given hotkey + modifiers. - - :param keycode: - :param modifiers: - :return: string form - """ - text = '' - if modifiers & NSControlKeyMask: - text += u'⌃' - - if modifiers & NSAlternateKeyMask: - text += u'⌥' - - if modifiers & NSShiftKeyMask: - text += u'⇧' - - if modifiers & NSCommandKeyMask: - text += u'⌘' - - if (modifiers & NSNumericPadKeyMask) and keycode <= 0x7f: - text += u'№' - - if not keycode: - pass - - elif ord(NSF1FunctionKey) <= keycode <= ord(NSF35FunctionKey): - text += f'F{keycode + 1 - ord(NSF1FunctionKey)}' - - elif keycode in MacHotkeyMgr.DISPLAY: # specials - text += MacHotkeyMgr.DISPLAY[keycode] - - elif keycode < 0x20: # control keys - text += chr(keycode + 0x40) - - elif keycode < 0xf700: # key char - text += chr(keycode).upper() - - else: - text += u'⁈' - - return text - - def play_good(self): - """Play the 'good' sound.""" - self.snd_good.play() - - def play_bad(self): - """Play the 'bad' sound.""" - self.snd_bad.play() - - -if sys.platform == 'win32': - - import atexit - import ctypes - import threading - import winsound - from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD - - RegisterHotKey = ctypes.windll.user32.RegisterHotKey - UnregisterHotKey = ctypes.windll.user32.UnregisterHotKey - MOD_ALT = 0x0001 - MOD_CONTROL = 0x0002 - MOD_SHIFT = 0x0004 - MOD_WIN = 0x0008 - MOD_NOREPEAT = 0x4000 - - GetMessage = ctypes.windll.user32.GetMessageW - TranslateMessage = ctypes.windll.user32.TranslateMessage - DispatchMessage = ctypes.windll.user32.DispatchMessageW - PostThreadMessage = ctypes.windll.user32.PostThreadMessageW - WM_QUIT = 0x0012 - WM_HOTKEY = 0x0312 - WM_APP = 0x8000 - WM_SND_GOOD = WM_APP + 1 - WM_SND_BAD = WM_APP + 2 - - GetKeyState = ctypes.windll.user32.GetKeyState - MapVirtualKey = ctypes.windll.user32.MapVirtualKeyW - VK_BACK = 0x08 - VK_CLEAR = 0x0c - VK_RETURN = 0x0d - VK_SHIFT = 0x10 - VK_CONTROL = 0x11 - VK_MENU = 0x12 - VK_CAPITAL = 0x14 - VK_MODECHANGE = 0x1f - VK_ESCAPE = 0x1b - VK_SPACE = 0x20 - VK_DELETE = 0x2e - VK_LWIN = 0x5b - VK_RWIN = 0x5c - VK_NUMPAD0 = 0x60 - VK_DIVIDE = 0x6f - VK_F1 = 0x70 - VK_F24 = 0x87 - VK_OEM_MINUS = 0xbd - VK_NUMLOCK = 0x90 - VK_SCROLL = 0x91 - VK_PROCESSKEY = 0xe5 - VK_OEM_CLEAR = 0xfe - - GetForegroundWindow = ctypes.windll.user32.GetForegroundWindow - GetWindowText = ctypes.windll.user32.GetWindowTextW - GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int] - GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW - - def window_title(h) -> str: - """ - Determine the title for a window. - - :param h: Window handle. - :return: Window title. - """ - if h: - title_length = GetWindowTextLength(h) + 1 - buf = ctypes.create_unicode_buffer(title_length) - if GetWindowText(h, buf, title_length): - return buf.value - - return '' - - class MOUSEINPUT(ctypes.Structure): - """Mouse Input structure.""" - - _fields_ = [ - ('dx', LONG), - ('dy', LONG), - ('mouseData', DWORD), - ('dwFlags', DWORD), - ('time', DWORD), - ('dwExtraInfo', ctypes.POINTER(ULONG)) - ] - - class KEYBDINPUT(ctypes.Structure): - """Keyboard Input structure.""" - - _fields_ = [ - ('wVk', WORD), - ('wScan', WORD), - ('dwFlags', DWORD), - ('time', DWORD), - ('dwExtraInfo', ctypes.POINTER(ULONG)) - ] - - class HARDWAREINPUT(ctypes.Structure): - """Hardware Input structure.""" - - _fields_ = [ - ('uMsg', DWORD), - ('wParamL', WORD), - ('wParamH', WORD) - ] - - class INPUTUNION(ctypes.Union): - """Input union.""" - - _fields_ = [ - ('mi', MOUSEINPUT), - ('ki', KEYBDINPUT), - ('hi', HARDWAREINPUT) - ] - - class INPUT(ctypes.Structure): - """Input structure.""" - - _fields_ = [ - ('type', DWORD), - ('union', INPUTUNION) - ] - - SendInput = ctypes.windll.user32.SendInput - SendInput.argtypes = [ctypes.c_uint, ctypes.POINTER(INPUT), ctypes.c_int] - - INPUT_MOUSE = 0 - INPUT_KEYBOARD = 1 - INPUT_HARDWARE = 2 - - -class WindowsHotkeyMgr(AbstractHotkeyMgr): - """Hot key management.""" - - # https://msdn.microsoft.com/en-us/library/windows/desktop/dd375731%28v=vs.85%29.aspx - # Limit ourselves to symbols in Windows 7 Segoe UI - DISPLAY = { - 0x03: 'Break', 0x08: 'Bksp', 0x09: u'↹', 0x0c: 'Clear', 0x0d: u'↵', 0x13: 'Pause', - 0x14: u'Ⓐ', 0x1b: 'Esc', - 0x20: u'⏘', 0x21: 'PgUp', 0x22: 'PgDn', 0x23: 'End', 0x24: 'Home', - 0x25: u'←', 0x26: u'↑', 0x27: u'→', 0x28: u'↓', - 0x2c: 'PrtScn', 0x2d: 'Ins', 0x2e: 'Del', 0x2f: 'Help', - 0x5d: u'▤', 0x5f: u'☾', - 0x90: u'➀', 0x91: 'ScrLk', - 0xa6: u'⇦', 0xa7: u'⇨', 0xa9: u'⊗', 0xab: u'☆', 0xac: u'⌂', 0xb4: u'✉', - } - - def __init__(self) -> None: - self.root: tk.Tk = None # type: ignore - self.thread: threading.Thread = None # type: ignore - with open(pathlib.Path(config.respath) / 'snd_good.wav', 'rb') as sg: - self.snd_good = sg.read() - with open(pathlib.Path(config.respath) / 'snd_bad.wav', 'rb') as sb: - self.snd_bad = sb.read() - atexit.register(self.unregister) - - def register(self, root: tk.Tk, keycode, modifiers) -> None: - """Register the hotkey handler.""" - self.root = root - - if self.thread: - logger.debug('Was already registered, unregistering...') - self.unregister() - - if keycode or modifiers: - logger.debug('Creating thread worker...') - self.thread = threading.Thread( - target=self.worker, - name=f'Hotkey "{keycode}:{modifiers}"', - args=(keycode, modifiers) - ) - self.thread.daemon = True - logger.debug('Starting thread worker...') - self.thread.start() - logger.debug('Done.') - - def unregister(self) -> None: - """Unregister the hotkey handling.""" - thread = self.thread - - if thread: - logger.debug('Thread is/was running') - self.thread = None # type: ignore - logger.debug('Telling thread WM_QUIT') - PostThreadMessage(thread.ident, WM_QUIT, 0, 0) - logger.debug('Joining thread') - thread.join() # Wait for it to unregister hotkey and quit - - else: - logger.debug('No thread') - - logger.debug('Done.') - - def worker(self, keycode, modifiers) -> None: # noqa: CCR001 - """Handle hotkeys.""" - logger.debug('Begin...') - # Hotkey must be registered by the thread that handles it - if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode): - logger.debug("We're not the right thread?") - self.thread = None # type: ignore - return - - fake = INPUT(INPUT_KEYBOARD, INPUTUNION(ki=KEYBDINPUT(keycode, keycode, 0, 0, None))) - - msg = MSG() - logger.debug('Entering GetMessage() loop...') - while GetMessage(ctypes.byref(msg), None, 0, 0) != 0: - logger.debug('Got message') - if msg.message == WM_HOTKEY: - logger.debug('WM_HOTKEY') - - if ( - config.get_int('hotkey_always') - or window_title(GetForegroundWindow()).startswith('Elite - Dangerous') - ): - if not config.shutting_down: - logger.debug('Sending event <>') - self.root.event_generate('<>', when="tail") - - else: - logger.debug('Passing key on') - UnregisterHotKey(None, 1) - SendInput(1, fake, ctypes.sizeof(INPUT)) - if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode): - logger.debug("We aren't registered for this ?") - break - - elif msg.message == WM_SND_GOOD: - logger.debug('WM_SND_GOOD') - winsound.PlaySound(self.snd_good, winsound.SND_MEMORY) # synchronous - - elif msg.message == WM_SND_BAD: - logger.debug('WM_SND_BAD') - winsound.PlaySound(self.snd_bad, winsound.SND_MEMORY) # synchronous - - else: - logger.debug('Something else') - TranslateMessage(ctypes.byref(msg)) - DispatchMessage(ctypes.byref(msg)) - - logger.debug('Exited GetMessage() loop.') - UnregisterHotKey(None, 1) - self.thread = None # type: ignore - logger.debug('Done.') - - def acquire_start(self) -> None: - """Start acquiring hotkey state via polling.""" - pass - - def acquire_stop(self) -> None: - """Stop acquiring hotkey state.""" - pass - - def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001 - """ - Return configuration (keycode, modifiers) or None=clear or False=retain previous. - - event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed. - event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter - by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate. - - :param event: tk event ? - :return: False to retain previous, None to not use, else (keycode, modifiers) - """ - modifiers = ((GetKeyState(VK_MENU) & 0x8000) and MOD_ALT) \ - | ((GetKeyState(VK_CONTROL) & 0x8000) and MOD_CONTROL) \ - | ((GetKeyState(VK_SHIFT) & 0x8000) and MOD_SHIFT) \ - | ((GetKeyState(VK_LWIN) & 0x8000) and MOD_WIN) \ - | ((GetKeyState(VK_RWIN) & 0x8000) and MOD_WIN) - keycode = event.keycode - - if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]: - return (0, modifiers) - - if not modifiers: - if keycode == VK_ESCAPE: # Esc = retain previous - return False - - elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey - return None - - elif keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord( - 'Z'): # don't allow keys needed for typing in System Map - winsound.MessageBeep() - return None - - elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY] - or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys - return (0, modifiers) - - # See if the keycode is usable and available - if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode): - UnregisterHotKey(None, 2) - return (keycode, modifiers) - - else: - winsound.MessageBeep() - return None - - def display(self, keycode, modifiers) -> str: - """ - Return displayable form of given hotkey + modifiers. - - :param keycode: - :param modifiers: - :return: string form - """ - text = '' - if modifiers & MOD_WIN: - text += u'❖+' - - if modifiers & MOD_CONTROL: - text += u'Ctrl+' - - if modifiers & MOD_ALT: - text += u'Alt+' - - if modifiers & MOD_SHIFT: - text += u'⇧+' - - if VK_NUMPAD0 <= keycode <= VK_DIVIDE: - text += u'№' - - if not keycode: - pass - - elif VK_F1 <= keycode <= VK_F24: - text += f'F{keycode + 1 - VK_F1}' - - elif keycode in WindowsHotkeyMgr.DISPLAY: # specials - text += WindowsHotkeyMgr.DISPLAY[keycode] - - else: - c = MapVirtualKey(keycode, 2) # printable ? - if not c: # oops not printable - text += u'⁈' - - elif c < 0x20: # control keys - text += chr(c + 0x40) - - else: - text += chr(c).upper() - - return text - - def play_good(self) -> None: - """Play the 'good' sound.""" - if self.thread: - PostThreadMessage(self.thread.ident, WM_SND_GOOD, 0, 0) - - def play_bad(self) -> None: - """Play the 'bad' sound.""" - if self.thread: - PostThreadMessage(self.thread.ident, WM_SND_BAD, 0, 0) - - -class LinuxHotKeyMgr(AbstractHotkeyMgr): - """ - Hot key management. - - Not actually implemented on Linux. It's a no-op instead. - """ - - def register(self, root, keycode, modifiers) -> None: - """Register the hotkey handler.""" - pass - - def unregister(self) -> None: - """Unregister the hotkey handling.""" - pass - - def play_good(self) -> None: - """Play the 'good' sound.""" - pass - - def play_bad(self) -> None: - """Play the 'bad' sound.""" - pass - - -def get_hotkeymgr() -> AbstractHotkeyMgr: - """ - Determine platform-specific HotkeyMgr. - - :param args: - :param kwargs: - :return: Appropriate class instance. - :raises ValueError: If unsupported platform. - """ - if sys.platform == 'darwin': - return MacHotkeyMgr() - - elif sys.platform == 'win32': - return WindowsHotkeyMgr() - - elif sys.platform == 'linux': - return LinuxHotKeyMgr() - - else: - raise ValueError(f'Unknown platform: {sys.platform}') - - -# singleton -hotkeymgr = get_hotkeymgr() diff --git a/hotkey/__init__.py b/hotkey/__init__.py new file mode 100644 index 00000000..5162621c --- /dev/null +++ b/hotkey/__init__.py @@ -0,0 +1,95 @@ +"""Handle keyboard input for manual update triggering.""" +# -*- coding: utf-8 -*- + +import abc +import sys +from abc import abstractmethod +from typing import Optional, Tuple, Union + + +class AbstractHotkeyMgr(abc.ABC): + """Abstract root class of all platforms specific HotKeyMgr.""" + + @abstractmethod + def register(self, root, keycode, modifiers) -> None: + """Register the hotkey handler.""" + pass + + @abstractmethod + def unregister(self) -> None: + """Unregister the hotkey handling.""" + pass + + @abstractmethod + def acquire_start(self) -> None: + """Start acquiring hotkey state via polling.""" + pass + + @abstractmethod + def acquire_stop(self) -> None: + """Stop acquiring hotkey state.""" + pass + + @abstractmethod + def fromevent(self, event) -> Optional[Union[bool, Tuple]]: + """ + Return configuration (keycode, modifiers) or None=clear or False=retain previous. + + event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed. + event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter + by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate. + + :param event: tk event ? + :return: False to retain previous, None to not use, else (keycode, modifiers) + """ + pass + + @abstractmethod + def display(self, keycode: int, modifiers: int) -> str: + """ + Return displayable form of given hotkey + modifiers. + + :param keycode: + :param modifiers: + :return: string form + """ + pass + + @abstractmethod + def play_good(self) -> None: + """Play the 'good' sound.""" + pass + + @abstractmethod + def play_bad(self) -> None: + """Play the 'bad' sound.""" + pass + + +def get_hotkeymgr() -> AbstractHotkeyMgr: + """ + Determine platform-specific HotkeyMgr. + + :param args: + :param kwargs: + :return: Appropriate class instance. + :raises ValueError: If unsupported platform. + """ + if sys.platform == 'darwin': + from hotkey.darwin import MacHotkeyMgr + return MacHotkeyMgr() + + elif sys.platform == 'win32': + from hotkey.windows import WindowsHotkeyMgr + return WindowsHotkeyMgr() + + elif sys.platform == 'linux': + from hotkey.linux import LinuxHotKeyMgr + return LinuxHotKeyMgr() + + else: + raise ValueError(f'Unknown platform: {sys.platform}') + + +# singleton +hotkeymgr = get_hotkeymgr() diff --git a/hotkey/darwin.py b/hotkey/darwin.py new file mode 100644 index 00000000..cbf9d260 --- /dev/null +++ b/hotkey/darwin.py @@ -0,0 +1,274 @@ +"""darwin/macOS implementation of hotkey.AbstractHotkeyMgr.""" +import pathlib +import sys +import tkinter as tk +from typing import Callable, Optional, Tuple, Union +assert sys.platform == 'darwin' + +import objc +from AppKit import ( + NSAlternateKeyMask, NSApplication, NSBeep, NSClearLineFunctionKey, NSCommandKeyMask, NSControlKeyMask, + NSDeleteFunctionKey, NSDeviceIndependentModifierFlagsMask, NSEvent, NSF1FunctionKey, NSF35FunctionKey, + NSFlagsChanged, NSKeyDown, NSKeyDownMask, NSKeyUp, NSNumericPadKeyMask, NSShiftKeyMask, NSSound, NSWorkspace +) + +from config import config +from EDMCLogging import get_main_logger +from hotkey import AbstractHotkeyMgr + +logger = get_main_logger() + + +class MacHotkeyMgr(AbstractHotkeyMgr): + """Hot key management.""" + + POLL = 250 + # https://developer.apple.com/library/mac/documentation/Cocoa/Reference/ApplicationKit/Classes/NSEvent_Class/#//apple_ref/doc/constant_group/Function_Key_Unicodes + DISPLAY = { + 0x03: u'⌅', 0x09: u'⇥', 0xd: u'↩', 0x19: u'⇤', 0x1b: u'esc', 0x20: u'⏘', 0x7f: u'⌫', + 0xf700: u'↑', 0xf701: u'↓', 0xf702: u'←', 0xf703: u'→', + 0xf727: u'Ins', + 0xf728: u'⌦', 0xf729: u'↖', 0xf72a: u'Fn', 0xf72b: u'↘', + 0xf72c: u'⇞', 0xf72d: u'⇟', 0xf72e: u'PrtScr', 0xf72f: u'ScrollLock', + 0xf730: u'Pause', 0xf731: u'SysReq', 0xf732: u'Break', 0xf733: u'Reset', + 0xf739: u'⌧', + } + (ACQUIRE_INACTIVE, ACQUIRE_ACTIVE, ACQUIRE_NEW) = range(3) + + def __init__(self): + self.MODIFIERMASK = NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask \ + | NSNumericPadKeyMask + self.root: tk.Tk + + self.keycode = 0 + self.modifiers = 0 + self.activated = False + self.observer = None + + self.acquire_key = 0 + self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE + + self.tkProcessKeyEvent_old: Callable + + self.snd_good = NSSound.alloc().initWithContentsOfFile_byReference_( + pathlib.Path(config.respath_path) / 'snd_good.wav', False + ) + self.snd_bad = NSSound.alloc().initWithContentsOfFile_byReference_( + pathlib.Path(config.respath_path) / 'snd_bad.wav', False + ) + + def register(self, root: tk.Tk, keycode: int, modifiers: int) -> None: + """ + Register current hotkey for monitoring. + + :param root: parent window. + :param keycode: Key to monitor. + :param modifiers: Any modifiers to take into account. + """ + self.root = root + self.keycode = keycode + self.modifiers = modifiers + self.activated = False + + if keycode: + if not self.observer: + self.root.after_idle(self._observe) + self.root.after(MacHotkeyMgr.POLL, self._poll) + + # Monkey-patch tk (tkMacOSXKeyEvent.c) + if not callable(self.tkProcessKeyEvent_old): + sel = b'tkProcessKeyEvent:' + cls = NSApplication.sharedApplication().class__() # type: ignore + self.tkProcessKeyEvent_old = NSApplication.sharedApplication().methodForSelector_(sel) # type: ignore + newmethod = objc.selector( # type: ignore + self.tkProcessKeyEvent, + selector=self.tkProcessKeyEvent_old.selector, + signature=self.tkProcessKeyEvent_old.signature + ) + objc.classAddMethod(cls, sel, newmethod) # type: ignore + + def tkProcessKeyEvent(self, cls, the_event): # noqa: N802 + """ + Monkey-patch tk (tkMacOSXKeyEvent.c). + + - workaround crash on OSX 10.9 & 10.10 on seeing a composing character + - notice when modifier key state changes + - keep a copy of NSEvent.charactersIgnoringModifiers, which is what we need for the hotkey + + (Would like to use a decorator but need to ensure the application is created before this is installed) + :param cls: ??? + :param the_event: tk event + :return: ??? + """ + if self.acquire_state: + if the_event.type() == NSFlagsChanged: + self.acquire_key = the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask + self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW + # suppress the event by not chaining the old function + return the_event + + elif the_event.type() in (NSKeyDown, NSKeyUp): + c = the_event.charactersIgnoringModifiers() + self.acquire_key = (c and ord(c[0]) or 0) | \ + (the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask) + self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW + # suppress the event by not chaining the old function + return the_event + + # replace empty characters with charactersIgnoringModifiers to avoid crash + elif the_event.type() in (NSKeyDown, NSKeyUp) and not the_event.characters(): + the_event = NSEvent.keyEventWithType_location_modifierFlags_timestamp_windowNumber_context_characters_charactersIgnoringModifiers_isARepeat_keyCode_( # noqa: E501 + # noqa: E501 + the_event.type(), + the_event.locationInWindow(), + the_event.modifierFlags(), + the_event.timestamp(), + the_event.windowNumber(), + the_event.context(), + the_event.charactersIgnoringModifiers(), + the_event.charactersIgnoringModifiers(), + the_event.isARepeat(), + the_event.keyCode() + ) + return self.tkProcessKeyEvent_old(cls, the_event) + + def _observe(self): + # Must be called after root.mainloop() so that the app's message loop has been created + self.observer = NSEvent.addGlobalMonitorForEventsMatchingMask_handler_(NSKeyDownMask, self._handler) + + def _poll(self): + if config.shutting_down: + return + + # No way of signalling to Tkinter from within the callback handler block that doesn't + # cause Python to crash, so poll. + if self.activated: + self.activated = False + self.root.event_generate('<>', when="tail") + + if self.keycode or self.modifiers: + self.root.after(MacHotkeyMgr.POLL, self._poll) + + def unregister(self) -> None: + """Remove hotkey registration.""" + self.keycode = 0 + self.modifiers = 0 + + @objc.callbackFor(NSEvent.addGlobalMonitorForEventsMatchingMask_handler_) + def _handler(self, event) -> None: + # use event.charactersIgnoringModifiers to handle composing characters like Alt-e + if ( + (event.modifierFlags() & self.MODIFIERMASK) == self.modifiers + and ord(event.charactersIgnoringModifiers()[0]) == self.keycode + ): + if config.get_int('hotkey_always'): + self.activated = True + + else: # Only trigger if game client is front process + front = NSWorkspace.sharedWorkspace().frontmostApplication() + if front and front.bundleIdentifier() == 'uk.co.frontier.EliteDangerous': + self.activated = True + + def acquire_start(self) -> None: + """Start acquiring hotkey state via polling.""" + self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE + self.root.after_idle(self._acquire_poll) + + def acquire_stop(self) -> None: + """Stop acquiring hotkey state.""" + self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE + + def _acquire_poll(self) -> None: + """Perform a poll of current hotkey state.""" + if config.shutting_down: + return + + # No way of signalling to Tkinter from within the monkey-patched event handler that doesn't + # cause Python to crash, so poll. + if self.acquire_state: + if self.acquire_state == MacHotkeyMgr.ACQUIRE_NEW: + # Abuse tkEvent's keycode field to hold our acquired key & modifier + self.root.event_generate('', keycode=self.acquire_key) + self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE + self.root.after(50, self._acquire_poll) + + def fromevent(self, event) -> Optional[Union[bool, Tuple]]: + """ + Return configuration (keycode, modifiers) or None=clear or False=retain previous. + + :param event: tk event ? + :return: False to retain previous, None to not use, else (keycode, modifiers) + """ + (keycode, modifiers) = (event.keycode & 0xffff, event.keycode & 0xffff0000) # Set by _acquire_poll() + if ( + keycode + and not (modifiers & (NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask)) + ): + if keycode == 0x1b: # Esc = retain previous + self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE + return False + + # BkSp, Del, Clear = clear hotkey + elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]: + self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE + return None + + # don't allow keys needed for typing in System Map + elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a: + NSBeep() + self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE + return None + + return (keycode, modifiers) + + def display(self, keycode, modifiers) -> str: + """ + Return displayable form of given hotkey + modifiers. + + :param keycode: + :param modifiers: + :return: string form + """ + text = '' + if modifiers & NSControlKeyMask: + text += u'⌃' + + if modifiers & NSAlternateKeyMask: + text += u'⌥' + + if modifiers & NSShiftKeyMask: + text += u'⇧' + + if modifiers & NSCommandKeyMask: + text += u'⌘' + + if (modifiers & NSNumericPadKeyMask) and keycode <= 0x7f: + text += u'№' + + if not keycode: + pass + + elif ord(NSF1FunctionKey) <= keycode <= ord(NSF35FunctionKey): + text += f'F{keycode + 1 - ord(NSF1FunctionKey)}' + + elif keycode in MacHotkeyMgr.DISPLAY: # specials + text += MacHotkeyMgr.DISPLAY[keycode] + + elif keycode < 0x20: # control keys + text += chr(keycode + 0x40) + + elif keycode < 0xf700: # key char + text += chr(keycode).upper() + + else: + text += u'⁈' + + return text + + def play_good(self): + """Play the 'good' sound.""" + self.snd_good.play() + + def play_bad(self): + """Play the 'bad' sound.""" + self.snd_bad.play() diff --git a/hotkey/linux.py b/hotkey/linux.py new file mode 100644 index 00000000..927c4d26 --- /dev/null +++ b/hotkey/linux.py @@ -0,0 +1,64 @@ +"""Linux implementation of hotkey.AbstractHotkeyMgr.""" +import sys + +from EDMCLogging import get_main_logger +from hotkey import AbstractHotkeyMgr + +assert sys.platform == 'linux' + +logger = get_main_logger() + + +class LinuxHotKeyMgr(AbstractHotkeyMgr): + """ + Hot key management. + + Not actually implemented on Linux. It's a no-op instead. + """ + + def register(self, root, keycode, modifiers) -> None: + """Register the hotkey handler.""" + pass + + def unregister(self) -> None: + """Unregister the hotkey handling.""" + pass + + def acquire_start(self) -> None: + """Start acquiring hotkey state via polling.""" + pass + + def acquire_stop(self) -> None: + """Stop acquiring hotkey state.""" + pass + + def fromevent(self, event) -> bool | tuple | None: + """ + Return configuration (keycode, modifiers) or None=clear or False=retain previous. + + event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed. + event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter + by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate. + + :param event: tk event ? + :return: False to retain previous, None to not use, else (keycode, modifiers) + """ + pass + + def display(self, keycode: int, modifiers: int) -> str: + """ + Return displayable form of given hotkey + modifiers. + + :param keycode: + :param modifiers: + :return: string form + """ + return "Unsupported on linux" + + def play_good(self) -> None: + """Play the 'good' sound.""" + pass + + def play_bad(self) -> None: + """Play the 'bad' sound.""" + pass diff --git a/hotkey/windows.py b/hotkey/windows.py new file mode 100644 index 00000000..8a1c7acd --- /dev/null +++ b/hotkey/windows.py @@ -0,0 +1,370 @@ +"""Windows implementation of hotkey.AbstractHotkeyMgr.""" +import atexit +import ctypes +import pathlib +import sys +import threading +import tkinter as tk +import winsound +from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD +from typing import Optional, Tuple, Union + +from config import config +from EDMCLogging import get_main_logger +from hotkey import AbstractHotkeyMgr + +assert sys.platform == 'win32' + +logger = get_main_logger() + +RegisterHotKey = ctypes.windll.user32.RegisterHotKey +UnregisterHotKey = ctypes.windll.user32.UnregisterHotKey +MOD_ALT = 0x0001 +MOD_CONTROL = 0x0002 +MOD_SHIFT = 0x0004 +MOD_WIN = 0x0008 +MOD_NOREPEAT = 0x4000 + +GetMessage = ctypes.windll.user32.GetMessageW +TranslateMessage = ctypes.windll.user32.TranslateMessage +DispatchMessage = ctypes.windll.user32.DispatchMessageW +PostThreadMessage = ctypes.windll.user32.PostThreadMessageW +WM_QUIT = 0x0012 +WM_HOTKEY = 0x0312 +WM_APP = 0x8000 +WM_SND_GOOD = WM_APP + 1 +WM_SND_BAD = WM_APP + 2 + +GetKeyState = ctypes.windll.user32.GetKeyState +MapVirtualKey = ctypes.windll.user32.MapVirtualKeyW +VK_BACK = 0x08 +VK_CLEAR = 0x0c +VK_RETURN = 0x0d +VK_SHIFT = 0x10 +VK_CONTROL = 0x11 +VK_MENU = 0x12 +VK_CAPITAL = 0x14 +VK_MODECHANGE = 0x1f +VK_ESCAPE = 0x1b +VK_SPACE = 0x20 +VK_DELETE = 0x2e +VK_LWIN = 0x5b +VK_RWIN = 0x5c +VK_NUMPAD0 = 0x60 +VK_DIVIDE = 0x6f +VK_F1 = 0x70 +VK_F24 = 0x87 +VK_OEM_MINUS = 0xbd +VK_NUMLOCK = 0x90 +VK_SCROLL = 0x91 +VK_PROCESSKEY = 0xe5 +VK_OEM_CLEAR = 0xfe + +GetForegroundWindow = ctypes.windll.user32.GetForegroundWindow +GetWindowText = ctypes.windll.user32.GetWindowTextW +GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int] +GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW + + +def window_title(h) -> str: + """ + Determine the title for a window. + + :param h: Window handle. + :return: Window title. + """ + if h: + title_length = GetWindowTextLength(h) + 1 + buf = ctypes.create_unicode_buffer(title_length) + if GetWindowText(h, buf, title_length): + return buf.value + + return '' + + +class MOUSEINPUT(ctypes.Structure): + """Mouse Input structure.""" + + _fields_ = [ + ('dx', LONG), + ('dy', LONG), + ('mouseData', DWORD), + ('dwFlags', DWORD), + ('time', DWORD), + ('dwExtraInfo', ctypes.POINTER(ULONG)) + ] + + +class KEYBDINPUT(ctypes.Structure): + """Keyboard Input structure.""" + + _fields_ = [ + ('wVk', WORD), + ('wScan', WORD), + ('dwFlags', DWORD), + ('time', DWORD), + ('dwExtraInfo', ctypes.POINTER(ULONG)) + ] + + +class HARDWAREINPUT(ctypes.Structure): + """Hardware Input structure.""" + + _fields_ = [ + ('uMsg', DWORD), + ('wParamL', WORD), + ('wParamH', WORD) + ] + + +class INPUTUNION(ctypes.Union): + """Input union.""" + + _fields_ = [ + ('mi', MOUSEINPUT), + ('ki', KEYBDINPUT), + ('hi', HARDWAREINPUT) + ] + + +class INPUT(ctypes.Structure): + """Input structure.""" + + _fields_ = [ + ('type', DWORD), + ('union', INPUTUNION) + ] + + +SendInput = ctypes.windll.user32.SendInput +SendInput.argtypes = [ctypes.c_uint, ctypes.POINTER(INPUT), ctypes.c_int] + +INPUT_MOUSE = 0 +INPUT_KEYBOARD = 1 +INPUT_HARDWARE = 2 + + +class WindowsHotkeyMgr(AbstractHotkeyMgr): + """Hot key management.""" + + # https://msdn.microsoft.com/en-us/library/windows/desktop/dd375731%28v=vs.85%29.aspx + # Limit ourselves to symbols in Windows 7 Segoe UI + DISPLAY = { + 0x03: 'Break', 0x08: 'Bksp', 0x09: '↹', 0x0c: 'Clear', 0x0d: '↵', 0x13: 'Pause', + 0x14: 'Ⓐ', 0x1b: 'Esc', + 0x20: '⏘', 0x21: 'PgUp', 0x22: 'PgDn', 0x23: 'End', 0x24: 'Home', + 0x25: '←', 0x26: '↑', 0x27: '→', 0x28: '↓', + 0x2c: 'PrtScn', 0x2d: 'Ins', 0x2e: 'Del', 0x2f: 'Help', + 0x5d: '▤', 0x5f: '☾', + 0x90: '➀', 0x91: 'ScrLk', + 0xa6: '⇦', 0xa7: '⇨', 0xa9: '⊗', 0xab: '☆', 0xac: '⌂', 0xb4: '✉', + } + + def __init__(self) -> None: + self.root: tk.Tk = None # type: ignore + self.thread: threading.Thread = None # type: ignore + with open(pathlib.Path(config.respath) / 'snd_good.wav', 'rb') as sg: + self.snd_good = sg.read() + with open(pathlib.Path(config.respath) / 'snd_bad.wav', 'rb') as sb: + self.snd_bad = sb.read() + atexit.register(self.unregister) + + def register(self, root: tk.Tk, keycode, modifiers) -> None: + """Register the hotkey handler.""" + self.root = root + + if self.thread: + logger.debug('Was already registered, unregistering...') + self.unregister() + + if keycode or modifiers: + logger.debug('Creating thread worker...') + self.thread = threading.Thread( + target=self.worker, + name=f'Hotkey "{keycode}:{modifiers}"', + args=(keycode, modifiers) + ) + self.thread.daemon = True + logger.debug('Starting thread worker...') + self.thread.start() + logger.debug('Done.') + + def unregister(self) -> None: + """Unregister the hotkey handling.""" + thread = self.thread + + if thread: + logger.debug('Thread is/was running') + self.thread = None # type: ignore + logger.debug('Telling thread WM_QUIT') + PostThreadMessage(thread.ident, WM_QUIT, 0, 0) + logger.debug('Joining thread') + thread.join() # Wait for it to unregister hotkey and quit + + else: + logger.debug('No thread') + + logger.debug('Done.') + + def worker(self, keycode, modifiers) -> None: # noqa: CCR001 + """Handle hotkeys.""" + logger.debug('Begin...') + # Hotkey must be registered by the thread that handles it + if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode): + logger.debug("We're not the right thread?") + self.thread = None # type: ignore + return + + fake = INPUT(INPUT_KEYBOARD, INPUTUNION(ki=KEYBDINPUT(keycode, keycode, 0, 0, None))) + + msg = MSG() + logger.debug('Entering GetMessage() loop...') + while GetMessage(ctypes.byref(msg), None, 0, 0) != 0: + logger.debug('Got message') + if msg.message == WM_HOTKEY: + logger.debug('WM_HOTKEY') + + if ( + config.get_int('hotkey_always') + or window_title(GetForegroundWindow()).startswith('Elite - Dangerous') + ): + if not config.shutting_down: + logger.debug('Sending event <>') + self.root.event_generate('<>', when="tail") + + else: + logger.debug('Passing key on') + UnregisterHotKey(None, 1) + SendInput(1, fake, ctypes.sizeof(INPUT)) + if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode): + logger.debug("We aren't registered for this ?") + break + + elif msg.message == WM_SND_GOOD: + logger.debug('WM_SND_GOOD') + winsound.PlaySound(self.snd_good, winsound.SND_MEMORY) # synchronous + + elif msg.message == WM_SND_BAD: + logger.debug('WM_SND_BAD') + winsound.PlaySound(self.snd_bad, winsound.SND_MEMORY) # synchronous + + else: + logger.debug('Something else') + TranslateMessage(ctypes.byref(msg)) + DispatchMessage(ctypes.byref(msg)) + + logger.debug('Exited GetMessage() loop.') + UnregisterHotKey(None, 1) + self.thread = None # type: ignore + logger.debug('Done.') + + def acquire_start(self) -> None: + """Start acquiring hotkey state via polling.""" + pass + + def acquire_stop(self) -> None: + """Stop acquiring hotkey state.""" + pass + + def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001 + """ + Return configuration (keycode, modifiers) or None=clear or False=retain previous. + + event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed. + event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter + by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate. + + :param event: tk event ? + :return: False to retain previous, None to not use, else (keycode, modifiers) + """ + modifiers = ((GetKeyState(VK_MENU) & 0x8000) and MOD_ALT) \ + | ((GetKeyState(VK_CONTROL) & 0x8000) and MOD_CONTROL) \ + | ((GetKeyState(VK_SHIFT) & 0x8000) and MOD_SHIFT) \ + | ((GetKeyState(VK_LWIN) & 0x8000) and MOD_WIN) \ + | ((GetKeyState(VK_RWIN) & 0x8000) and MOD_WIN) + keycode = event.keycode + + if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]: + return (0, modifiers) + + if not modifiers: + if keycode == VK_ESCAPE: # Esc = retain previous + return False + + elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey + return None + + elif ( + keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z') + ): # don't allow keys needed for typing in System Map + winsound.MessageBeep() + return None + + elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY] + or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys + return (0, modifiers) + + # See if the keycode is usable and available + if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode): + UnregisterHotKey(None, 2) + return (keycode, modifiers) + + else: + winsound.MessageBeep() + return None + + def display(self, keycode, modifiers) -> str: + """ + Return displayable form of given hotkey + modifiers. + + :param keycode: + :param modifiers: + :return: string form + """ + text = '' + if modifiers & MOD_WIN: + text += '❖+' + + if modifiers & MOD_CONTROL: + text += 'Ctrl+' + + if modifiers & MOD_ALT: + text += 'Alt+' + + if modifiers & MOD_SHIFT: + text += '⇧+' + + if VK_NUMPAD0 <= keycode <= VK_DIVIDE: + text += '№' + + if not keycode: + pass + + elif VK_F1 <= keycode <= VK_F24: + text += f'F{keycode + 1 - VK_F1}' + + elif keycode in WindowsHotkeyMgr.DISPLAY: # specials + text += WindowsHotkeyMgr.DISPLAY[keycode] + + else: + c = MapVirtualKey(keycode, 2) # printable ? + if not c: # oops not printable + text += '⁈' + + elif c < 0x20: # control keys + text += chr(c + 0x40) + + else: + text += chr(c).upper() + + return text + + def play_good(self) -> None: + """Play the 'good' sound.""" + if self.thread: + PostThreadMessage(self.thread.ident, WM_SND_GOOD, 0, 0) + + def play_bad(self) -> None: + """Play the 'bad' sound.""" + if self.thread: + PostThreadMessage(self.thread.ident, WM_SND_BAD, 0, 0) diff --git a/journal_lock.py b/journal_lock.py index b8882a51..7945ef91 100644 --- a/journal_lock.py +++ b/journal_lock.py @@ -33,7 +33,7 @@ class JournalLock: def __init__(self) -> None: """Initialise where the journal directory and lock file are.""" - self.journal_dir: str = config.get_str('journaldir') or config.default_journal_dir + self.journal_dir: str | None = config.get_str('journaldir') or config.default_journal_dir self.journal_dir_path: Optional[pathlib.Path] = None self.set_path_from_journaldir() self.journal_dir_lockfile_name: Optional[pathlib.Path] = None diff --git a/l10n.py b/l10n.py index 83600383..e2c1143f 100755 --- a/l10n.py +++ b/l10n.py @@ -83,7 +83,7 @@ class _Translations: self.translations = {None: {}} builtins.__dict__['_'] = lambda x: str(x).replace(r'\"', '"').replace('{CR}', '\n') - def install(self, lang: str = None) -> None: # noqa: CCR001 + def install(self, lang: str | None = None) -> None: # noqa: CCR001 """ Install the translation function to the _ builtin. @@ -250,7 +250,7 @@ class _Locale: self.float_formatter.setMinimumFractionDigits_(5) self.float_formatter.setMaximumFractionDigits_(5) - def stringFromNumber(self, number: Union[float, int], decimals: int = None) -> str: # noqa: N802 + def stringFromNumber(self, number: Union[float, int], decimals: int | None = None) -> str: # noqa: N802 warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.')) return self.string_from_number(number, decimals) # type: ignore diff --git a/myNotebook.py b/myNotebook.py index eb17bf2f..30f95274 100644 --- a/myNotebook.py +++ b/myNotebook.py @@ -58,7 +58,7 @@ class Notebook(ttk.Notebook): class Frame(sys.platform == 'darwin' and tk.Frame or ttk.Frame): # type: ignore """Custom t(t)k.Frame class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Notebook | None = None, **kw): if sys.platform == 'darwin': kw['background'] = kw.pop('background', PAGEBG) tk.Frame.__init__(self, master, **kw) @@ -76,7 +76,8 @@ class Label(tk.Label): """Custom tk.Label class to fix some display issues.""" def __init__(self, master: Optional[ttk.Frame] = None, **kw): - if sys.platform in ['darwin', 'win32']: + # This format chosen over `sys.platform in (...)` as mypy and friends dont understand that + if sys.platform == 'darwin' or sys.platform == 'win32': kw['foreground'] = kw.pop('foreground', PAGEFG) kw['background'] = kw.pop('background', PAGEBG) else: diff --git a/outfitting.py b/outfitting.py index e59aa8d0..174e24b8 100644 --- a/outfitting.py +++ b/outfitting.py @@ -93,13 +93,13 @@ def lookup(module, ship_map, entitled=False) -> Optional[dict]: # noqa: C901, C # Countermeasures - e.g. Hpt_PlasmaPointDefence_Turret_Tiny elif name[0] == 'hpt' and name[1] in countermeasure_map: new['category'] = 'utility' - new['name'], new['rating'] = countermeasure_map[len(name) > 4 and (name[1], name[4]) or name[1]] + new['name'], new['rating'] = countermeasure_map[name[1]] new['class'] = weaponclass_map[name[-1]] # Utility - e.g. Hpt_CargoScanner_Size0_Class1 elif name[0] == 'hpt' and name[1] in utility_map: new['category'] = 'utility' - new['name'] = utility_map[len(name) > 4 and (name[1], name[4]) or name[1]] + new['name'] = utility_map[name[1]] if not name[2].startswith('size') or not name[3].startswith('class'): raise AssertionError(f'{module["id"]}: Unknown class/rating "{name[2]}/{name[3]}"') diff --git a/plug.py b/plug.py index d77c5fa9..355fcdde 100644 --- a/plug.py +++ b/plug.py @@ -7,7 +7,8 @@ import os import sys import tkinter as tk from builtins import object, str -from typing import Any, Callable, List, Mapping, MutableMapping, Optional, Tuple +from tkinter import ttk +from typing import Any, Callable, List, Mapping, MutableMapping, Optional import companion import myNotebook as nb # noqa: N813 @@ -26,7 +27,7 @@ class LastError: """Holds the last plugin error.""" msg: Optional[str] - root: tk.Frame + root: tk.Tk def __init__(self) -> None: self.msg = None @@ -118,7 +119,7 @@ class Plugin(object): return None - def get_prefs(self, parent: tk.Frame, cmdr: str, is_beta: bool) -> Optional[tk.Frame]: + def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Optional[tk.Frame]: """ If the plugin provides a prefs frame, create and return it. @@ -140,7 +141,7 @@ class Plugin(object): return None -def load_plugins(master: tk.Frame) -> None: # noqa: CCR001 +def load_plugins(master: tk.Tk) -> None: # noqa: CCR001 """Find and load all plugins.""" last_error.root = master @@ -198,7 +199,7 @@ def provides(fn_name: str) -> List[str]: def invoke( - plugin_name: str, fallback: str, fn_name: str, *args: Tuple + plugin_name: str, fallback: str | None, fn_name: str, *args: Any ) -> Optional[str]: """ Invoke a function on a named plugin. @@ -248,7 +249,7 @@ def notify_stop() -> Optional[str]: return error -def notify_prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None: +def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: """ Notify plugins that the Cmdr was changed while the settings dialog is open. @@ -265,7 +266,7 @@ def notify_prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None: logger.exception(f'Plugin "{plugin.name}" failed') -def notify_prefs_changed(cmdr: str, is_beta: bool) -> None: +def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None: """ Notify plugins that the settings dialog has been closed. diff --git a/plugins/coriolis.py b/plugins/coriolis.py index fef683b9..29fddf26 100644 --- a/plugins/coriolis.py +++ b/plugins/coriolis.py @@ -26,6 +26,7 @@ import gzip import io import json import tkinter as tk +from tkinter import ttk from typing import TYPE_CHECKING, Union import myNotebook as nb # noqa: N813 # its not my fault. @@ -79,7 +80,7 @@ def plugin_start3(path: str) -> str: return 'Coriolis' -def plugin_prefs(parent: tk.Widget, cmdr: str, is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """Set up plugin preferences.""" PADX = 10 # noqa: N806 @@ -126,7 +127,7 @@ def plugin_prefs(parent: tk.Widget, cmdr: str, is_beta: bool) -> tk.Frame: return conf_frame -def prefs_changed(cmdr: str, is_beta: bool) -> None: +def prefs_changed(cmdr: str | None, is_beta: bool) -> None: """Update URLs.""" global normal_url, beta_url, override_mode normal_url = normal_textvar.get() diff --git a/plugins/eddn.py b/plugins/eddn.py index cecdd0e0..96c839ad 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -34,7 +34,7 @@ from collections import OrderedDict from platform import system from textwrap import dedent from threading import Lock -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, MutableMapping, Optional +from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping, Optional from typing import OrderedDict as OrderedDictT from typing import Tuple, Union @@ -91,13 +91,13 @@ class This: # Avoid duplicates self.marketId: Optional[str] = None - self.commodities: Optional[List[OrderedDictT[str, Any]]] = None - self.outfitting: Optional[Tuple[bool, List[str]]] = None - self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None + self.commodities: Optional[list[OrderedDictT[str, Any]]] = None + self.outfitting: Optional[Tuple[bool, list[str]]] = None + self.shipyard: Optional[Tuple[bool, list[Mapping[str, Any]]]] = None self.fcmaterials_marketid: int = 0 - self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials: Optional[list[OrderedDictT[str, Any]]] = None self.fcmaterials_capi_marketid: int = 0 - self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials_capi: Optional[list[OrderedDictT[str, Any]]] = None # For the tkinter parent window, so we can call update_idletasks() self.parent: tk.Tk @@ -379,7 +379,7 @@ class EDDNSender: :param text: The status text to be set/logged. """ if os.getenv('EDMC_NO_UI'): - logger.INFO(text) + logger.info(text) return self.eddn.parent.children['status']['text'] = text @@ -403,7 +403,7 @@ class EDDNSender: """ logger.trace_if("plugin.eddn.send", "Sending message") should_return: bool - new_data: Dict[str, Any] = {} + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: @@ -615,7 +615,7 @@ class EDDN: self.sender = EDDNSender(self, self.eddn_url) - self.fss_signals: List[Mapping[str, Any]] = [] + self.fss_signals: list[Mapping[str, Any]] = [] def close(self): """Close down the EDDN class instance.""" @@ -639,8 +639,7 @@ class EDDN: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] = {} - + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) if should_return: logger.warning("capi.request./market has been disabled by killswitch. Returning.") @@ -657,7 +656,7 @@ class EDDN: modules, ships ) - commodities: List[OrderedDictT[str, Any]] = [] + commodities: list[OrderedDictT[str, Any]] = [] for commodity in data['lastStarport'].get('commodities') or []: # Check 'marketable' and 'not prohibited' if (category_map.get(commodity['categoryname'], True) @@ -719,7 +718,7 @@ class EDDN: # Send any FCMaterials.json-equivalent 'orders' as well self.export_capi_fcmaterials(data, is_beta, horizons) - def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[Dict, Dict]: + def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: """ Produce a sanity-checked version of ships and modules from CAPI data. @@ -730,7 +729,7 @@ class EDDN: :param data: The raw CAPI data. :return: Sanity-checked data. """ - modules: Dict[str, Any] = data['lastStarport'].get('modules') + modules: dict[str, Any] = data['lastStarport'].get('modules') if modules is None or not isinstance(modules, dict): if modules is None: logger.debug('modules was None. FC or Damaged Station?') @@ -747,7 +746,7 @@ class EDDN: # Set a safe value modules = {} - ships: Dict[str, Any] = data['lastStarport'].get('ships') + ships: dict[str, Any] = data['lastStarport'].get('ships') if ships is None or not isinstance(ships, dict): if ships is None: logger.debug('ships was None') @@ -773,8 +772,7 @@ class EDDN: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] = {} - + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -801,7 +799,7 @@ class EDDN: modules.values() ) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search ) @@ -842,8 +840,7 @@ class EDDN: :param is_beta: whether or not we are in beta mode """ should_return: bool - new_data: Dict[str, Any] = {} - + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -862,7 +859,7 @@ class EDDN: ships ) - shipyard: List[Mapping[str, Any]] = sorted( + shipyard: list[Mapping[str, Any]] = sorted( itertools.chain( (ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()), (ship['name'].lower() for ship in ships['unavailable_list'] or {}), @@ -905,8 +902,8 @@ class EDDN: :param is_beta: whether or not we're in beta mode :param entry: the journal entry containing the commodities data """ - items: List[Mapping[str, Any]] = entry.get('Items') or [] - commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([ + items: list[Mapping[str, Any]] = entry.get('Items') or [] + commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([ ('name', self.canonicalise(commodity['Name'])), ('meanPrice', commodity['MeanPrice']), ('buyPrice', commodity['BuyPrice']), @@ -953,11 +950,11 @@ class EDDN: :param is_beta: Whether or not we're in beta mode :param entry: The relevant journal entry """ - modules: List[Mapping[str, Any]] = entry.get('Items', []) + modules: list[Mapping[str, Any]] = entry.get('Items', []) horizons: bool = entry.get('Horizons', False) # outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) # for module in modules if module['Name'] != 'int_planetapproachsuite']) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules) ) @@ -992,7 +989,7 @@ class EDDN: :param is_beta: Whether or not we're in beta mode :param entry: the relevant journal entry """ - ships: List[Mapping[str, Any]] = entry.get('PriceList') or [] + ships: list[Mapping[str, Any]] = entry.get('PriceList') or [] horizons: bool = entry.get('Horizons', False) shipyard = sorted(ship['ShipType'] for ship in ships) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. @@ -1834,7 +1831,7 @@ class EDDN: ####################################################################### # Build basis of message - msg: Dict = { + msg: dict = { '$schemaRef': f'https://eddn.edcd.io/schemas/fsssignaldiscovered/1{"/test" if is_beta else ""}', 'message': { "event": "FSSSignalDiscovered", @@ -2176,9 +2173,6 @@ def journal_entry( # noqa: C901, CCR001 :param state: `dict` - Current `monitor.state` data. :return: `str` - Error message, or `None` if no errors. """ - should_return: bool - new_data: Dict[str, Any] = {} - should_return, new_data = killswitch.check_killswitch('plugins.eddn.journal', entry) if should_return: plug.show_error(_('EDDN journal handler disabled. See Log.')) # LANG: Killswitch disabled EDDN @@ -2585,7 +2579,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST return economies_colony or modules_horizons or ship_horizons -def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None: +def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None: """ Process Status.json data to track things like current Body. diff --git a/plugins/edsm.py b/plugins/edsm.py index 4c6ba02b..635b3559 100644 --- a/plugins/edsm.py +++ b/plugins/edsm.py @@ -38,12 +38,14 @@ from datetime import datetime, timedelta, timezone from queue import Queue from threading import Thread from time import sleep +from tkinter import ttk from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast import requests import killswitch import monitor +import myNotebook import myNotebook as nb # noqa: N813 import plug from companion import CAPIData @@ -82,8 +84,8 @@ class This: self.session: requests.Session = requests.Session() self.session.headers['User-Agent'] = user_agent self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread - self.discarded_events: Set[str] = [] # List discarded events from EDSM - self.lastlookup: requests.Response # Result of last system lookup + self.discarded_events: Set[str] = set() # List discarded events from EDSM + self.lastlookup: Dict[str, Any] # Result of last system lookup # Game state self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew @@ -91,13 +93,13 @@ class This: self.newgame: bool = False # starting up - batch initial burst of events self.newgame_docked: bool = False # starting up while docked self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan - self.system_link: tk.Widget = None - self.system: tk.Tk = None - self.system_address: Optional[int] = None # Frontier SystemAddress - self.system_population: Optional[int] = None - self.station_link: tk.Widget = None - self.station: Optional[str] = None - self.station_marketid: Optional[int] = None # Frontier MarketID + self.system_link: tk.Widget | None = None + self.system: tk.Tk | None = None + self.system_address: int | None = None # Frontier SystemAddress + self.system_population: int | None = None + self.station_link: tk.Widget | None = None + self.station: str | None = None + self.station_marketid: int | None = None # Frontier MarketID self.on_foot = False self._IMG_KNOWN = None @@ -107,19 +109,19 @@ class This: self.thread: Optional[threading.Thread] = None - self.log = None - self.log_button = None + self.log: tk.IntVar | None = None + self.log_button: ttk.Checkbutton | None = None - self.label = None + self.label: tk.Widget | None = None - self.cmdr_label = None - self.cmdr_text = None + self.cmdr_label: myNotebook.Label | None = None + self.cmdr_text: myNotebook.Label | None = None - self.user_label = None - self.user = None + self.user_label: myNotebook.Label | None = None + self.user: myNotebook.Entry | None = None - self.apikey_label = None - self.apikey = None + self.apikey_label: myNotebook.Label | None = None + self.apikey: myNotebook.Entry | None = None this = This() @@ -267,7 +269,7 @@ def plugin_stop() -> None: logger.debug('Done.') -def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """ Plugin preferences setup hook. @@ -300,7 +302,8 @@ def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame: frame, text=_('Send flight log and Cmdr status to EDSM'), variable=this.log, command=prefsvarchanged ) - this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W) + if this.log_button: + this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W) nb.Label(frame).grid(sticky=tk.W) # big spacer # Section heading in settings @@ -315,61 +318,77 @@ def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame: cur_row = 10 - this.label.grid(columnspan=2, padx=PADX, sticky=tk.W) + if this.label: + this.label.grid(columnspan=2, padx=PADX, sticky=tk.W) - # LANG: Game Commander name label in EDSM settings - this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window - this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W) - this.cmdr_text = nb.Label(frame) - this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W) + if this.cmdr_label and this.cmdr_text: + # LANG: Game Commander name label in EDSM settings + this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window + this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W) + this.cmdr_text = nb.Label(frame) + this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W) cur_row += 1 - # LANG: EDSM Commander name label in EDSM settings - this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting - this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W) - this.user = nb.Entry(frame) - this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) + if this.user_label and this.label: + # LANG: EDSM Commander name label in EDSM settings + this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting + this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W) + this.user = nb.Entry(frame) + this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) cur_row += 1 - # LANG: EDSM API key label - this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting - this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W) - this.apikey = nb.Entry(frame) - this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) + if this.apikey_label and this.apikey: + # LANG: EDSM API key label + this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting + this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W) + this.apikey = nb.Entry(frame) + this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW) prefs_cmdr_changed(cmdr, is_beta) return frame -def prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None: +def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001 """ Handle the Commander name changing whilst Settings was open. :param cmdr: The new current Commander name. :param is_beta: Whether game beta was detected. """ - this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED - this.user['state'] = tk.NORMAL - this.user.delete(0, tk.END) - this.apikey['state'] = tk.NORMAL - this.apikey.delete(0, tk.END) + if this.log_button: + this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED + + if this.user: + this.user['state'] = tk.NORMAL + this.user.delete(0, tk.END) + + if this.apikey: + this.apikey['state'] = tk.NORMAL + this.apikey.delete(0, tk.END) + if cmdr: - this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}' + if this.cmdr_text: + this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}' + cred = credentials(cmdr) if cred: - this.user.insert(0, cred[0]) - this.apikey.insert(0, cred[1]) + if this.user: + this.user.insert(0, cred[0]) + + if this.apikey: + this.apikey.insert(0, cred[1]) else: - # LANG: We have no data on the current commander - this.cmdr_text['text'] = _('None') + if this.cmdr_text: + # LANG: We have no data on the current commander + this.cmdr_text['text'] = _('None') to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED - if cmdr and not is_beta and this.log.get(): + if cmdr and not is_beta and this.log and this.log.get(): to_set = tk.NORMAL set_prefs_ui_states(to_set) @@ -378,7 +397,7 @@ def prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None: def prefsvarchanged() -> None: """Handle the 'Send data to EDSM' tickbox changing state.""" to_set = tk.DISABLED - if this.log.get(): + if this.log and this.log.get() and this.log_button: to_set = this.log_button['state'] set_prefs_ui_states(to_set) @@ -390,13 +409,17 @@ def set_prefs_ui_states(state: str) -> None: :param state: the state to set each entry to """ - this.label['state'] = state - this.cmdr_label['state'] = state - this.cmdr_text['state'] = state - this.user_label['state'] = state - this.user['state'] = state - this.apikey_label['state'] = state - this.apikey['state'] = state + if ( + this.label and this.cmdr_label and this.cmdr_text and this.user_label and this.user + and this.apikey_label and this.apikey + ): + this.label['state'] = state + this.cmdr_label['state'] = state + this.cmdr_text['state'] = state + this.user_label['state'] = state + this.user['state'] = state + this.apikey_label['state'] = state + this.apikey['state'] = state def prefs_changed(cmdr: str, is_beta: bool) -> None: @@ -406,7 +429,8 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: :param cmdr: Name of Commander. :param is_beta: Whether game beta was detected. """ - config.set('edsm_out', this.log.get()) + if this.log: + config.set('edsm_out', this.log.get()) if cmdr and not is_beta: # TODO: remove this when config is rewritten. @@ -414,17 +438,18 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: usernames: List[str] = config.get_list('edsm_usernames', default=[]) apikeys: List[str] = config.get_list('edsm_apikeys', default=[]) - if cmdr in cmdrs: - idx = cmdrs.index(cmdr) - usernames.extend([''] * (1 + idx - len(usernames))) - usernames[idx] = this.user.get().strip() - apikeys.extend([''] * (1 + idx - len(apikeys))) - apikeys[idx] = this.apikey.get().strip() + if this.user and this.apikey: + if cmdr in cmdrs: + idx = cmdrs.index(cmdr) + usernames.extend([''] * (1 + idx - len(usernames))) + usernames[idx] = this.user.get().strip() + apikeys.extend([''] * (1 + idx - len(apikeys))) + apikeys[idx] = this.apikey.get().strip() - else: - config.set('edsm_cmdrs', cmdrs + [cmdr]) - usernames.append(this.user.get().strip()) - apikeys.append(this.apikey.get().strip()) + else: + config.set('edsm_cmdrs', cmdrs + [cmdr]) + usernames.append(this.user.get().strip()) + apikeys.append(this.apikey.get().strip()) config.set('edsm_usernames', usernames) config.set('edsm_apikeys', apikeys) @@ -479,9 +504,6 @@ def journal_entry( # noqa: C901, CCR001 :param state: `monitor.state` :return: None if no error, else an error string. """ - should_return: bool - new_entry: Dict[str, Any] = {} - should_return, new_entry = killswitch.check_killswitch('plugins.edsm.journal', entry, logger) if should_return: # LANG: EDSM plugin - Journal handling disabled by killswitch @@ -548,12 +570,13 @@ entry: {entry!r}''' else: to_set = '' - this.station_link['text'] = to_set - this.station_link['url'] = station_url(str(this.system), str(this.station)) - this.station_link.update_idletasks() + if this.station_link: + this.station_link['text'] = to_set + this.station_link['url'] = station_url(str(this.system), str(this.station)) + this.station_link.update_idletasks() # Update display of 'EDSM Status' image - if this.system_link['text'] != system: + if this.system_link and this.system_link['text'] != system: this.system_link['text'] = system if system else '' this.system_link['image'] = '' this.system_link.update_idletasks() @@ -642,7 +665,7 @@ Queueing: {entry!r}''' # Update system data -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: +def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 """ Process new CAPI data. @@ -666,27 +689,29 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # TODO: Fire off the EDSM API call to trigger the callback for the icons if config.get_str('system_provider') == 'EDSM': - this.system_link['text'] = this.system - # Do *NOT* set 'url' here, as it's set to a function that will call - # through correctly. We don't want a static string. - this.system_link.update_idletasks() + if this.system_link: + this.system_link['text'] = this.system + # Do *NOT* set 'url' here, as it's set to a function that will call + # through correctly. We don't want a static string. + this.system_link.update_idletasks() if config.get_str('station_provider') == 'EDSM': - if data['commander']['docked'] or this.on_foot and this.station: - this.station_link['text'] = this.station + if this.station_link: + if data['commander']['docked'] or this.on_foot and this.station: + this.station_link['text'] = this.station - elif data['lastStarport']['name'] and data['lastStarport']['name'] != "": - this.station_link['text'] = STATION_UNDOCKED + elif data['lastStarport']['name'] and data['lastStarport']['name'] != "": + this.station_link['text'] = STATION_UNDOCKED - else: - this.station_link['text'] = '' + else: + this.station_link['text'] = '' - # Do *NOT* set 'url' here, as it's set to a function that will call - # through correctly. We don't want a static string. + # Do *NOT* set 'url' here, as it's set to a function that will call + # through correctly. We don't want a static string. - this.station_link.update_idletasks() + this.station_link.update_idletasks() - if not this.system_link['text']: + if this.system_link and not this.system_link['text']: this.system_link['text'] = system this.system_link['image'] = '' this.system_link.update_idletasks() @@ -762,12 +787,12 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently retrying = 0 while retrying < 3: - should_skip: bool - new_item: Dict[str, Any] = {} + if item is None: + item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {})) should_skip, new_item = killswitch.check_killswitch( 'plugins.edsm.worker', - item if item is not None else cast(Tuple[str, Mapping[str, Any]], ("", {})), + item, logger ) @@ -801,9 +826,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently # drop events if required by killswitch new_pending = [] for e in pending: - skip: bool - new: Dict[str, Any] = {} - skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger) if skip: continue @@ -847,8 +869,12 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently if any(p for p in pending if p['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked')): data_elided = data.copy() data_elided['apiKey'] = '' - data_elided['message'] = data_elided['message'].decode('utf-8') - data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8') + if isinstance(data_elided['message'], bytes): + data_elided['message'] = data_elided['message'].decode('utf-8') + + if isinstance(data_elided['commanderName'], bytes): + data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8') + logger.trace_if( 'journal.locations', "pending has at least one of ('CarrierJump', 'FSDJump', 'Location', 'Docked')" @@ -867,11 +893,11 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently 'journal.locations', f'Overall POST data (elided) is:\n{json.dumps(data_elided, indent=2)}' ) - r = this.session.post(TARGET_URL, data=data, timeout=_TIMEOUT) - logger.trace_if('plugin.edsm.api', f'API response content: {r.content!r}') - r.raise_for_status() + response = this.session.post(TARGET_URL, data=data, timeout=_TIMEOUT) + logger.trace_if('plugin.edsm.api', f'API response content: {response.content!r}') + response.raise_for_status() - reply = r.json() + reply = response.json() msg_num = reply['msgnum'] msg = reply['msg'] # 1xx = OK @@ -902,7 +928,7 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently # Update main window's system status this.lastlookup = r # calls update_status in main thread - if not config.shutting_down: + if not config.shutting_down and this.system_link is not None: this.system_link.event_generate('<>', when="tail") if r['msgnum'] // 100 != 1: # type: ignore @@ -1005,18 +1031,19 @@ def update_status(event=None) -> None: # msgnum: 1xx = OK, 2xx = fatal error, 3xx = error, 4xx = ignorable errors. def edsm_notify_system(reply: Mapping[str, Any]) -> None: """Update the image next to the system link.""" - if not reply: - this.system_link['image'] = this._IMG_ERROR - # LANG: EDSM Plugin - Error connecting to EDSM API - plug.show_error(_("Error: Can't connect to EDSM")) + if this.system_link is not None: + if not reply: + this.system_link['image'] = this._IMG_ERROR + # LANG: EDSM Plugin - Error connecting to EDSM API + plug.show_error(_("Error: Can't connect to EDSM")) - elif reply['msgnum'] // 100 not in (1, 4): - this.system_link['image'] = this._IMG_ERROR - # LANG: EDSM Plugin - Error message from EDSM API - plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg'])) + elif reply['msgnum'] // 100 not in (1, 4): + this.system_link['image'] = this._IMG_ERROR + # LANG: EDSM Plugin - Error message from EDSM API + plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg'])) - elif reply.get('systemCreated'): - this.system_link['image'] = this._IMG_NEW + elif reply.get('systemCreated'): + this.system_link['image'] = this._IMG_NEW - else: - this.system_link['image'] = this._IMG_KNOWN + else: + this.system_link['image'] = this._IMG_KNOWN diff --git a/plugins/inara.py b/plugins/inara.py index 163fdcb7..15fdea5b 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -30,6 +30,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from operator import itemgetter from threading import Lock, Thread +from tkinter import ttk from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional from typing import OrderedDict as OrderedDictT from typing import Sequence, Union, cast @@ -130,7 +131,7 @@ class This: self.log_button: nb.Checkbutton self.label: HyperlinkLabel self.apikey: nb.Entry - self.apikey_label: HyperlinkLabel + self.apikey_label: tk.Label self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque) self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events @@ -235,7 +236,7 @@ def plugin_stop() -> None: logger.debug('Done.') -def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str, is_beta: bool) -> tk.Frame: """Plugin Preferences UI hook.""" x_padding = 10 x_button_padding = 12 # indent Checkbuttons and Radiobuttons @@ -1539,9 +1540,6 @@ def new_add_event( def clean_event_list(event_list: List[Event]) -> List[Event]: """Check for killswitched events and remove or modify them as requested.""" out = [] - bad: bool - new_event: Dict[str, Any] = {} - for e in event_list: bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{e.name}', e.data, logger) if bad: diff --git a/prefs.py b/prefs.py index 985ef581..e5ab242b 100644 --- a/prefs.py +++ b/prefs.py @@ -19,7 +19,6 @@ from EDMCLogging import edmclogger, get_main_logger from hotkey import hotkeymgr from l10n import Translations from monitor import monitor -from myNotebook import Notebook from theme import theme from ttkHyperlinkLabel import HyperlinkLabel @@ -279,7 +278,7 @@ class PreferencesDialog(tk.Toplevel): frame = ttk.Frame(self) frame.grid(sticky=tk.NSEW) - notebook = nb.Notebook(frame) + notebook: ttk.Notebook = nb.Notebook(frame) notebook.bind('<>', self.tabchanged) # Recompute on tab change self.PADX = 10 @@ -333,7 +332,7 @@ class PreferencesDialog(tk.Toplevel): ): self.geometry(f"+{position.left}+{position.top}") - def __setup_output_tab(self, root_notebook: nb.Notebook) -> None: + def __setup_output_tab(self, root_notebook: ttk.Notebook) -> None: output_frame = nb.Frame(root_notebook) output_frame.columnconfigure(0, weight=1) @@ -418,13 +417,13 @@ class PreferencesDialog(tk.Toplevel): # LANG: Label for 'Output' Settings/Preferences tab root_notebook.add(output_frame, text=_('Output')) # Tab heading in settings - def __setup_plugin_tabs(self, notebook: Notebook) -> None: + def __setup_plugin_tabs(self, notebook: ttk.Notebook) -> None: for plugin in plug.PLUGINS: plugin_frame = plugin.get_prefs(notebook, monitor.cmdr, monitor.is_beta) if plugin_frame: notebook.add(plugin_frame, text=plugin.name) - def __setup_config_tab(self, notebook: Notebook) -> None: # noqa: CCR001 + def __setup_config_tab(self, notebook: ttk.Notebook) -> None: # noqa: CCR001 config_frame = nb.Frame(notebook) config_frame.columnconfigure(1, weight=1) row = AutoInc(start=1) @@ -675,7 +674,7 @@ class PreferencesDialog(tk.Toplevel): # LANG: Label for 'Configuration' tab in Settings notebook.add(config_frame, text=_('Configuration')) - def __setup_privacy_tab(self, notebook: Notebook) -> None: + def __setup_privacy_tab(self, notebook: ttk.Notebook) -> None: frame = nb.Frame(notebook) self.hide_multicrew_captain = tk.BooleanVar(value=config.get_bool('hide_multicrew_captain', default=False)) self.hide_private_group = tk.BooleanVar(value=config.get_bool('hide_private_group', default=False)) @@ -697,7 +696,7 @@ class PreferencesDialog(tk.Toplevel): notebook.add(frame, text=_('Privacy')) # LANG: Preferences privacy tab title - def __setup_appearance_tab(self, notebook: Notebook) -> None: + def __setup_appearance_tab(self, notebook: ttk.Notebook) -> None: self.languages = Translations.available_names() # Appearance theme and language setting # LANG: The system default language choice in Settings > Appearance @@ -887,7 +886,7 @@ class PreferencesDialog(tk.Toplevel): # LANG: Label for Settings > Appearance tab notebook.add(appearance_frame, text=_('Appearance')) # Tab heading in settings - def __setup_plugin_tab(self, notebook: Notebook) -> None: # noqa: CCR001 + def __setup_plugin_tab(self, notebook: ttk.Notebook) -> None: # noqa: CCR001 # Plugin settings and info plugins_frame = nb.Frame(notebook) plugins_frame.columnconfigure(0, weight=1) @@ -1188,8 +1187,8 @@ class PreferencesDialog(tk.Toplevel): :return: "break" as a literal, to halt processing """ good = hotkeymgr.fromevent(event) - if good: - (hotkey_code, hotkey_mods) = good + if good and isinstance(good, tuple): + hotkey_code, hotkey_mods = good event.widget.delete(0, tk.END) event.widget.insert(0, hotkeymgr.display(hotkey_code, hotkey_mods)) if hotkey_code: diff --git a/protocol.py b/protocol.py index 290a7031..f0e1b4a8 100644 --- a/protocol.py +++ b/protocol.py @@ -121,9 +121,13 @@ elif (config.auth_force_edmc_protocol and not is_wine and not config.auth_force_localserver )): + # This could be false if you use auth_force_edmc_protocol, but then you get to keep the pieces + assert sys.platform == 'win32' # spell-checker: words HBRUSH HICON WPARAM wstring WNDCLASS HMENU HGLOBAL from ctypes import windll # type: ignore - from ctypes import POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at + from ctypes import ( # type: ignore + POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at + ) from ctypes.wintypes import ( ATOM, BOOL, DWORD, HBRUSH, HGLOBAL, HICON, HINSTANCE, HMENU, HWND, INT, LPARAM, LPCWSTR, LPVOID, LPWSTR, MSG, UINT, WPARAM diff --git a/requirements-dev.txt b/requirements-dev.txt index e0098f88..4207743e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,8 +13,8 @@ flake8-annotations-coverage==0.0.6 flake8-cognitive-complexity==0.1.0 flake8-comprehensions==3.10.1 flake8-docstrings==1.6.0 -isort==5.11.3 -flake8-isort==5.0.3 +isort==5.11.4 +flake8-isort==6.0.0 flake8-json==21.7.0 flake8-noqa==1.3.0 flake8-polyfill==1.0.2 @@ -23,7 +23,8 @@ flake8-use-fstring==1.4 mypy==0.991 pep8-naming==0.13.3 safety==2.3.5 -types-requests==2.28.11.6 +types-requests==2.28.11.7 +types-pkg-resources==0.1.3 # Code formatting tools autopep8==2.0.1 @@ -45,8 +46,8 @@ py2exe==0.13.0.0; sys_platform == 'win32' # Testing pytest==7.2.0 pytest-cov==4.0.0 # Pytest code coverage support -coverage[toml]==6.5.0 # pytest-cov dep. This is here to ensure that it includes TOML support for pyproject.toml configs -coverage-conditional-plugin==0.7.0 +coverage[toml]==7.0.0 # pytest-cov dep. This is here to ensure that it includes TOML support for pyproject.toml configs +coverage-conditional-plugin==0.8.0 # For manipulating folder permissions and the like. pywin32==305; sys_platform == 'win32' diff --git a/scripts/mypy-all.sh b/scripts/mypy-all.sh new file mode 100755 index 00000000..72a37f0b --- /dev/null +++ b/scripts/mypy-all.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# +# Run mypy checks against all the relevant files + +# We assume that all `.py` files in git should be checked, and *only* those. +mypy $@ $(git ls-tree --full-tree -r --name-only HEAD | grep -E '\.py$') diff --git a/scripts/pip_rev_deps.py b/scripts/pip_rev_deps.py new file mode 100644 index 00000000..b97757d5 --- /dev/null +++ b/scripts/pip_rev_deps.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +"""Find the reverse dependencies of a package according to pip.""" +import sys + +import pkg_resources + + +def find_reverse_deps(package_name: str): + """ + Find the packages that depend on the named one. + + :param package_name: Target package. + :return: List of packages that depend on this one. + """ + return [ + pkg.project_name for pkg in pkg_resources.WorkingSet() + if package_name in {req.project_name for req in pkg.requires()} + ] + + +if __name__ == '__main__': + print(find_reverse_deps(sys.argv[1])) diff --git a/stats.py b/stats.py index 6886eb23..6b5d8d61 100644 --- a/stats.py +++ b/stats.py @@ -5,7 +5,7 @@ import sys import tkinter import tkinter as tk from tkinter import ttk -from typing import TYPE_CHECKING, Any, AnyStr, Callable, Dict, List, NamedTuple, Optional, Sequence, cast +from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast import companion import EDMCLogging @@ -45,7 +45,7 @@ RANK_LINES_END = 9 POWERPLAY_LINES_START = 9 -def status(data: Dict[str, Any]) -> List[List[str]]: +def status(data: dict[str, Any]) -> list[list[str]]: """ Get the current status of the cmdr referred to by data. @@ -211,7 +211,7 @@ def status(data: Dict[str, Any]) -> List[List[str]]: return res -def export_status(data: Dict[str, Any], filename: AnyStr) -> None: +def export_status(data: dict[str, Any], filename: AnyStr) -> None: """ Export status data to a CSV file. @@ -236,21 +236,21 @@ class ShipRet(NamedTuple): value: str -def ships(companion_data: Dict[str, Any]) -> List[ShipRet]: +def ships(companion_data: dict[str, Any]) -> list[ShipRet]: """ Return a list of 5 tuples of ship information. :param data: [description] :return: A 5 tuple of strings containing: Ship ID, Ship Type Name (internal), Ship Name, System, Station, and Value """ - ships: List[Dict[str, Any]] = companion.listify(cast(list, companion_data.get('ships'))) + ships: list[dict[str, Any]] = companion.listify(cast(list, companion_data.get('ships'))) current = companion_data['commander'].get('currentShipId') if isinstance(current, int) and current < len(ships) and ships[current]: ships.insert(0, ships.pop(current)) # Put current ship first if not companion_data['commander'].get('docked'): - out: List[ShipRet] = [] + out: list[ShipRet] = [] # Set current system, not last docked out.append(ShipRet( id=str(ships[0]['id']), @@ -285,7 +285,7 @@ def ships(companion_data: Dict[str, Any]) -> List[ShipRet]: ] -def export_ships(companion_data: Dict[str, Any], filename: AnyStr) -> None: +def export_ships(companion_data: dict[str, Any], filename: AnyStr) -> None: """ Export the current ships to a CSV file. @@ -358,7 +358,7 @@ class StatsDialog(): class StatsResults(tk.Toplevel): """Status window.""" - def __init__(self, parent: tk.Tk, data: Dict[str, Any]) -> None: + def __init__(self, parent: tk.Tk, data: dict[str, Any]) -> None: tk.Toplevel.__init__(self, parent) self.parent = parent @@ -442,7 +442,9 @@ class StatsResults(tk.Toplevel): ): self.geometry(f"+{position.left}+{position.top}") - def addpage(self, parent, header: List[str] = None, align: Optional[str] = None) -> tk.Frame: + def addpage( + self, parent, header: list[str] | None = None, align: str | None = None + ) -> ttk.Frame: """ Add a page to the StatsResults screen. @@ -462,7 +464,7 @@ class StatsResults(tk.Toplevel): return page - def addpageheader(self, parent: tk.Frame, header: Sequence[str], align: Optional[str] = None) -> None: + def addpageheader(self, parent: ttk.Frame, header: Sequence[str], align: str | None = None) -> None: """ Add the column headers to the page, followed by a separator. @@ -478,7 +480,7 @@ class StatsResults(tk.Toplevel): self.addpagerow(parent, ['']) def addpagerow( - self, parent: tk.Frame, content: Sequence[str], align: Optional[str] = None, with_copy: bool = False + self, parent: ttk.Frame, content: Sequence[str], align: str | None = None, with_copy: bool = False ): """ Add a single row to parent. diff --git a/tests/config/_old_config.py b/tests/config/_old_config.py index b0952903..211c1e72 100644 --- a/tests/config/_old_config.py +++ b/tests/config/_old_config.py @@ -1,3 +1,4 @@ +# type: ignore import numbers import sys import warnings @@ -138,7 +139,7 @@ class OldConfig(): self.identifier = f'uk.org.marginal.{appname.lower()}' NSBundle.mainBundle().infoDictionary()['CFBundleIdentifier'] = self.identifier - self.default_journal_dir = join( + self.default_journal_dir: str | None = join( NSSearchPathForDirectoriesInDomains(NSApplicationSupportDirectory, NSUserDomainMask, True)[0], 'Frontier Developments', 'Elite Dangerous' @@ -222,13 +223,13 @@ class OldConfig(): journaldir = known_folder_path(FOLDERID_SavedGames) if journaldir: - self.default_journal_dir = join(journaldir, 'Frontier Developments', 'Elite Dangerous') + self.default_journal_dir: str | None = join(journaldir, 'Frontier Developments', 'Elite Dangerous') else: self.default_journal_dir = None self.identifier = applongname - self.hkey = HKEY() + self.hkey: ctypes.c_void_p | None = HKEY() disposition = DWORD() if RegCreateKeyEx( HKEY_CURRENT_USER, @@ -376,7 +377,7 @@ class OldConfig(): mkdir(self.plugin_dir) self.internal_plugin_dir = join(dirname(__file__), 'plugins') - self.default_journal_dir = None + self.default_journal_dir: str | None = None self.home = expanduser('~') self.respath = dirname(__file__) self.identifier = f'uk.org.marginal.{appname.lower()}' diff --git a/tests/config/test_config.py b/tests/config/test_config.py index b6637623..ad6bbd6f 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -88,7 +88,7 @@ class TestNewConfig: if sys.platform != 'linux': return - from config.linux import LinuxConfig + from config.linux import LinuxConfig # type: ignore if isinstance(config, LinuxConfig) and config.config is not None: config.config.read(config.filename) @@ -179,7 +179,7 @@ class TestOldNewConfig: if sys.platform != 'linux': return - from config.linux import LinuxConfig + from config.linux import LinuxConfig # type: ignore if isinstance(config, LinuxConfig) and config.config is not None: config.config.read(config.filename) diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index 72252e9f..c617d52c 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -3,11 +3,10 @@ import multiprocessing as mp import os import pathlib import sys +from typing import Generator import pytest -# Import as other names else they get picked up when used as fixtures -from _pytest import monkeypatch as _pytest_monkeypatch -from _pytest import tmpdir as _pytest_tmpdir +from pytest import MonkeyPatch, TempdirFactory, TempPathFactory from config import config from journal_lock import JournalLock, JournalLockResult @@ -117,11 +116,11 @@ class TestJournalLock: @pytest.fixture def mock_journaldir( - self, monkeypatch: _pytest_monkeypatch, - tmp_path_factory: _pytest_tmpdir.TempPathFactory - ) -> _pytest_tmpdir.TempPathFactory: + self, monkeypatch: MonkeyPatch, + tmp_path_factory: TempdirFactory + ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: str = None) -> str: + def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': return str(tmp_path_factory.getbasetemp()) @@ -136,11 +135,11 @@ class TestJournalLock: @pytest.fixture def mock_journaldir_changing( self, - monkeypatch: _pytest_monkeypatch, - tmp_path_factory: _pytest_tmpdir.TempPathFactory - ) -> _pytest_tmpdir.TempPathFactory: + monkeypatch: MonkeyPatch, + tmp_path_factory: TempdirFactory + ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: str = None) -> str: + def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': return tmp_path_factory.mktemp("changing") @@ -154,7 +153,7 @@ class TestJournalLock: ########################################################################### # Tests against JournalLock.__init__() - def test_journal_lock_init(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_journal_lock_init(self, mock_journaldir: TempPathFactory): """Test JournalLock instantiation.""" print(f'{type(mock_journaldir)=}') tmpdir = str(mock_journaldir.getbasetemp()) @@ -176,14 +175,14 @@ class TestJournalLock: jlock.set_path_from_journaldir() assert jlock.journal_dir_path is None - def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: TempPathFactory): """Test JournalLock.set_path_from_journaldir() with tmpdir.""" tmpdir = mock_journaldir jlock = JournalLock() # Check that an actual journaldir is handled correctly. - jlock.journal_dir = tmpdir + jlock.journal_dir = str(tmpdir) jlock.set_path_from_journaldir() assert isinstance(jlock.journal_dir_path, pathlib.Path) @@ -200,7 +199,7 @@ class TestJournalLock: locked = jlock.obtain_lock() assert locked == JournalLockResult.JOURNALDIR_IS_NONE - def test_obtain_lock_with_tmpdir(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_obtain_lock_with_tmpdir(self, mock_journaldir: TempPathFactory): """Test JournalLock.obtain_lock() with tmpdir.""" jlock = JournalLock() @@ -213,7 +212,7 @@ class TestJournalLock: assert jlock.release_lock() os.unlink(str(jlock.journal_dir_lockfile_name)) - def test_obtain_lock_with_tmpdir_ro(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_obtain_lock_with_tmpdir_ro(self, mock_journaldir: TempPathFactory): """Test JournalLock.obtain_lock() with read-only tmpdir.""" tmpdir = str(mock_journaldir.getbasetemp()) print(f'{tmpdir=}') @@ -280,7 +279,7 @@ class TestJournalLock: assert locked == JournalLockResult.JOURNALDIR_READONLY - def test_obtain_lock_already_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_obtain_lock_already_locked(self, mock_journaldir: TempPathFactory): """Test JournalLock.obtain_lock() with tmpdir.""" continue_q: mp.Queue = mp.Queue() exit_q: mp.Queue = mp.Queue() @@ -312,7 +311,7 @@ class TestJournalLock: ########################################################################### # Tests against JournalLock.release_lock() - def test_release_lock(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_release_lock(self, mock_journaldir: TempPathFactory): """Test JournalLock.release_lock().""" # First actually obtain the lock, and check it worked jlock = JournalLock() @@ -330,12 +329,12 @@ class TestJournalLock: # Cleanup, to avoid side-effect on other tests os.unlink(str(jlock.journal_dir_lockfile_name)) - def test_release_lock_not_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_release_lock_not_locked(self, mock_journaldir: TempPathFactory): """Test JournalLock.release_lock() when not locked.""" jlock = JournalLock() assert jlock.release_lock() - def test_release_lock_lie_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_release_lock_lie_locked(self, mock_journaldir: TempPathFactory): """Test JournalLock.release_lock() when not locked, but lie we are.""" jlock = JournalLock() jlock.locked = True @@ -345,7 +344,7 @@ class TestJournalLock: # Tests against JournalLock.update_lock() def test_update_lock( self, - mock_journaldir_changing: _pytest_tmpdir.TempPathFactory): + mock_journaldir_changing: TempPathFactory): """ Test JournalLock.update_lock(). @@ -373,7 +372,7 @@ class TestJournalLock: # And the old_journaldir's lockfile too os.unlink(str(old_journaldir_lockfile_name)) - def test_update_lock_same(self, mock_journaldir: _pytest_tmpdir.TempPathFactory): + def test_update_lock_same(self, mock_journaldir: TempPathFactory): """ Test JournalLock.update_lock(). diff --git a/theme.py b/theme.py index e9ace26f..b5d51484 100644 --- a/theme.py +++ b/theme.py @@ -131,14 +131,14 @@ class _Theme(object): THEME_TRANSPARENT = 2 def __init__(self) -> None: - self.active = None # Starts out with no theme + self.active: int | None = None # Starts out with no theme self.minwidth: Optional[int] = None self.widgets: Dict[tk.Widget | tk.BitmapImage, Set] = {} self.widgets_pair: List = [] self.defaults: Dict = {} self.current: Dict = {} - self.default_ui_scale = None # None == not yet known - self.startup_ui_scale = None + self.default_ui_scale: float | None = None # None == not yet known + self.startup_ui_scale: int | None = None def register(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901 # Note widget and children for later application of a theme. Note if diff --git a/timeout_session.py b/timeout_session.py index 6faf54bb..8a4656ce 100644 --- a/timeout_session.py +++ b/timeout_session.py @@ -25,7 +25,9 @@ class TimeoutAdapter(HTTPAdapter): return super().send(*args, **kwargs) -def new_session(timeout: int = REQUEST_TIMEOUT, session: requests.Session = None) -> requests.Session: +def new_session( + timeout: int = REQUEST_TIMEOUT, session: requests.Session | None = None +) -> requests.Session: """ Create a new requests.Session and override the default HTTPAdapter with a TimeoutAdapter. diff --git a/ttkHyperlinkLabel.py b/ttkHyperlinkLabel.py index 43071732..1d9749ac 100644 --- a/ttkHyperlinkLabel.py +++ b/ttkHyperlinkLabel.py @@ -19,7 +19,7 @@ import tkinter as tk import webbrowser from tkinter import font as tk_font from tkinter import ttk -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: def _(x: str) -> str: ... @@ -29,7 +29,7 @@ if TYPE_CHECKING: class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object): # type: ignore """Clickable label for HTTP links.""" - def __init__(self, master: Optional[tk.Tk] = None, **kw: Any) -> None: + def __init__(self, master: tk.Frame | None = None, **kw: Any) -> None: self.url = 'url' in kw and kw.pop('url') or None self.popup_copy = kw.pop('popup_copy', False) self.underline = kw.pop('underline', None) # override ttk.Label's underline