diff --git a/EDMC.py b/EDMC.py index f56f7ead..df203008 100755 --- a/EDMC.py +++ b/EDMC.py @@ -1,6 +1,11 @@ -#!/usr/bin/env python3 -"""Command-line interface. Requires prior setup through the GUI.""" +""" +EDMC.py - Command-line interface. Requires prior setup through the GUI. +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations import argparse import json @@ -8,24 +13,22 @@ import locale import os import queue import sys -from os.path import getmtime from time import sleep, time -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, Any # isort: off - os.environ["EDMC_NO_UI"] = "1" # See EDMCLogging.py docs. # workaround for https://github.com/EDCD/EDMarketConnector/issues/568 from EDMCLogging import edmclogger, logger, logging + if TYPE_CHECKING: from logging import TRACE # type: ignore # noqa: F401 # needed to make mypy happy edmclogger.set_channels_loglevel(logging.INFO) # isort: on - import collate import commodity import companion @@ -45,6 +48,8 @@ sys.path.append(config.internal_plugin_dir) # The sys.path.append has to be after `import sys` and `from config import config` # isort: off import eddn # noqa: E402 + + # isort: on @@ -66,7 +71,7 @@ EXIT_SUCCESS, EXIT_SERVER, EXIT_CREDENTIALS, EXIT_VERIFICATION, EXIT_LAGGING, EX EXIT_JOURNAL_READ_ERR, EXIT_COMMANDER_UNKNOWN = range(9) -def versioncmp(versionstring) -> List: +def versioncmp(versionstring) -> list: """Quick and dirty version comparison assuming "strict" numeric only version numbers.""" return list(map(int, versionstring.split('.'))) @@ -98,9 +103,7 @@ def deep_get(target: dict | companion.CAPIData, *args: str, default=None) -> Any res = current.get(arg) if res is None: return default - current = res - return current @@ -155,16 +158,15 @@ def main(): # noqa: C901, CCR001 args = parser.parse_args() if args.version: - updater = Updater(provider='internal') - newversion: Optional[EDMCVersion] = updater.check_appcast() + updater = Updater() + newversion: EDMCVersion | None = updater.check_appcast() if newversion: print(f'{appversion()} ({newversion.title!r} is available)') else: print(appversion()) - return - level_to_set: Optional[int] = None + level_to_set: int | None = None if args.trace or args.trace_on: level_to_set = logging.TRACE # type: ignore # it exists logger.info('Setting TRACE level debugging due to either --trace or a --trace-on') @@ -185,10 +187,10 @@ def main(): # noqa: C901, CCR001 logger.debug(f'Startup v{appversion()} : Running on Python v{sys.version}') logger.debug(f'''Platform: {sys.platform} -argv[0]: {sys.argv[0]} -exec_prefix: {sys.exec_prefix} -executable: {sys.executable} -sys.path: {sys.path}''' + argv[0]: {sys.argv[0]} + exec_prefix: {sys.exec_prefix} + executable: {sys.executable} + sys.path: {sys.path}''' ) if args.trace_on and len(args.trace_on) > 0: import config as conf_module @@ -207,12 +209,14 @@ sys.path: {sys.path}''' # system, chances are its the current locale, and not utf-8. Otherwise if it was copied, its probably # utf8. Either way, try the system FIRST because reading something like cp1251 in UTF-8 results in garbage # but the reverse results in an exception. + json_file = os.path.abspath(args.j) try: - data = json.load(open(args.j)) + with open(json_file) as file_handle: + data = json.load(file_handle) except UnicodeDecodeError: - data = json.load(open(args.j, encoding='utf-8')) - - config.set('querytime', int(getmtime(args.j))) + with open(json_file, encoding='utf-8') as file_handle: + data = json.load(file_handle) + config.set('querytime', int(os.path.getmtime(args.j))) else: # Get state from latest Journal file @@ -255,10 +259,8 @@ sys.path: {sys.path}''' for idx, cmdr in enumerate(cmdrs): if cmdr.lower() == args.p.lower(): break - else: raise companion.CredentialsError() - companion.session.login(cmdrs[idx], monitor.is_beta) else: @@ -292,9 +294,7 @@ sys.path: {sys.path}''' logger.trace_if('capi.worker', f'Failed Request: {capi_response.message}') if capi_response.exception: raise capi_response.exception - - else: - raise ValueError(capi_response.message) + raise ValueError(capi_response.message) logger.trace_if('capi.worker', 'Answer is not a Failure') if not isinstance(capi_response, companion.EDMCCAPIResponse): @@ -366,20 +366,19 @@ sys.path: {sys.path}''' else: print(deep_get(data, 'lastSystem', 'name', default='Unknown')) - if (args.m or args.o or args.s or args.n or args.j): + if args.m or args.o or args.s or args.n or args.j: if not data['commander'].get('docked'): logger.error("Can't use -m, -o, -s, -n or -j because you're not currently docked!") return - elif not deep_get(data, 'lastStarport', 'name'): + if not deep_get(data, 'lastStarport', 'name'): logger.error("No data['lastStarport']['name'] from CAPI") sys.exit(EXIT_LAGGING) # Ignore possibly missing shipyard info - elif not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')): + if not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')): logger.error("No commodities or outfitting (modules) in CAPI data") return - else: return @@ -410,8 +409,8 @@ sys.path: {sys.path}''' else: logger.error("Station doesn't supply outfitting") - if (args.s or args.n) and not args.j and not \ - data['lastStarport'].get('ships') and data['lastStarport']['services'].get('shipyard'): + if ((args.s or args.n) and not args.j and not data['lastStarport'].get('ships') + and data['lastStarport']['services'].get('shipyard')): # Retry for shipyard sleep(SERVER_RETRY) @@ -462,14 +461,14 @@ sys.path: {sys.path}''' except Exception: logger.exception('Failed to send data to EDDN') - except companion.ServerError: - logger.exception('Frontier CAPI Server returned an error') - sys.exit(EXIT_SERVER) - except companion.ServerConnectionError: logger.exception('Exception while contacting server') sys.exit(EXIT_SERVER) + except companion.ServerError: + logger.exception('Frontier CAPI Server returned an error') + sys.exit(EXIT_SERVER) + except companion.CredentialsError: logger.error('Frontier CAPI Server: Invalid Credentials') sys.exit(EXIT_CREDENTIALS) diff --git a/EDMCLogging.py b/EDMCLogging.py index e698b35b..74b439f9 100644 --- a/EDMCLogging.py +++ b/EDMCLogging.py @@ -34,6 +34,7 @@ To utilise logging in a 'found' (third-party) plugin, include this: # See, plug.py:load_plugins() logger = logging.getLogger(f'{appname}.{plugin_name}') """ +from __future__ import annotations import inspect import logging @@ -48,7 +49,7 @@ from sys import _getframe as getframe from threading import get_native_id as thread_native_id from time import gmtime from traceback import print_exc -from typing import TYPE_CHECKING, Tuple, cast +from typing import TYPE_CHECKING, cast import config as config_mod from config import appcmdname, appname, config @@ -122,7 +123,6 @@ if TYPE_CHECKING: def trace(self, message, *args, **kwargs) -> None: """See implementation above.""" - ... def trace_if(self, condition: str, message, *args, **kwargs) -> None: """ @@ -130,7 +130,6 @@ if TYPE_CHECKING: See implementation above. """ - ... class Logger: @@ -183,18 +182,12 @@ class Logger: # rotated versions. # This is {logger_name} so that EDMC.py logs to a different file. logfile_rotating = pathlib.Path(tempfile.gettempdir()) - logfile_rotating = logfile_rotating / f'{appname}' + logfile_rotating /= f'{appname}' logfile_rotating.mkdir(exist_ok=True) - logfile_rotating = logfile_rotating / f'{logger_name}-debug.log' + logfile_rotating /= f'{logger_name}-debug.log' - self.logger_channel_rotating = logging.handlers.RotatingFileHandler( - logfile_rotating, - mode='a', - maxBytes=1024 * 1024, # 1MiB - backupCount=10, - encoding='utf-8', - delay=False - ) + self.logger_channel_rotating = logging.handlers.RotatingFileHandler(logfile_rotating, maxBytes=1024 * 1024, + backupCount=10, encoding='utf-8') # Yes, we always want these rotated files to be at TRACE level self.logger_channel_rotating.setLevel(logging.TRACE) # type: ignore self.logger_channel_rotating.setFormatter(self.logger_formatter) @@ -325,7 +318,7 @@ class EDMCContextFilter(logging.Filter): return True @classmethod - def caller_attributes(cls, module_name: str = '') -> Tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible + def caller_attributes(cls, module_name: str = '') -> tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible """ Determine extra or changed fields for the caller. @@ -535,9 +528,8 @@ def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin': if not os.getenv("EDMC_NO_UI"): # GUI app being run return cast('LoggerMixin', logging.getLogger(appname)) - else: - # Must be the CLI - return cast('LoggerMixin', logging.getLogger(appcmdname)) + # Must be the CLI + return cast('LoggerMixin', logging.getLogger(appcmdname)) # Singleton diff --git a/EDMarketConnector.py b/EDMarketConnector.py index 0b457c9c..8be745f2 100755 --- a/EDMarketConnector.py +++ b/EDMarketConnector.py @@ -1,6 +1,10 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""Entry point for the main GUI application.""" +""" +EDMarketConnector.py - Entry point for the GUI. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" from __future__ import annotations import argparse @@ -9,14 +13,14 @@ import locale import pathlib import queue import re +import subprocess import sys import threading import webbrowser -from builtins import object, str -from os import chdir, environ -from os.path import dirname, join +from os import chdir, environ, path from time import localtime, strftime, time -from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Literal +from constants import applongname, appname, protocolhandler_redirect # Have this as early as possible for people running EDMarketConnector.exe # from cmd.exe or a bat file or similar. Else they might not be in the correct @@ -24,17 +28,16 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union if getattr(sys, 'frozen', False): # Under py2exe sys.path[0] is the executable name if sys.platform == 'win32': - chdir(dirname(sys.path[0])) + chdir(path.dirname(sys.path[0])) # Allow executable to be invoked from any cwd - environ['TCL_LIBRARY'] = join(dirname(sys.path[0]), 'lib', 'tcl') - environ['TK_LIBRARY'] = join(dirname(sys.path[0]), 'lib', 'tk') + environ['TCL_LIBRARY'] = path.join(path.dirname(sys.path[0]), 'lib', 'tcl') + environ['TK_LIBRARY'] = path.join(path.dirname(sys.path[0]), 'lib', 'tk') else: # We still want to *try* to have CWD be where the main script is, even if # not frozen. chdir(pathlib.Path(__file__).parent) -from constants import applongname, appname, protocolhandler_redirect # config will now cause an appname logger to be set up, so we need the # console redirect before this @@ -46,7 +49,8 @@ if __name__ == '__main__': import tempfile # unbuffered not allowed for text in python3, so use `1 for line buffering - sys.stdout = sys.stderr = open(join(tempfile.gettempdir(), f'{appname}.log'), mode='wt', buffering=1) + log_file_path = path.join(tempfile.gettempdir(), f'{appname}.log') + sys.stdout = sys.stderr = open(log_file_path, mode='wt', buffering=1) # Do NOT use WITH here. # TODO: Test: Make *sure* this redirect is working, else py2exe is going to cause an exit popup @@ -87,6 +91,11 @@ if __name__ == '__main__': # noqa: C901 help='Suppress the popup from when the application detects another instance already running', action='store_true' ) + + parser.add_argument('--start_min', + help="Start the application minimized", + action="store_true" + ) ########################################################################### ########################################################################### @@ -175,7 +184,7 @@ if __name__ == '__main__': # noqa: C901 ) ########################################################################### - args = parser.parse_args() + args: argparse.Namespace = parser.parse_args() if args.capi_pretend_down: import config as conf_module @@ -187,7 +196,7 @@ if __name__ == '__main__': # noqa: C901 with open(conf_module.config.app_dir_path / 'access_token.txt', 'r') as at: conf_module.capi_debug_access_token = at.readline().strip() - level_to_set: Optional[int] = None + level_to_set: int | None = None if args.trace or args.trace_on: level_to_set = logging.TRACE # type: ignore # it exists logger.info('Setting TRACE level debugging due to either --trace or a --trace-on') @@ -216,7 +225,7 @@ if __name__ == '__main__': # noqa: C901 else: print("--force-edmc-protocol is only valid on Windows") parser.print_help() - exit(1) + sys.exit(1) if args.debug_sender and len(args.debug_sender) > 0: import config as conf_module @@ -237,7 +246,7 @@ if __name__ == '__main__': # noqa: C901 logger.info(f'marked {d} for TRACE') def handle_edmc_callback_or_foregrounding() -> None: # noqa: CCR001 - """Handle any edmc:// auth callback, else foreground existing window.""" + """Handle any edmc:// auth callback, else foreground an existing window.""" logger.trace_if('frontier-auth.windows', 'Begin...') if sys.platform == 'win32': @@ -245,40 +254,40 @@ if __name__ == '__main__': # noqa: C901 # If *this* instance hasn't locked, then another already has and we # now need to do the edmc:// checks for auth callback if locked != JournalLockResult.LOCKED: - import ctypes + from ctypes import windll, c_int, create_unicode_buffer, WINFUNCTYPE from ctypes.wintypes import BOOL, HWND, INT, LPARAM, LPCWSTR, LPWSTR - EnumWindows = ctypes.windll.user32.EnumWindows # noqa: N806 - GetClassName = ctypes.windll.user32.GetClassNameW # noqa: N806 - GetClassName.argtypes = [HWND, LPWSTR, ctypes.c_int] - GetWindowText = ctypes.windll.user32.GetWindowTextW # noqa: N806 - GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int] - GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW # noqa: N806 - GetProcessHandleFromHwnd = ctypes.windll.oleacc.GetProcessHandleFromHwnd # noqa: N806 + EnumWindows = windll.user32.EnumWindows # noqa: N806 + GetClassName = windll.user32.GetClassNameW # noqa: N806 + GetClassName.argtypes = [HWND, LPWSTR, c_int] + GetWindowText = windll.user32.GetWindowTextW # noqa: N806 + GetWindowText.argtypes = [HWND, LPWSTR, c_int] + GetWindowTextLength = windll.user32.GetWindowTextLengthW # noqa: N806 + GetProcessHandleFromHwnd = windll.oleacc.GetProcessHandleFromHwnd # noqa: N806 SW_RESTORE = 9 # noqa: N806 - SetForegroundWindow = ctypes.windll.user32.SetForegroundWindow # noqa: N806 - ShowWindow = ctypes.windll.user32.ShowWindow # noqa: N806 - ShowWindowAsync = ctypes.windll.user32.ShowWindowAsync # noqa: N806 + SetForegroundWindow = windll.user32.SetForegroundWindow # noqa: N806 + ShowWindow = windll.user32.ShowWindow # noqa: N806 + ShowWindowAsync = windll.user32.ShowWindowAsync # noqa: N806 COINIT_MULTITHREADED = 0 # noqa: N806,F841 COINIT_APARTMENTTHREADED = 0x2 # noqa: N806 COINIT_DISABLE_OLE1DDE = 0x4 # noqa: N806 - CoInitializeEx = ctypes.windll.ole32.CoInitializeEx # noqa: N806 + CoInitializeEx = windll.ole32.CoInitializeEx # noqa: N806 - ShellExecute = ctypes.windll.shell32.ShellExecuteW # noqa: N806 + ShellExecute = windll.shell32.ShellExecuteW # noqa: N806 ShellExecute.argtypes = [HWND, LPCWSTR, LPCWSTR, LPCWSTR, LPCWSTR, INT] - def window_title(h: int) -> Optional[str]: + def window_title(h: int) -> str | None: if h: text_length = GetWindowTextLength(h) + 1 - buf = ctypes.create_unicode_buffer(text_length) + buf = create_unicode_buffer(text_length) if GetWindowText(h, buf, text_length): return buf.value return None - @ctypes.WINFUNCTYPE(BOOL, HWND, LPARAM) + @WINFUNCTYPE(BOOL, HWND, LPARAM) def enumwindowsproc(window_handle, l_param): # noqa: CCR001 """ Determine if any window for the Application exists. @@ -286,15 +295,16 @@ if __name__ == '__main__': # noqa: C901 Called for each found window by EnumWindows(). When a match is found we check if we're being invoked as the - edmc://auth handler. If so we send the message to the existing - process/window. If not we'll raise that existing window to the + edmc://auth handler. If so we send the message to the existing + process/window. If not we'll raise that existing window to the foreground. + :param window_handle: Window to check. :param l_param: The second parameter to the EnumWindows() call. :return: False if we found a match, else True to continue iteration """ # class name limited to 256 - https://msdn.microsoft.com/en-us/library/windows/desktop/ms633576 - cls = ctypes.create_unicode_buffer(257) + cls = create_unicode_buffer(257) # This conditional is exploded to make debugging slightly easier if GetClassName(window_handle, cls, 257): if cls.value == 'TkTopLevel': @@ -323,13 +333,8 @@ if __name__ == '__main__': # noqa: C901 # Ref: EnumWindows(enumwindowsproc, 0) - return - def already_running_popup(): """Create the "already running" popup.""" - import tkinter as tk - from tkinter import ttk - # Check for CL arg that suppresses this popup. if args.suppress_dupe_process_popup: sys.exit(0) @@ -339,8 +344,7 @@ if __name__ == '__main__': # noqa: C901 frame = tk.Frame(root) frame.grid(row=1, column=0, sticky=tk.NSEW) - label = tk.Label(frame) - label['text'] = 'An EDMarketConnector.exe process was already running, exiting.' + label = tk.Label(frame, text='An EDMarketConnector.exe process was already running, exiting.') label.grid(row=1, column=0, sticky=tk.NSEW) button = ttk.Button(frame, text='OK', command=lambda: sys.exit(0)) @@ -373,41 +377,23 @@ if __name__ == '__main__': # noqa: C901 git_branch = "" try: - import subprocess git_cmd = subprocess.Popen('git branch --show-current'.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) out, err = git_cmd.communicate() - git_branch = out.decode().rstrip('\n') + git_branch = out.decode().strip() except Exception: pass if ( - ( - git_branch == 'develop' - or ( - git_branch == '' and '-alpha0' in str(appversion()) - ) - ) and ( - ( - sys.platform == 'linux' - and environ.get('USER') is not None - and environ['USER'] not in ['ad', 'athan'] - ) - or ( - sys.platform == 'win32' - and environ.get('USERNAME') is not None - and environ['USERNAME'] not in ['Athan'] - ) + git_branch == 'develop' + or ( + git_branch == '' and '-alpha0' in str(appversion()) ) ): - print("Why are you running the develop branch if you're not a developer?") - print("Please check https://github.com/EDCD/EDMarketConnector/wiki/Running-from-source#running-from-source") - print("You probably want the 'stable' branch.") - print("\n\rIf Athanasius or A_D asked you to run this, tell them about this message.") - sys.exit(-1) + print("You're running in a DEVELOPMENT branch build. You might encounter bugs!") # See EDMCLogging.py docs. @@ -427,8 +413,7 @@ import tkinter as tk import tkinter.filedialog import tkinter.font import tkinter.messagebox -from tkinter import ttk, constants as tkc - +from tkinter import ttk import commodity import plug import prefs @@ -462,7 +447,7 @@ SHIPYARD_HTML_TEMPLATE = """ """ -class AppWindow(object): +class AppWindow: """Define the main application window.""" _CAPI_RESPONSE_TK_EVENT_NAME = '<>' @@ -509,7 +494,7 @@ class AppWindow(object): else: self.w.tk.call('wm', 'iconphoto', self.w, '-default', - tk.PhotoImage(file=join(config.respath_path, 'io.edcd.EDMarketConnector.png'))) + tk.PhotoImage(file=path.join(config.respath_path, 'io.edcd.EDMarketConnector.png'))) # TODO: Export to files and merge from them in future ? self.theme_icon = tk.PhotoImage( @@ -637,7 +622,7 @@ class AppWindow(object): self.updater = update.Updater(tkroot=self.w, provider='external') else: - self.updater = update.Updater(tkroot=self.w, provider='internal') + self.updater = update.Updater(tkroot=self.w) self.updater.check_for_updates() # Sparkle / WinSparkle does this automatically for packaged apps if sys.platform == 'darwin': @@ -680,14 +665,14 @@ class AppWindow(object): self.w.protocol("WM_DELETE_WINDOW", self.w.withdraw) # close button shouldn't quit app self.w.resizable(tk.FALSE, tk.FALSE) # Can't be only resizable on one axis else: - self.file_menu = self.view_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore + self.file_menu = self.view_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) self.file_menu.add_command(command=lambda: stats.StatsDialog(self.w, self.status)) self.file_menu.add_command(command=self.save_raw) self.file_menu.add_command(command=lambda: prefs.PreferencesDialog(self.w, self.postprefs)) self.file_menu.add_separator() self.file_menu.add_command(command=self.onexit) self.menubar.add_cascade(menu=self.file_menu) - self.edit_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore + self.edit_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) self.edit_menu.add_command(accelerator='Ctrl+C', state=tk.DISABLED, command=self.copy) self.menubar.add_cascade(menu=self.edit_menu) self.help_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore @@ -734,7 +719,7 @@ class AppWindow(object): anchor=tk.W, compound=tk.LEFT ) theme_titlebar.grid(columnspan=3, padx=2, sticky=tk.NSEW) - self.drag_offset: Tuple[Optional[int], Optional[int]] = (None, None) + self.drag_offset: tuple[int | None, int | None] = (None, None) theme_titlebar.bind('', self.drag_start) theme_titlebar.bind('', self.drag_continue) theme_titlebar.bind('', self.drag_end) @@ -808,7 +793,7 @@ class AppWindow(object): self.w.bind('', self.capi_request_data) self.w.bind_all('<>', self.capi_request_data) # Ask for CAPI queries to be performed self.w.bind_all(self._CAPI_RESPONSE_TK_EVENT_NAME, self.capi_handle_response) - self.w.bind_all('<>', self.journal_event) # Journal monitoring + self.w.bind_all('<>', self.journal_event) # type: ignore # Journal monitoring self.w.bind_all('<>', self.dashboard_event) # Dashboard monitoring self.w.bind_all('<>', self.plugin_error) # Statusbar self.w.bind_all('<>', self.auth) # cAPI auth @@ -826,6 +811,12 @@ class AppWindow(object): config.delete('logdir', suppress=True) self.postprefs(False) # Companion login happens in callback from monitor self.toggle_suit_row(visible=False) + if args.start_min: + logger.warning("Trying to start minimized") + if root.overrideredirect(): + self.oniconify() + else: + self.w.wm_iconify() def update_suit_text(self) -> None: """Update the suit text for current type and loadout.""" @@ -834,13 +825,14 @@ class AppWindow(object): self.suit['text'] = '' return - if (suit := monitor.state.get('SuitCurrent')) is None: + suit = monitor.state.get('SuitCurrent') + if suit is None: self.suit['text'] = f'<{_("Unknown")}>' # LANG: Unknown suit return suitname = suit['edmcName'] - - if (suitloadout := monitor.state.get('SuitLoadoutCurrent')) is None: + suitloadout = monitor.state.get('SuitLoadoutCurrent') + if suitloadout is None: self.suit['text'] = '' return @@ -849,31 +841,18 @@ class AppWindow(object): def suit_show_if_set(self) -> None: """Show UI Suit row if we have data, else hide.""" - if self.suit['text'] != '': - self.toggle_suit_row(visible=True) + self.toggle_suit_row(self.suit['text'] != '') - else: - self.toggle_suit_row(visible=False) - - def toggle_suit_row(self, visible: Optional[bool] = None) -> None: + def toggle_suit_row(self, visible: bool | None = None) -> None: """ Toggle the visibility of the 'Suit' row. :param visible: Force visibility to this. """ - if visible is True: - self.suit_shown = False - - elif visible is False: - self.suit_shown = True + self.suit_shown = not visible if not self.suit_shown: - if sys.platform != 'win32': - pady = 2 - - else: - - pady = 0 + pady = 2 if sys.platform != 'win32' else 0 self.suit_label.grid(row=self.suit_grid_row, column=0, sticky=tk.W, padx=self.PADX, pady=pady) self.suit.grid(row=self.suit_grid_row, column=1, sticky=tk.EW, padx=self.PADX, pady=pady) @@ -1020,38 +999,49 @@ class AppWindow(object): :return: True if all OK, else False to trigger play_bad in caller. """ - if config.get_int('output') & (config.OUT_STATION_ANY): - if not data['commander'].get('docked') and not monitor.state['OnFoot']: - if not self.status['text']: - # Signal as error because the user might actually be docked - # but the server hosting the Companion API hasn't caught up - # LANG: Player is not docked at a station, when we expect them to be - self.status['text'] = _("You're not docked at a station!") - return False + output_flags = config.get_int('output') + is_docked = data['commander'].get('docked') + has_commodities = data['lastStarport'].get('commodities') + has_modules = data['lastStarport'].get('modules') + commodities_flag = config.OUT_MKT_CSV | config.OUT_MKT_TD + + if output_flags & config.OUT_STATION_ANY: + if not is_docked and not monitor.state['OnFoot']: + # Signal as error because the user might actually be docked + # but the server hosting the Companion API hasn't caught up + # LANG: Player is not docked at a station, when we expect them to be + self._handle_status(_("You're not docked at a station!")) + return False # Ignore possibly missing shipyard info - elif (config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA) \ - and not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')): - if not self.status['text']: - # LANG: Status - Either no market or no modules data for station from Frontier CAPI - self.status['text'] = _("Station doesn't have anything!") + if output_flags & config.OUT_EDDN_SEND_STATION_DATA and not (has_commodities or has_modules): + # LANG: Status - Either no market or no modules data for station from Frontier CAPI + self._handle_status(_("Station doesn't have anything!")) - elif not data['lastStarport'].get('commodities'): - if not self.status['text']: - # LANG: Status - No station market data from Frontier CAPI - self.status['text'] = _("Station doesn't have a market!") + elif not has_commodities: + # LANG: Status - No station market data from Frontier CAPI + self._handle_status(_("Station doesn't have a market!")) - elif config.get_int('output') & (config.OUT_MKT_CSV | config.OUT_MKT_TD): - # Fixup anomalies in the commodity data + elif output_flags & commodities_flag: + # Fixup anomalies in the comodity data fixed = companion.fixup(data) - if config.get_int('output') & config.OUT_MKT_CSV: + if output_flags & config.OUT_MKT_CSV: commodity.export(fixed, COMMODITY_CSV) - if config.get_int('output') & config.OUT_MKT_TD: + if output_flags & config.OUT_MKT_TD: td.export(fixed) return True + def _handle_status(self, message: str) -> None: + """ + Set the status label text if it's not already set. + + :param message: Status message to display. + """ + if not self.status['text']: + self.status['text'] = message + def capi_request_data(self, event=None) -> None: # noqa: CCR001 """ Perform CAPI data retrieval and associated actions. @@ -1118,13 +1108,13 @@ class AppWindow(object): return if not companion.session.retrying: - if time() < self.capi_query_holdoff_time: # Was invoked by key while in cooldown - if play_sound and (self.capi_query_holdoff_time - time()) < companion.capi_query_cooldown * 0.75: + if time() < self.capi_query_holdoff_time: + # Invoked by key while in cooldown + time_remaining = self.capi_query_holdoff_time - time() + if play_sound and time_remaining < companion.capi_query_cooldown * 0.75: self.status['text'] = '' - hotkeymgr.play_bad() # Don't play sound in first few seconds to prevent repeats - - return - + hotkeymgr.play_bad() + return elif play_sound: hotkeymgr.play_good() @@ -1137,6 +1127,7 @@ class AppWindow(object): logger.trace_if('capi.worker', 'Requesting full station data') config.set('querytime', query_time) logger.trace_if('capi.worker', 'Calling companion.session.station') + companion.session.station( query_time=query_time, tk_response_event=self._CAPI_RESPONSE_TK_EVENT_NAME, play_sound=play_sound @@ -1192,12 +1183,16 @@ class AppWindow(object): ) def capi_handle_response(self, event=None): # noqa: C901, CCR001 - """Handle the resulting data from a CAPI query.""" + """ + Handle the resulting data from a CAPI query. + + :param event: generated event details. + """ logger.trace_if('capi.worker', 'Handling response') play_bad: bool = False - err: Optional[str] = None + err: str | None = None - capi_response: Union[companion.EDMCCAPIFailedRequest, companion.EDMCCAPIResponse] + capi_response: companion.EDMCCAPIFailedRequest | companion.EDMCCAPIResponse try: logger.trace_if('capi.worker', 'Pulling answer off queue') capi_response = companion.session.capi_response_queue.get(block=False) @@ -1206,8 +1201,7 @@ class AppWindow(object): if capi_response.exception: raise capi_response.exception - else: - raise ValueError(capi_response.message) + raise ValueError(capi_response.message) logger.trace_if('capi.worker', 'Answer is not a Failure') if not isinstance(capi_response, companion.EDMCCAPIResponse): @@ -1256,8 +1250,8 @@ class AppWindow(object): err = self.status['text'] = _("Where are you?!") # Shouldn't happen elif ( - not capi_response.capi_data.get('ship', {}).get('name') - or not capi_response.capi_data.get('ship', {}).get('modules') + not capi_response.capi_data.get('ship', {}).get('name') + or not capi_response.capi_data.get('ship', {}).get('modules') ): # LANG: We don't know what ship the commander is in, when we should err = self.status['text'] = _("What are you flying?!") # Shouldn't happen @@ -1268,8 +1262,8 @@ class AppWindow(object): raise companion.CmdrError() elif ( - capi_response.auto_update and not monitor.state['OnFoot'] - and not capi_response.capi_data['commander'].get('docked') + capi_response.auto_update and not monitor.state['OnFoot'] + and not capi_response.capi_data['commander'].get('docked') ): # auto update is only when just docked logger.warning(f"{capi_response.auto_update!r} and not {monitor.state['OnFoot']!r} and " @@ -1289,7 +1283,7 @@ class AppWindow(object): f"{monitor.state['OnFoot']!r} and {monitor.state['StationName']!r}") raise companion.ServerLagging() - elif capi_response.capi_data['commander']['docked'] and monitor.state['StationName'] is None: + if capi_response.capi_data['commander']['docked'] and monitor.state['StationName'] is None: # Likely (re-)Embarked on ship docked at an EDO settlement. # Both Disembark and Embark have `"Onstation": false` in Journal. # So there's nothing to tell us which settlement we're (still, @@ -1311,8 +1305,8 @@ class AppWindow(object): raise companion.ServerLagging() elif ( - not monitor.state['OnFoot'] - and capi_response.capi_data['ship']['name'].lower() != monitor.state['ShipType'] + not monitor.state['OnFoot'] + and capi_response.capi_data['ship']['name'].lower() != monitor.state['ShipType'] ): # CAPI ship type must match logger.warning(f"not {monitor.state['OnFoot']!r} and " @@ -1385,9 +1379,12 @@ class AppWindow(object): # TODO: Set status text return - except companion.ServerConnectionError: + except companion.ServerConnectionError as comp_err: # LANG: Frontier CAPI server error when fetching data self.status['text'] = _('Frontier CAPI server error') + logger.warning(f'Exception while contacting server: {comp_err}') + err = self.status['text'] = str(comp_err) + play_bad = True except companion.CredentialsRequireRefresh: # We need to 'close' the auth else it'll see STATE_OK and think login() isn't needed @@ -1425,11 +1422,6 @@ class AppWindow(object): companion.session.invalidate() self.login() - except companion.ServerConnectionError as e: # TODO: unreachable (subclass of ServerLagging -- move to above) - logger.warning(f'Exception while contacting server: {e}') - err = self.status['text'] = str(e) - play_bad = True - except Exception as e: # Including CredentialsError, ServerError logger.debug('"other" exception', exc_info=e) err = self.status['text'] = str(e) @@ -1448,7 +1440,7 @@ class AppWindow(object): self.cooldown() logger.trace_if('capi.worker', '...done') - def journal_event(self, event): # noqa: C901, CCR001 # Currently not easily broken up. + def journal_event(self, event: str): # noqa: C901, CCR001 # Currently not easily broken up. """ Handle a Journal event passed through event queue from monitor.py. @@ -1576,7 +1568,7 @@ class AppWindow(object): logger.info('StartUp or LoadGame event') # Disable WinSparkle automatic update checks, IFF configured to do so when in-game - if config.get_int('disable_autoappupdatecheckingame') and 1: + if config.get_int('disable_autoappupdatecheckingame'): if self.updater is not None: self.updater.set_automatic_updates_check(False) @@ -1721,27 +1713,27 @@ class AppWindow(object): # Avoid file length limits if possible provider = config.get_str('shipyard_provider', default='EDSY') target = plug.invoke(provider, 'EDSY', 'shipyard_url', loadout, monitor.is_beta) - file_name = join(config.app_dir_path, "last_shipyard.html") + file_name = path.join(config.app_dir_path, "last_shipyard.html") with open(file_name, 'w') as f: - print(SHIPYARD_HTML_TEMPLATE.format( + f.write(SHIPYARD_HTML_TEMPLATE.format( link=html.escape(str(target)), provider_name=html.escape(str(provider)), ship_name=html.escape(str(shipname)) - ), file=f) + )) return f'file://localhost/{file_name}' def system_url(self, system: str) -> str | None: """Despatch a system URL to the configured handler.""" return plug.invoke( - config.get_str('system_provider'), 'EDSM', 'system_url', monitor.state['SystemName'] + config.get_str('system_provider', default='EDSM'), 'EDSM', 'system_url', monitor.state['SystemName'] ) def station_url(self, station: str) -> str | None: """Despatch a station URL to the configured handler.""" return plug.invoke( - config.get_str('station_provider'), 'EDSM', 'station_url', + config.get_str('station_provider', default='EDSM'), 'EDSM', 'station_url', monitor.state['SystemName'], monitor.state['StationName'] ) @@ -1749,12 +1741,10 @@ class AppWindow(object): """Display and update the cooldown timer for 'Update' button.""" if time() < self.capi_query_holdoff_time: # Update button in main window - self.button['text'] = self.theme_button['text'] \ - = _('cooldown {SS}s').format( # LANG: Cooldown on 'Update' button - SS=int(self.capi_query_holdoff_time - time()) - ) + cooldown_time = int(self.capi_query_holdoff_time - time()) + # LANG: Cooldown on 'Update' button + self.button['text'] = self.theme_button['text'] = _('cooldown {SS}s').format(SS=cooldown_time) self.w.after(1000, self.cooldown) - else: self.button['text'] = self.theme_button['text'] = _('Update') # LANG: Update button in main window self.button['state'] = self.theme_button['state'] = ( @@ -1775,11 +1765,10 @@ class AppWindow(object): def copy(self, event=None) -> None: """Copy system, and possible station, name to clipboard.""" if monitor.state['SystemName']: + clipboard_text = f"{monitor.state['SystemName']},{monitor.state['StationName']}" if monitor.state[ + 'StationName'] else monitor.state['SystemName'] self.w.clipboard_clear() - self.w.clipboard_append( - f"{monitor.state['SystemName']},{monitor.state['StationName']}" if monitor.state['StationName'] - else monitor.state['SystemName'] - ) + self.w.clipboard_append(clipboard_text) def help_general(self, event=None) -> None: """Open Wiki Help page in browser.""" @@ -1805,9 +1794,14 @@ class AppWindow(object): class HelpAbout(tk.Toplevel): """The applications Help > About popup.""" - showing = False + showing: bool = False - def __init__(self, parent: tk.Tk): + def __init__(self, parent: tk.Tk) -> None: + """ + Initialize the HelpAbout popup. + + :param parent: The parent Tk window. + """ if self.__class__.showing: return @@ -1848,11 +1842,11 @@ class AppWindow(object): # version tk.Label(frame).grid(row=row, column=0) # spacer row += 1 - self.appversion_label = tk.Text(frame, height=1, width=len(str(appversion())), wrap=tkc.NONE, bd=0) + self.appversion_label = tk.Text(frame, height=1, width=len(str(appversion())), wrap=tk.NONE, bd=0) self.appversion_label.insert("1.0", str(appversion())) self.appversion_label.tag_configure("center", justify="center") self.appversion_label.tag_add("center", "1.0", "end") - self.appversion_label.config(state=tkc.DISABLED, bg=frame.cget("background"), font="TkDefaultFont") + self.appversion_label.config(state=tk.DISABLED, bg=frame.cget("background"), font="TkDefaultFont") self.appversion_label.grid(row=row, column=0, sticky=tk.E) # LANG: Help > Release Notes self.appversion = HyperlinkLabel(frame, compound=tk.RIGHT, text=_('Release Notes'), @@ -2068,7 +2062,7 @@ Locale LC_TIME: {locale.getlocale(locale.LC_TIME)}''' ) -def setup_killswitches(filename: Optional[str]): +def setup_killswitches(filename: str | None): """Download and setup the main killswitch list.""" logger.debug('fetching killswitches...') if filename is not None: @@ -2130,7 +2124,7 @@ sys.path: {sys.path}''' config.delete('font_size', suppress=True) config.set('ui_scale', 100) # 100% is the default here - config.delete('geometry', suppress=True) # unset is recreated by other code + config.delete('geometry', suppress=True) # unset is recreated by other code logger.info('reset theme, transparency, font, font size, ui scale, and ui geometry to default.') @@ -2167,12 +2161,12 @@ sys.path: {sys.path}''' # # Windows 19, 1903 was build 18362 if ( - sys.platform != 'win32' - or ( - windows_ver.major == 10 - and windows_ver.build >= 18362 - ) - or windows_ver.major > 10 # Paranoid future check + sys.platform != 'win32' + or ( + windows_ver.major == 10 + and windows_ver.build >= 18362 + ) + or windows_ver.major > 10 # Paranoid future check ): # Set that same language, but utf8 encoding (it was probably cp1252 # or equivalent for other languages). @@ -2209,10 +2203,10 @@ sys.path: {sys.path}''' # logger.debug('Test from __main__') # test_logging() - class A(object): + class A: """Simple top-level class.""" - class B(object): + class B: """Simple second-level class.""" def __init__(self): @@ -2268,7 +2262,7 @@ sys.path: {sys.path}''' def messagebox_not_py3(): """Display message about plugins not updated for Python 3.x.""" plugins_not_py3_last = config.get_int('plugins_not_py3_last', default=0) - if (plugins_not_py3_last + 86400) < int(time()) and len(plug.PLUGINS_not_py3): + if (plugins_not_py3_last + 86400) < int(time()) and plug.PLUGINS_not_py3: # LANG: Popup-text about 'active' plugins without Python 3.x support popup_text = _( "One or more of your enabled plugins do not yet have support for Python 3.x. Please see the " @@ -2302,9 +2296,11 @@ sys.path: {sys.path}''' ui_transparency = 100 root.wm_attributes('-alpha', ui_transparency / 100) - + # Display message box about plugins without Python 3.x support root.after(0, messagebox_not_py3) + # Show warning popup for killswitches matching current version root.after(1, show_killswitch_poppup, root) + # Start the main event loop root.mainloop() logger.info('Exiting') diff --git a/dashboard.py b/dashboard.py index 715fd230..1f95a943 100644 --- a/dashboard.py +++ b/dashboard.py @@ -1,32 +1,32 @@ -"""Handle the game Status.json file.""" +""" +dashboard.py - Handle the game Status.json file. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations import json -import pathlib import sys import time import tkinter as tk from calendar import timegm -from os.path import getsize, isdir, isfile -from typing import Any, Dict, Optional, cast - +from os.path import getsize, isdir, isfile, join +from typing import Any, cast from watchdog.observers.api import BaseObserver - from config import config from EDMCLogging import get_main_logger logger = get_main_logger() -if sys.platform == 'darwin': +if sys.platform in ('darwin', 'win32'): from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer - -elif sys.platform == 'win32': - from watchdog.events import FileSystemEventHandler - from watchdog.observers import Observer - else: # Linux's inotify doesn't work over CIFS or NFS, so poll - FileSystemEventHandler = object # dummy + class FileSystemEventHandler: # type: ignore + """Dummy class to represent a file system event handler on platforms other than macOS and Windows.""" class Dashboard(FileSystemEventHandler): @@ -39,9 +39,9 @@ class Dashboard(FileSystemEventHandler): self.session_start: int = int(time.time()) self.root: tk.Tk = None # type: ignore self.currentdir: str = None # type: ignore # The actual logdir that we're monitoring - self.observer: Optional[Observer] = None # type: ignore + self.observer: Observer | None = None # type: ignore self.observed = None # a watchdog ObservedWatch, or None if polling - self.status: Dict[str, Any] = {} # Current status for communicating status back to main thread + self.status: dict[str, Any] = {} # Current status for communicating status back to main thread def start(self, root: tk.Tk, started: int) -> bool: """ @@ -56,10 +56,8 @@ class Dashboard(FileSystemEventHandler): self.session_start = started logdir = config.get_str('journaldir', default=config.default_journal_dir) - if logdir == '': - logdir = config.default_journal_dir - - if not logdir or not isdir(logdir): + logdir = logdir or config.default_journal_dir + if not isdir(logdir): logger.info(f"No logdir, or it isn't a directory: {logdir=}") self.stop() return False @@ -74,7 +72,7 @@ class Dashboard(FileSystemEventHandler): # File system events are unreliable/non-existent over network drives on Linux. # We can't easily tell whether a path points to a network drive, so assume # any non-standard logdir might be on a network drive and poll instead. - if not (sys.platform != 'win32') and not self.observer: + if sys.platform == 'win32' and not self.observer: logger.debug('Setting up observer...') self.observer = Observer() self.observer.daemon = True @@ -87,7 +85,7 @@ class Dashboard(FileSystemEventHandler): self.observer = None # type: ignore logger.debug('Done') - if not self.observed and not (sys.platform != 'win32'): + if not self.observed and sys.platform == 'win32': logger.debug('Starting observer...') self.observed = cast(BaseObserver, self.observer).schedule(self, self.currentdir) logger.debug('Done') @@ -178,22 +176,17 @@ class Dashboard(FileSystemEventHandler): """ if config.shutting_down: return - try: - with (pathlib.Path(self.currentdir) / 'Status.json').open('rb') as h: + status_json_path = join(self.currentdir, 'Status.json') + with open(status_json_path, 'rb') as h: data = h.read().strip() - if data: # Can be empty if polling while the file is being re-written entry = json.loads(data) - - # Status file is shared between beta and live. So filter out status not in this game session. - if ( - timegm(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ')) >= self.session_start - and self.status != entry - ): + # Status file is shared between beta and live. Filter out status not in this game session. + entry_timestamp = timegm(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ')) + if entry_timestamp >= self.session_start and self.status != entry: self.status = entry self.root.event_generate('<>', when="tail") - except Exception: logger.exception('Processing Status.json') diff --git a/killswitch.py b/killswitch.py index 42d02844..9cc407a6 100644 --- a/killswitch.py +++ b/killswitch.py @@ -1,18 +1,22 @@ -"""Fetch kill switches from EDMC Repo.""" +""" +killswitch.py - Fetch kill switches from EDMC Repo. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" from __future__ import annotations import json import threading from copy import deepcopy from typing import ( - TYPE_CHECKING, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Sequence, - Tuple, TypedDict, TypeVar, Union, cast + TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, MutableSequence, NamedTuple, Sequence, + TypedDict, TypeVar, cast, Union ) - import requests import semantic_version from semantic_version.base import Version - import config import EDMCLogging @@ -21,7 +25,7 @@ logger = EDMCLogging.get_main_logger() OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json' DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json' CURRENT_KILLSWITCH_VERSION = 2 -UPDATABLE_DATA = Union[Mapping, Sequence] +UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style _current_version: semantic_version.Version = config.appversion_nobuild() T = TypeVar('T', bound=UPDATABLE_DATA) @@ -32,13 +36,13 @@ class SingleKill(NamedTuple): match: str reason: str - redact_fields: Optional[List[str]] = None - delete_fields: Optional[List[str]] = None - set_fields: Optional[Dict[str, Any]] = None + redact_fields: list[str] | None = None + delete_fields: list[str] | None = None + set_fields: dict[str, Any] | None = None @property def has_rules(self) -> bool: - """Return whether or not this SingleKill can apply rules to a dict to make it safe to use.""" + """Return whether this SingleKill can apply rules to a dict to make it safe to use.""" return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields)) def apply_rules(self, target: T) -> T: @@ -119,9 +123,9 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): # it exists on this level, dont go further break - elif isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()): + if isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()): # there is a dotted key in here that can be used for this - # if theres a dotted key in here (must be a mapping), use that if we can + # if there's a dotted key in here (must be a mapping), use that if we can keys = current.keys() for k in filter(lambda x: '.' in x, keys): @@ -140,7 +144,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): key, _, path = path.partition('.') if isinstance(current, Mapping): - current = current[key] # type: ignore # I really dont know at this point what you want from me mypy. + current = current[key] # type: ignore # I really don't know at this point what you want from me mypy. elif isinstance(current, Sequence): target_idx = _get_int(key) # mypy is broken. doesn't like := here. @@ -155,7 +159,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): _apply(current, path, to_set, delete) -def _get_int(s: str) -> Optional[int]: +def _get_int(s: str) -> int | None: try: return int(s) except ValueError: @@ -166,7 +170,7 @@ class KillSwitches(NamedTuple): """One version's set of kill switches.""" version: semantic_version.SimpleSpec - kills: Dict[str, SingleKill] + kills: dict[str, SingleKill] @staticmethod def from_dict(data: KillSwitchSetJSON) -> KillSwitches: @@ -189,7 +193,7 @@ class DisabledResult(NamedTuple): """DisabledResult is the result returned from various is_disabled calls.""" disabled: bool - kill: Optional[SingleKill] + kill: SingleKill | None @property def reason(self) -> str: @@ -197,11 +201,11 @@ class DisabledResult(NamedTuple): return self.kill.reason if self.kill is not None else "" def has_kill(self) -> bool: - """Return whether or not this DisabledResult has a Kill associated with it.""" + """Return whether this DisabledResult has a Kill associated with it.""" return self.kill is not None def has_rules(self) -> bool: - """Return whether or not the kill on this Result contains rules.""" + """Return whether the kill on this Result contains rules.""" # HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always # be non-None at the point it is evaluated return self.has_kill() and self.kill.has_rules # type: ignore @@ -210,12 +214,12 @@ class DisabledResult(NamedTuple): class KillSwitchSet: """Queryable set of kill switches.""" - def __init__(self, kill_switches: List[KillSwitches]) -> None: + def __init__(self, kill_switches: list[KillSwitches]) -> None: self.kill_switches = kill_switches def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult: """ - Return whether or not the given feature ID is disabled by a killswitch for the given version. + Return whether the given feature ID is disabled by a killswitch for the given version. :param id: The feature ID to check :param version: The version to check killswitches for, defaults to the @@ -234,14 +238,14 @@ class KillSwitchSet: return DisabledResult(False, None) def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool: - """Return whether or not a given feature ID is disabled for the given version.""" + """Return whether a given feature ID is disabled for the given version.""" return self.get_disabled(id, version=version).disabled def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str: """Return a reason for why the given id is disabled for the given version, if any.""" return self.get_disabled(id, version=version).reason - def kills_for_version(self, version: semantic_version.Version = _current_version) -> List[KillSwitches]: + def kills_for_version(self, version: semantic_version.Version = _current_version) -> list[KillSwitches]: """ Get all killswitch entries that apply to the given version. @@ -252,9 +256,9 @@ class KillSwitchSet: def check_killswitch( self, name: str, data: T, log=logger, version=_current_version - ) -> Tuple[bool, T]: + ) -> tuple[bool, T]: """ - Check whether or not a killswitch is enabled. If it is, apply rules if any. + Check whether a killswitch is enabled. If it is, apply rules if any. :param name: The killswitch to check :param data: The data to modify if needed @@ -283,7 +287,7 @@ class KillSwitchSet: log.info('Rules applied successfully, allowing execution to continue') return False, new_data - def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> Tuple[bool, T]: + def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> tuple[bool, T]: """ Check multiple killswitches in order. @@ -323,16 +327,16 @@ class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101 class KillSwitchSetJSON(TypedDict): # noqa: D101 version: str - kills: Dict[str, SingleKillSwitchJSON] + kills: dict[str, SingleKillSwitchJSON] class KillSwitchJSONFile(TypedDict): # noqa: D101 version: int last_updated: str - kill_switches: List[KillSwitchSetJSON] + kill_switches: list[KillSwitchSetJSON] -def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSONFile]: +def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> KillSwitchJSONFile | None: """ Fetch the JSON representation of our kill switches. @@ -343,7 +347,7 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO if target.startswith('file:'): target = target.replace('file:', '') try: - with open(target, 'r') as t: + with open(target) as t: return json.load(t) except FileNotFoundError: @@ -366,13 +370,13 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO class _KillSwitchV1(TypedDict): version: str - kills: Dict[str, str] + kills: dict[str, str] class _KillSwitchJSONFileV1(TypedDict): version: int last_updated: str - kill_switches: List[_KillSwitchV1] + kill_switches: list[_KillSwitchV1] def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile: @@ -402,7 +406,7 @@ def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile: raise ValueError(f'Unknown Killswitch version {data["version"]}') -def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]: +def parse_kill_switches(data: KillSwitchJSONFile) -> list[KillSwitches]: """ Parse kill switch dict to List of KillSwitches. @@ -431,7 +435,7 @@ def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]: return out -def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = None) -> Optional[KillSwitchSet]: +def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: str | None = None) -> KillSwitchSet | None: """ Get a kill switch set object. @@ -451,7 +455,7 @@ def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = N def get_kill_switches_thread( - target, callback: Callable[[Optional[KillSwitchSet]], None], fallback: Optional[str] = None, + target, callback: Callable[[KillSwitchSet | None], None], fallback: str | None = None, ) -> None: """ Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available. @@ -469,7 +473,7 @@ def get_kill_switches_thread( active: KillSwitchSet = KillSwitchSet([]) -def setup_main_list(filename: Optional[str]): +def setup_main_list(filename: str | None): """ Set up the global set of kill switches for querying. @@ -498,7 +502,7 @@ def get_disabled(id: str, *, version: semantic_version.Version = _current_versio return active.get_disabled(id, version=version) -def check_killswitch(name: str, data: T, log=logger) -> Tuple[bool, T]: +def check_killswitch(name: str, data: T, log=logger) -> tuple[bool, T]: """Query the global KillSwitchSet#check_killswitch method.""" return active.check_killswitch(name, data, log) @@ -518,6 +522,6 @@ def get_reason(id: str, *, version: semantic_version.Version = _current_version) return active.get_reason(id, version=version) -def kills_for_version(version: semantic_version.Version = _current_version) -> List[KillSwitches]: +def kills_for_version(version: semantic_version.Version = _current_version) -> list[KillSwitches]: """Query the global KillSwitchSet for kills matching a particular version.""" return active.kills_for_version(version) diff --git a/l10n.py b/l10n.py index e2c1143f..fc144b77 100755 --- a/l10n.py +++ b/l10n.py @@ -1,18 +1,27 @@ -#!/usr/bin/env python3 -"""Localization with gettext is a pain on non-Unix systems. Use OSX-style strings files instead.""" +""" +l10n.py - Localize using OSX-Style Strings. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. + +Localization with gettext is a pain on non-Unix systems. +""" +from __future__ import annotations import builtins import locale import numbers -import os -import pathlib import re import sys import warnings from collections import OrderedDict from contextlib import suppress -from os.path import basename, dirname, isdir, isfile, join -from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, TextIO, Union, cast +from os import pardir, listdir, sep, makedirs +from os.path import basename, dirname, isdir, isfile, join, abspath, exists +from typing import TYPE_CHECKING, Iterable, TextIO, cast +from config import config +from EDMCLogging import get_main_logger if TYPE_CHECKING: def _(x: str) -> str: ... @@ -20,22 +29,16 @@ if TYPE_CHECKING: # Note that this is also done in EDMarketConnector.py, and thus removing this here may not have a desired effect try: locale.setlocale(locale.LC_ALL, '') - except Exception: # Locale env variables incorrect or locale package not installed/configured on Linux, mysterious reasons on Windows print("Can't set locale!") -from config import config -from EDMCLogging import get_main_logger - logger = get_main_logger() - # Language name LANGUAGE_ID = '!Language' LOCALISATION_DIR = 'L10n' - if sys.platform == 'darwin': from Foundation import ( # type: ignore # exists on Darwin NSLocale, NSNumberFormatter, NSNumberFormatterDecimalStyle @@ -68,11 +71,11 @@ class _Translations: FALLBACK = 'en' # strings in this code are in English FALLBACK_NAME = 'English' - TRANS_RE = re.compile(r'\s*"((?:[^"]|(?:\"))+)"\s*=\s*"((?:[^"]|(?:\"))+)"\s*;\s*$') + TRANS_RE = re.compile(r'\s*"((?:[^"]|\")+)"\s*=\s*"((?:[^"]|\")+)"\s*;\s*$') COMMENT_RE = re.compile(r'\s*/\*.*\*/\s*$') def __init__(self) -> None: - self.translations: Dict[Optional[str], Dict[str, str]] = {None: {}} + self.translations: dict[str | None, dict[str, str]] = {None: {}} def install_dummy(self) -> None: """ @@ -112,7 +115,7 @@ class _Translations: return self.translations = {None: self.contents(cast(str, lang))} - for plugin in os.listdir(config.plugin_dir_path): + for plugin in listdir(config.plugin_dir_path): plugin_path = join(config.plugin_dir_path, plugin, LOCALISATION_DIR) if isdir(plugin_path): try: @@ -126,7 +129,7 @@ class _Translations: builtins.__dict__['_'] = self.translate - def contents(self, lang: str, plugin_path: Optional[str] = None) -> Dict[str, str]: + def contents(self, lang: str, plugin_path: str | None = None) -> dict[str, str]: """Load all the translations from a translation file.""" assert lang in self.available() translations = {} @@ -151,7 +154,7 @@ class _Translations: return translations - def translate(self, x: str, context: Optional[str] = None) -> str: + def translate(self, x: str, context: str | None = None) -> str: """ Translate the given string to the current lang. @@ -161,7 +164,7 @@ class _Translations: """ if context: # TODO: There is probably a better way to go about this now. - context = context[len(config.plugin_dir)+1:].split(os.sep)[0] + context = context[len(config.plugin_dir)+1:].split(sep)[0] if self.translations[None] and context not in self.translations: logger.debug(f'No translations for {context!r}') @@ -172,23 +175,23 @@ class _Translations: return self.translations[None].get(x) or str(x).replace(r'\"', '"').replace('{CR}', '\n') - def available(self) -> Set[str]: + def available(self) -> set[str]: """Return a list of available language codes.""" path = self.respath() if getattr(sys, 'frozen', False) and sys.platform == 'darwin': available = { - x[:-len('.lproj')] for x in os.listdir(path) + x[:-len('.lproj')] for x in listdir(path) if x.endswith('.lproj') and isfile(join(x, 'Localizable.strings')) } else: - available = {x[:-len('.strings')] for x in os.listdir(path) if x.endswith('.strings')} + available = {x[:-len('.strings')] for x in listdir(path) if x.endswith('.strings')} return available - def available_names(self) -> Dict[Optional[str], str]: + def available_names(self) -> dict[str | None, str]: """Available language names by code.""" - names: Dict[Optional[str], str] = OrderedDict([ + names: dict[str | None, str] = OrderedDict([ # LANG: The system default language choice in Settings > Appearance (None, _('Default')), # Appearance theme and language setting ]) @@ -200,20 +203,20 @@ class _Translations: return names - def respath(self) -> pathlib.Path: + def respath(self) -> str: """Path to localisation files.""" if getattr(sys, 'frozen', False): if sys.platform == 'darwin': - return (pathlib.Path(sys.executable).parents[0] / os.pardir / 'Resources').resolve() + return abspath(join(dirname(sys.executable), pardir, 'Resources')) - return pathlib.Path(dirname(sys.executable)) / LOCALISATION_DIR + return abspath(join(dirname(sys.executable), LOCALISATION_DIR)) - elif __file__: - return pathlib.Path(__file__).parents[0] / LOCALISATION_DIR + if __file__: + return abspath(join(dirname(__file__), LOCALISATION_DIR)) - return pathlib.Path(LOCALISATION_DIR) + return abspath(LOCALISATION_DIR) - def file(self, lang: str, plugin_path: Optional[str] = None) -> Optional[TextIO]: + def file(self, lang: str, plugin_path: str | None = None) -> TextIO | None: """ Open the given lang file for reading. @@ -222,20 +225,21 @@ class _Translations: :return: the opened file (Note: This should be closed when done) """ if plugin_path: - f = pathlib.Path(plugin_path) / f'{lang}.strings' - if not f.exists(): + file_path = join(plugin_path, f'{lang}.strings') + if not exists(file_path): return None try: - return f.open('r', encoding='utf-8') - + return open(file_path, encoding='utf-8') except OSError: - logger.exception(f'could not open {f}') + logger.exception(f'could not open {file_path}') elif getattr(sys, 'frozen', False) and sys.platform == 'darwin': - return (self.respath() / f'{lang}.lproj' / 'Localizable.strings').open('r', encoding='utf-16') + res_path = join(self.respath(), f'{lang}.lproj', 'Localizable.strings') + return open(res_path, encoding='utf-16') - return (self.respath() / f'{lang}.strings').open('r', encoding='utf-8') + res_path = join(self.respath(), f'{lang}.strings') + return open(res_path, encoding='utf-8') class _Locale: @@ -250,11 +254,11 @@ class _Locale: self.float_formatter.setMinimumFractionDigits_(5) self.float_formatter.setMaximumFractionDigits_(5) - def stringFromNumber(self, number: Union[float, int], decimals: int | None = None) -> str: # noqa: N802 + def stringFromNumber(self, number: float | int, decimals: int | None = None) -> str: # noqa: N802 warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.')) return self.string_from_number(number, decimals) # type: ignore - def numberFromString(self, string: str) -> Union[int, float, None]: # noqa: N802 + def numberFromString(self, string: str) -> int | float | None: # noqa: N802 warnings.warn(DeprecationWarning('use _Locale.number_from_string instead.')) return self.number_from_string(string) @@ -262,7 +266,7 @@ class _Locale: warnings.warn(DeprecationWarning('use _Locale.preferred_languages instead.')) return self.preferred_languages() - def string_from_number(self, number: Union[float, int], decimals: int = 5) -> str: + def string_from_number(self, number: float | int, decimals: int = 5) -> str: """ Convert a number to a string. @@ -285,11 +289,9 @@ class _Locale: if not decimals and isinstance(number, numbers.Integral): return locale.format_string('%d', number, True) + return locale.format_string('%.*f', (decimals, number), True) - else: - return locale.format_string('%.*f', (decimals, number), True) - - def number_from_string(self, string: str) -> Union[int, float, None]: + def number_from_string(self, string: str) -> int | float | None: """ Convert a string to a number using the system locale. @@ -308,7 +310,17 @@ class _Locale: return None - def preferred_languages(self) -> Iterable[str]: # noqa: CCR001 + def wszarray_to_list(self, array): + offset = 0 + while offset < len(array): + sz = ctypes.wstring_at(ctypes.addressof(array) + offset * 2) # type: ignore + if sz: + yield sz + offset += len(sz) + 1 + else: + break + + def preferred_languages(self) -> Iterable[str]: """ Return a list of preferred language codes. @@ -326,32 +338,21 @@ class _Locale: elif sys.platform != 'win32': # POSIX lang = locale.getlocale()[0] - languages = lang and [lang.replace('_', '-')] or [] + languages = [lang.replace('_', '-')] if lang else [] else: - def wszarray_to_list(array): - offset = 0 - while offset < len(array): - sz = ctypes.wstring_at(ctypes.addressof(array) + offset*2) - if sz: - yield sz - offset += len(sz)+1 - - else: - break - num = ctypes.c_ulong() size = ctypes.c_ulong(0) languages = [] if GetUserPreferredUILanguages( - MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size) + MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size) ) and size.value: buf = ctypes.create_unicode_buffer(size.value) if GetUserPreferredUILanguages( - MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size) + MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size) ): - languages = wszarray_to_list(buf) + languages = self.wszarray_to_list(buf) # HACK: | 2021-12-11: OneSky calls "Chinese Simplified" "zh-Hans" # in the name of the file, but that will be zh-CN in terms of @@ -369,14 +370,13 @@ Translations = _Translations() # generate template strings file - like xgettext # parsing is limited - only single ' or " delimited strings, and only one string per line if __name__ == "__main__": - import re regexp = re.compile(r'''_\([ur]?(['"])(((? None: +def export(data: companion.CAPIData, requested_filename: str | None = None) -> None: """ Write Ship Loadout in Companion API JSON format. @@ -32,15 +36,14 @@ def export(data: companion.CAPIData, requested_filename: Optional[str] = None) - with open(requested_filename, 'wt') as h: h.write(string) return - - elif not requested_filename: + if not requested_filename: logger.error(f"{requested_filename=} is not valid") return # Look for last ship of this type ship = util_ships.ship_file_name(data['ship'].get('shipName'), data['ship']['name']) - regexp = re.compile(re.escape(ship) + r'\.\d\d\d\d\-\d\d\-\d\dT\d\d\.\d\d\.\d\d\.txt') - oldfiles = sorted([x for x in os.listdir(config.get_str('outdir')) if regexp.match(x)]) + regexp = re.compile(re.escape(ship) + r'\.\d\d\d\d-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt') + oldfiles = sorted([x for x in listdir(config.get_str('outdir')) if regexp.match(x)]) if oldfiles: with open(join(config.get_str('outdir'), oldfiles[-1]), 'rU') as h: if h.read() == string: @@ -50,10 +53,9 @@ def export(data: companion.CAPIData, requested_filename: Optional[str] = None) - # Write - with open( - pathlib.Path(config.get_str('outdir')) / pathlib.Path( - ship + '.' + time.strftime('%Y-%m-%dT%H.%M.%S', time.localtime(query_time)) + '.txt' - ), - 'wt' - ) as h: + output_directory = config.get_str('outdir') + ship_time = time.strftime('%Y-%m-%dT%H.%M.%S', time.localtime(query_time)) + file_path = join(output_directory, f"{ship}.{ship_time}.txt") + + with open(file_path, 'wt') as h: h.write(string) diff --git a/protocol.py b/protocol.py index cf3a9f75..f3b956d2 100644 --- a/protocol.py +++ b/protocol.py @@ -1,12 +1,17 @@ -"""protocol handler for cAPI authorisation.""" -# spell-checker: words ntdll GURL alloc wfile instantiatable pyright +""" +protocol.py - Protocol Handler for cAPI Auth. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations + import os import sys import threading -import urllib.error -import urllib.parse -import urllib.request -from typing import TYPE_CHECKING, Optional, Type +from urllib import parse +from typing import TYPE_CHECKING, Type from config import config from constants import appname, protocolhandler_redirect @@ -34,7 +39,7 @@ class GenericProtocolHandler: def __init__(self) -> None: self.redirect = protocolhandler_redirect # Base redirection URL self.master: 'tkinter.Tk' = None # type: ignore - self.lastpayload: Optional[str] = None + self.lastpayload: str | None = None def start(self, master: 'tkinter.Tk') -> None: """Start Protocol Handler.""" @@ -42,7 +47,6 @@ class GenericProtocolHandler: def close(self) -> None: """Stop / Close Protocol Handler.""" - pass def event(self, url: str) -> None: """Generate an auth event.""" @@ -75,7 +79,7 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i def start(self, master: 'tkinter.Tk') -> None: """Start Protocol Handler.""" GenericProtocolHandler.start(self, master) - self.lasturl: Optional[str] = None + self.lasturl: str | None = None self.eventhandler = EventHandler.alloc().init() def poll(self) -> None: @@ -106,12 +110,11 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i def handleEvent_withReplyEvent_(self, event, replyEvent) -> None: # noqa: N802 N803 # Required to override """Actual event handling from NSAppleEventManager.""" - protocolhandler.lasturl = urllib.parse.unquote( # noqa: F821: type: ignore # It's going to be a DPH in - # this code + protocolhandler.lasturl = parse.unquote( event.paramDescriptorForKeyword_(keyDirectObject).stringValue() ).strip() - protocolhandler.master.after(DarwinProtocolHandler.POLL, protocolhandler.poll) # noqa: F821 # type: ignore + protocolhandler.master.after(DarwinProtocolHandler.POLL, protocolhandler.poll) elif (config.auth_force_edmc_protocol @@ -124,9 +127,8 @@ elif (config.auth_force_edmc_protocol # This could be false if you use auth_force_edmc_protocol, but then you get to keep the pieces assert sys.platform == 'win32' # spell-checker: words HBRUSH HICON WPARAM wstring WNDCLASS HMENU HGLOBAL - from ctypes import windll # type: ignore from ctypes import ( # type: ignore - POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at + windll, POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at ) from ctypes.wintypes import ( ATOM, BOOL, DWORD, HBRUSH, HGLOBAL, HICON, HINSTANCE, HMENU, HWND, INT, LPARAM, LPCWSTR, LPMSG, LPVOID, LPWSTR, @@ -271,7 +273,7 @@ elif (config.auth_force_edmc_protocol def __init__(self) -> None: super().__init__() - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None def start(self, master: 'tkinter.Tk') -> None: """Start the DDE thread.""" @@ -342,7 +344,7 @@ elif (config.auth_force_edmc_protocol if args.lower().startswith('open("') and args.endswith('")'): logger.trace_if('frontier-auth.windows', f'args are: {args}') - url = urllib.parse.unquote(args[6:-2]).strip() + url = parse.unquote(args[6:-2]).strip() if url.startswith(self.redirect): logger.debug(f'Message starts with {self.redirect}') self.event(url) @@ -381,7 +383,7 @@ else: # Linux / Run from source if not os.getenv("EDMC_NO_UI"): logger.info(f'Web server listening on {self.redirect}') - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None def start(self, master: 'tkinter.Tk') -> None: """Start the HTTP server thread.""" @@ -420,15 +422,14 @@ else: # Linux / Run from source def parse(self) -> bool: """Parse a request.""" logger.trace_if('frontier-auth.http', f'Got message on path: {self.path}') - url = urllib.parse.unquote(self.path) + url = parse.unquote(self.path) if url.startswith('/auth'): logger.debug('Request starts with /auth, sending to protocolhandler.event()') - protocolhandler.event(url) # noqa: F821 + protocolhandler.event(url) self.send_response(200) return True - else: - self.send_response(404) # Not found - return False + self.send_response(404) # Not found + return False def do_HEAD(self) -> None: # noqa: N802 # Required to override """Handle HEAD Request.""" @@ -440,14 +441,37 @@ else: # Linux / Run from source if self.parse(): self.send_header('Content-Type', 'text/html') self.end_headers() - self.wfile.write('Authentication successful'.encode('utf-8')) - self.wfile.write('

Authentication successful

'.encode('utf-8')) + self.wfile.write(self._generate_auth_response().encode('utf-8')) else: + self.send_response(404) self.end_headers() def log_request(self, code: int | str = '-', size: int | str = '-') -> None: """Override to prevent logging.""" - pass + + def _generate_auth_response(self) -> str: + """ + Generate the authentication response HTML. + + :return: The HTML content of the authentication response. + """ + return ( + '' + '' + 'Authentication successful - Elite: Dangerous' + '' + '' + '' + '

Authentication successful

' + '

Thank you for authenticating.

' + '

Please close this browser tab now.

' + '' + '' + ) def get_handler_impl() -> Type[GenericProtocolHandler]: @@ -459,14 +483,13 @@ def get_handler_impl() -> Type[GenericProtocolHandler]: if sys.platform == 'darwin' and getattr(sys, 'frozen', False): return DarwinProtocolHandler # pyright: reportUnboundVariable=false - elif ( + if ( (sys.platform == 'win32' and config.auth_force_edmc_protocol) or (getattr(sys, 'frozen', False) and not is_wine and not config.auth_force_localserver) ): return WindowsProtocolHandler - else: - return LinuxProtocolHandler + return LinuxProtocolHandler # *late init* singleton diff --git a/ttkHyperlinkLabel.py b/ttkHyperlinkLabel.py index 1d9749ac..24470a93 100644 --- a/ttkHyperlinkLabel.py +++ b/ttkHyperlinkLabel.py @@ -1,5 +1,9 @@ """ -A clickable ttk label for HTTP links. +ttkHyperlinkLabel.py - Clickable ttk labels. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. In addition to standard ttk.Label arguments, takes the following arguments: url: The URL as a string that the user will be sent to on clicking on @@ -14,6 +18,8 @@ In addition to standard ttk.Label arguments, takes the following arguments: May be imported by plugins """ +from __future__ import annotations + import sys import tkinter as tk import webbrowser @@ -26,14 +32,22 @@ if TYPE_CHECKING: # FIXME: Split this into multi-file module to separate the platforms -class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object): # type: ignore +class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label): # type: ignore """Clickable label for HTTP links.""" def __init__(self, master: tk.Frame | None = None, **kw: Any) -> None: - self.url = 'url' in kw and kw.pop('url') or None + """ + Initialize the HyperlinkLabel. + + :param master: The master widget. + :param kw: Additional keyword arguments. + """ + self.font_u: tk_font.Font + self.font_n = None + self.url = kw.pop('url', None) self.popup_copy = kw.pop('popup_copy', False) self.underline = kw.pop('underline', None) # override ttk.Label's underline - self.foreground = kw.get('foreground') or 'blue' + self.foreground = kw.get('foreground', 'blue') self.disabledforeground = kw.pop('disabledforeground', ttk.Style().lookup( 'TLabel', 'foreground', ('disabled',))) # ttk.Label doesn't support disabledforeground option @@ -44,11 +58,11 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object) tk.Label.__init__(self, master, **kw) else: - ttk.Label.__init__(self, master, **kw) # type: ignore + ttk.Label.__init__(self, master, **kw) self.bind('', self._click) - self.menu = tk.Menu(None, tearoff=tk.FALSE) + self.menu = tk.Menu(tearoff=tk.FALSE) # LANG: Label for 'Copy' as in 'Copy and Paste' self.menu.add_command(label=_('Copy'), command=self.copy) # As in Copy and Paste self.bind(sys.platform == 'darwin' and '' or '', self._contextmenu) @@ -74,29 +88,30 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object) setattr(self, thing, kw[thing]) # Emulate disabledforeground option for ttk.Label - if kw.get('state') == tk.DISABLED: - if 'foreground' not in kw: + if 'state' in kw: + state = kw['state'] + if state == tk.DISABLED and 'foreground' not in kw: kw['foreground'] = self.disabledforeground - elif 'state' in kw: - if 'foreground' not in kw: + elif state != tk.DISABLED and 'foreground' not in kw: kw['foreground'] = self.foreground if 'font' in kw: self.font_n = kw['font'] self.font_u = tk_font.Font(font=self.font_n) self.font_u.configure(underline=True) - kw['font'] = self.underline is True and self.font_u or self.font_n + kw['font'] = self.font_u if self.underline is True else self.font_n if 'cursor' not in kw: - if (kw['state'] if 'state' in kw else str(self['state'])) == tk.DISABLED: + state = kw.get('state', str(self['state'])) + if state == tk.DISABLED: kw['cursor'] = 'arrow' # System default elif self.url and (kw['text'] if 'text' in kw else self['text']): - kw['cursor'] = sys.platform == 'darwin' and 'pointinghand' or 'hand2' + kw['cursor'] = 'pointinghand' if sys.platform == 'darwin' else 'hand2' else: - kw['cursor'] = (sys.platform == 'darwin' and 'notallowed') or ( - sys.platform == 'win32' and 'no') or 'circle' + kw['cursor'] = 'notallowed' if sys.platform == 'darwin' else ( + 'no' if sys.platform == 'win32' else 'circle') - return super(HyperlinkLabel, self).configure(cnf, **kw) + return super().configure(cnf, **kw) def __setitem__(self, key: str, value: Any) -> None: """ @@ -105,15 +120,15 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object) :param key: option name :param value: option value """ - self.configure(None, **{key: value}) + self.configure(**{key: value}) def _enter(self, event: tk.Event) -> None: if self.url and self.underline is not False and str(self['state']) != tk.DISABLED: - super(HyperlinkLabel, self).configure(font=self.font_u) + super().configure(font=self.font_u) def _leave(self, event: tk.Event) -> None: if not self.underline: - super(HyperlinkLabel, self).configure(font=self.font_n) + super().configure(font=self.font_n) def _click(self, event: tk.Event) -> None: if self.url and self['text'] and str(self['state']) != tk.DISABLED: