1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-13 07:47:14 +03:00

Merge branch 'develop' into feature/847/capi-fleetcarrier

This commit is contained in:
aussig 2022-12-24 08:21:18 +00:00
commit 41962663d8
45 changed files with 1235 additions and 1039 deletions

View File

@ -8,6 +8,7 @@ exclude =
venv/
.venv/
wix/
hotkey/darwin.py # FIXME: Check under macOS VM at some point
# Show exactly where in a line the error happened
#show-source = True

View File

@ -9,10 +9,15 @@ name: Push-Checks
on:
push:
branches: [ develop ]
# We'll catch issues on `develop` or any PR branch.
branches-ignore:
- 'main'
- 'stable'
- 'releases'
- 'beta'
jobs:
build:
push_checks:
runs-on: ubuntu-22.04
@ -59,3 +64,7 @@ jobs:
grep -E -z -Z '\.py$' | \
xargs -0 flake8 --count --statistics --extend-ignore D
#######################################################################
- name: mypy type checks
run: |
./scripts/mypy-all.sh --platform win32

View File

@ -1,5 +1,5 @@
[mypy]
follow_imports = skip
follow_imports = normal
ignore_missing_imports = True
scripts_are_modules = True
; Without this bare `mypy <file>` may get warnings for e.g.
@ -7,3 +7,4 @@ scripts_are_modules = True
; i.e. no typing info.
check_untyped_defs = True
; platform = darwin
explicit_package_bases = True

View File

@ -51,7 +51,7 @@ repos:
- id: mypy
# verbose: true
# log_file: 'pre-commit_mypy.log'
additional_dependencies: [ types-requests ]
additional_dependencies: [ types-pkg-resources, types-requests, types-urllib3 ]
# args: [ "--follow-imports", "skip", "--ignore-missing-imports", "--scripts-are-modules" ]
### # pydocstyle.exe <file>

View File

@ -28,6 +28,9 @@ if sys.platform == 'win32':
else:
raise AssertionError(f'Unsupported platform {sys.platform}')
# This added to make mypy happy
assert sys.platform == 'win32'
###########################################################################
###########################################################################

25
EDMC.py
View File

@ -71,7 +71,7 @@ def versioncmp(versionstring) -> List:
return list(map(int, versionstring.split('.')))
def deep_get(target: dict, *args: str, default=None) -> Any:
def deep_get(target: dict | companion.CAPIData, *args: str, default=None) -> Any:
"""
Walk into a dict and return the specified deep value.
@ -224,12 +224,15 @@ sys.path: {sys.path}'''
logger.debug(f'logdir = "{monitor.currentdir}"')
logfile = monitor.journal_newest_filename(monitor.currentdir)
if logfile is None:
raise ValueError("None from monitor.journal_newest_filename")
logger.debug(f'Using logfile "{logfile}"')
with open(logfile, 'r', encoding='utf-8') as loghandle:
with open(logfile, 'rb', 0) as loghandle:
for line in loghandle:
try:
monitor.parse_entry(line)
except Exception:
logger.debug(f'Invalid journal entry {line!r}')
@ -410,7 +413,23 @@ sys.path: {sys.path}'''
# Retry for shipyard
sleep(SERVER_RETRY)
new_data = companion.session.station()
companion.session.station(int(time()))
# Wait for the response
_capi_request_timeout = 60
try:
capi_response = companion.session.capi_response_queue.get(
block=True, timeout=_capi_request_timeout
)
except queue.Empty:
logger.error(f'CAPI requests timed out after {_capi_request_timeout} seconds')
sys.exit(EXIT_SERVER)
if isinstance(capi_response, companion.EDMCCAPIFailedRequest):
logger.error(f'Failed Request: {capi_response.message}')
sys.exit(EXIT_SERVER)
new_data = capi_response.capi_data
# might have undocked while we were waiting for retry in which case station data is unreliable
if new_data['commander'].get('docked') and \
deep_get(new_data, 'lastSystem', 'name') == monitor.system and \

View File

@ -145,7 +145,7 @@ class Logger:
logging.Logger instance.
"""
def __init__(self, logger_name: str, loglevel: int = _default_loglevel):
def __init__(self, logger_name: str, loglevel: int | str = _default_loglevel):
"""
Set up a `logging.Logger` with our preferred configuration.
@ -216,7 +216,7 @@ class Logger:
"""
return self.logger_channel
def set_channels_loglevel(self, level: int) -> None:
def set_channels_loglevel(self, level: int | str) -> None:
"""
Set the specified log level on the channels.
@ -226,7 +226,7 @@ class Logger:
self.logger_channel.setLevel(level)
self.logger_channel_rotating.setLevel(level)
def set_console_loglevel(self, level: int) -> None:
def set_console_loglevel(self, level: int | str) -> None:
"""
Set the specified log level on the console channel.
@ -541,7 +541,7 @@ def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin':
# Singleton
loglevel = config.get_str('loglevel')
loglevel: str | int = config.get_str('loglevel')
if not loglevel:
loglevel = logging.INFO

View File

@ -16,7 +16,7 @@ from builtins import object, str
from os import chdir, environ
from os.path import dirname, join
from time import localtime, strftime, time
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union
# Have this as early as possible for people running EDMarketConnector.exe
# from cmd.exe or a bat file or similar. Else they might not be in the correct
@ -594,7 +594,7 @@ class AppWindow(object):
# The type needs defining for adding the menu entry, but won't be
# properly set until later
self.updater: update.Updater = None
self.updater: update.Updater | None = None
self.menubar = tk.Menu()
if sys.platform == 'darwin':
@ -649,7 +649,9 @@ class AppWindow(object):
self.help_menu.add_command(command=self.help_general)
self.help_menu.add_command(command=self.help_privacy)
self.help_menu.add_command(command=self.help_releases)
self.help_menu.add_command(command=lambda: self.updater.check_for_updates())
if self.updater is not None:
self.help_menu.add_command(command=lambda: self.updater.check_for_updates())
self.help_menu.add_command(command=lambda: not self.HelpAbout.showing and self.HelpAbout(self.w))
self.menubar.add_cascade(menu=self.help_menu)
@ -917,7 +919,7 @@ class AppWindow(object):
def login(self):
"""Initiate CAPI/Frontier login and set other necessary state."""
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.auth', {})
if should_return:
@ -1011,8 +1013,7 @@ class AppWindow(object):
"""
logger.trace_if('capi.worker', 'Begin')
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.auth', {})
if should_return:
logger.warning('capi.auth has been disabled via killswitch. Returning.')
@ -1100,7 +1101,7 @@ class AppWindow(object):
"""
logger.trace_if('capi.worker', 'Begin')
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request.fleetcarrier', {})
if should_return:
@ -1311,7 +1312,7 @@ class AppWindow(object):
play_bad = True
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./market', {})
if should_return:
@ -1504,7 +1505,7 @@ class AppWindow(object):
config.set('cmdrs', config.get_list('cmdrs', default=[]) + [monitor.cmdr])
self.login()
if monitor.mode == 'CQC' and entry['event']:
if monitor.cmdr and monitor.mode == 'CQC' and entry['event']:
err = plug.notify_journal_entry_cqc(monitor.cmdr, monitor.is_beta, entry, monitor.state)
if err:
self.status['text'] = err
@ -1522,7 +1523,9 @@ class AppWindow(object):
# Disable WinSparkle automatic update checks, IFF configured to do so when in-game
if config.get_int('disable_autoappupdatecheckingame') and 1:
self.updater.set_automatic_updates_check(False)
if self.updater is not None:
self.updater.set_automatic_updates_check(False)
logger.info('Monitor: Disable WinSparkle automatic update checks')
# Can't start dashboard monitoring
@ -1534,12 +1537,16 @@ class AppWindow(object):
and config.get_int('output') & config.OUT_SHIP:
monitor.export_ship()
err = plug.notify_journal_entry(monitor.cmdr,
monitor.is_beta,
monitor.system,
monitor.station,
entry,
monitor.state)
if monitor.cmdr and monitor.system and monitor.station:
err = plug.notify_journal_entry(
monitor.cmdr,
monitor.is_beta,
monitor.system,
monitor.station,
entry,
monitor.state
)
if err:
self.status['text'] = err
if not config.get_int('hotkey_mute'):
@ -1568,7 +1575,7 @@ class AppWindow(object):
auto_update = True
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
if auto_update:
should_return, new_data = killswitch.check_killswitch('capi.auth', {})
@ -1583,7 +1590,9 @@ class AppWindow(object):
if entry['event'] == 'ShutDown':
# Enable WinSparkle automatic update checks
# NB: Do this blindly, in case option got changed whilst in-game
self.updater.set_automatic_updates_check(True)
if self.updater is not None:
self.updater.set_automatic_updates_check(True)
logger.info('Monitor: Enable WinSparkle automatic update checks')
def auth(self, event=None) -> None:
@ -1626,7 +1635,9 @@ class AppWindow(object):
entry = dashboard.status
# Currently we don't do anything with these events
err = plug.notify_dashboard_entry(monitor.cmdr, monitor.is_beta, entry)
if monitor.cmdr:
err = plug.notify_dashboard_entry(monitor.cmdr, monitor.is_beta, entry)
if err:
self.status['text'] = err
if not config.get_int('hotkey_mute'):
@ -1634,13 +1645,13 @@ class AppWindow(object):
def plugin_error(self, event=None) -> None:
"""Display asynchronous error from plugin."""
if plug.last_error.get('msg'):
self.status['text'] = plug.last_error['msg']
if plug.last_error.msg:
self.status['text'] = plug.last_error.msg
self.w.update_idletasks()
if not config.get_int('hotkey_mute'):
hotkeymgr.play_bad()
def shipyard_url(self, shipname: str) -> str:
def shipyard_url(self, shipname: str) -> str | None:
"""Despatch a ship URL to the configured handler."""
if not (loadout := monitor.ship()):
logger.warning('No ship loadout, aborting.')
@ -1667,11 +1678,11 @@ class AppWindow(object):
return f'file://localhost/{file_name}'
def system_url(self, system: str) -> str:
def system_url(self, system: str) -> str | None:
"""Despatch a system URL to the configured handler."""
return plug.invoke(config.get_str('system_provider'), 'EDSM', 'system_url', monitor.system)
def station_url(self, station: str) -> str:
def station_url(self, station: str) -> str | None:
"""Despatch a station URL to the configured handler."""
return plug.invoke(config.get_str('station_provider'), 'eddb', 'station_url', monitor.system, monitor.station)
@ -1694,10 +1705,11 @@ class AppWindow(object):
monitor.system and
tk.NORMAL or tk.DISABLED)
def ontop_changed(self, event=None) -> None:
"""Set main window 'on top' state as appropriate."""
config.set('always_ontop', self.always_ontop.get())
self.w.wm_attributes('-topmost', self.always_ontop.get())
if sys.platform == 'win32':
def ontop_changed(self, event=None) -> None:
"""Set main window 'on top' state as appropriate."""
config.set('always_ontop', self.always_ontop.get())
self.w.wm_attributes('-topmost', self.always_ontop.get())
def copy(self, event=None) -> None:
"""Copy system, and possible station, name to clipboard."""
@ -1748,7 +1760,7 @@ class AppWindow(object):
self.resizable(tk.FALSE, tk.FALSE)
frame = ttk.Frame(self)
frame = tk.Frame(self)
frame.grid(sticky=tk.NSEW)
row = 1
@ -1761,7 +1773,7 @@ class AppWindow(object):
############################################################
# version <link to changelog>
ttk.Label(frame).grid(row=row, column=0) # spacer
tk.Label(frame).grid(row=row, column=0) # spacer
row += 1
self.appversion_label = tk.Label(frame, text=appversion())
self.appversion_label.grid(row=row, column=0, sticky=tk.E)
@ -1837,13 +1849,14 @@ class AppWindow(object):
with open(f, 'wb') as h:
h.write(str(companion.session.capi_raw_data).encode(encoding='utf-8'))
def exit_tray(self, systray: 'SysTrayIcon') -> None:
"""Tray icon is shutting down."""
exit_thread = threading.Thread(
target=self.onexit,
daemon=True,
)
exit_thread.start()
if sys.platform == 'win32':
def exit_tray(self, systray: 'SysTrayIcon') -> None:
"""Tray icon is shutting down."""
exit_thread = threading.Thread(
target=self.onexit,
daemon=True,
)
exit_thread.start()
def onexit(self, event=None) -> None:
"""Application shutdown procedure."""
@ -1869,7 +1882,8 @@ class AppWindow(object):
# First so it doesn't interrupt us
logger.info('Closing update checker...')
self.updater.close()
if self.updater is not None:
self.updater.close()
# Earlier than anything else so plugin code can't interfere *and* it
# won't still be running in a manner that might rely on something
@ -2018,11 +2032,9 @@ def show_killswitch_poppup(root=None):
idx += 1
idx += 1
ok_button = tk.Button(frame, text="ok", command=tl.destroy)
ok_button = tk.Button(frame, text="Ok", command=tl.destroy)
ok_button.grid(columnspan=2, sticky=tk.EW)
theme.apply(tl)
# Run the app
if __name__ == "__main__": # noqa: C901
@ -2167,10 +2179,13 @@ sys.path: {sys.path}'''
if not ui_scale:
ui_scale = 100
config.set('ui_scale', ui_scale)
theme.default_ui_scale = root.tk.call('tk', 'scaling')
logger.trace_if('tk', f'Default tk scaling = {theme.default_ui_scale}')
theme.startup_ui_scale = ui_scale
root.tk.call('tk', 'scaling', theme.default_ui_scale * float(ui_scale) / 100.0)
if theme.default_ui_scale is not None:
root.tk.call('tk', 'scaling', theme.default_ui_scale * float(ui_scale) / 100.0)
app = AppWindow(root)
def messagebox_not_py3():

View File

@ -100,7 +100,7 @@ def addmodules(data): # noqa: C901, CCR001
if not data['lastStarport'].get('modules'):
return
outfile = 'outfitting.csv'
outfile = pathlib.Path('outfitting.csv')
modules = {}
fields = ('id', 'symbol', 'category', 'name', 'mount', 'guidance', 'ship', 'class', 'rating', 'entitlement')

View File

@ -329,7 +329,7 @@ class Auth(object):
logger.debug(f'Trying for "{self.cmdr}"')
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.auth', {})
if should_return:
@ -675,7 +675,7 @@ class Session(object):
:return: True if login succeeded, False if re-authorization initiated.
"""
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.auth', {})
if should_return:
@ -796,7 +796,7 @@ class Session(object):
"""
capi_data: CAPIData = CAPIData()
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request.' + capi_endpoint, {})
if should_return:

View File

@ -40,7 +40,7 @@ import sys
import traceback
import warnings
from abc import abstractmethod
from typing import Any, Callable, List, Optional, Type, TypeVar, Union
from typing import Any, Callable, Optional, Type, TypeVar
import semantic_version
@ -59,10 +59,10 @@ copyright = '© 2015-2019 Jonathan Harris, 2020-2022 EDCD'
update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml'
update_interval = 8*60*60
# Providers marked to be in debug mode. Generally this is expected to switch to sending data to a log file
debug_senders: List[str] = []
debug_senders: list[str] = []
# TRACE logging code that should actually be used. Means not spamming it
# *all* if only interested in some things.
trace_on: List[str] = []
trace_on: list[str] = []
capi_pretend_down: bool = False
capi_debug_access_token: Optional[str] = None
@ -293,7 +293,7 @@ class AbstractConfig(abc.ABC):
@staticmethod
def _suppress_call(
func: Callable[..., _T], exceptions: Union[Type[BaseException], List[Type[BaseException]]] = Exception,
func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception,
*args: Any, **kwargs: Any
) -> Optional[_T]:
if exceptions is None:
@ -307,7 +307,10 @@ class AbstractConfig(abc.ABC):
return None
def get(self, key: str, default: Union[list, str, bool, int] = None) -> Union[list, str, bool, int]:
def get(
self, key: str,
default: list | str | bool | int | None = None
) -> list | str | bool | int | None:
"""
Return the data for the requested key, or a default.
@ -334,7 +337,7 @@ class AbstractConfig(abc.ABC):
return default # type: ignore
@abstractmethod
def get_list(self, key: str, *, default: list = None) -> list:
def get_list(self, key: str, *, default: list | None = None) -> list:
"""
Return the list referred to by the given key if it exists, or the default.
@ -343,7 +346,7 @@ class AbstractConfig(abc.ABC):
raise NotImplementedError
@abstractmethod
def get_str(self, key: str, *, default: str = None) -> str:
def get_str(self, key: str, *, default: str | None = None) -> str:
"""
Return the string referred to by the given key if it exists, or the default.
@ -356,7 +359,7 @@ class AbstractConfig(abc.ABC):
raise NotImplementedError
@abstractmethod
def get_bool(self, key: str, *, default: bool = None) -> bool:
def get_bool(self, key: str, *, default: bool | None = None) -> bool:
"""
Return the bool referred to by the given key if it exists, or the default.
@ -396,7 +399,7 @@ class AbstractConfig(abc.ABC):
raise NotImplementedError
@abstractmethod
def set(self, key: str, val: Union[int, str, List[str], bool]) -> None:
def set(self, key: str, val: int | str | list[str] | bool) -> None:
"""
Set the given key's data to the given value.

View File

@ -3,7 +3,6 @@ import os
import pathlib
import sys
from configparser import ConfigParser
from typing import List, Optional, Union
from config import AbstractConfig, appname, logger
@ -18,7 +17,7 @@ class LinuxConfig(AbstractConfig):
__unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'}
__escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'}
def __init__(self, filename: Optional[str] = None) -> None:
def __init__(self, filename: str | None = None) -> None:
super().__init__()
# http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html
xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser()
@ -42,7 +41,7 @@ class LinuxConfig(AbstractConfig):
self.filename.parent.mkdir(exist_ok=True, parents=True)
self.config: Optional[ConfigParser] = ConfigParser(comment_prefixes=('#',), interpolation=None)
self.config: ConfigParser | None = ConfigParser(comment_prefixes=('#',), interpolation=None)
self.config.read(self.filename) # read() ignores files that dont exist
# Ensure that our section exists. This is here because configparser will happily create files for us, but it
@ -85,7 +84,7 @@ class LinuxConfig(AbstractConfig):
:param s: str - The string to unescape.
:return: str - The unescaped string.
"""
out: List[str] = []
out: list[str] = []
i = 0
while i < len(s):
c = s[i]
@ -107,7 +106,7 @@ class LinuxConfig(AbstractConfig):
return "".join(out)
def __raw_get(self, key: str) -> Optional[str]:
def __raw_get(self, key: str) -> str | None:
"""
Get a raw data value from the config file.
@ -119,7 +118,7 @@ class LinuxConfig(AbstractConfig):
return self.config[self.SECTION].get(key)
def get_str(self, key: str, *, default: str = None) -> str:
def get_str(self, key: str, *, default: str | None = None) -> str:
"""
Return the string referred to by the given key if it exists, or the default.
@ -134,7 +133,7 @@ class LinuxConfig(AbstractConfig):
return self.__unescape(data)
def get_list(self, key: str, *, default: list = None) -> list:
def get_list(self, key: str, *, default: list | None = None) -> list:
"""
Return the list referred to by the given key if it exists, or the default.
@ -168,7 +167,7 @@ class LinuxConfig(AbstractConfig):
except ValueError as e:
raise ValueError(f'requested {key=} as int cannot be converted to int') from e
def get_bool(self, key: str, *, default: bool = None) -> bool:
def get_bool(self, key: str, *, default: bool | None = None) -> bool:
"""
Return the bool referred to by the given key if it exists, or the default.
@ -183,7 +182,7 @@ class LinuxConfig(AbstractConfig):
return bool(int(data))
def set(self, key: str, val: Union[int, str, List[str]]) -> None:
def set(self, key: str, val: int | str | list[str]) -> None:
"""
Set the given key's data to the given value.
@ -192,7 +191,7 @@ class LinuxConfig(AbstractConfig):
if self.config is None:
raise ValueError('attempt to use a closed config')
to_set: Optional[str] = None
to_set: str | None = None
if isinstance(val, bool):
to_set = str(int(val))

View File

@ -8,7 +8,7 @@ import sys
import uuid
import winreg
from ctypes.wintypes import DWORD, HANDLE
from typing import List, Optional, Union
from typing import List, Literal, Optional, Union
from config import AbstractConfig, applongname, appname, logger, update_interval
@ -142,7 +142,7 @@ class WinConfig(AbstractConfig):
logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}')
return None
def get_str(self, key: str, *, default: str = None) -> str:
def get_str(self, key: str, *, default: str | None = None) -> str:
"""
Return the string referred to by the given key if it exists, or the default.
@ -157,7 +157,7 @@ class WinConfig(AbstractConfig):
return res
def get_list(self, key: str, *, default: list = None) -> list:
def get_list(self, key: str, *, default: list | None = None) -> list:
"""
Return the list referred to by the given key if it exists, or the default.
@ -187,7 +187,7 @@ class WinConfig(AbstractConfig):
return res
def get_bool(self, key: str, *, default: bool = None) -> bool:
def get_bool(self, key: str, *, default: bool | None = None) -> bool:
"""
Return the bool referred to by the given key if it exists, or the default.
@ -205,7 +205,8 @@ class WinConfig(AbstractConfig):
Implements :meth:`AbstractConfig.set`.
"""
reg_type = None
# These are the types that winreg.REG_* below resolve to.
reg_type: Literal[1] | Literal[4] | Literal[7]
if isinstance(val, str):
reg_type = winreg.REG_SZ
winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val)

View File

@ -167,7 +167,7 @@ class Dashboard(FileSystemEventHandler):
# Can get on_modified events when the file is emptied
self.process(event.src_path if not event.is_directory else None)
def process(self, logfile: str = None) -> None:
def process(self, logfile: str | None = None) -> None:
"""
Process the contents of current Status.json file.

View File

@ -191,17 +191,17 @@ that it's actually included in the installer.
Before you create a new install each time you should:
1. Ensure the data sourced from coriolis.io is up to date and works:
1. Update the `coriolis-data` repo. **NB: You will need 'npm' installed for
2. Update the `coriolis-data` repo. **NB: You will need 'npm' installed for
this.**
1. `cd coriolis-data`
1. `git pull`
1. `npm install` - to check it's worked.
1. Run `coriolis.py` to update `modules.p` and `ships.p`. **NB: The
submodule might have been updated by a GitHub workflow/PR/merge, so
be sure to perform this step for every build.**
1. XXX: Test ?
1. `git commit` the changes to the repo and the `.p` files.
1. Ensure translations are up to date, see [Translations.md](Translations.md).
1. `cd coriolis-data`
2. `git pull`
3. `npm install` - to check it's worked.
3. Run `coriolis-update-files.py` to update `modules.p` and `ships.p`. **NB:
The submodule might have been updated by a GitHub workflow/PR/merge, so
be sure to perform this step for every build.**
4. XXX: Test ?
5. `git commit` the changes to the repo and the `.p` files.
6. Ensure translations are up to date, see [Translations.md](Translations.md).
# Preparing to Package

View File

@ -27,7 +27,7 @@ class ClickCounter:
def __init__(self) -> None:
# Be sure to use names that wont collide in our config variables
self.click_count: Optional[tk.StringVar] = tk.StringVar(value=str(config.get_int('click_counter_count')))
self.click_count = tk.StringVar(value=str(config.get_int('click_counter_count')))
logger.info("ClickCounter instantiated")
def on_load(self) -> str:
@ -99,8 +99,8 @@ class ClickCounter:
)
button.grid(row=current_row)
current_row += 1
nb.Label(frame, text="Count:").grid(row=current_row, sticky=tk.W)
nb.Label(frame, textvariable=self.click_count).grid(row=current_row, column=1)
tk.Label(frame, text="Count:").grid(row=current_row, sticky=tk.W)
tk.Label(frame, textvariable=self.click_count).grid(row=current_row, column=1)
return frame

View File

@ -13,19 +13,6 @@ from SubA import SubA
from config import appname, appversion, config
# For compatibility with pre-5.0.0
if not hasattr(config, 'get_int'):
config.get_int = config.getint
if not hasattr(config, 'get_str'):
config.get_str = config.get
if not hasattr(config, 'get_bool'):
config.get_bool = lambda key: bool(config.getint(key))
if not hasattr(config, 'get_list'):
config.get_list = config.get
# This could also be returned from plugin_start3()
plugin_name = os.path.basename(os.path.dirname(__file__))

View File

@ -81,7 +81,7 @@ def export(data, filename=None) -> None: # noqa: C901, CCR001
if not v:
continue
module: __Module = outfitting.lookup(v['module'], ship_map)
module: __Module | None = outfitting.lookup(v['module'], ship_map)
if not module:
continue

703
hotkey.py
View File

@ -1,703 +0,0 @@
"""Handle keyboard input for manual update triggering."""
# -*- coding: utf-8 -*-
import abc
import pathlib
import sys
import tkinter as tk
from abc import abstractmethod
from typing import Optional, Tuple, Union
from config import config
from EDMCLogging import get_main_logger
logger = get_main_logger()
class AbstractHotkeyMgr(abc.ABC):
"""Abstract root class of all platforms specific HotKeyMgr."""
@abstractmethod
def register(self, root, keycode, modifiers) -> None:
"""Register the hotkey handler."""
pass
@abstractmethod
def unregister(self) -> None:
"""Unregister the hotkey handling."""
pass
@abstractmethod
def play_good(self) -> None:
"""Play the 'good' sound."""
pass
@abstractmethod
def play_bad(self) -> None:
"""Play the 'bad' sound."""
pass
if sys.platform == 'darwin':
import objc
from AppKit import (
NSAlternateKeyMask, NSApplication, NSBeep, NSClearLineFunctionKey, NSCommandKeyMask, NSControlKeyMask,
NSDeleteFunctionKey, NSDeviceIndependentModifierFlagsMask, NSEvent, NSF1FunctionKey, NSF35FunctionKey,
NSFlagsChanged, NSKeyDown, NSKeyDownMask, NSKeyUp, NSNumericPadKeyMask, NSShiftKeyMask, NSSound, NSWorkspace
)
class MacHotkeyMgr(AbstractHotkeyMgr):
"""Hot key management."""
POLL = 250
# https://developer.apple.com/library/mac/documentation/Cocoa/Reference/ApplicationKit/Classes/NSEvent_Class/#//apple_ref/doc/constant_group/Function_Key_Unicodes
DISPLAY = {
0x03: u'', 0x09: u'', 0xd: u'', 0x19: u'', 0x1b: u'esc', 0x20: u'', 0x7f: u'',
0xf700: u'', 0xf701: u'', 0xf702: u'', 0xf703: u'',
0xf727: u'Ins',
0xf728: u'', 0xf729: u'', 0xf72a: u'Fn', 0xf72b: u'',
0xf72c: u'', 0xf72d: u'', 0xf72e: u'PrtScr', 0xf72f: u'ScrollLock',
0xf730: u'Pause', 0xf731: u'SysReq', 0xf732: u'Break', 0xf733: u'Reset',
0xf739: u'',
}
(ACQUIRE_INACTIVE, ACQUIRE_ACTIVE, ACQUIRE_NEW) = range(3)
def __init__(self):
self.MODIFIERMASK = NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask \
| NSNumericPadKeyMask
self.root = None
self.keycode = 0
self.modifiers = 0
self.activated = False
self.observer = None
self.acquire_key = 0
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
self.tkProcessKeyEvent_old = None
self.snd_good = NSSound.alloc().initWithContentsOfFile_byReference_(
pathlib.Path(config.respath_path) / 'snd_good.wav', False
)
self.snd_bad = NSSound.alloc().initWithContentsOfFile_byReference_(
pathlib.Path(config.respath_path) / 'snd_bad.wav', False
)
def register(self, root: tk.Tk, keycode, modifiers) -> None:
"""
Register current hotkey for monitoring.
:param root: parent window.
:param keycode: Key to monitor.
:param modifiers: Any modifiers to take into account.
"""
self.root = root
self.keycode = keycode
self.modifiers = modifiers
self.activated = False
if keycode:
if not self.observer:
self.root.after_idle(self._observe)
self.root.after(MacHotkeyMgr.POLL, self._poll)
# Monkey-patch tk (tkMacOSXKeyEvent.c)
if not self.tkProcessKeyEvent_old:
sel = b'tkProcessKeyEvent:'
cls = NSApplication.sharedApplication().class__() # type: ignore
self.tkProcessKeyEvent_old = NSApplication.sharedApplication().methodForSelector_(sel) # type: ignore
newmethod = objc.selector( # type: ignore
self.tkProcessKeyEvent,
selector=self.tkProcessKeyEvent_old.selector,
signature=self.tkProcessKeyEvent_old.signature
)
objc.classAddMethod(cls, sel, newmethod) # type: ignore
def tkProcessKeyEvent(self, cls, the_event): # noqa: N802
"""
Monkey-patch tk (tkMacOSXKeyEvent.c).
- workaround crash on OSX 10.9 & 10.10 on seeing a composing character
- notice when modifier key state changes
- keep a copy of NSEvent.charactersIgnoringModifiers, which is what we need for the hotkey
(Would like to use a decorator but need to ensure the application is created before this is installed)
:param cls: ???
:param the_event: tk event
:return: ???
"""
if self.acquire_state:
if the_event.type() == NSFlagsChanged:
self.acquire_key = the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask
self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW
# suppress the event by not chaining the old function
return the_event
elif the_event.type() in (NSKeyDown, NSKeyUp):
c = the_event.charactersIgnoringModifiers()
self.acquire_key = (c and ord(c[0]) or 0) | \
(the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask)
self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW
# suppress the event by not chaining the old function
return the_event
# replace empty characters with charactersIgnoringModifiers to avoid crash
elif the_event.type() in (NSKeyDown, NSKeyUp) and not the_event.characters():
the_event = NSEvent.keyEventWithType_location_modifierFlags_timestamp_windowNumber_context_characters_charactersIgnoringModifiers_isARepeat_keyCode_( # noqa: E501
# noqa: E501
the_event.type(),
the_event.locationInWindow(),
the_event.modifierFlags(),
the_event.timestamp(),
the_event.windowNumber(),
the_event.context(),
the_event.charactersIgnoringModifiers(),
the_event.charactersIgnoringModifiers(),
the_event.isARepeat(),
the_event.keyCode()
)
return self.tkProcessKeyEvent_old(cls, the_event)
def _observe(self):
# Must be called after root.mainloop() so that the app's message loop has been created
self.observer = NSEvent.addGlobalMonitorForEventsMatchingMask_handler_(NSKeyDownMask, self._handler)
def _poll(self):
if config.shutting_down:
return
# No way of signalling to Tkinter from within the callback handler block that doesn't
# cause Python to crash, so poll.
if self.activated:
self.activated = False
self.root.event_generate('<<Invoke>>', when="tail")
if self.keycode or self.modifiers:
self.root.after(MacHotkeyMgr.POLL, self._poll)
def unregister(self) -> None:
"""Remove hotkey registration."""
self.keycode = None
self.modifiers = None
if sys.platform == 'darwin': # noqa: C901
@objc.callbackFor(NSEvent.addGlobalMonitorForEventsMatchingMask_handler_)
def _handler(self, event) -> None:
# use event.charactersIgnoringModifiers to handle composing characters like Alt-e
if ((event.modifierFlags() & self.MODIFIERMASK) == self.modifiers
and ord(event.charactersIgnoringModifiers()[0]) == self.keycode):
if config.get_int('hotkey_always'):
self.activated = True
else: # Only trigger if game client is front process
front = NSWorkspace.sharedWorkspace().frontmostApplication()
if front and front.bundleIdentifier() == 'uk.co.frontier.EliteDangerous':
self.activated = True
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after_idle(self._acquire_poll)
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
def _acquire_poll(self) -> None:
"""Perform a poll of current hotkey state."""
if config.shutting_down:
return
# No way of signalling to Tkinter from within the monkey-patched event handler that doesn't
# cause Python to crash, so poll.
if self.acquire_state:
if self.acquire_state == MacHotkeyMgr.ACQUIRE_NEW:
# Abuse tkEvent's keycode field to hold our acquired key & modifier
self.root.event_generate('<KeyPress>', keycode=self.acquire_key)
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after(50, self._acquire_poll)
def fromevent(self, event) -> Optional[Union[bool, Tuple]]:
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
(keycode, modifiers) = (event.keycode & 0xffff, event.keycode & 0xffff0000) # Set by _acquire_poll()
if (keycode
and not (modifiers & (NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask))):
if keycode == 0x1b: # Esc = retain previous
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return False
# BkSp, Del, Clear = clear hotkey
elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]:
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
# don't allow keys needed for typing in System Map
elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a:
NSBeep()
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
return (keycode, modifiers)
def display(self, keycode, modifiers) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
text = ''
if modifiers & NSControlKeyMask:
text += u''
if modifiers & NSAlternateKeyMask:
text += u''
if modifiers & NSShiftKeyMask:
text += u''
if modifiers & NSCommandKeyMask:
text += u''
if (modifiers & NSNumericPadKeyMask) and keycode <= 0x7f:
text += u''
if not keycode:
pass
elif ord(NSF1FunctionKey) <= keycode <= ord(NSF35FunctionKey):
text += f'F{keycode + 1 - ord(NSF1FunctionKey)}'
elif keycode in MacHotkeyMgr.DISPLAY: # specials
text += MacHotkeyMgr.DISPLAY[keycode]
elif keycode < 0x20: # control keys
text += chr(keycode + 0x40)
elif keycode < 0xf700: # key char
text += chr(keycode).upper()
else:
text += u''
return text
def play_good(self):
"""Play the 'good' sound."""
self.snd_good.play()
def play_bad(self):
"""Play the 'bad' sound."""
self.snd_bad.play()
if sys.platform == 'win32':
import atexit
import ctypes
import threading
import winsound
from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD
RegisterHotKey = ctypes.windll.user32.RegisterHotKey
UnregisterHotKey = ctypes.windll.user32.UnregisterHotKey
MOD_ALT = 0x0001
MOD_CONTROL = 0x0002
MOD_SHIFT = 0x0004
MOD_WIN = 0x0008
MOD_NOREPEAT = 0x4000
GetMessage = ctypes.windll.user32.GetMessageW
TranslateMessage = ctypes.windll.user32.TranslateMessage
DispatchMessage = ctypes.windll.user32.DispatchMessageW
PostThreadMessage = ctypes.windll.user32.PostThreadMessageW
WM_QUIT = 0x0012
WM_HOTKEY = 0x0312
WM_APP = 0x8000
WM_SND_GOOD = WM_APP + 1
WM_SND_BAD = WM_APP + 2
GetKeyState = ctypes.windll.user32.GetKeyState
MapVirtualKey = ctypes.windll.user32.MapVirtualKeyW
VK_BACK = 0x08
VK_CLEAR = 0x0c
VK_RETURN = 0x0d
VK_SHIFT = 0x10
VK_CONTROL = 0x11
VK_MENU = 0x12
VK_CAPITAL = 0x14
VK_MODECHANGE = 0x1f
VK_ESCAPE = 0x1b
VK_SPACE = 0x20
VK_DELETE = 0x2e
VK_LWIN = 0x5b
VK_RWIN = 0x5c
VK_NUMPAD0 = 0x60
VK_DIVIDE = 0x6f
VK_F1 = 0x70
VK_F24 = 0x87
VK_OEM_MINUS = 0xbd
VK_NUMLOCK = 0x90
VK_SCROLL = 0x91
VK_PROCESSKEY = 0xe5
VK_OEM_CLEAR = 0xfe
GetForegroundWindow = ctypes.windll.user32.GetForegroundWindow
GetWindowText = ctypes.windll.user32.GetWindowTextW
GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int]
GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW
def window_title(h) -> str:
"""
Determine the title for a window.
:param h: Window handle.
:return: Window title.
"""
if h:
title_length = GetWindowTextLength(h) + 1
buf = ctypes.create_unicode_buffer(title_length)
if GetWindowText(h, buf, title_length):
return buf.value
return ''
class MOUSEINPUT(ctypes.Structure):
"""Mouse Input structure."""
_fields_ = [
('dx', LONG),
('dy', LONG),
('mouseData', DWORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ctypes.POINTER(ULONG))
]
class KEYBDINPUT(ctypes.Structure):
"""Keyboard Input structure."""
_fields_ = [
('wVk', WORD),
('wScan', WORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ctypes.POINTER(ULONG))
]
class HARDWAREINPUT(ctypes.Structure):
"""Hardware Input structure."""
_fields_ = [
('uMsg', DWORD),
('wParamL', WORD),
('wParamH', WORD)
]
class INPUTUNION(ctypes.Union):
"""Input union."""
_fields_ = [
('mi', MOUSEINPUT),
('ki', KEYBDINPUT),
('hi', HARDWAREINPUT)
]
class INPUT(ctypes.Structure):
"""Input structure."""
_fields_ = [
('type', DWORD),
('union', INPUTUNION)
]
SendInput = ctypes.windll.user32.SendInput
SendInput.argtypes = [ctypes.c_uint, ctypes.POINTER(INPUT), ctypes.c_int]
INPUT_MOUSE = 0
INPUT_KEYBOARD = 1
INPUT_HARDWARE = 2
class WindowsHotkeyMgr(AbstractHotkeyMgr):
"""Hot key management."""
# https://msdn.microsoft.com/en-us/library/windows/desktop/dd375731%28v=vs.85%29.aspx
# Limit ourselves to symbols in Windows 7 Segoe UI
DISPLAY = {
0x03: 'Break', 0x08: 'Bksp', 0x09: u'', 0x0c: 'Clear', 0x0d: u'', 0x13: 'Pause',
0x14: u'', 0x1b: 'Esc',
0x20: u'', 0x21: 'PgUp', 0x22: 'PgDn', 0x23: 'End', 0x24: 'Home',
0x25: u'', 0x26: u'', 0x27: u'', 0x28: u'',
0x2c: 'PrtScn', 0x2d: 'Ins', 0x2e: 'Del', 0x2f: 'Help',
0x5d: u'', 0x5f: u'',
0x90: u'', 0x91: 'ScrLk',
0xa6: u'', 0xa7: u'', 0xa9: u'', 0xab: u'', 0xac: u'', 0xb4: u'',
}
def __init__(self) -> None:
self.root: tk.Tk = None # type: ignore
self.thread: threading.Thread = None # type: ignore
with open(pathlib.Path(config.respath) / 'snd_good.wav', 'rb') as sg:
self.snd_good = sg.read()
with open(pathlib.Path(config.respath) / 'snd_bad.wav', 'rb') as sb:
self.snd_bad = sb.read()
atexit.register(self.unregister)
def register(self, root: tk.Tk, keycode, modifiers) -> None:
"""Register the hotkey handler."""
self.root = root
if self.thread:
logger.debug('Was already registered, unregistering...')
self.unregister()
if keycode or modifiers:
logger.debug('Creating thread worker...')
self.thread = threading.Thread(
target=self.worker,
name=f'Hotkey "{keycode}:{modifiers}"',
args=(keycode, modifiers)
)
self.thread.daemon = True
logger.debug('Starting thread worker...')
self.thread.start()
logger.debug('Done.')
def unregister(self) -> None:
"""Unregister the hotkey handling."""
thread = self.thread
if thread:
logger.debug('Thread is/was running')
self.thread = None # type: ignore
logger.debug('Telling thread WM_QUIT')
PostThreadMessage(thread.ident, WM_QUIT, 0, 0)
logger.debug('Joining thread')
thread.join() # Wait for it to unregister hotkey and quit
else:
logger.debug('No thread')
logger.debug('Done.')
def worker(self, keycode, modifiers) -> None: # noqa: CCR001
"""Handle hotkeys."""
logger.debug('Begin...')
# Hotkey must be registered by the thread that handles it
if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode):
logger.debug("We're not the right thread?")
self.thread = None # type: ignore
return
fake = INPUT(INPUT_KEYBOARD, INPUTUNION(ki=KEYBDINPUT(keycode, keycode, 0, 0, None)))
msg = MSG()
logger.debug('Entering GetMessage() loop...')
while GetMessage(ctypes.byref(msg), None, 0, 0) != 0:
logger.debug('Got message')
if msg.message == WM_HOTKEY:
logger.debug('WM_HOTKEY')
if (
config.get_int('hotkey_always')
or window_title(GetForegroundWindow()).startswith('Elite - Dangerous')
):
if not config.shutting_down:
logger.debug('Sending event <<Invoke>>')
self.root.event_generate('<<Invoke>>', when="tail")
else:
logger.debug('Passing key on')
UnregisterHotKey(None, 1)
SendInput(1, fake, ctypes.sizeof(INPUT))
if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode):
logger.debug("We aren't registered for this ?")
break
elif msg.message == WM_SND_GOOD:
logger.debug('WM_SND_GOOD')
winsound.PlaySound(self.snd_good, winsound.SND_MEMORY) # synchronous
elif msg.message == WM_SND_BAD:
logger.debug('WM_SND_BAD')
winsound.PlaySound(self.snd_bad, winsound.SND_MEMORY) # synchronous
else:
logger.debug('Something else')
TranslateMessage(ctypes.byref(msg))
DispatchMessage(ctypes.byref(msg))
logger.debug('Exited GetMessage() loop.')
UnregisterHotKey(None, 1)
self.thread = None # type: ignore
logger.debug('Done.')
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
pass
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
pass
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed.
event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter
by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
modifiers = ((GetKeyState(VK_MENU) & 0x8000) and MOD_ALT) \
| ((GetKeyState(VK_CONTROL) & 0x8000) and MOD_CONTROL) \
| ((GetKeyState(VK_SHIFT) & 0x8000) and MOD_SHIFT) \
| ((GetKeyState(VK_LWIN) & 0x8000) and MOD_WIN) \
| ((GetKeyState(VK_RWIN) & 0x8000) and MOD_WIN)
keycode = event.keycode
if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]:
return (0, modifiers)
if not modifiers:
if keycode == VK_ESCAPE: # Esc = retain previous
return False
elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey
return None
elif keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord(
'Z'): # don't allow keys needed for typing in System Map
winsound.MessageBeep()
return None
elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY]
or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys
return (0, modifiers)
# See if the keycode is usable and available
if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode):
UnregisterHotKey(None, 2)
return (keycode, modifiers)
else:
winsound.MessageBeep()
return None
def display(self, keycode, modifiers) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
text = ''
if modifiers & MOD_WIN:
text += u'❖+'
if modifiers & MOD_CONTROL:
text += u'Ctrl+'
if modifiers & MOD_ALT:
text += u'Alt+'
if modifiers & MOD_SHIFT:
text += u'⇧+'
if VK_NUMPAD0 <= keycode <= VK_DIVIDE:
text += u''
if not keycode:
pass
elif VK_F1 <= keycode <= VK_F24:
text += f'F{keycode + 1 - VK_F1}'
elif keycode in WindowsHotkeyMgr.DISPLAY: # specials
text += WindowsHotkeyMgr.DISPLAY[keycode]
else:
c = MapVirtualKey(keycode, 2) # printable ?
if not c: # oops not printable
text += u''
elif c < 0x20: # control keys
text += chr(c + 0x40)
else:
text += chr(c).upper()
return text
def play_good(self) -> None:
"""Play the 'good' sound."""
if self.thread:
PostThreadMessage(self.thread.ident, WM_SND_GOOD, 0, 0)
def play_bad(self) -> None:
"""Play the 'bad' sound."""
if self.thread:
PostThreadMessage(self.thread.ident, WM_SND_BAD, 0, 0)
class LinuxHotKeyMgr(AbstractHotkeyMgr):
"""
Hot key management.
Not actually implemented on Linux. It's a no-op instead.
"""
def register(self, root, keycode, modifiers) -> None:
"""Register the hotkey handler."""
pass
def unregister(self) -> None:
"""Unregister the hotkey handling."""
pass
def play_good(self) -> None:
"""Play the 'good' sound."""
pass
def play_bad(self) -> None:
"""Play the 'bad' sound."""
pass
def get_hotkeymgr() -> AbstractHotkeyMgr:
"""
Determine platform-specific HotkeyMgr.
:param args:
:param kwargs:
:return: Appropriate class instance.
:raises ValueError: If unsupported platform.
"""
if sys.platform == 'darwin':
return MacHotkeyMgr()
elif sys.platform == 'win32':
return WindowsHotkeyMgr()
elif sys.platform == 'linux':
return LinuxHotKeyMgr()
else:
raise ValueError(f'Unknown platform: {sys.platform}')
# singleton
hotkeymgr = get_hotkeymgr()

95
hotkey/__init__.py Normal file
View File

@ -0,0 +1,95 @@
"""Handle keyboard input for manual update triggering."""
# -*- coding: utf-8 -*-
import abc
import sys
from abc import abstractmethod
from typing import Optional, Tuple, Union
class AbstractHotkeyMgr(abc.ABC):
"""Abstract root class of all platforms specific HotKeyMgr."""
@abstractmethod
def register(self, root, keycode, modifiers) -> None:
"""Register the hotkey handler."""
pass
@abstractmethod
def unregister(self) -> None:
"""Unregister the hotkey handling."""
pass
@abstractmethod
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
pass
@abstractmethod
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
pass
@abstractmethod
def fromevent(self, event) -> Optional[Union[bool, Tuple]]:
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed.
event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter
by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
pass
@abstractmethod
def display(self, keycode: int, modifiers: int) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
pass
@abstractmethod
def play_good(self) -> None:
"""Play the 'good' sound."""
pass
@abstractmethod
def play_bad(self) -> None:
"""Play the 'bad' sound."""
pass
def get_hotkeymgr() -> AbstractHotkeyMgr:
"""
Determine platform-specific HotkeyMgr.
:param args:
:param kwargs:
:return: Appropriate class instance.
:raises ValueError: If unsupported platform.
"""
if sys.platform == 'darwin':
from hotkey.darwin import MacHotkeyMgr
return MacHotkeyMgr()
elif sys.platform == 'win32':
from hotkey.windows import WindowsHotkeyMgr
return WindowsHotkeyMgr()
elif sys.platform == 'linux':
from hotkey.linux import LinuxHotKeyMgr
return LinuxHotKeyMgr()
else:
raise ValueError(f'Unknown platform: {sys.platform}')
# singleton
hotkeymgr = get_hotkeymgr()

274
hotkey/darwin.py Normal file
View File

@ -0,0 +1,274 @@
"""darwin/macOS implementation of hotkey.AbstractHotkeyMgr."""
import pathlib
import sys
import tkinter as tk
from typing import Callable, Optional, Tuple, Union
assert sys.platform == 'darwin'
import objc
from AppKit import (
NSAlternateKeyMask, NSApplication, NSBeep, NSClearLineFunctionKey, NSCommandKeyMask, NSControlKeyMask,
NSDeleteFunctionKey, NSDeviceIndependentModifierFlagsMask, NSEvent, NSF1FunctionKey, NSF35FunctionKey,
NSFlagsChanged, NSKeyDown, NSKeyDownMask, NSKeyUp, NSNumericPadKeyMask, NSShiftKeyMask, NSSound, NSWorkspace
)
from config import config
from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr
logger = get_main_logger()
class MacHotkeyMgr(AbstractHotkeyMgr):
"""Hot key management."""
POLL = 250
# https://developer.apple.com/library/mac/documentation/Cocoa/Reference/ApplicationKit/Classes/NSEvent_Class/#//apple_ref/doc/constant_group/Function_Key_Unicodes
DISPLAY = {
0x03: u'', 0x09: u'', 0xd: u'', 0x19: u'', 0x1b: u'esc', 0x20: u'', 0x7f: u'',
0xf700: u'', 0xf701: u'', 0xf702: u'', 0xf703: u'',
0xf727: u'Ins',
0xf728: u'', 0xf729: u'', 0xf72a: u'Fn', 0xf72b: u'',
0xf72c: u'', 0xf72d: u'', 0xf72e: u'PrtScr', 0xf72f: u'ScrollLock',
0xf730: u'Pause', 0xf731: u'SysReq', 0xf732: u'Break', 0xf733: u'Reset',
0xf739: u'',
}
(ACQUIRE_INACTIVE, ACQUIRE_ACTIVE, ACQUIRE_NEW) = range(3)
def __init__(self):
self.MODIFIERMASK = NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask \
| NSNumericPadKeyMask
self.root: tk.Tk
self.keycode = 0
self.modifiers = 0
self.activated = False
self.observer = None
self.acquire_key = 0
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
self.tkProcessKeyEvent_old: Callable
self.snd_good = NSSound.alloc().initWithContentsOfFile_byReference_(
pathlib.Path(config.respath_path) / 'snd_good.wav', False
)
self.snd_bad = NSSound.alloc().initWithContentsOfFile_byReference_(
pathlib.Path(config.respath_path) / 'snd_bad.wav', False
)
def register(self, root: tk.Tk, keycode: int, modifiers: int) -> None:
"""
Register current hotkey for monitoring.
:param root: parent window.
:param keycode: Key to monitor.
:param modifiers: Any modifiers to take into account.
"""
self.root = root
self.keycode = keycode
self.modifiers = modifiers
self.activated = False
if keycode:
if not self.observer:
self.root.after_idle(self._observe)
self.root.after(MacHotkeyMgr.POLL, self._poll)
# Monkey-patch tk (tkMacOSXKeyEvent.c)
if not callable(self.tkProcessKeyEvent_old):
sel = b'tkProcessKeyEvent:'
cls = NSApplication.sharedApplication().class__() # type: ignore
self.tkProcessKeyEvent_old = NSApplication.sharedApplication().methodForSelector_(sel) # type: ignore
newmethod = objc.selector( # type: ignore
self.tkProcessKeyEvent,
selector=self.tkProcessKeyEvent_old.selector,
signature=self.tkProcessKeyEvent_old.signature
)
objc.classAddMethod(cls, sel, newmethod) # type: ignore
def tkProcessKeyEvent(self, cls, the_event): # noqa: N802
"""
Monkey-patch tk (tkMacOSXKeyEvent.c).
- workaround crash on OSX 10.9 & 10.10 on seeing a composing character
- notice when modifier key state changes
- keep a copy of NSEvent.charactersIgnoringModifiers, which is what we need for the hotkey
(Would like to use a decorator but need to ensure the application is created before this is installed)
:param cls: ???
:param the_event: tk event
:return: ???
"""
if self.acquire_state:
if the_event.type() == NSFlagsChanged:
self.acquire_key = the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask
self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW
# suppress the event by not chaining the old function
return the_event
elif the_event.type() in (NSKeyDown, NSKeyUp):
c = the_event.charactersIgnoringModifiers()
self.acquire_key = (c and ord(c[0]) or 0) | \
(the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask)
self.acquire_state = MacHotkeyMgr.ACQUIRE_NEW
# suppress the event by not chaining the old function
return the_event
# replace empty characters with charactersIgnoringModifiers to avoid crash
elif the_event.type() in (NSKeyDown, NSKeyUp) and not the_event.characters():
the_event = NSEvent.keyEventWithType_location_modifierFlags_timestamp_windowNumber_context_characters_charactersIgnoringModifiers_isARepeat_keyCode_( # noqa: E501
# noqa: E501
the_event.type(),
the_event.locationInWindow(),
the_event.modifierFlags(),
the_event.timestamp(),
the_event.windowNumber(),
the_event.context(),
the_event.charactersIgnoringModifiers(),
the_event.charactersIgnoringModifiers(),
the_event.isARepeat(),
the_event.keyCode()
)
return self.tkProcessKeyEvent_old(cls, the_event)
def _observe(self):
# Must be called after root.mainloop() so that the app's message loop has been created
self.observer = NSEvent.addGlobalMonitorForEventsMatchingMask_handler_(NSKeyDownMask, self._handler)
def _poll(self):
if config.shutting_down:
return
# No way of signalling to Tkinter from within the callback handler block that doesn't
# cause Python to crash, so poll.
if self.activated:
self.activated = False
self.root.event_generate('<<Invoke>>', when="tail")
if self.keycode or self.modifiers:
self.root.after(MacHotkeyMgr.POLL, self._poll)
def unregister(self) -> None:
"""Remove hotkey registration."""
self.keycode = 0
self.modifiers = 0
@objc.callbackFor(NSEvent.addGlobalMonitorForEventsMatchingMask_handler_)
def _handler(self, event) -> None:
# use event.charactersIgnoringModifiers to handle composing characters like Alt-e
if (
(event.modifierFlags() & self.MODIFIERMASK) == self.modifiers
and ord(event.charactersIgnoringModifiers()[0]) == self.keycode
):
if config.get_int('hotkey_always'):
self.activated = True
else: # Only trigger if game client is front process
front = NSWorkspace.sharedWorkspace().frontmostApplication()
if front and front.bundleIdentifier() == 'uk.co.frontier.EliteDangerous':
self.activated = True
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after_idle(self._acquire_poll)
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
def _acquire_poll(self) -> None:
"""Perform a poll of current hotkey state."""
if config.shutting_down:
return
# No way of signalling to Tkinter from within the monkey-patched event handler that doesn't
# cause Python to crash, so poll.
if self.acquire_state:
if self.acquire_state == MacHotkeyMgr.ACQUIRE_NEW:
# Abuse tkEvent's keycode field to hold our acquired key & modifier
self.root.event_generate('<KeyPress>', keycode=self.acquire_key)
self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE
self.root.after(50, self._acquire_poll)
def fromevent(self, event) -> Optional[Union[bool, Tuple]]:
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
(keycode, modifiers) = (event.keycode & 0xffff, event.keycode & 0xffff0000) # Set by _acquire_poll()
if (
keycode
and not (modifiers & (NSShiftKeyMask | NSControlKeyMask | NSAlternateKeyMask | NSCommandKeyMask))
):
if keycode == 0x1b: # Esc = retain previous
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return False
# BkSp, Del, Clear = clear hotkey
elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]:
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
# don't allow keys needed for typing in System Map
elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a:
NSBeep()
self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE
return None
return (keycode, modifiers)
def display(self, keycode, modifiers) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
text = ''
if modifiers & NSControlKeyMask:
text += u''
if modifiers & NSAlternateKeyMask:
text += u''
if modifiers & NSShiftKeyMask:
text += u''
if modifiers & NSCommandKeyMask:
text += u''
if (modifiers & NSNumericPadKeyMask) and keycode <= 0x7f:
text += u''
if not keycode:
pass
elif ord(NSF1FunctionKey) <= keycode <= ord(NSF35FunctionKey):
text += f'F{keycode + 1 - ord(NSF1FunctionKey)}'
elif keycode in MacHotkeyMgr.DISPLAY: # specials
text += MacHotkeyMgr.DISPLAY[keycode]
elif keycode < 0x20: # control keys
text += chr(keycode + 0x40)
elif keycode < 0xf700: # key char
text += chr(keycode).upper()
else:
text += u''
return text
def play_good(self):
"""Play the 'good' sound."""
self.snd_good.play()
def play_bad(self):
"""Play the 'bad' sound."""
self.snd_bad.play()

64
hotkey/linux.py Normal file
View File

@ -0,0 +1,64 @@
"""Linux implementation of hotkey.AbstractHotkeyMgr."""
import sys
from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr
assert sys.platform == 'linux'
logger = get_main_logger()
class LinuxHotKeyMgr(AbstractHotkeyMgr):
"""
Hot key management.
Not actually implemented on Linux. It's a no-op instead.
"""
def register(self, root, keycode, modifiers) -> None:
"""Register the hotkey handler."""
pass
def unregister(self) -> None:
"""Unregister the hotkey handling."""
pass
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
pass
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
pass
def fromevent(self, event) -> bool | tuple | None:
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed.
event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter
by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
pass
def display(self, keycode: int, modifiers: int) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
return "Unsupported on linux"
def play_good(self) -> None:
"""Play the 'good' sound."""
pass
def play_bad(self) -> None:
"""Play the 'bad' sound."""
pass

370
hotkey/windows.py Normal file
View File

@ -0,0 +1,370 @@
"""Windows implementation of hotkey.AbstractHotkeyMgr."""
import atexit
import ctypes
import pathlib
import sys
import threading
import tkinter as tk
import winsound
from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD
from typing import Optional, Tuple, Union
from config import config
from EDMCLogging import get_main_logger
from hotkey import AbstractHotkeyMgr
assert sys.platform == 'win32'
logger = get_main_logger()
RegisterHotKey = ctypes.windll.user32.RegisterHotKey
UnregisterHotKey = ctypes.windll.user32.UnregisterHotKey
MOD_ALT = 0x0001
MOD_CONTROL = 0x0002
MOD_SHIFT = 0x0004
MOD_WIN = 0x0008
MOD_NOREPEAT = 0x4000
GetMessage = ctypes.windll.user32.GetMessageW
TranslateMessage = ctypes.windll.user32.TranslateMessage
DispatchMessage = ctypes.windll.user32.DispatchMessageW
PostThreadMessage = ctypes.windll.user32.PostThreadMessageW
WM_QUIT = 0x0012
WM_HOTKEY = 0x0312
WM_APP = 0x8000
WM_SND_GOOD = WM_APP + 1
WM_SND_BAD = WM_APP + 2
GetKeyState = ctypes.windll.user32.GetKeyState
MapVirtualKey = ctypes.windll.user32.MapVirtualKeyW
VK_BACK = 0x08
VK_CLEAR = 0x0c
VK_RETURN = 0x0d
VK_SHIFT = 0x10
VK_CONTROL = 0x11
VK_MENU = 0x12
VK_CAPITAL = 0x14
VK_MODECHANGE = 0x1f
VK_ESCAPE = 0x1b
VK_SPACE = 0x20
VK_DELETE = 0x2e
VK_LWIN = 0x5b
VK_RWIN = 0x5c
VK_NUMPAD0 = 0x60
VK_DIVIDE = 0x6f
VK_F1 = 0x70
VK_F24 = 0x87
VK_OEM_MINUS = 0xbd
VK_NUMLOCK = 0x90
VK_SCROLL = 0x91
VK_PROCESSKEY = 0xe5
VK_OEM_CLEAR = 0xfe
GetForegroundWindow = ctypes.windll.user32.GetForegroundWindow
GetWindowText = ctypes.windll.user32.GetWindowTextW
GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int]
GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW
def window_title(h) -> str:
"""
Determine the title for a window.
:param h: Window handle.
:return: Window title.
"""
if h:
title_length = GetWindowTextLength(h) + 1
buf = ctypes.create_unicode_buffer(title_length)
if GetWindowText(h, buf, title_length):
return buf.value
return ''
class MOUSEINPUT(ctypes.Structure):
"""Mouse Input structure."""
_fields_ = [
('dx', LONG),
('dy', LONG),
('mouseData', DWORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ctypes.POINTER(ULONG))
]
class KEYBDINPUT(ctypes.Structure):
"""Keyboard Input structure."""
_fields_ = [
('wVk', WORD),
('wScan', WORD),
('dwFlags', DWORD),
('time', DWORD),
('dwExtraInfo', ctypes.POINTER(ULONG))
]
class HARDWAREINPUT(ctypes.Structure):
"""Hardware Input structure."""
_fields_ = [
('uMsg', DWORD),
('wParamL', WORD),
('wParamH', WORD)
]
class INPUTUNION(ctypes.Union):
"""Input union."""
_fields_ = [
('mi', MOUSEINPUT),
('ki', KEYBDINPUT),
('hi', HARDWAREINPUT)
]
class INPUT(ctypes.Structure):
"""Input structure."""
_fields_ = [
('type', DWORD),
('union', INPUTUNION)
]
SendInput = ctypes.windll.user32.SendInput
SendInput.argtypes = [ctypes.c_uint, ctypes.POINTER(INPUT), ctypes.c_int]
INPUT_MOUSE = 0
INPUT_KEYBOARD = 1
INPUT_HARDWARE = 2
class WindowsHotkeyMgr(AbstractHotkeyMgr):
"""Hot key management."""
# https://msdn.microsoft.com/en-us/library/windows/desktop/dd375731%28v=vs.85%29.aspx
# Limit ourselves to symbols in Windows 7 Segoe UI
DISPLAY = {
0x03: 'Break', 0x08: 'Bksp', 0x09: '', 0x0c: 'Clear', 0x0d: '', 0x13: 'Pause',
0x14: '', 0x1b: 'Esc',
0x20: '', 0x21: 'PgUp', 0x22: 'PgDn', 0x23: 'End', 0x24: 'Home',
0x25: '', 0x26: '', 0x27: '', 0x28: '',
0x2c: 'PrtScn', 0x2d: 'Ins', 0x2e: 'Del', 0x2f: 'Help',
0x5d: '', 0x5f: '',
0x90: '', 0x91: 'ScrLk',
0xa6: '', 0xa7: '', 0xa9: '', 0xab: '', 0xac: '', 0xb4: '',
}
def __init__(self) -> None:
self.root: tk.Tk = None # type: ignore
self.thread: threading.Thread = None # type: ignore
with open(pathlib.Path(config.respath) / 'snd_good.wav', 'rb') as sg:
self.snd_good = sg.read()
with open(pathlib.Path(config.respath) / 'snd_bad.wav', 'rb') as sb:
self.snd_bad = sb.read()
atexit.register(self.unregister)
def register(self, root: tk.Tk, keycode, modifiers) -> None:
"""Register the hotkey handler."""
self.root = root
if self.thread:
logger.debug('Was already registered, unregistering...')
self.unregister()
if keycode or modifiers:
logger.debug('Creating thread worker...')
self.thread = threading.Thread(
target=self.worker,
name=f'Hotkey "{keycode}:{modifiers}"',
args=(keycode, modifiers)
)
self.thread.daemon = True
logger.debug('Starting thread worker...')
self.thread.start()
logger.debug('Done.')
def unregister(self) -> None:
"""Unregister the hotkey handling."""
thread = self.thread
if thread:
logger.debug('Thread is/was running')
self.thread = None # type: ignore
logger.debug('Telling thread WM_QUIT')
PostThreadMessage(thread.ident, WM_QUIT, 0, 0)
logger.debug('Joining thread')
thread.join() # Wait for it to unregister hotkey and quit
else:
logger.debug('No thread')
logger.debug('Done.')
def worker(self, keycode, modifiers) -> None: # noqa: CCR001
"""Handle hotkeys."""
logger.debug('Begin...')
# Hotkey must be registered by the thread that handles it
if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode):
logger.debug("We're not the right thread?")
self.thread = None # type: ignore
return
fake = INPUT(INPUT_KEYBOARD, INPUTUNION(ki=KEYBDINPUT(keycode, keycode, 0, 0, None)))
msg = MSG()
logger.debug('Entering GetMessage() loop...')
while GetMessage(ctypes.byref(msg), None, 0, 0) != 0:
logger.debug('Got message')
if msg.message == WM_HOTKEY:
logger.debug('WM_HOTKEY')
if (
config.get_int('hotkey_always')
or window_title(GetForegroundWindow()).startswith('Elite - Dangerous')
):
if not config.shutting_down:
logger.debug('Sending event <<Invoke>>')
self.root.event_generate('<<Invoke>>', when="tail")
else:
logger.debug('Passing key on')
UnregisterHotKey(None, 1)
SendInput(1, fake, ctypes.sizeof(INPUT))
if not RegisterHotKey(None, 1, modifiers | MOD_NOREPEAT, keycode):
logger.debug("We aren't registered for this ?")
break
elif msg.message == WM_SND_GOOD:
logger.debug('WM_SND_GOOD')
winsound.PlaySound(self.snd_good, winsound.SND_MEMORY) # synchronous
elif msg.message == WM_SND_BAD:
logger.debug('WM_SND_BAD')
winsound.PlaySound(self.snd_bad, winsound.SND_MEMORY) # synchronous
else:
logger.debug('Something else')
TranslateMessage(ctypes.byref(msg))
DispatchMessage(ctypes.byref(msg))
logger.debug('Exited GetMessage() loop.')
UnregisterHotKey(None, 1)
self.thread = None # type: ignore
logger.debug('Done.')
def acquire_start(self) -> None:
"""Start acquiring hotkey state via polling."""
pass
def acquire_stop(self) -> None:
"""Stop acquiring hotkey state."""
pass
def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001
"""
Return configuration (keycode, modifiers) or None=clear or False=retain previous.
event.state is a pain - it shows the state of the modifiers *before* a modifier key was pressed.
event.state *does* differentiate between left and right Ctrl and Alt and between Return and Enter
by putting KF_EXTENDED in bit 18, but RegisterHotKey doesn't differentiate.
:param event: tk event ?
:return: False to retain previous, None to not use, else (keycode, modifiers)
"""
modifiers = ((GetKeyState(VK_MENU) & 0x8000) and MOD_ALT) \
| ((GetKeyState(VK_CONTROL) & 0x8000) and MOD_CONTROL) \
| ((GetKeyState(VK_SHIFT) & 0x8000) and MOD_SHIFT) \
| ((GetKeyState(VK_LWIN) & 0x8000) and MOD_WIN) \
| ((GetKeyState(VK_RWIN) & 0x8000) and MOD_WIN)
keycode = event.keycode
if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]:
return (0, modifiers)
if not modifiers:
if keycode == VK_ESCAPE: # Esc = retain previous
return False
elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey
return None
elif (
keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z')
): # don't allow keys needed for typing in System Map
winsound.MessageBeep()
return None
elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY]
or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys
return (0, modifiers)
# See if the keycode is usable and available
if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode):
UnregisterHotKey(None, 2)
return (keycode, modifiers)
else:
winsound.MessageBeep()
return None
def display(self, keycode, modifiers) -> str:
"""
Return displayable form of given hotkey + modifiers.
:param keycode:
:param modifiers:
:return: string form
"""
text = ''
if modifiers & MOD_WIN:
text += '❖+'
if modifiers & MOD_CONTROL:
text += 'Ctrl+'
if modifiers & MOD_ALT:
text += 'Alt+'
if modifiers & MOD_SHIFT:
text += '⇧+'
if VK_NUMPAD0 <= keycode <= VK_DIVIDE:
text += ''
if not keycode:
pass
elif VK_F1 <= keycode <= VK_F24:
text += f'F{keycode + 1 - VK_F1}'
elif keycode in WindowsHotkeyMgr.DISPLAY: # specials
text += WindowsHotkeyMgr.DISPLAY[keycode]
else:
c = MapVirtualKey(keycode, 2) # printable ?
if not c: # oops not printable
text += ''
elif c < 0x20: # control keys
text += chr(c + 0x40)
else:
text += chr(c).upper()
return text
def play_good(self) -> None:
"""Play the 'good' sound."""
if self.thread:
PostThreadMessage(self.thread.ident, WM_SND_GOOD, 0, 0)
def play_bad(self) -> None:
"""Play the 'bad' sound."""
if self.thread:
PostThreadMessage(self.thread.ident, WM_SND_BAD, 0, 0)

View File

@ -33,7 +33,7 @@ class JournalLock:
def __init__(self) -> None:
"""Initialise where the journal directory and lock file are."""
self.journal_dir: str = config.get_str('journaldir') or config.default_journal_dir
self.journal_dir: str | None = config.get_str('journaldir') or config.default_journal_dir
self.journal_dir_path: Optional[pathlib.Path] = None
self.set_path_from_journaldir()
self.journal_dir_lockfile_name: Optional[pathlib.Path] = None

View File

@ -83,7 +83,7 @@ class _Translations:
self.translations = {None: {}}
builtins.__dict__['_'] = lambda x: str(x).replace(r'\"', '"').replace('{CR}', '\n')
def install(self, lang: str = None) -> None: # noqa: CCR001
def install(self, lang: str | None = None) -> None: # noqa: CCR001
"""
Install the translation function to the _ builtin.
@ -250,7 +250,7 @@ class _Locale:
self.float_formatter.setMinimumFractionDigits_(5)
self.float_formatter.setMaximumFractionDigits_(5)
def stringFromNumber(self, number: Union[float, int], decimals: int = None) -> str: # noqa: N802
def stringFromNumber(self, number: Union[float, int], decimals: int | None = None) -> str: # noqa: N802
warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.'))
return self.string_from_number(number, decimals) # type: ignore

View File

@ -58,7 +58,7 @@ class Notebook(ttk.Notebook):
class Frame(sys.platform == 'darwin' and tk.Frame or ttk.Frame): # type: ignore
"""Custom t(t)k.Frame class to fix some display issues."""
def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Notebook | None = None, **kw):
if sys.platform == 'darwin':
kw['background'] = kw.pop('background', PAGEBG)
tk.Frame.__init__(self, master, **kw)
@ -76,7 +76,8 @@ class Label(tk.Label):
"""Custom tk.Label class to fix some display issues."""
def __init__(self, master: Optional[ttk.Frame] = None, **kw):
if sys.platform in ['darwin', 'win32']:
# This format chosen over `sys.platform in (...)` as mypy and friends dont understand that
if sys.platform == 'darwin' or sys.platform == 'win32':
kw['foreground'] = kw.pop('foreground', PAGEFG)
kw['background'] = kw.pop('background', PAGEBG)
else:

View File

@ -93,13 +93,13 @@ def lookup(module, ship_map, entitled=False) -> Optional[dict]: # noqa: C901, C
# Countermeasures - e.g. Hpt_PlasmaPointDefence_Turret_Tiny
elif name[0] == 'hpt' and name[1] in countermeasure_map:
new['category'] = 'utility'
new['name'], new['rating'] = countermeasure_map[len(name) > 4 and (name[1], name[4]) or name[1]]
new['name'], new['rating'] = countermeasure_map[name[1]]
new['class'] = weaponclass_map[name[-1]]
# Utility - e.g. Hpt_CargoScanner_Size0_Class1
elif name[0] == 'hpt' and name[1] in utility_map:
new['category'] = 'utility'
new['name'] = utility_map[len(name) > 4 and (name[1], name[4]) or name[1]]
new['name'] = utility_map[name[1]]
if not name[2].startswith('size') or not name[3].startswith('class'):
raise AssertionError(f'{module["id"]}: Unknown class/rating "{name[2]}/{name[3]}"')

15
plug.py
View File

@ -7,7 +7,8 @@ import os
import sys
import tkinter as tk
from builtins import object, str
from typing import Any, Callable, List, Mapping, MutableMapping, Optional, Tuple
from tkinter import ttk
from typing import Any, Callable, List, Mapping, MutableMapping, Optional
import companion
import myNotebook as nb # noqa: N813
@ -26,7 +27,7 @@ class LastError:
"""Holds the last plugin error."""
msg: Optional[str]
root: tk.Frame
root: tk.Tk
def __init__(self) -> None:
self.msg = None
@ -118,7 +119,7 @@ class Plugin(object):
return None
def get_prefs(self, parent: tk.Frame, cmdr: str, is_beta: bool) -> Optional[tk.Frame]:
def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Optional[tk.Frame]:
"""
If the plugin provides a prefs frame, create and return it.
@ -140,7 +141,7 @@ class Plugin(object):
return None
def load_plugins(master: tk.Frame) -> None: # noqa: CCR001
def load_plugins(master: tk.Tk) -> None: # noqa: CCR001
"""Find and load all plugins."""
last_error.root = master
@ -198,7 +199,7 @@ def provides(fn_name: str) -> List[str]:
def invoke(
plugin_name: str, fallback: str, fn_name: str, *args: Tuple
plugin_name: str, fallback: str | None, fn_name: str, *args: Any
) -> Optional[str]:
"""
Invoke a function on a named plugin.
@ -248,7 +249,7 @@ def notify_stop() -> Optional[str]:
return error
def notify_prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None:
def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None:
"""
Notify plugins that the Cmdr was changed while the settings dialog is open.
@ -265,7 +266,7 @@ def notify_prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_prefs_changed(cmdr: str, is_beta: bool) -> None:
def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None:
"""
Notify plugins that the settings dialog has been closed.

View File

@ -26,6 +26,7 @@ import gzip
import io
import json
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Union
import myNotebook as nb # noqa: N813 # its not my fault.
@ -79,7 +80,7 @@ def plugin_start3(path: str) -> str:
return 'Coriolis'
def plugin_prefs(parent: tk.Widget, cmdr: str, is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
"""Set up plugin preferences."""
PADX = 10 # noqa: N806
@ -126,7 +127,7 @@ def plugin_prefs(parent: tk.Widget, cmdr: str, is_beta: bool) -> tk.Frame:
return conf_frame
def prefs_changed(cmdr: str, is_beta: bool) -> None:
def prefs_changed(cmdr: str | None, is_beta: bool) -> None:
"""Update URLs."""
global normal_url, beta_url, override_mode
normal_url = normal_textvar.get()

View File

@ -34,7 +34,7 @@ from collections import OrderedDict
from platform import system
from textwrap import dedent
from threading import Lock
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, MutableMapping, Optional
from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping, Optional
from typing import OrderedDict as OrderedDictT
from typing import Tuple, Union
@ -91,13 +91,13 @@ class This:
# Avoid duplicates
self.marketId: Optional[str] = None
self.commodities: Optional[List[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, List[str]]] = None
self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None
self.commodities: Optional[list[OrderedDictT[str, Any]]] = None
self.outfitting: Optional[Tuple[bool, list[str]]] = None
self.shipyard: Optional[Tuple[bool, list[Mapping[str, Any]]]] = None
self.fcmaterials_marketid: int = 0
self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials: Optional[list[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi_marketid: int = 0
self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None
self.fcmaterials_capi: Optional[list[OrderedDictT[str, Any]]] = None
# For the tkinter parent window, so we can call update_idletasks()
self.parent: tk.Tk
@ -379,7 +379,7 @@ class EDDNSender:
:param text: The status text to be set/logged.
"""
if os.getenv('EDMC_NO_UI'):
logger.INFO(text)
logger.info(text)
return
self.eddn.parent.children['status']['text'] = text
@ -403,7 +403,7 @@ class EDDNSender:
"""
logger.trace_if("plugin.eddn.send", "Sending message")
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg))
if should_return:
@ -615,7 +615,7 @@ class EDDN:
self.sender = EDDNSender(self, self.eddn_url)
self.fss_signals: List[Mapping[str, Any]] = []
self.fss_signals: list[Mapping[str, Any]] = []
def close(self):
"""Close down the EDDN class instance."""
@ -639,8 +639,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./market', {})
if should_return:
logger.warning("capi.request./market has been disabled by killswitch. Returning.")
@ -657,7 +656,7 @@ class EDDN:
modules,
ships
)
commodities: List[OrderedDictT[str, Any]] = []
commodities: list[OrderedDictT[str, Any]] = []
for commodity in data['lastStarport'].get('commodities') or []:
# Check 'marketable' and 'not prohibited'
if (category_map.get(commodity['categoryname'], True)
@ -719,7 +718,7 @@ class EDDN:
# Send any FCMaterials.json-equivalent 'orders' as well
self.export_capi_fcmaterials(data, is_beta, horizons)
def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[Dict, Dict]:
def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]:
"""
Produce a sanity-checked version of ships and modules from CAPI data.
@ -730,7 +729,7 @@ class EDDN:
:param data: The raw CAPI data.
:return: Sanity-checked data.
"""
modules: Dict[str, Any] = data['lastStarport'].get('modules')
modules: dict[str, Any] = data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict):
if modules is None:
logger.debug('modules was None. FC or Damaged Station?')
@ -747,7 +746,7 @@ class EDDN:
# Set a safe value
modules = {}
ships: Dict[str, Any] = data['lastStarport'].get('ships')
ships: dict[str, Any] = data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict):
if ships is None:
logger.debug('ships was None')
@ -773,8 +772,7 @@ class EDDN:
:param is_beta: whether or not we're currently in beta mode
"""
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -801,7 +799,7 @@ class EDDN:
modules.values()
)
outfitting: List[str] = sorted(
outfitting: list[str] = sorted(
self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search
)
@ -842,8 +840,7 @@ class EDDN:
:param is_beta: whether or not we are in beta mode
"""
should_return: bool
new_data: Dict[str, Any] = {}
new_data: dict[str, Any]
should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {})
if should_return:
logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.")
@ -862,7 +859,7 @@ class EDDN:
ships
)
shipyard: List[Mapping[str, Any]] = sorted(
shipyard: list[Mapping[str, Any]] = sorted(
itertools.chain(
(ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()),
(ship['name'].lower() for ship in ships['unavailable_list'] or {}),
@ -905,8 +902,8 @@ class EDDN:
:param is_beta: whether or not we're in beta mode
:param entry: the journal entry containing the commodities data
"""
items: List[Mapping[str, Any]] = entry.get('Items') or []
commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([
items: list[Mapping[str, Any]] = entry.get('Items') or []
commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([
('name', self.canonicalise(commodity['Name'])),
('meanPrice', commodity['MeanPrice']),
('buyPrice', commodity['BuyPrice']),
@ -953,11 +950,11 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: The relevant journal entry
"""
modules: List[Mapping[str, Any]] = entry.get('Items', [])
modules: list[Mapping[str, Any]] = entry.get('Items', [])
horizons: bool = entry.get('Horizons', False)
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
# for module in modules if module['Name'] != 'int_planetapproachsuite'])
outfitting: List[str] = sorted(
outfitting: list[str] = sorted(
self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in
filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules)
)
@ -992,7 +989,7 @@ class EDDN:
:param is_beta: Whether or not we're in beta mode
:param entry: the relevant journal entry
"""
ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
ships: list[Mapping[str, Any]] = entry.get('PriceList') or []
horizons: bool = entry.get('Horizons', False)
shipyard = sorted(ship['ShipType'] for ship in ships)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
@ -1834,7 +1831,7 @@ class EDDN:
#######################################################################
# Build basis of message
msg: Dict = {
msg: dict = {
'$schemaRef': f'https://eddn.edcd.io/schemas/fsssignaldiscovered/1{"/test" if is_beta else ""}',
'message': {
"event": "FSSSignalDiscovered",
@ -2176,9 +2173,6 @@ def journal_entry( # noqa: C901, CCR001
:param state: `dict` - Current `monitor.state` data.
:return: `str` - Error message, or `None` if no errors.
"""
should_return: bool
new_data: Dict[str, Any] = {}
should_return, new_data = killswitch.check_killswitch('plugins.eddn.journal', entry)
if should_return:
plug.show_error(_('EDDN journal handler disabled. See Log.')) # LANG: Killswitch disabled EDDN
@ -2585,7 +2579,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST
return economies_colony or modules_horizons or ship_horizons
def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None:
def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None:
"""
Process Status.json data to track things like current Body.

View File

@ -38,12 +38,14 @@ from datetime import datetime, timedelta, timezone
from queue import Queue
from threading import Thread
from time import sleep
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast
import requests
import killswitch
import monitor
import myNotebook
import myNotebook as nb # noqa: N813
import plug
from companion import CAPIData
@ -82,8 +84,8 @@ class This:
self.session: requests.Session = requests.Session()
self.session.headers['User-Agent'] = user_agent
self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread
self.discarded_events: Set[str] = [] # List discarded events from EDSM
self.lastlookup: requests.Response # Result of last system lookup
self.discarded_events: Set[str] = set() # List discarded events from EDSM
self.lastlookup: Dict[str, Any] # Result of last system lookup
# Game state
self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew
@ -91,13 +93,13 @@ class This:
self.newgame: bool = False # starting up - batch initial burst of events
self.newgame_docked: bool = False # starting up while docked
self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan
self.system_link: tk.Widget = None
self.system: tk.Tk = None
self.system_address: Optional[int] = None # Frontier SystemAddress
self.system_population: Optional[int] = None
self.station_link: tk.Widget = None
self.station: Optional[str] = None
self.station_marketid: Optional[int] = None # Frontier MarketID
self.system_link: tk.Widget | None = None
self.system: tk.Tk | None = None
self.system_address: int | None = None # Frontier SystemAddress
self.system_population: int | None = None
self.station_link: tk.Widget | None = None
self.station: str | None = None
self.station_marketid: int | None = None # Frontier MarketID
self.on_foot = False
self._IMG_KNOWN = None
@ -107,19 +109,19 @@ class This:
self.thread: Optional[threading.Thread] = None
self.log = None
self.log_button = None
self.log: tk.IntVar | None = None
self.log_button: ttk.Checkbutton | None = None
self.label = None
self.label: tk.Widget | None = None
self.cmdr_label = None
self.cmdr_text = None
self.cmdr_label: myNotebook.Label | None = None
self.cmdr_text: myNotebook.Label | None = None
self.user_label = None
self.user = None
self.user_label: myNotebook.Label | None = None
self.user: myNotebook.Entry | None = None
self.apikey_label = None
self.apikey = None
self.apikey_label: myNotebook.Label | None = None
self.apikey: myNotebook.Entry | None = None
this = This()
@ -267,7 +269,7 @@ def plugin_stop() -> None:
logger.debug('Done.')
def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame:
"""
Plugin preferences setup hook.
@ -300,7 +302,8 @@ def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame:
frame, text=_('Send flight log and Cmdr status to EDSM'), variable=this.log, command=prefsvarchanged
)
this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
if this.log_button:
this.log_button.grid(columnspan=2, padx=BUTTONX, pady=(5, 0), sticky=tk.W)
nb.Label(frame).grid(sticky=tk.W) # big spacer
# Section heading in settings
@ -315,61 +318,77 @@ def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame:
cur_row = 10
this.label.grid(columnspan=2, padx=PADX, sticky=tk.W)
if this.label:
this.label.grid(columnspan=2, padx=PADX, sticky=tk.W)
# LANG: Game Commander name label in EDSM settings
this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window
this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.cmdr_text = nb.Label(frame)
this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W)
if this.cmdr_label and this.cmdr_text:
# LANG: Game Commander name label in EDSM settings
this.cmdr_label = nb.Label(frame, text=_('Cmdr')) # Main window
this.cmdr_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.cmdr_text = nb.Label(frame)
this.cmdr_text.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.W)
cur_row += 1
# LANG: EDSM Commander name label in EDSM settings
this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting
this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.user = nb.Entry(frame)
this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
if this.user_label and this.label:
# LANG: EDSM Commander name label in EDSM settings
this.user_label = nb.Label(frame, text=_('Commander Name')) # EDSM setting
this.user_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.user = nb.Entry(frame)
this.user.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
cur_row += 1
# LANG: EDSM API key label
this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting
this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.apikey = nb.Entry(frame)
this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
if this.apikey_label and this.apikey:
# LANG: EDSM API key label
this.apikey_label = nb.Label(frame, text=_('API Key')) # EDSM setting
this.apikey_label.grid(row=cur_row, padx=PADX, sticky=tk.W)
this.apikey = nb.Entry(frame)
this.apikey.grid(row=cur_row, column=1, padx=PADX, pady=PADY, sticky=tk.EW)
prefs_cmdr_changed(cmdr, is_beta)
return frame
def prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None:
def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001
"""
Handle the Commander name changing whilst Settings was open.
:param cmdr: The new current Commander name.
:param is_beta: Whether game beta was detected.
"""
this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED
this.user['state'] = tk.NORMAL
this.user.delete(0, tk.END)
this.apikey['state'] = tk.NORMAL
this.apikey.delete(0, tk.END)
if this.log_button:
this.log_button['state'] = tk.NORMAL if cmdr and not is_beta else tk.DISABLED
if this.user:
this.user['state'] = tk.NORMAL
this.user.delete(0, tk.END)
if this.apikey:
this.apikey['state'] = tk.NORMAL
this.apikey.delete(0, tk.END)
if cmdr:
this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}'
if this.cmdr_text:
this.cmdr_text['text'] = f'{cmdr}{" [Beta]" if is_beta else ""}'
cred = credentials(cmdr)
if cred:
this.user.insert(0, cred[0])
this.apikey.insert(0, cred[1])
if this.user:
this.user.insert(0, cred[0])
if this.apikey:
this.apikey.insert(0, cred[1])
else:
# LANG: We have no data on the current commander
this.cmdr_text['text'] = _('None')
if this.cmdr_text:
# LANG: We have no data on the current commander
this.cmdr_text['text'] = _('None')
to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED
if cmdr and not is_beta and this.log.get():
if cmdr and not is_beta and this.log and this.log.get():
to_set = tk.NORMAL
set_prefs_ui_states(to_set)
@ -378,7 +397,7 @@ def prefs_cmdr_changed(cmdr: str, is_beta: bool) -> None:
def prefsvarchanged() -> None:
"""Handle the 'Send data to EDSM' tickbox changing state."""
to_set = tk.DISABLED
if this.log.get():
if this.log and this.log.get() and this.log_button:
to_set = this.log_button['state']
set_prefs_ui_states(to_set)
@ -390,13 +409,17 @@ def set_prefs_ui_states(state: str) -> None:
:param state: the state to set each entry to
"""
this.label['state'] = state
this.cmdr_label['state'] = state
this.cmdr_text['state'] = state
this.user_label['state'] = state
this.user['state'] = state
this.apikey_label['state'] = state
this.apikey['state'] = state
if (
this.label and this.cmdr_label and this.cmdr_text and this.user_label and this.user
and this.apikey_label and this.apikey
):
this.label['state'] = state
this.cmdr_label['state'] = state
this.cmdr_text['state'] = state
this.user_label['state'] = state
this.user['state'] = state
this.apikey_label['state'] = state
this.apikey['state'] = state
def prefs_changed(cmdr: str, is_beta: bool) -> None:
@ -406,7 +429,8 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
:param cmdr: Name of Commander.
:param is_beta: Whether game beta was detected.
"""
config.set('edsm_out', this.log.get())
if this.log:
config.set('edsm_out', this.log.get())
if cmdr and not is_beta:
# TODO: remove this when config is rewritten.
@ -414,17 +438,18 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
usernames: List[str] = config.get_list('edsm_usernames', default=[])
apikeys: List[str] = config.get_list('edsm_apikeys', default=[])
if cmdr in cmdrs:
idx = cmdrs.index(cmdr)
usernames.extend([''] * (1 + idx - len(usernames)))
usernames[idx] = this.user.get().strip()
apikeys.extend([''] * (1 + idx - len(apikeys)))
apikeys[idx] = this.apikey.get().strip()
if this.user and this.apikey:
if cmdr in cmdrs:
idx = cmdrs.index(cmdr)
usernames.extend([''] * (1 + idx - len(usernames)))
usernames[idx] = this.user.get().strip()
apikeys.extend([''] * (1 + idx - len(apikeys)))
apikeys[idx] = this.apikey.get().strip()
else:
config.set('edsm_cmdrs', cmdrs + [cmdr])
usernames.append(this.user.get().strip())
apikeys.append(this.apikey.get().strip())
else:
config.set('edsm_cmdrs', cmdrs + [cmdr])
usernames.append(this.user.get().strip())
apikeys.append(this.apikey.get().strip())
config.set('edsm_usernames', usernames)
config.set('edsm_apikeys', apikeys)
@ -479,9 +504,6 @@ def journal_entry( # noqa: C901, CCR001
:param state: `monitor.state`
:return: None if no error, else an error string.
"""
should_return: bool
new_entry: Dict[str, Any] = {}
should_return, new_entry = killswitch.check_killswitch('plugins.edsm.journal', entry, logger)
if should_return:
# LANG: EDSM plugin - Journal handling disabled by killswitch
@ -548,12 +570,13 @@ entry: {entry!r}'''
else:
to_set = ''
this.station_link['text'] = to_set
this.station_link['url'] = station_url(str(this.system), str(this.station))
this.station_link.update_idletasks()
if this.station_link:
this.station_link['text'] = to_set
this.station_link['url'] = station_url(str(this.system), str(this.station))
this.station_link.update_idletasks()
# Update display of 'EDSM Status' image
if this.system_link['text'] != system:
if this.system_link and this.system_link['text'] != system:
this.system_link['text'] = system if system else ''
this.system_link['image'] = ''
this.system_link.update_idletasks()
@ -642,7 +665,7 @@ Queueing: {entry!r}'''
# Update system data
def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]:
def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001
"""
Process new CAPI data.
@ -666,27 +689,29 @@ def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]:
# TODO: Fire off the EDSM API call to trigger the callback for the icons
if config.get_str('system_provider') == 'EDSM':
this.system_link['text'] = this.system
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.system_link.update_idletasks()
if this.system_link:
this.system_link['text'] = this.system
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.system_link.update_idletasks()
if config.get_str('station_provider') == 'EDSM':
if data['commander']['docked'] or this.on_foot and this.station:
this.station_link['text'] = this.station
if this.station_link:
if data['commander']['docked'] or this.on_foot and this.station:
this.station_link['text'] = this.station
elif data['lastStarport']['name'] and data['lastStarport']['name'] != "":
this.station_link['text'] = STATION_UNDOCKED
elif data['lastStarport']['name'] and data['lastStarport']['name'] != "":
this.station_link['text'] = STATION_UNDOCKED
else:
this.station_link['text'] = ''
else:
this.station_link['text'] = ''
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.station_link.update_idletasks()
this.station_link.update_idletasks()
if not this.system_link['text']:
if this.system_link and not this.system_link['text']:
this.system_link['text'] = system
this.system_link['image'] = ''
this.system_link.update_idletasks()
@ -762,12 +787,12 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
retrying = 0
while retrying < 3:
should_skip: bool
new_item: Dict[str, Any] = {}
if item is None:
item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {}))
should_skip, new_item = killswitch.check_killswitch(
'plugins.edsm.worker',
item if item is not None else cast(Tuple[str, Mapping[str, Any]], ("", {})),
item,
logger
)
@ -801,9 +826,6 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
# drop events if required by killswitch
new_pending = []
for e in pending:
skip: bool
new: Dict[str, Any] = {}
skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger)
if skip:
continue
@ -847,8 +869,12 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
if any(p for p in pending if p['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked')):
data_elided = data.copy()
data_elided['apiKey'] = '<elided>'
data_elided['message'] = data_elided['message'].decode('utf-8')
data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8')
if isinstance(data_elided['message'], bytes):
data_elided['message'] = data_elided['message'].decode('utf-8')
if isinstance(data_elided['commanderName'], bytes):
data_elided['commanderName'] = data_elided['commanderName'].decode('utf-8')
logger.trace_if(
'journal.locations',
"pending has at least one of ('CarrierJump', 'FSDJump', 'Location', 'Docked')"
@ -867,11 +893,11 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
'journal.locations', f'Overall POST data (elided) is:\n{json.dumps(data_elided, indent=2)}'
)
r = this.session.post(TARGET_URL, data=data, timeout=_TIMEOUT)
logger.trace_if('plugin.edsm.api', f'API response content: {r.content!r}')
r.raise_for_status()
response = this.session.post(TARGET_URL, data=data, timeout=_TIMEOUT)
logger.trace_if('plugin.edsm.api', f'API response content: {response.content!r}')
response.raise_for_status()
reply = r.json()
reply = response.json()
msg_num = reply['msgnum']
msg = reply['msg']
# 1xx = OK
@ -902,7 +928,7 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
# Update main window's system status
this.lastlookup = r
# calls update_status in main thread
if not config.shutting_down:
if not config.shutting_down and this.system_link is not None:
this.system_link.event_generate('<<EDSMStatus>>', when="tail")
if r['msgnum'] // 100 != 1: # type: ignore
@ -1005,18 +1031,19 @@ def update_status(event=None) -> None:
# msgnum: 1xx = OK, 2xx = fatal error, 3xx = error, 4xx = ignorable errors.
def edsm_notify_system(reply: Mapping[str, Any]) -> None:
"""Update the image next to the system link."""
if not reply:
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM"))
if this.system_link is not None:
if not reply:
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error connecting to EDSM API
plug.show_error(_("Error: Can't connect to EDSM"))
elif reply['msgnum'] // 100 not in (1, 4):
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg']))
elif reply['msgnum'] // 100 not in (1, 4):
this.system_link['image'] = this._IMG_ERROR
# LANG: EDSM Plugin - Error message from EDSM API
plug.show_error(_('Error: EDSM {MSG}').format(MSG=reply['msg']))
elif reply.get('systemCreated'):
this.system_link['image'] = this._IMG_NEW
elif reply.get('systemCreated'):
this.system_link['image'] = this._IMG_NEW
else:
this.system_link['image'] = this._IMG_KNOWN
else:
this.system_link['image'] = this._IMG_KNOWN

View File

@ -30,6 +30,7 @@ from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from operator import itemgetter
from threading import Lock, Thread
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import OrderedDict as OrderedDictT
from typing import Sequence, Union, cast
@ -130,7 +131,7 @@ class This:
self.log_button: nb.Checkbutton
self.label: HyperlinkLabel
self.apikey: nb.Entry
self.apikey_label: HyperlinkLabel
self.apikey_label: tk.Label
self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque)
self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events
@ -235,7 +236,7 @@ def plugin_stop() -> None:
logger.debug('Done.')
def plugin_prefs(parent: tk.Tk, cmdr: str, is_beta: bool) -> tk.Frame:
def plugin_prefs(parent: ttk.Notebook, cmdr: str, is_beta: bool) -> tk.Frame:
"""Plugin Preferences UI hook."""
x_padding = 10
x_button_padding = 12 # indent Checkbuttons and Radiobuttons
@ -1539,9 +1540,6 @@ def new_add_event(
def clean_event_list(event_list: List[Event]) -> List[Event]:
"""Check for killswitched events and remove or modify them as requested."""
out = []
bad: bool
new_event: Dict[str, Any] = {}
for e in event_list:
bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{e.name}', e.data, logger)
if bad:

View File

@ -19,7 +19,6 @@ from EDMCLogging import edmclogger, get_main_logger
from hotkey import hotkeymgr
from l10n import Translations
from monitor import monitor
from myNotebook import Notebook
from theme import theme
from ttkHyperlinkLabel import HyperlinkLabel
@ -279,7 +278,7 @@ class PreferencesDialog(tk.Toplevel):
frame = ttk.Frame(self)
frame.grid(sticky=tk.NSEW)
notebook = nb.Notebook(frame)
notebook: ttk.Notebook = nb.Notebook(frame)
notebook.bind('<<NotebookTabChanged>>', self.tabchanged) # Recompute on tab change
self.PADX = 10
@ -333,7 +332,7 @@ class PreferencesDialog(tk.Toplevel):
):
self.geometry(f"+{position.left}+{position.top}")
def __setup_output_tab(self, root_notebook: nb.Notebook) -> None:
def __setup_output_tab(self, root_notebook: ttk.Notebook) -> None:
output_frame = nb.Frame(root_notebook)
output_frame.columnconfigure(0, weight=1)
@ -418,13 +417,13 @@ class PreferencesDialog(tk.Toplevel):
# LANG: Label for 'Output' Settings/Preferences tab
root_notebook.add(output_frame, text=_('Output')) # Tab heading in settings
def __setup_plugin_tabs(self, notebook: Notebook) -> None:
def __setup_plugin_tabs(self, notebook: ttk.Notebook) -> None:
for plugin in plug.PLUGINS:
plugin_frame = plugin.get_prefs(notebook, monitor.cmdr, monitor.is_beta)
if plugin_frame:
notebook.add(plugin_frame, text=plugin.name)
def __setup_config_tab(self, notebook: Notebook) -> None: # noqa: CCR001
def __setup_config_tab(self, notebook: ttk.Notebook) -> None: # noqa: CCR001
config_frame = nb.Frame(notebook)
config_frame.columnconfigure(1, weight=1)
row = AutoInc(start=1)
@ -675,7 +674,7 @@ class PreferencesDialog(tk.Toplevel):
# LANG: Label for 'Configuration' tab in Settings
notebook.add(config_frame, text=_('Configuration'))
def __setup_privacy_tab(self, notebook: Notebook) -> None:
def __setup_privacy_tab(self, notebook: ttk.Notebook) -> None:
frame = nb.Frame(notebook)
self.hide_multicrew_captain = tk.BooleanVar(value=config.get_bool('hide_multicrew_captain', default=False))
self.hide_private_group = tk.BooleanVar(value=config.get_bool('hide_private_group', default=False))
@ -697,7 +696,7 @@ class PreferencesDialog(tk.Toplevel):
notebook.add(frame, text=_('Privacy')) # LANG: Preferences privacy tab title
def __setup_appearance_tab(self, notebook: Notebook) -> None:
def __setup_appearance_tab(self, notebook: ttk.Notebook) -> None:
self.languages = Translations.available_names()
# Appearance theme and language setting
# LANG: The system default language choice in Settings > Appearance
@ -887,7 +886,7 @@ class PreferencesDialog(tk.Toplevel):
# LANG: Label for Settings > Appearance tab
notebook.add(appearance_frame, text=_('Appearance')) # Tab heading in settings
def __setup_plugin_tab(self, notebook: Notebook) -> None: # noqa: CCR001
def __setup_plugin_tab(self, notebook: ttk.Notebook) -> None: # noqa: CCR001
# Plugin settings and info
plugins_frame = nb.Frame(notebook)
plugins_frame.columnconfigure(0, weight=1)
@ -1188,8 +1187,8 @@ class PreferencesDialog(tk.Toplevel):
:return: "break" as a literal, to halt processing
"""
good = hotkeymgr.fromevent(event)
if good:
(hotkey_code, hotkey_mods) = good
if good and isinstance(good, tuple):
hotkey_code, hotkey_mods = good
event.widget.delete(0, tk.END)
event.widget.insert(0, hotkeymgr.display(hotkey_code, hotkey_mods))
if hotkey_code:

View File

@ -121,9 +121,13 @@ elif (config.auth_force_edmc_protocol
and not is_wine
and not config.auth_force_localserver
)):
# This could be false if you use auth_force_edmc_protocol, but then you get to keep the pieces
assert sys.platform == 'win32'
# spell-checker: words HBRUSH HICON WPARAM wstring WNDCLASS HMENU HGLOBAL
from ctypes import windll # type: ignore
from ctypes import POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at
from ctypes import ( # type: ignore
POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at
)
from ctypes.wintypes import (
ATOM, BOOL, DWORD, HBRUSH, HGLOBAL, HICON, HINSTANCE, HMENU, HWND, INT, LPARAM, LPCWSTR, LPVOID, LPWSTR, MSG,
UINT, WPARAM

View File

@ -13,8 +13,8 @@ flake8-annotations-coverage==0.0.6
flake8-cognitive-complexity==0.1.0
flake8-comprehensions==3.10.1
flake8-docstrings==1.6.0
isort==5.11.3
flake8-isort==5.0.3
isort==5.11.4
flake8-isort==6.0.0
flake8-json==21.7.0
flake8-noqa==1.3.0
flake8-polyfill==1.0.2
@ -23,7 +23,8 @@ flake8-use-fstring==1.4
mypy==0.991
pep8-naming==0.13.3
safety==2.3.5
types-requests==2.28.11.6
types-requests==2.28.11.7
types-pkg-resources==0.1.3
# Code formatting tools
autopep8==2.0.1
@ -45,8 +46,8 @@ py2exe==0.13.0.0; sys_platform == 'win32'
# Testing
pytest==7.2.0
pytest-cov==4.0.0 # Pytest code coverage support
coverage[toml]==6.5.0 # pytest-cov dep. This is here to ensure that it includes TOML support for pyproject.toml configs
coverage-conditional-plugin==0.7.0
coverage[toml]==7.0.0 # pytest-cov dep. This is here to ensure that it includes TOML support for pyproject.toml configs
coverage-conditional-plugin==0.8.0
# For manipulating folder permissions and the like.
pywin32==305; sys_platform == 'win32'

6
scripts/mypy-all.sh Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
#
# Run mypy checks against all the relevant files
# We assume that all `.py` files in git should be checked, and *only* those.
mypy $@ $(git ls-tree --full-tree -r --name-only HEAD | grep -E '\.py$')

22
scripts/pip_rev_deps.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Find the reverse dependencies of a package according to pip."""
import sys
import pkg_resources
def find_reverse_deps(package_name: str):
"""
Find the packages that depend on the named one.
:param package_name: Target package.
:return: List of packages that depend on this one.
"""
return [
pkg.project_name for pkg in pkg_resources.WorkingSet()
if package_name in {req.project_name for req in pkg.requires()}
]
if __name__ == '__main__':
print(find_reverse_deps(sys.argv[1]))

View File

@ -5,7 +5,7 @@ import sys
import tkinter
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any, AnyStr, Callable, Dict, List, NamedTuple, Optional, Sequence, cast
from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast
import companion
import EDMCLogging
@ -45,7 +45,7 @@ RANK_LINES_END = 9
POWERPLAY_LINES_START = 9
def status(data: Dict[str, Any]) -> List[List[str]]:
def status(data: dict[str, Any]) -> list[list[str]]:
"""
Get the current status of the cmdr referred to by data.
@ -211,7 +211,7 @@ def status(data: Dict[str, Any]) -> List[List[str]]:
return res
def export_status(data: Dict[str, Any], filename: AnyStr) -> None:
def export_status(data: dict[str, Any], filename: AnyStr) -> None:
"""
Export status data to a CSV file.
@ -236,21 +236,21 @@ class ShipRet(NamedTuple):
value: str
def ships(companion_data: Dict[str, Any]) -> List[ShipRet]:
def ships(companion_data: dict[str, Any]) -> list[ShipRet]:
"""
Return a list of 5 tuples of ship information.
:param data: [description]
:return: A 5 tuple of strings containing: Ship ID, Ship Type Name (internal), Ship Name, System, Station, and Value
"""
ships: List[Dict[str, Any]] = companion.listify(cast(list, companion_data.get('ships')))
ships: list[dict[str, Any]] = companion.listify(cast(list, companion_data.get('ships')))
current = companion_data['commander'].get('currentShipId')
if isinstance(current, int) and current < len(ships) and ships[current]:
ships.insert(0, ships.pop(current)) # Put current ship first
if not companion_data['commander'].get('docked'):
out: List[ShipRet] = []
out: list[ShipRet] = []
# Set current system, not last docked
out.append(ShipRet(
id=str(ships[0]['id']),
@ -285,7 +285,7 @@ def ships(companion_data: Dict[str, Any]) -> List[ShipRet]:
]
def export_ships(companion_data: Dict[str, Any], filename: AnyStr) -> None:
def export_ships(companion_data: dict[str, Any], filename: AnyStr) -> None:
"""
Export the current ships to a CSV file.
@ -358,7 +358,7 @@ class StatsDialog():
class StatsResults(tk.Toplevel):
"""Status window."""
def __init__(self, parent: tk.Tk, data: Dict[str, Any]) -> None:
def __init__(self, parent: tk.Tk, data: dict[str, Any]) -> None:
tk.Toplevel.__init__(self, parent)
self.parent = parent
@ -442,7 +442,9 @@ class StatsResults(tk.Toplevel):
):
self.geometry(f"+{position.left}+{position.top}")
def addpage(self, parent, header: List[str] = None, align: Optional[str] = None) -> tk.Frame:
def addpage(
self, parent, header: list[str] | None = None, align: str | None = None
) -> ttk.Frame:
"""
Add a page to the StatsResults screen.
@ -462,7 +464,7 @@ class StatsResults(tk.Toplevel):
return page
def addpageheader(self, parent: tk.Frame, header: Sequence[str], align: Optional[str] = None) -> None:
def addpageheader(self, parent: ttk.Frame, header: Sequence[str], align: str | None = None) -> None:
"""
Add the column headers to the page, followed by a separator.
@ -478,7 +480,7 @@ class StatsResults(tk.Toplevel):
self.addpagerow(parent, [''])
def addpagerow(
self, parent: tk.Frame, content: Sequence[str], align: Optional[str] = None, with_copy: bool = False
self, parent: ttk.Frame, content: Sequence[str], align: str | None = None, with_copy: bool = False
):
"""
Add a single row to parent.

View File

@ -1,3 +1,4 @@
# type: ignore
import numbers
import sys
import warnings
@ -138,7 +139,7 @@ class OldConfig():
self.identifier = f'uk.org.marginal.{appname.lower()}'
NSBundle.mainBundle().infoDictionary()['CFBundleIdentifier'] = self.identifier
self.default_journal_dir = join(
self.default_journal_dir: str | None = join(
NSSearchPathForDirectoriesInDomains(NSApplicationSupportDirectory, NSUserDomainMask, True)[0],
'Frontier Developments',
'Elite Dangerous'
@ -222,13 +223,13 @@ class OldConfig():
journaldir = known_folder_path(FOLDERID_SavedGames)
if journaldir:
self.default_journal_dir = join(journaldir, 'Frontier Developments', 'Elite Dangerous')
self.default_journal_dir: str | None = join(journaldir, 'Frontier Developments', 'Elite Dangerous')
else:
self.default_journal_dir = None
self.identifier = applongname
self.hkey = HKEY()
self.hkey: ctypes.c_void_p | None = HKEY()
disposition = DWORD()
if RegCreateKeyEx(
HKEY_CURRENT_USER,
@ -376,7 +377,7 @@ class OldConfig():
mkdir(self.plugin_dir)
self.internal_plugin_dir = join(dirname(__file__), 'plugins')
self.default_journal_dir = None
self.default_journal_dir: str | None = None
self.home = expanduser('~')
self.respath = dirname(__file__)
self.identifier = f'uk.org.marginal.{appname.lower()}'

View File

@ -88,7 +88,7 @@ class TestNewConfig:
if sys.platform != 'linux':
return
from config.linux import LinuxConfig
from config.linux import LinuxConfig # type: ignore
if isinstance(config, LinuxConfig) and config.config is not None:
config.config.read(config.filename)
@ -179,7 +179,7 @@ class TestOldNewConfig:
if sys.platform != 'linux':
return
from config.linux import LinuxConfig
from config.linux import LinuxConfig # type: ignore
if isinstance(config, LinuxConfig) and config.config is not None:
config.config.read(config.filename)

View File

@ -3,11 +3,10 @@ import multiprocessing as mp
import os
import pathlib
import sys
from typing import Generator
import pytest
# Import as other names else they get picked up when used as fixtures
from _pytest import monkeypatch as _pytest_monkeypatch
from _pytest import tmpdir as _pytest_tmpdir
from pytest import MonkeyPatch, TempdirFactory, TempPathFactory
from config import config
from journal_lock import JournalLock, JournalLockResult
@ -117,11 +116,11 @@ class TestJournalLock:
@pytest.fixture
def mock_journaldir(
self, monkeypatch: _pytest_monkeypatch,
tmp_path_factory: _pytest_tmpdir.TempPathFactory
) -> _pytest_tmpdir.TempPathFactory:
self, monkeypatch: MonkeyPatch,
tmp_path_factory: TempdirFactory
) -> Generator:
"""Fixture for mocking config.get_str('journaldir')."""
def get_str(key: str, *, default: str = None) -> str:
def get_str(key: str, *, default: str | None = None) -> str:
"""Mock config.*Config get_str to provide fake journaldir."""
if key == 'journaldir':
return str(tmp_path_factory.getbasetemp())
@ -136,11 +135,11 @@ class TestJournalLock:
@pytest.fixture
def mock_journaldir_changing(
self,
monkeypatch: _pytest_monkeypatch,
tmp_path_factory: _pytest_tmpdir.TempPathFactory
) -> _pytest_tmpdir.TempPathFactory:
monkeypatch: MonkeyPatch,
tmp_path_factory: TempdirFactory
) -> Generator:
"""Fixture for mocking config.get_str('journaldir')."""
def get_str(key: str, *, default: str = None) -> str:
def get_str(key: str, *, default: str | None = None) -> str:
"""Mock config.*Config get_str to provide fake journaldir."""
if key == 'journaldir':
return tmp_path_factory.mktemp("changing")
@ -154,7 +153,7 @@ class TestJournalLock:
###########################################################################
# Tests against JournalLock.__init__()
def test_journal_lock_init(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_journal_lock_init(self, mock_journaldir: TempPathFactory):
"""Test JournalLock instantiation."""
print(f'{type(mock_journaldir)=}')
tmpdir = str(mock_journaldir.getbasetemp())
@ -176,14 +175,14 @@ class TestJournalLock:
jlock.set_path_from_journaldir()
assert jlock.journal_dir_path is None
def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.set_path_from_journaldir() with tmpdir."""
tmpdir = mock_journaldir
jlock = JournalLock()
# Check that an actual journaldir is handled correctly.
jlock.journal_dir = tmpdir
jlock.journal_dir = str(tmpdir)
jlock.set_path_from_journaldir()
assert isinstance(jlock.journal_dir_path, pathlib.Path)
@ -200,7 +199,7 @@ class TestJournalLock:
locked = jlock.obtain_lock()
assert locked == JournalLockResult.JOURNALDIR_IS_NONE
def test_obtain_lock_with_tmpdir(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_obtain_lock_with_tmpdir(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.obtain_lock() with tmpdir."""
jlock = JournalLock()
@ -213,7 +212,7 @@ class TestJournalLock:
assert jlock.release_lock()
os.unlink(str(jlock.journal_dir_lockfile_name))
def test_obtain_lock_with_tmpdir_ro(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_obtain_lock_with_tmpdir_ro(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.obtain_lock() with read-only tmpdir."""
tmpdir = str(mock_journaldir.getbasetemp())
print(f'{tmpdir=}')
@ -280,7 +279,7 @@ class TestJournalLock:
assert locked == JournalLockResult.JOURNALDIR_READONLY
def test_obtain_lock_already_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_obtain_lock_already_locked(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.obtain_lock() with tmpdir."""
continue_q: mp.Queue = mp.Queue()
exit_q: mp.Queue = mp.Queue()
@ -312,7 +311,7 @@ class TestJournalLock:
###########################################################################
# Tests against JournalLock.release_lock()
def test_release_lock(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_release_lock(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.release_lock()."""
# First actually obtain the lock, and check it worked
jlock = JournalLock()
@ -330,12 +329,12 @@ class TestJournalLock:
# Cleanup, to avoid side-effect on other tests
os.unlink(str(jlock.journal_dir_lockfile_name))
def test_release_lock_not_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_release_lock_not_locked(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.release_lock() when not locked."""
jlock = JournalLock()
assert jlock.release_lock()
def test_release_lock_lie_locked(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_release_lock_lie_locked(self, mock_journaldir: TempPathFactory):
"""Test JournalLock.release_lock() when not locked, but lie we are."""
jlock = JournalLock()
jlock.locked = True
@ -345,7 +344,7 @@ class TestJournalLock:
# Tests against JournalLock.update_lock()
def test_update_lock(
self,
mock_journaldir_changing: _pytest_tmpdir.TempPathFactory):
mock_journaldir_changing: TempPathFactory):
"""
Test JournalLock.update_lock().
@ -373,7 +372,7 @@ class TestJournalLock:
# And the old_journaldir's lockfile too
os.unlink(str(old_journaldir_lockfile_name))
def test_update_lock_same(self, mock_journaldir: _pytest_tmpdir.TempPathFactory):
def test_update_lock_same(self, mock_journaldir: TempPathFactory):
"""
Test JournalLock.update_lock().

View File

@ -131,14 +131,14 @@ class _Theme(object):
THEME_TRANSPARENT = 2
def __init__(self) -> None:
self.active = None # Starts out with no theme
self.active: int | None = None # Starts out with no theme
self.minwidth: Optional[int] = None
self.widgets: Dict[tk.Widget | tk.BitmapImage, Set] = {}
self.widgets_pair: List = []
self.defaults: Dict = {}
self.current: Dict = {}
self.default_ui_scale = None # None == not yet known
self.startup_ui_scale = None
self.default_ui_scale: float | None = None # None == not yet known
self.startup_ui_scale: int | None = None
def register(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901
# Note widget and children for later application of a theme. Note if

View File

@ -25,7 +25,9 @@ class TimeoutAdapter(HTTPAdapter):
return super().send(*args, **kwargs)
def new_session(timeout: int = REQUEST_TIMEOUT, session: requests.Session = None) -> requests.Session:
def new_session(
timeout: int = REQUEST_TIMEOUT, session: requests.Session | None = None
) -> requests.Session:
"""
Create a new requests.Session and override the default HTTPAdapter with a TimeoutAdapter.

View File

@ -19,7 +19,7 @@ import tkinter as tk
import webbrowser
from tkinter import font as tk_font
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
def _(x: str) -> str: ...
@ -29,7 +29,7 @@ if TYPE_CHECKING:
class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object): # type: ignore
"""Clickable label for HTTP links."""
def __init__(self, master: Optional[tk.Tk] = None, **kw: Any) -> None:
def __init__(self, master: tk.Frame | None = None, **kw: Any) -> None:
self.url = 'url' in kw and kw.pop('url') or None
self.popup_copy = kw.pop('popup_copy', False)
self.underline = kw.pop('underline', None) # override ttk.Label's underline