1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-15 00:30:33 +03:00

Merge pull request #2100 from HullSeals/enhancement/2051/main_files

[2051] Additional Core Files
This commit is contained in:
Phoebe 2023-11-22 16:31:30 +01:00 committed by GitHub
commit 467779e3ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 447 additions and 423 deletions

71
EDMC.py
View File

@ -1,6 +1,11 @@
#!/usr/bin/env python3
"""Command-line interface. Requires prior setup through the GUI."""
"""
EDMC.py - Command-line interface. Requires prior setup through the GUI.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import argparse
import json
@ -8,24 +13,22 @@ import locale
import os
import queue
import sys
from os.path import getmtime
from time import sleep, time
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any
# isort: off
os.environ["EDMC_NO_UI"] = "1"
# See EDMCLogging.py docs.
# workaround for https://github.com/EDCD/EDMarketConnector/issues/568
from EDMCLogging import edmclogger, logger, logging
if TYPE_CHECKING:
from logging import TRACE # type: ignore # noqa: F401 # needed to make mypy happy
edmclogger.set_channels_loglevel(logging.INFO)
# isort: on
import collate
import commodity
import companion
@ -45,6 +48,8 @@ sys.path.append(config.internal_plugin_dir)
# The sys.path.append has to be after `import sys` and `from config import config`
# isort: off
import eddn # noqa: E402
# isort: on
@ -66,7 +71,7 @@ EXIT_SUCCESS, EXIT_SERVER, EXIT_CREDENTIALS, EXIT_VERIFICATION, EXIT_LAGGING, EX
EXIT_JOURNAL_READ_ERR, EXIT_COMMANDER_UNKNOWN = range(9)
def versioncmp(versionstring) -> List:
def versioncmp(versionstring) -> list:
"""Quick and dirty version comparison assuming "strict" numeric only version numbers."""
return list(map(int, versionstring.split('.')))
@ -98,9 +103,7 @@ def deep_get(target: dict | companion.CAPIData, *args: str, default=None) -> Any
res = current.get(arg)
if res is None:
return default
current = res
return current
@ -155,16 +158,15 @@ def main(): # noqa: C901, CCR001
args = parser.parse_args()
if args.version:
updater = Updater(provider='internal')
newversion: Optional[EDMCVersion] = updater.check_appcast()
updater = Updater()
newversion: EDMCVersion | None = updater.check_appcast()
if newversion:
print(f'{appversion()} ({newversion.title!r} is available)')
else:
print(appversion())
return
level_to_set: Optional[int] = None
level_to_set: int | None = None
if args.trace or args.trace_on:
level_to_set = logging.TRACE # type: ignore # it exists
logger.info('Setting TRACE level debugging due to either --trace or a --trace-on')
@ -185,10 +187,10 @@ def main(): # noqa: C901, CCR001
logger.debug(f'Startup v{appversion()} : Running on Python v{sys.version}')
logger.debug(f'''Platform: {sys.platform}
argv[0]: {sys.argv[0]}
exec_prefix: {sys.exec_prefix}
executable: {sys.executable}
sys.path: {sys.path}'''
argv[0]: {sys.argv[0]}
exec_prefix: {sys.exec_prefix}
executable: {sys.executable}
sys.path: {sys.path}'''
)
if args.trace_on and len(args.trace_on) > 0:
import config as conf_module
@ -207,12 +209,14 @@ sys.path: {sys.path}'''
# system, chances are its the current locale, and not utf-8. Otherwise if it was copied, its probably
# utf8. Either way, try the system FIRST because reading something like cp1251 in UTF-8 results in garbage
# but the reverse results in an exception.
json_file = os.path.abspath(args.j)
try:
data = json.load(open(args.j))
with open(json_file) as file_handle:
data = json.load(file_handle)
except UnicodeDecodeError:
data = json.load(open(args.j, encoding='utf-8'))
config.set('querytime', int(getmtime(args.j)))
with open(json_file, encoding='utf-8') as file_handle:
data = json.load(file_handle)
config.set('querytime', int(os.path.getmtime(args.j)))
else:
# Get state from latest Journal file
@ -255,10 +259,8 @@ sys.path: {sys.path}'''
for idx, cmdr in enumerate(cmdrs):
if cmdr.lower() == args.p.lower():
break
else:
raise companion.CredentialsError()
companion.session.login(cmdrs[idx], monitor.is_beta)
else:
@ -292,9 +294,7 @@ sys.path: {sys.path}'''
logger.trace_if('capi.worker', f'Failed Request: {capi_response.message}')
if capi_response.exception:
raise capi_response.exception
else:
raise ValueError(capi_response.message)
raise ValueError(capi_response.message)
logger.trace_if('capi.worker', 'Answer is not a Failure')
if not isinstance(capi_response, companion.EDMCCAPIResponse):
@ -366,20 +366,19 @@ sys.path: {sys.path}'''
else:
print(deep_get(data, 'lastSystem', 'name', default='Unknown'))
if (args.m or args.o or args.s or args.n or args.j):
if args.m or args.o or args.s or args.n or args.j:
if not data['commander'].get('docked'):
logger.error("Can't use -m, -o, -s, -n or -j because you're not currently docked!")
return
elif not deep_get(data, 'lastStarport', 'name'):
if not deep_get(data, 'lastStarport', 'name'):
logger.error("No data['lastStarport']['name'] from CAPI")
sys.exit(EXIT_LAGGING)
# Ignore possibly missing shipyard info
elif not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')):
if not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')):
logger.error("No commodities or outfitting (modules) in CAPI data")
return
else:
return
@ -410,8 +409,8 @@ sys.path: {sys.path}'''
else:
logger.error("Station doesn't supply outfitting")
if (args.s or args.n) and not args.j and not \
data['lastStarport'].get('ships') and data['lastStarport']['services'].get('shipyard'):
if ((args.s or args.n) and not args.j and not data['lastStarport'].get('ships')
and data['lastStarport']['services'].get('shipyard')):
# Retry for shipyard
sleep(SERVER_RETRY)
@ -462,14 +461,14 @@ sys.path: {sys.path}'''
except Exception:
logger.exception('Failed to send data to EDDN')
except companion.ServerError:
logger.exception('Frontier CAPI Server returned an error')
sys.exit(EXIT_SERVER)
except companion.ServerConnectionError:
logger.exception('Exception while contacting server')
sys.exit(EXIT_SERVER)
except companion.ServerError:
logger.exception('Frontier CAPI Server returned an error')
sys.exit(EXIT_SERVER)
except companion.CredentialsError:
logger.error('Frontier CAPI Server: Invalid Credentials')
sys.exit(EXIT_CREDENTIALS)

View File

@ -34,6 +34,7 @@ To utilise logging in a 'found' (third-party) plugin, include this:
# See, plug.py:load_plugins()
logger = logging.getLogger(f'{appname}.{plugin_name}')
"""
from __future__ import annotations
import inspect
import logging
@ -48,7 +49,7 @@ from sys import _getframe as getframe
from threading import get_native_id as thread_native_id
from time import gmtime
from traceback import print_exc
from typing import TYPE_CHECKING, Tuple, cast
from typing import TYPE_CHECKING, cast
import config as config_mod
from config import appcmdname, appname, config
@ -122,7 +123,6 @@ if TYPE_CHECKING:
def trace(self, message, *args, **kwargs) -> None:
"""See implementation above."""
...
def trace_if(self, condition: str, message, *args, **kwargs) -> None:
"""
@ -130,7 +130,6 @@ if TYPE_CHECKING:
See implementation above.
"""
...
class Logger:
@ -183,18 +182,12 @@ class Logger:
# rotated versions.
# This is {logger_name} so that EDMC.py logs to a different file.
logfile_rotating = pathlib.Path(tempfile.gettempdir())
logfile_rotating = logfile_rotating / f'{appname}'
logfile_rotating /= f'{appname}'
logfile_rotating.mkdir(exist_ok=True)
logfile_rotating = logfile_rotating / f'{logger_name}-debug.log'
logfile_rotating /= f'{logger_name}-debug.log'
self.logger_channel_rotating = logging.handlers.RotatingFileHandler(
logfile_rotating,
mode='a',
maxBytes=1024 * 1024, # 1MiB
backupCount=10,
encoding='utf-8',
delay=False
)
self.logger_channel_rotating = logging.handlers.RotatingFileHandler(logfile_rotating, maxBytes=1024 * 1024,
backupCount=10, encoding='utf-8')
# Yes, we always want these rotated files to be at TRACE level
self.logger_channel_rotating.setLevel(logging.TRACE) # type: ignore
self.logger_channel_rotating.setFormatter(self.logger_formatter)
@ -325,7 +318,7 @@ class EDMCContextFilter(logging.Filter):
return True
@classmethod
def caller_attributes(cls, module_name: str = '') -> Tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible
def caller_attributes(cls, module_name: str = '') -> tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible
"""
Determine extra or changed fields for the caller.
@ -535,9 +528,8 @@ def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin':
if not os.getenv("EDMC_NO_UI"):
# GUI app being run
return cast('LoggerMixin', logging.getLogger(appname))
else:
# Must be the CLI
return cast('LoggerMixin', logging.getLogger(appcmdname))
# Must be the CLI
return cast('LoggerMixin', logging.getLogger(appcmdname))
# Singleton

View File

@ -1,6 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Entry point for the main GUI application."""
"""
EDMarketConnector.py - Entry point for the GUI.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import argparse
@ -9,14 +13,14 @@ import locale
import pathlib
import queue
import re
import subprocess
import sys
import threading
import webbrowser
from builtins import object, str
from os import chdir, environ
from os.path import dirname, join
from os import chdir, environ, path
from time import localtime, strftime, time
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Literal
from constants import applongname, appname, protocolhandler_redirect
# Have this as early as possible for people running EDMarketConnector.exe
# from cmd.exe or a bat file or similar. Else they might not be in the correct
@ -24,17 +28,16 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union
if getattr(sys, 'frozen', False):
# Under py2exe sys.path[0] is the executable name
if sys.platform == 'win32':
chdir(dirname(sys.path[0]))
chdir(path.dirname(sys.path[0]))
# Allow executable to be invoked from any cwd
environ['TCL_LIBRARY'] = join(dirname(sys.path[0]), 'lib', 'tcl')
environ['TK_LIBRARY'] = join(dirname(sys.path[0]), 'lib', 'tk')
environ['TCL_LIBRARY'] = path.join(path.dirname(sys.path[0]), 'lib', 'tcl')
environ['TK_LIBRARY'] = path.join(path.dirname(sys.path[0]), 'lib', 'tk')
else:
# We still want to *try* to have CWD be where the main script is, even if
# not frozen.
chdir(pathlib.Path(__file__).parent)
from constants import applongname, appname, protocolhandler_redirect
# config will now cause an appname logger to be set up, so we need the
# console redirect before this
@ -46,7 +49,8 @@ if __name__ == '__main__':
import tempfile
# unbuffered not allowed for text in python3, so use `1 for line buffering
sys.stdout = sys.stderr = open(join(tempfile.gettempdir(), f'{appname}.log'), mode='wt', buffering=1)
log_file_path = path.join(tempfile.gettempdir(), f'{appname}.log')
sys.stdout = sys.stderr = open(log_file_path, mode='wt', buffering=1) # Do NOT use WITH here.
# TODO: Test: Make *sure* this redirect is working, else py2exe is going to cause an exit popup
@ -87,6 +91,11 @@ if __name__ == '__main__': # noqa: C901
help='Suppress the popup from when the application detects another instance already running',
action='store_true'
)
parser.add_argument('--start_min',
help="Start the application minimized",
action="store_true"
)
###########################################################################
###########################################################################
@ -175,7 +184,7 @@ if __name__ == '__main__': # noqa: C901
)
###########################################################################
args = parser.parse_args()
args: argparse.Namespace = parser.parse_args()
if args.capi_pretend_down:
import config as conf_module
@ -187,7 +196,7 @@ if __name__ == '__main__': # noqa: C901
with open(conf_module.config.app_dir_path / 'access_token.txt', 'r') as at:
conf_module.capi_debug_access_token = at.readline().strip()
level_to_set: Optional[int] = None
level_to_set: int | None = None
if args.trace or args.trace_on:
level_to_set = logging.TRACE # type: ignore # it exists
logger.info('Setting TRACE level debugging due to either --trace or a --trace-on')
@ -216,7 +225,7 @@ if __name__ == '__main__': # noqa: C901
else:
print("--force-edmc-protocol is only valid on Windows")
parser.print_help()
exit(1)
sys.exit(1)
if args.debug_sender and len(args.debug_sender) > 0:
import config as conf_module
@ -237,7 +246,7 @@ if __name__ == '__main__': # noqa: C901
logger.info(f'marked {d} for TRACE')
def handle_edmc_callback_or_foregrounding() -> None: # noqa: CCR001
"""Handle any edmc:// auth callback, else foreground existing window."""
"""Handle any edmc:// auth callback, else foreground an existing window."""
logger.trace_if('frontier-auth.windows', 'Begin...')
if sys.platform == 'win32':
@ -245,40 +254,40 @@ if __name__ == '__main__': # noqa: C901
# If *this* instance hasn't locked, then another already has and we
# now need to do the edmc:// checks for auth callback
if locked != JournalLockResult.LOCKED:
import ctypes
from ctypes import windll, c_int, create_unicode_buffer, WINFUNCTYPE
from ctypes.wintypes import BOOL, HWND, INT, LPARAM, LPCWSTR, LPWSTR
EnumWindows = ctypes.windll.user32.EnumWindows # noqa: N806
GetClassName = ctypes.windll.user32.GetClassNameW # noqa: N806
GetClassName.argtypes = [HWND, LPWSTR, ctypes.c_int]
GetWindowText = ctypes.windll.user32.GetWindowTextW # noqa: N806
GetWindowText.argtypes = [HWND, LPWSTR, ctypes.c_int]
GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW # noqa: N806
GetProcessHandleFromHwnd = ctypes.windll.oleacc.GetProcessHandleFromHwnd # noqa: N806
EnumWindows = windll.user32.EnumWindows # noqa: N806
GetClassName = windll.user32.GetClassNameW # noqa: N806
GetClassName.argtypes = [HWND, LPWSTR, c_int]
GetWindowText = windll.user32.GetWindowTextW # noqa: N806
GetWindowText.argtypes = [HWND, LPWSTR, c_int]
GetWindowTextLength = windll.user32.GetWindowTextLengthW # noqa: N806
GetProcessHandleFromHwnd = windll.oleacc.GetProcessHandleFromHwnd # noqa: N806
SW_RESTORE = 9 # noqa: N806
SetForegroundWindow = ctypes.windll.user32.SetForegroundWindow # noqa: N806
ShowWindow = ctypes.windll.user32.ShowWindow # noqa: N806
ShowWindowAsync = ctypes.windll.user32.ShowWindowAsync # noqa: N806
SetForegroundWindow = windll.user32.SetForegroundWindow # noqa: N806
ShowWindow = windll.user32.ShowWindow # noqa: N806
ShowWindowAsync = windll.user32.ShowWindowAsync # noqa: N806
COINIT_MULTITHREADED = 0 # noqa: N806,F841
COINIT_APARTMENTTHREADED = 0x2 # noqa: N806
COINIT_DISABLE_OLE1DDE = 0x4 # noqa: N806
CoInitializeEx = ctypes.windll.ole32.CoInitializeEx # noqa: N806
CoInitializeEx = windll.ole32.CoInitializeEx # noqa: N806
ShellExecute = ctypes.windll.shell32.ShellExecuteW # noqa: N806
ShellExecute = windll.shell32.ShellExecuteW # noqa: N806
ShellExecute.argtypes = [HWND, LPCWSTR, LPCWSTR, LPCWSTR, LPCWSTR, INT]
def window_title(h: int) -> Optional[str]:
def window_title(h: int) -> str | None:
if h:
text_length = GetWindowTextLength(h) + 1
buf = ctypes.create_unicode_buffer(text_length)
buf = create_unicode_buffer(text_length)
if GetWindowText(h, buf, text_length):
return buf.value
return None
@ctypes.WINFUNCTYPE(BOOL, HWND, LPARAM)
@WINFUNCTYPE(BOOL, HWND, LPARAM)
def enumwindowsproc(window_handle, l_param): # noqa: CCR001
"""
Determine if any window for the Application exists.
@ -286,15 +295,16 @@ if __name__ == '__main__': # noqa: C901
Called for each found window by EnumWindows().
When a match is found we check if we're being invoked as the
edmc://auth handler. If so we send the message to the existing
process/window. If not we'll raise that existing window to the
edmc://auth handler. If so we send the message to the existing
process/window. If not we'll raise that existing window to the
foreground.
:param window_handle: Window to check.
:param l_param: The second parameter to the EnumWindows() call.
:return: False if we found a match, else True to continue iteration
"""
# class name limited to 256 - https://msdn.microsoft.com/en-us/library/windows/desktop/ms633576
cls = ctypes.create_unicode_buffer(257)
cls = create_unicode_buffer(257)
# This conditional is exploded to make debugging slightly easier
if GetClassName(window_handle, cls, 257):
if cls.value == 'TkTopLevel':
@ -323,13 +333,8 @@ if __name__ == '__main__': # noqa: C901
# Ref: <https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-enumwindows>
EnumWindows(enumwindowsproc, 0)
return
def already_running_popup():
"""Create the "already running" popup."""
import tkinter as tk
from tkinter import ttk
# Check for CL arg that suppresses this popup.
if args.suppress_dupe_process_popup:
sys.exit(0)
@ -339,8 +344,7 @@ if __name__ == '__main__': # noqa: C901
frame = tk.Frame(root)
frame.grid(row=1, column=0, sticky=tk.NSEW)
label = tk.Label(frame)
label['text'] = 'An EDMarketConnector.exe process was already running, exiting.'
label = tk.Label(frame, text='An EDMarketConnector.exe process was already running, exiting.')
label.grid(row=1, column=0, sticky=tk.NSEW)
button = ttk.Button(frame, text='OK', command=lambda: sys.exit(0))
@ -373,41 +377,23 @@ if __name__ == '__main__': # noqa: C901
git_branch = ""
try:
import subprocess
git_cmd = subprocess.Popen('git branch --show-current'.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
out, err = git_cmd.communicate()
git_branch = out.decode().rstrip('\n')
git_branch = out.decode().strip()
except Exception:
pass
if (
(
git_branch == 'develop'
or (
git_branch == '' and '-alpha0' in str(appversion())
)
) and (
(
sys.platform == 'linux'
and environ.get('USER') is not None
and environ['USER'] not in ['ad', 'athan']
)
or (
sys.platform == 'win32'
and environ.get('USERNAME') is not None
and environ['USERNAME'] not in ['Athan']
)
git_branch == 'develop'
or (
git_branch == '' and '-alpha0' in str(appversion())
)
):
print("Why are you running the develop branch if you're not a developer?")
print("Please check https://github.com/EDCD/EDMarketConnector/wiki/Running-from-source#running-from-source")
print("You probably want the 'stable' branch.")
print("\n\rIf Athanasius or A_D asked you to run this, tell them about this message.")
sys.exit(-1)
print("You're running in a DEVELOPMENT branch build. You might encounter bugs!")
# See EDMCLogging.py docs.
@ -427,8 +413,7 @@ import tkinter as tk
import tkinter.filedialog
import tkinter.font
import tkinter.messagebox
from tkinter import ttk, constants as tkc
from tkinter import ttk
import commodity
import plug
import prefs
@ -462,7 +447,7 @@ SHIPYARD_HTML_TEMPLATE = """
"""
class AppWindow(object):
class AppWindow:
"""Define the main application window."""
_CAPI_RESPONSE_TK_EVENT_NAME = '<<CAPIResponse>>'
@ -509,7 +494,7 @@ class AppWindow(object):
else:
self.w.tk.call('wm', 'iconphoto', self.w, '-default',
tk.PhotoImage(file=join(config.respath_path, 'io.edcd.EDMarketConnector.png')))
tk.PhotoImage(file=path.join(config.respath_path, 'io.edcd.EDMarketConnector.png')))
# TODO: Export to files and merge from them in future ?
self.theme_icon = tk.PhotoImage(
@ -637,7 +622,7 @@ class AppWindow(object):
self.updater = update.Updater(tkroot=self.w, provider='external')
else:
self.updater = update.Updater(tkroot=self.w, provider='internal')
self.updater = update.Updater(tkroot=self.w)
self.updater.check_for_updates() # Sparkle / WinSparkle does this automatically for packaged apps
if sys.platform == 'darwin':
@ -680,14 +665,14 @@ class AppWindow(object):
self.w.protocol("WM_DELETE_WINDOW", self.w.withdraw) # close button shouldn't quit app
self.w.resizable(tk.FALSE, tk.FALSE) # Can't be only resizable on one axis
else:
self.file_menu = self.view_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore
self.file_menu = self.view_menu = tk.Menu(self.menubar, tearoff=tk.FALSE)
self.file_menu.add_command(command=lambda: stats.StatsDialog(self.w, self.status))
self.file_menu.add_command(command=self.save_raw)
self.file_menu.add_command(command=lambda: prefs.PreferencesDialog(self.w, self.postprefs))
self.file_menu.add_separator()
self.file_menu.add_command(command=self.onexit)
self.menubar.add_cascade(menu=self.file_menu)
self.edit_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore
self.edit_menu = tk.Menu(self.menubar, tearoff=tk.FALSE)
self.edit_menu.add_command(accelerator='Ctrl+C', state=tk.DISABLED, command=self.copy)
self.menubar.add_cascade(menu=self.edit_menu)
self.help_menu = tk.Menu(self.menubar, tearoff=tk.FALSE) # type: ignore
@ -734,7 +719,7 @@ class AppWindow(object):
anchor=tk.W, compound=tk.LEFT
)
theme_titlebar.grid(columnspan=3, padx=2, sticky=tk.NSEW)
self.drag_offset: Tuple[Optional[int], Optional[int]] = (None, None)
self.drag_offset: tuple[int | None, int | None] = (None, None)
theme_titlebar.bind('<Button-1>', self.drag_start)
theme_titlebar.bind('<B1-Motion>', self.drag_continue)
theme_titlebar.bind('<ButtonRelease-1>', self.drag_end)
@ -808,7 +793,7 @@ class AppWindow(object):
self.w.bind('<KP_Enter>', self.capi_request_data)
self.w.bind_all('<<Invoke>>', self.capi_request_data) # Ask for CAPI queries to be performed
self.w.bind_all(self._CAPI_RESPONSE_TK_EVENT_NAME, self.capi_handle_response)
self.w.bind_all('<<JournalEvent>>', self.journal_event) # Journal monitoring
self.w.bind_all('<<JournalEvent>>', self.journal_event) # type: ignore # Journal monitoring
self.w.bind_all('<<DashboardEvent>>', self.dashboard_event) # Dashboard monitoring
self.w.bind_all('<<PluginError>>', self.plugin_error) # Statusbar
self.w.bind_all('<<CompanionAuthEvent>>', self.auth) # cAPI auth
@ -826,6 +811,12 @@ class AppWindow(object):
config.delete('logdir', suppress=True)
self.postprefs(False) # Companion login happens in callback from monitor
self.toggle_suit_row(visible=False)
if args.start_min:
logger.warning("Trying to start minimized")
if root.overrideredirect():
self.oniconify()
else:
self.w.wm_iconify()
def update_suit_text(self) -> None:
"""Update the suit text for current type and loadout."""
@ -834,13 +825,14 @@ class AppWindow(object):
self.suit['text'] = ''
return
if (suit := monitor.state.get('SuitCurrent')) is None:
suit = monitor.state.get('SuitCurrent')
if suit is None:
self.suit['text'] = f'<{_("Unknown")}>' # LANG: Unknown suit
return
suitname = suit['edmcName']
if (suitloadout := monitor.state.get('SuitLoadoutCurrent')) is None:
suitloadout = monitor.state.get('SuitLoadoutCurrent')
if suitloadout is None:
self.suit['text'] = ''
return
@ -849,31 +841,18 @@ class AppWindow(object):
def suit_show_if_set(self) -> None:
"""Show UI Suit row if we have data, else hide."""
if self.suit['text'] != '':
self.toggle_suit_row(visible=True)
self.toggle_suit_row(self.suit['text'] != '')
else:
self.toggle_suit_row(visible=False)
def toggle_suit_row(self, visible: Optional[bool] = None) -> None:
def toggle_suit_row(self, visible: bool | None = None) -> None:
"""
Toggle the visibility of the 'Suit' row.
:param visible: Force visibility to this.
"""
if visible is True:
self.suit_shown = False
elif visible is False:
self.suit_shown = True
self.suit_shown = not visible
if not self.suit_shown:
if sys.platform != 'win32':
pady = 2
else:
pady = 0
pady = 2 if sys.platform != 'win32' else 0
self.suit_label.grid(row=self.suit_grid_row, column=0, sticky=tk.W, padx=self.PADX, pady=pady)
self.suit.grid(row=self.suit_grid_row, column=1, sticky=tk.EW, padx=self.PADX, pady=pady)
@ -1020,38 +999,49 @@ class AppWindow(object):
:return: True if all OK, else False to trigger play_bad in caller.
"""
if config.get_int('output') & (config.OUT_STATION_ANY):
if not data['commander'].get('docked') and not monitor.state['OnFoot']:
if not self.status['text']:
# Signal as error because the user might actually be docked
# but the server hosting the Companion API hasn't caught up
# LANG: Player is not docked at a station, when we expect them to be
self.status['text'] = _("You're not docked at a station!")
return False
output_flags = config.get_int('output')
is_docked = data['commander'].get('docked')
has_commodities = data['lastStarport'].get('commodities')
has_modules = data['lastStarport'].get('modules')
commodities_flag = config.OUT_MKT_CSV | config.OUT_MKT_TD
if output_flags & config.OUT_STATION_ANY:
if not is_docked and not monitor.state['OnFoot']:
# Signal as error because the user might actually be docked
# but the server hosting the Companion API hasn't caught up
# LANG: Player is not docked at a station, when we expect them to be
self._handle_status(_("You're not docked at a station!"))
return False
# Ignore possibly missing shipyard info
elif (config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA) \
and not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')):
if not self.status['text']:
# LANG: Status - Either no market or no modules data for station from Frontier CAPI
self.status['text'] = _("Station doesn't have anything!")
if output_flags & config.OUT_EDDN_SEND_STATION_DATA and not (has_commodities or has_modules):
# LANG: Status - Either no market or no modules data for station from Frontier CAPI
self._handle_status(_("Station doesn't have anything!"))
elif not data['lastStarport'].get('commodities'):
if not self.status['text']:
# LANG: Status - No station market data from Frontier CAPI
self.status['text'] = _("Station doesn't have a market!")
elif not has_commodities:
# LANG: Status - No station market data from Frontier CAPI
self._handle_status(_("Station doesn't have a market!"))
elif config.get_int('output') & (config.OUT_MKT_CSV | config.OUT_MKT_TD):
# Fixup anomalies in the commodity data
elif output_flags & commodities_flag:
# Fixup anomalies in the comodity data
fixed = companion.fixup(data)
if config.get_int('output') & config.OUT_MKT_CSV:
if output_flags & config.OUT_MKT_CSV:
commodity.export(fixed, COMMODITY_CSV)
if config.get_int('output') & config.OUT_MKT_TD:
if output_flags & config.OUT_MKT_TD:
td.export(fixed)
return True
def _handle_status(self, message: str) -> None:
"""
Set the status label text if it's not already set.
:param message: Status message to display.
"""
if not self.status['text']:
self.status['text'] = message
def capi_request_data(self, event=None) -> None: # noqa: CCR001
"""
Perform CAPI data retrieval and associated actions.
@ -1118,13 +1108,13 @@ class AppWindow(object):
return
if not companion.session.retrying:
if time() < self.capi_query_holdoff_time: # Was invoked by key while in cooldown
if play_sound and (self.capi_query_holdoff_time - time()) < companion.capi_query_cooldown * 0.75:
if time() < self.capi_query_holdoff_time:
# Invoked by key while in cooldown
time_remaining = self.capi_query_holdoff_time - time()
if play_sound and time_remaining < companion.capi_query_cooldown * 0.75:
self.status['text'] = ''
hotkeymgr.play_bad() # Don't play sound in first few seconds to prevent repeats
return
hotkeymgr.play_bad()
return
elif play_sound:
hotkeymgr.play_good()
@ -1137,6 +1127,7 @@ class AppWindow(object):
logger.trace_if('capi.worker', 'Requesting full station data')
config.set('querytime', query_time)
logger.trace_if('capi.worker', 'Calling companion.session.station')
companion.session.station(
query_time=query_time, tk_response_event=self._CAPI_RESPONSE_TK_EVENT_NAME,
play_sound=play_sound
@ -1192,12 +1183,16 @@ class AppWindow(object):
)
def capi_handle_response(self, event=None): # noqa: C901, CCR001
"""Handle the resulting data from a CAPI query."""
"""
Handle the resulting data from a CAPI query.
:param event: generated event details.
"""
logger.trace_if('capi.worker', 'Handling response')
play_bad: bool = False
err: Optional[str] = None
err: str | None = None
capi_response: Union[companion.EDMCCAPIFailedRequest, companion.EDMCCAPIResponse]
capi_response: companion.EDMCCAPIFailedRequest | companion.EDMCCAPIResponse
try:
logger.trace_if('capi.worker', 'Pulling answer off queue')
capi_response = companion.session.capi_response_queue.get(block=False)
@ -1206,8 +1201,7 @@ class AppWindow(object):
if capi_response.exception:
raise capi_response.exception
else:
raise ValueError(capi_response.message)
raise ValueError(capi_response.message)
logger.trace_if('capi.worker', 'Answer is not a Failure')
if not isinstance(capi_response, companion.EDMCCAPIResponse):
@ -1256,8 +1250,8 @@ class AppWindow(object):
err = self.status['text'] = _("Where are you?!") # Shouldn't happen
elif (
not capi_response.capi_data.get('ship', {}).get('name')
or not capi_response.capi_data.get('ship', {}).get('modules')
not capi_response.capi_data.get('ship', {}).get('name')
or not capi_response.capi_data.get('ship', {}).get('modules')
):
# LANG: We don't know what ship the commander is in, when we should
err = self.status['text'] = _("What are you flying?!") # Shouldn't happen
@ -1268,8 +1262,8 @@ class AppWindow(object):
raise companion.CmdrError()
elif (
capi_response.auto_update and not monitor.state['OnFoot']
and not capi_response.capi_data['commander'].get('docked')
capi_response.auto_update and not monitor.state['OnFoot']
and not capi_response.capi_data['commander'].get('docked')
):
# auto update is only when just docked
logger.warning(f"{capi_response.auto_update!r} and not {monitor.state['OnFoot']!r} and "
@ -1289,7 +1283,7 @@ class AppWindow(object):
f"{monitor.state['OnFoot']!r} and {monitor.state['StationName']!r}")
raise companion.ServerLagging()
elif capi_response.capi_data['commander']['docked'] and monitor.state['StationName'] is None:
if capi_response.capi_data['commander']['docked'] and monitor.state['StationName'] is None:
# Likely (re-)Embarked on ship docked at an EDO settlement.
# Both Disembark and Embark have `"Onstation": false` in Journal.
# So there's nothing to tell us which settlement we're (still,
@ -1311,8 +1305,8 @@ class AppWindow(object):
raise companion.ServerLagging()
elif (
not monitor.state['OnFoot']
and capi_response.capi_data['ship']['name'].lower() != monitor.state['ShipType']
not monitor.state['OnFoot']
and capi_response.capi_data['ship']['name'].lower() != monitor.state['ShipType']
):
# CAPI ship type must match
logger.warning(f"not {monitor.state['OnFoot']!r} and "
@ -1385,9 +1379,12 @@ class AppWindow(object):
# TODO: Set status text
return
except companion.ServerConnectionError:
except companion.ServerConnectionError as comp_err:
# LANG: Frontier CAPI server error when fetching data
self.status['text'] = _('Frontier CAPI server error')
logger.warning(f'Exception while contacting server: {comp_err}')
err = self.status['text'] = str(comp_err)
play_bad = True
except companion.CredentialsRequireRefresh:
# We need to 'close' the auth else it'll see STATE_OK and think login() isn't needed
@ -1425,11 +1422,6 @@ class AppWindow(object):
companion.session.invalidate()
self.login()
except companion.ServerConnectionError as e: # TODO: unreachable (subclass of ServerLagging -- move to above)
logger.warning(f'Exception while contacting server: {e}')
err = self.status['text'] = str(e)
play_bad = True
except Exception as e: # Including CredentialsError, ServerError
logger.debug('"other" exception', exc_info=e)
err = self.status['text'] = str(e)
@ -1448,7 +1440,7 @@ class AppWindow(object):
self.cooldown()
logger.trace_if('capi.worker', '...done')
def journal_event(self, event): # noqa: C901, CCR001 # Currently not easily broken up.
def journal_event(self, event: str): # noqa: C901, CCR001 # Currently not easily broken up.
"""
Handle a Journal event passed through event queue from monitor.py.
@ -1576,7 +1568,7 @@ class AppWindow(object):
logger.info('StartUp or LoadGame event')
# Disable WinSparkle automatic update checks, IFF configured to do so when in-game
if config.get_int('disable_autoappupdatecheckingame') and 1:
if config.get_int('disable_autoappupdatecheckingame'):
if self.updater is not None:
self.updater.set_automatic_updates_check(False)
@ -1721,27 +1713,27 @@ class AppWindow(object):
# Avoid file length limits if possible
provider = config.get_str('shipyard_provider', default='EDSY')
target = plug.invoke(provider, 'EDSY', 'shipyard_url', loadout, monitor.is_beta)
file_name = join(config.app_dir_path, "last_shipyard.html")
file_name = path.join(config.app_dir_path, "last_shipyard.html")
with open(file_name, 'w') as f:
print(SHIPYARD_HTML_TEMPLATE.format(
f.write(SHIPYARD_HTML_TEMPLATE.format(
link=html.escape(str(target)),
provider_name=html.escape(str(provider)),
ship_name=html.escape(str(shipname))
), file=f)
))
return f'file://localhost/{file_name}'
def system_url(self, system: str) -> str | None:
"""Despatch a system URL to the configured handler."""
return plug.invoke(
config.get_str('system_provider'), 'EDSM', 'system_url', monitor.state['SystemName']
config.get_str('system_provider', default='EDSM'), 'EDSM', 'system_url', monitor.state['SystemName']
)
def station_url(self, station: str) -> str | None:
"""Despatch a station URL to the configured handler."""
return plug.invoke(
config.get_str('station_provider'), 'EDSM', 'station_url',
config.get_str('station_provider', default='EDSM'), 'EDSM', 'station_url',
monitor.state['SystemName'], monitor.state['StationName']
)
@ -1749,12 +1741,10 @@ class AppWindow(object):
"""Display and update the cooldown timer for 'Update' button."""
if time() < self.capi_query_holdoff_time:
# Update button in main window
self.button['text'] = self.theme_button['text'] \
= _('cooldown {SS}s').format( # LANG: Cooldown on 'Update' button
SS=int(self.capi_query_holdoff_time - time())
)
cooldown_time = int(self.capi_query_holdoff_time - time())
# LANG: Cooldown on 'Update' button
self.button['text'] = self.theme_button['text'] = _('cooldown {SS}s').format(SS=cooldown_time)
self.w.after(1000, self.cooldown)
else:
self.button['text'] = self.theme_button['text'] = _('Update') # LANG: Update button in main window
self.button['state'] = self.theme_button['state'] = (
@ -1775,11 +1765,10 @@ class AppWindow(object):
def copy(self, event=None) -> None:
"""Copy system, and possible station, name to clipboard."""
if monitor.state['SystemName']:
clipboard_text = f"{monitor.state['SystemName']},{monitor.state['StationName']}" if monitor.state[
'StationName'] else monitor.state['SystemName']
self.w.clipboard_clear()
self.w.clipboard_append(
f"{monitor.state['SystemName']},{monitor.state['StationName']}" if monitor.state['StationName']
else monitor.state['SystemName']
)
self.w.clipboard_append(clipboard_text)
def help_general(self, event=None) -> None:
"""Open Wiki Help page in browser."""
@ -1805,9 +1794,14 @@ class AppWindow(object):
class HelpAbout(tk.Toplevel):
"""The applications Help > About popup."""
showing = False
showing: bool = False
def __init__(self, parent: tk.Tk):
def __init__(self, parent: tk.Tk) -> None:
"""
Initialize the HelpAbout popup.
:param parent: The parent Tk window.
"""
if self.__class__.showing:
return
@ -1848,11 +1842,11 @@ class AppWindow(object):
# version <link to changelog>
tk.Label(frame).grid(row=row, column=0) # spacer
row += 1
self.appversion_label = tk.Text(frame, height=1, width=len(str(appversion())), wrap=tkc.NONE, bd=0)
self.appversion_label = tk.Text(frame, height=1, width=len(str(appversion())), wrap=tk.NONE, bd=0)
self.appversion_label.insert("1.0", str(appversion()))
self.appversion_label.tag_configure("center", justify="center")
self.appversion_label.tag_add("center", "1.0", "end")
self.appversion_label.config(state=tkc.DISABLED, bg=frame.cget("background"), font="TkDefaultFont")
self.appversion_label.config(state=tk.DISABLED, bg=frame.cget("background"), font="TkDefaultFont")
self.appversion_label.grid(row=row, column=0, sticky=tk.E)
# LANG: Help > Release Notes
self.appversion = HyperlinkLabel(frame, compound=tk.RIGHT, text=_('Release Notes'),
@ -2068,7 +2062,7 @@ Locale LC_TIME: {locale.getlocale(locale.LC_TIME)}'''
)
def setup_killswitches(filename: Optional[str]):
def setup_killswitches(filename: str | None):
"""Download and setup the main killswitch list."""
logger.debug('fetching killswitches...')
if filename is not None:
@ -2130,7 +2124,7 @@ sys.path: {sys.path}'''
config.delete('font_size', suppress=True)
config.set('ui_scale', 100) # 100% is the default here
config.delete('geometry', suppress=True) # unset is recreated by other code
config.delete('geometry', suppress=True) # unset is recreated by other code
logger.info('reset theme, transparency, font, font size, ui scale, and ui geometry to default.')
@ -2167,12 +2161,12 @@ sys.path: {sys.path}'''
# <https://en.wikipedia.org/wiki/Windows_10_version_history#Version_1903_(May_2019_Update)>
# Windows 19, 1903 was build 18362
if (
sys.platform != 'win32'
or (
windows_ver.major == 10
and windows_ver.build >= 18362
)
or windows_ver.major > 10 # Paranoid future check
sys.platform != 'win32'
or (
windows_ver.major == 10
and windows_ver.build >= 18362
)
or windows_ver.major > 10 # Paranoid future check
):
# Set that same language, but utf8 encoding (it was probably cp1252
# or equivalent for other languages).
@ -2209,10 +2203,10 @@ sys.path: {sys.path}'''
# logger.debug('Test from __main__')
# test_logging()
class A(object):
class A:
"""Simple top-level class."""
class B(object):
class B:
"""Simple second-level class."""
def __init__(self):
@ -2268,7 +2262,7 @@ sys.path: {sys.path}'''
def messagebox_not_py3():
"""Display message about plugins not updated for Python 3.x."""
plugins_not_py3_last = config.get_int('plugins_not_py3_last', default=0)
if (plugins_not_py3_last + 86400) < int(time()) and len(plug.PLUGINS_not_py3):
if (plugins_not_py3_last + 86400) < int(time()) and plug.PLUGINS_not_py3:
# LANG: Popup-text about 'active' plugins without Python 3.x support
popup_text = _(
"One or more of your enabled plugins do not yet have support for Python 3.x. Please see the "
@ -2302,9 +2296,11 @@ sys.path: {sys.path}'''
ui_transparency = 100
root.wm_attributes('-alpha', ui_transparency / 100)
# Display message box about plugins without Python 3.x support
root.after(0, messagebox_not_py3)
# Show warning popup for killswitches matching current version
root.after(1, show_killswitch_poppup, root)
# Start the main event loop
root.mainloop()
logger.info('Exiting')

View File

@ -1,32 +1,32 @@
"""Handle the game Status.json file."""
"""
dashboard.py - Handle the game Status.json file.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json
import pathlib
import sys
import time
import tkinter as tk
from calendar import timegm
from os.path import getsize, isdir, isfile
from typing import Any, Dict, Optional, cast
from os.path import getsize, isdir, isfile, join
from typing import Any, cast
from watchdog.observers.api import BaseObserver
from config import config
from EDMCLogging import get_main_logger
logger = get_main_logger()
if sys.platform == 'darwin':
if sys.platform in ('darwin', 'win32'):
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
elif sys.platform == 'win32':
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
else:
# Linux's inotify doesn't work over CIFS or NFS, so poll
FileSystemEventHandler = object # dummy
class FileSystemEventHandler: # type: ignore
"""Dummy class to represent a file system event handler on platforms other than macOS and Windows."""
class Dashboard(FileSystemEventHandler):
@ -39,9 +39,9 @@ class Dashboard(FileSystemEventHandler):
self.session_start: int = int(time.time())
self.root: tk.Tk = None # type: ignore
self.currentdir: str = None # type: ignore # The actual logdir that we're monitoring
self.observer: Optional[Observer] = None # type: ignore
self.observer: Observer | None = None # type: ignore
self.observed = None # a watchdog ObservedWatch, or None if polling
self.status: Dict[str, Any] = {} # Current status for communicating status back to main thread
self.status: dict[str, Any] = {} # Current status for communicating status back to main thread
def start(self, root: tk.Tk, started: int) -> bool:
"""
@ -56,10 +56,8 @@ class Dashboard(FileSystemEventHandler):
self.session_start = started
logdir = config.get_str('journaldir', default=config.default_journal_dir)
if logdir == '':
logdir = config.default_journal_dir
if not logdir or not isdir(logdir):
logdir = logdir or config.default_journal_dir
if not isdir(logdir):
logger.info(f"No logdir, or it isn't a directory: {logdir=}")
self.stop()
return False
@ -74,7 +72,7 @@ class Dashboard(FileSystemEventHandler):
# File system events are unreliable/non-existent over network drives on Linux.
# We can't easily tell whether a path points to a network drive, so assume
# any non-standard logdir might be on a network drive and poll instead.
if not (sys.platform != 'win32') and not self.observer:
if sys.platform == 'win32' and not self.observer:
logger.debug('Setting up observer...')
self.observer = Observer()
self.observer.daemon = True
@ -87,7 +85,7 @@ class Dashboard(FileSystemEventHandler):
self.observer = None # type: ignore
logger.debug('Done')
if not self.observed and not (sys.platform != 'win32'):
if not self.observed and sys.platform == 'win32':
logger.debug('Starting observer...')
self.observed = cast(BaseObserver, self.observer).schedule(self, self.currentdir)
logger.debug('Done')
@ -178,22 +176,17 @@ class Dashboard(FileSystemEventHandler):
"""
if config.shutting_down:
return
try:
with (pathlib.Path(self.currentdir) / 'Status.json').open('rb') as h:
status_json_path = join(self.currentdir, 'Status.json')
with open(status_json_path, 'rb') as h:
data = h.read().strip()
if data: # Can be empty if polling while the file is being re-written
entry = json.loads(data)
# Status file is shared between beta and live. So filter out status not in this game session.
if (
timegm(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ')) >= self.session_start
and self.status != entry
):
# Status file is shared between beta and live. Filter out status not in this game session.
entry_timestamp = timegm(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ'))
if entry_timestamp >= self.session_start and self.status != entry:
self.status = entry
self.root.event_generate('<<DashboardEvent>>', when="tail")
except Exception:
logger.exception('Processing Status.json')

View File

@ -1,18 +1,22 @@
"""Fetch kill switches from EDMC Repo."""
"""
killswitch.py - Fetch kill switches from EDMC Repo.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json
import threading
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Sequence,
Tuple, TypedDict, TypeVar, Union, cast
TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, MutableSequence, NamedTuple, Sequence,
TypedDict, TypeVar, cast, Union
)
import requests
import semantic_version
from semantic_version.base import Version
import config
import EDMCLogging
@ -21,7 +25,7 @@ logger = EDMCLogging.get_main_logger()
OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json'
DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json'
CURRENT_KILLSWITCH_VERSION = 2
UPDATABLE_DATA = Union[Mapping, Sequence]
UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style
_current_version: semantic_version.Version = config.appversion_nobuild()
T = TypeVar('T', bound=UPDATABLE_DATA)
@ -32,13 +36,13 @@ class SingleKill(NamedTuple):
match: str
reason: str
redact_fields: Optional[List[str]] = None
delete_fields: Optional[List[str]] = None
set_fields: Optional[Dict[str, Any]] = None
redact_fields: list[str] | None = None
delete_fields: list[str] | None = None
set_fields: dict[str, Any] | None = None
@property
def has_rules(self) -> bool:
"""Return whether or not this SingleKill can apply rules to a dict to make it safe to use."""
"""Return whether this SingleKill can apply rules to a dict to make it safe to use."""
return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields))
def apply_rules(self, target: T) -> T:
@ -119,9 +123,9 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
# it exists on this level, dont go further
break
elif isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()):
if isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()):
# there is a dotted key in here that can be used for this
# if theres a dotted key in here (must be a mapping), use that if we can
# if there's a dotted key in here (must be a mapping), use that if we can
keys = current.keys()
for k in filter(lambda x: '.' in x, keys):
@ -140,7 +144,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
key, _, path = path.partition('.')
if isinstance(current, Mapping):
current = current[key] # type: ignore # I really dont know at this point what you want from me mypy.
current = current[key] # type: ignore # I really don't know at this point what you want from me mypy.
elif isinstance(current, Sequence):
target_idx = _get_int(key) # mypy is broken. doesn't like := here.
@ -155,7 +159,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
_apply(current, path, to_set, delete)
def _get_int(s: str) -> Optional[int]:
def _get_int(s: str) -> int | None:
try:
return int(s)
except ValueError:
@ -166,7 +170,7 @@ class KillSwitches(NamedTuple):
"""One version's set of kill switches."""
version: semantic_version.SimpleSpec
kills: Dict[str, SingleKill]
kills: dict[str, SingleKill]
@staticmethod
def from_dict(data: KillSwitchSetJSON) -> KillSwitches:
@ -189,7 +193,7 @@ class DisabledResult(NamedTuple):
"""DisabledResult is the result returned from various is_disabled calls."""
disabled: bool
kill: Optional[SingleKill]
kill: SingleKill | None
@property
def reason(self) -> str:
@ -197,11 +201,11 @@ class DisabledResult(NamedTuple):
return self.kill.reason if self.kill is not None else ""
def has_kill(self) -> bool:
"""Return whether or not this DisabledResult has a Kill associated with it."""
"""Return whether this DisabledResult has a Kill associated with it."""
return self.kill is not None
def has_rules(self) -> bool:
"""Return whether or not the kill on this Result contains rules."""
"""Return whether the kill on this Result contains rules."""
# HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always
# be non-None at the point it is evaluated
return self.has_kill() and self.kill.has_rules # type: ignore
@ -210,12 +214,12 @@ class DisabledResult(NamedTuple):
class KillSwitchSet:
"""Queryable set of kill switches."""
def __init__(self, kill_switches: List[KillSwitches]) -> None:
def __init__(self, kill_switches: list[KillSwitches]) -> None:
self.kill_switches = kill_switches
def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult:
"""
Return whether or not the given feature ID is disabled by a killswitch for the given version.
Return whether the given feature ID is disabled by a killswitch for the given version.
:param id: The feature ID to check
:param version: The version to check killswitches for, defaults to the
@ -234,14 +238,14 @@ class KillSwitchSet:
return DisabledResult(False, None)
def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Return whether or not a given feature ID is disabled for the given version."""
"""Return whether a given feature ID is disabled for the given version."""
return self.get_disabled(id, version=version).disabled
def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str:
"""Return a reason for why the given id is disabled for the given version, if any."""
return self.get_disabled(id, version=version).reason
def kills_for_version(self, version: semantic_version.Version = _current_version) -> List[KillSwitches]:
def kills_for_version(self, version: semantic_version.Version = _current_version) -> list[KillSwitches]:
"""
Get all killswitch entries that apply to the given version.
@ -252,9 +256,9 @@ class KillSwitchSet:
def check_killswitch(
self, name: str, data: T, log=logger, version=_current_version
) -> Tuple[bool, T]:
) -> tuple[bool, T]:
"""
Check whether or not a killswitch is enabled. If it is, apply rules if any.
Check whether a killswitch is enabled. If it is, apply rules if any.
:param name: The killswitch to check
:param data: The data to modify if needed
@ -283,7 +287,7 @@ class KillSwitchSet:
log.info('Rules applied successfully, allowing execution to continue')
return False, new_data
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> Tuple[bool, T]:
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> tuple[bool, T]:
"""
Check multiple killswitches in order.
@ -323,16 +327,16 @@ class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101
class KillSwitchSetJSON(TypedDict): # noqa: D101
version: str
kills: Dict[str, SingleKillSwitchJSON]
kills: dict[str, SingleKillSwitchJSON]
class KillSwitchJSONFile(TypedDict): # noqa: D101
version: int
last_updated: str
kill_switches: List[KillSwitchSetJSON]
kill_switches: list[KillSwitchSetJSON]
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSONFile]:
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> KillSwitchJSONFile | None:
"""
Fetch the JSON representation of our kill switches.
@ -343,7 +347,7 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO
if target.startswith('file:'):
target = target.replace('file:', '')
try:
with open(target, 'r') as t:
with open(target) as t:
return json.load(t)
except FileNotFoundError:
@ -366,13 +370,13 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO
class _KillSwitchV1(TypedDict):
version: str
kills: Dict[str, str]
kills: dict[str, str]
class _KillSwitchJSONFileV1(TypedDict):
version: int
last_updated: str
kill_switches: List[_KillSwitchV1]
kill_switches: list[_KillSwitchV1]
def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
@ -402,7 +406,7 @@ def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
raise ValueError(f'Unknown Killswitch version {data["version"]}')
def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]:
def parse_kill_switches(data: KillSwitchJSONFile) -> list[KillSwitches]:
"""
Parse kill switch dict to List of KillSwitches.
@ -431,7 +435,7 @@ def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]:
return out
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = None) -> Optional[KillSwitchSet]:
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: str | None = None) -> KillSwitchSet | None:
"""
Get a kill switch set object.
@ -451,7 +455,7 @@ def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = N
def get_kill_switches_thread(
target, callback: Callable[[Optional[KillSwitchSet]], None], fallback: Optional[str] = None,
target, callback: Callable[[KillSwitchSet | None], None], fallback: str | None = None,
) -> None:
"""
Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available.
@ -469,7 +473,7 @@ def get_kill_switches_thread(
active: KillSwitchSet = KillSwitchSet([])
def setup_main_list(filename: Optional[str]):
def setup_main_list(filename: str | None):
"""
Set up the global set of kill switches for querying.
@ -498,7 +502,7 @@ def get_disabled(id: str, *, version: semantic_version.Version = _current_versio
return active.get_disabled(id, version=version)
def check_killswitch(name: str, data: T, log=logger) -> Tuple[bool, T]:
def check_killswitch(name: str, data: T, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_killswitch method."""
return active.check_killswitch(name, data, log)
@ -518,6 +522,6 @@ def get_reason(id: str, *, version: semantic_version.Version = _current_version)
return active.get_reason(id, version=version)
def kills_for_version(version: semantic_version.Version = _current_version) -> List[KillSwitches]:
def kills_for_version(version: semantic_version.Version = _current_version) -> list[KillSwitches]:
"""Query the global KillSwitchSet for kills matching a particular version."""
return active.kills_for_version(version)

136
l10n.py
View File

@ -1,18 +1,27 @@
#!/usr/bin/env python3
"""Localization with gettext is a pain on non-Unix systems. Use OSX-style strings files instead."""
"""
l10n.py - Localize using OSX-Style Strings.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
Localization with gettext is a pain on non-Unix systems.
"""
from __future__ import annotations
import builtins
import locale
import numbers
import os
import pathlib
import re
import sys
import warnings
from collections import OrderedDict
from contextlib import suppress
from os.path import basename, dirname, isdir, isfile, join
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, TextIO, Union, cast
from os import pardir, listdir, sep, makedirs
from os.path import basename, dirname, isdir, isfile, join, abspath, exists
from typing import TYPE_CHECKING, Iterable, TextIO, cast
from config import config
from EDMCLogging import get_main_logger
if TYPE_CHECKING:
def _(x: str) -> str: ...
@ -20,22 +29,16 @@ if TYPE_CHECKING:
# Note that this is also done in EDMarketConnector.py, and thus removing this here may not have a desired effect
try:
locale.setlocale(locale.LC_ALL, '')
except Exception:
# Locale env variables incorrect or locale package not installed/configured on Linux, mysterious reasons on Windows
print("Can't set locale!")
from config import config
from EDMCLogging import get_main_logger
logger = get_main_logger()
# Language name
LANGUAGE_ID = '!Language'
LOCALISATION_DIR = 'L10n'
if sys.platform == 'darwin':
from Foundation import ( # type: ignore # exists on Darwin
NSLocale, NSNumberFormatter, NSNumberFormatterDecimalStyle
@ -68,11 +71,11 @@ class _Translations:
FALLBACK = 'en' # strings in this code are in English
FALLBACK_NAME = 'English'
TRANS_RE = re.compile(r'\s*"((?:[^"]|(?:\"))+)"\s*=\s*"((?:[^"]|(?:\"))+)"\s*;\s*$')
TRANS_RE = re.compile(r'\s*"((?:[^"]|\")+)"\s*=\s*"((?:[^"]|\")+)"\s*;\s*$')
COMMENT_RE = re.compile(r'\s*/\*.*\*/\s*$')
def __init__(self) -> None:
self.translations: Dict[Optional[str], Dict[str, str]] = {None: {}}
self.translations: dict[str | None, dict[str, str]] = {None: {}}
def install_dummy(self) -> None:
"""
@ -112,7 +115,7 @@ class _Translations:
return
self.translations = {None: self.contents(cast(str, lang))}
for plugin in os.listdir(config.plugin_dir_path):
for plugin in listdir(config.plugin_dir_path):
plugin_path = join(config.plugin_dir_path, plugin, LOCALISATION_DIR)
if isdir(plugin_path):
try:
@ -126,7 +129,7 @@ class _Translations:
builtins.__dict__['_'] = self.translate
def contents(self, lang: str, plugin_path: Optional[str] = None) -> Dict[str, str]:
def contents(self, lang: str, plugin_path: str | None = None) -> dict[str, str]:
"""Load all the translations from a translation file."""
assert lang in self.available()
translations = {}
@ -151,7 +154,7 @@ class _Translations:
return translations
def translate(self, x: str, context: Optional[str] = None) -> str:
def translate(self, x: str, context: str | None = None) -> str:
"""
Translate the given string to the current lang.
@ -161,7 +164,7 @@ class _Translations:
"""
if context:
# TODO: There is probably a better way to go about this now.
context = context[len(config.plugin_dir)+1:].split(os.sep)[0]
context = context[len(config.plugin_dir)+1:].split(sep)[0]
if self.translations[None] and context not in self.translations:
logger.debug(f'No translations for {context!r}')
@ -172,23 +175,23 @@ class _Translations:
return self.translations[None].get(x) or str(x).replace(r'\"', '"').replace('{CR}', '\n')
def available(self) -> Set[str]:
def available(self) -> set[str]:
"""Return a list of available language codes."""
path = self.respath()
if getattr(sys, 'frozen', False) and sys.platform == 'darwin':
available = {
x[:-len('.lproj')] for x in os.listdir(path)
x[:-len('.lproj')] for x in listdir(path)
if x.endswith('.lproj') and isfile(join(x, 'Localizable.strings'))
}
else:
available = {x[:-len('.strings')] for x in os.listdir(path) if x.endswith('.strings')}
available = {x[:-len('.strings')] for x in listdir(path) if x.endswith('.strings')}
return available
def available_names(self) -> Dict[Optional[str], str]:
def available_names(self) -> dict[str | None, str]:
"""Available language names by code."""
names: Dict[Optional[str], str] = OrderedDict([
names: dict[str | None, str] = OrderedDict([
# LANG: The system default language choice in Settings > Appearance
(None, _('Default')), # Appearance theme and language setting
])
@ -200,20 +203,20 @@ class _Translations:
return names
def respath(self) -> pathlib.Path:
def respath(self) -> str:
"""Path to localisation files."""
if getattr(sys, 'frozen', False):
if sys.platform == 'darwin':
return (pathlib.Path(sys.executable).parents[0] / os.pardir / 'Resources').resolve()
return abspath(join(dirname(sys.executable), pardir, 'Resources'))
return pathlib.Path(dirname(sys.executable)) / LOCALISATION_DIR
return abspath(join(dirname(sys.executable), LOCALISATION_DIR))
elif __file__:
return pathlib.Path(__file__).parents[0] / LOCALISATION_DIR
if __file__:
return abspath(join(dirname(__file__), LOCALISATION_DIR))
return pathlib.Path(LOCALISATION_DIR)
return abspath(LOCALISATION_DIR)
def file(self, lang: str, plugin_path: Optional[str] = None) -> Optional[TextIO]:
def file(self, lang: str, plugin_path: str | None = None) -> TextIO | None:
"""
Open the given lang file for reading.
@ -222,20 +225,21 @@ class _Translations:
:return: the opened file (Note: This should be closed when done)
"""
if plugin_path:
f = pathlib.Path(plugin_path) / f'{lang}.strings'
if not f.exists():
file_path = join(plugin_path, f'{lang}.strings')
if not exists(file_path):
return None
try:
return f.open('r', encoding='utf-8')
return open(file_path, encoding='utf-8')
except OSError:
logger.exception(f'could not open {f}')
logger.exception(f'could not open {file_path}')
elif getattr(sys, 'frozen', False) and sys.platform == 'darwin':
return (self.respath() / f'{lang}.lproj' / 'Localizable.strings').open('r', encoding='utf-16')
res_path = join(self.respath(), f'{lang}.lproj', 'Localizable.strings')
return open(res_path, encoding='utf-16')
return (self.respath() / f'{lang}.strings').open('r', encoding='utf-8')
res_path = join(self.respath(), f'{lang}.strings')
return open(res_path, encoding='utf-8')
class _Locale:
@ -250,11 +254,11 @@ class _Locale:
self.float_formatter.setMinimumFractionDigits_(5)
self.float_formatter.setMaximumFractionDigits_(5)
def stringFromNumber(self, number: Union[float, int], decimals: int | None = None) -> str: # noqa: N802
def stringFromNumber(self, number: float | int, decimals: int | None = None) -> str: # noqa: N802
warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.'))
return self.string_from_number(number, decimals) # type: ignore
def numberFromString(self, string: str) -> Union[int, float, None]: # noqa: N802
def numberFromString(self, string: str) -> int | float | None: # noqa: N802
warnings.warn(DeprecationWarning('use _Locale.number_from_string instead.'))
return self.number_from_string(string)
@ -262,7 +266,7 @@ class _Locale:
warnings.warn(DeprecationWarning('use _Locale.preferred_languages instead.'))
return self.preferred_languages()
def string_from_number(self, number: Union[float, int], decimals: int = 5) -> str:
def string_from_number(self, number: float | int, decimals: int = 5) -> str:
"""
Convert a number to a string.
@ -285,11 +289,9 @@ class _Locale:
if not decimals and isinstance(number, numbers.Integral):
return locale.format_string('%d', number, True)
return locale.format_string('%.*f', (decimals, number), True)
else:
return locale.format_string('%.*f', (decimals, number), True)
def number_from_string(self, string: str) -> Union[int, float, None]:
def number_from_string(self, string: str) -> int | float | None:
"""
Convert a string to a number using the system locale.
@ -308,7 +310,17 @@ class _Locale:
return None
def preferred_languages(self) -> Iterable[str]: # noqa: CCR001
def wszarray_to_list(self, array):
offset = 0
while offset < len(array):
sz = ctypes.wstring_at(ctypes.addressof(array) + offset * 2) # type: ignore
if sz:
yield sz
offset += len(sz) + 1
else:
break
def preferred_languages(self) -> Iterable[str]:
"""
Return a list of preferred language codes.
@ -326,32 +338,21 @@ class _Locale:
elif sys.platform != 'win32':
# POSIX
lang = locale.getlocale()[0]
languages = lang and [lang.replace('_', '-')] or []
languages = [lang.replace('_', '-')] if lang else []
else:
def wszarray_to_list(array):
offset = 0
while offset < len(array):
sz = ctypes.wstring_at(ctypes.addressof(array) + offset*2)
if sz:
yield sz
offset += len(sz)+1
else:
break
num = ctypes.c_ulong()
size = ctypes.c_ulong(0)
languages = []
if GetUserPreferredUILanguages(
MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size)
MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size)
) and size.value:
buf = ctypes.create_unicode_buffer(size.value)
if GetUserPreferredUILanguages(
MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size)
MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size)
):
languages = wszarray_to_list(buf)
languages = self.wszarray_to_list(buf)
# HACK: <n/a> | 2021-12-11: OneSky calls "Chinese Simplified" "zh-Hans"
# in the name of the file, but that will be zh-CN in terms of
@ -369,14 +370,13 @@ Translations = _Translations()
# generate template strings file - like xgettext
# parsing is limited - only single ' or " delimited strings, and only one string per line
if __name__ == "__main__":
import re
regexp = re.compile(r'''_\([ur]?(['"])(((?<!\\)\\\1|.)+?)\1\)[^#]*(#.+)?''') # match a single line python literal
seen: Dict[str, str] = {}
seen: dict[str, str] = {}
for f in (
sorted(x for x in os.listdir('.') if x.endswith('.py')) +
sorted(join('plugins', x) for x in (os.listdir('plugins') if isdir('plugins') else []) if x.endswith('.py'))
sorted(x for x in listdir('.') if x.endswith('.py')) +
sorted(join('plugins', x) for x in (listdir('plugins') if isdir('plugins') else []) if x.endswith('.py'))
):
with open(f, 'r', encoding='utf-8') as h:
with open(f, encoding='utf-8') as h:
lineno = 0
for line in h:
lineno += 1
@ -386,9 +386,9 @@ if __name__ == "__main__":
(match.group(4) and (match.group(4)[1:].strip()) + '. ' or '') + f'[{basename(f)}]'
)
if seen:
target_path = pathlib.Path(LOCALISATION_DIR) / 'en.template.new'
target_path.parent.mkdir(exist_ok=True)
with target_path.open('w', encoding='utf-8') as target_file:
target_path = join(LOCALISATION_DIR, 'en.template.new')
makedirs(dirname(target_path), exist_ok=True)
with open(target_path, 'w', encoding='utf-8') as target_file:
target_file.write(f'/* Language name */\n"{LANGUAGE_ID}" = "English";\n\n')
for thing in sorted(seen, key=str.lower):
if seen[thing]:

View File

@ -1,13 +1,17 @@
"""Export ship loadout in Companion API json format."""
"""
loadout.py - Export ship loadout in Companion API json format.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json
import os
import pathlib
import re
import time
from os import listdir
from os.path import join
from typing import Optional
import companion
import util_ships
from config import config
@ -16,7 +20,7 @@ from EDMCLogging import get_main_logger
logger = get_main_logger()
def export(data: companion.CAPIData, requested_filename: Optional[str] = None) -> None:
def export(data: companion.CAPIData, requested_filename: str | None = None) -> None:
"""
Write Ship Loadout in Companion API JSON format.
@ -32,15 +36,14 @@ def export(data: companion.CAPIData, requested_filename: Optional[str] = None) -
with open(requested_filename, 'wt') as h:
h.write(string)
return
elif not requested_filename:
if not requested_filename:
logger.error(f"{requested_filename=} is not valid")
return
# Look for last ship of this type
ship = util_ships.ship_file_name(data['ship'].get('shipName'), data['ship']['name'])
regexp = re.compile(re.escape(ship) + r'\.\d\d\d\d\-\d\d\-\d\dT\d\d\.\d\d\.\d\d\.txt')
oldfiles = sorted([x for x in os.listdir(config.get_str('outdir')) if regexp.match(x)])
regexp = re.compile(re.escape(ship) + r'\.\d\d\d\d-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt')
oldfiles = sorted([x for x in listdir(config.get_str('outdir')) if regexp.match(x)])
if oldfiles:
with open(join(config.get_str('outdir'), oldfiles[-1]), 'rU') as h:
if h.read() == string:
@ -50,10 +53,9 @@ def export(data: companion.CAPIData, requested_filename: Optional[str] = None) -
# Write
with open(
pathlib.Path(config.get_str('outdir')) / pathlib.Path(
ship + '.' + time.strftime('%Y-%m-%dT%H.%M.%S', time.localtime(query_time)) + '.txt'
),
'wt'
) as h:
output_directory = config.get_str('outdir')
ship_time = time.strftime('%Y-%m-%dT%H.%M.%S', time.localtime(query_time))
file_path = join(output_directory, f"{ship}.{ship_time}.txt")
with open(file_path, 'wt') as h:
h.write(string)

View File

@ -1,12 +1,17 @@
"""protocol handler for cAPI authorisation."""
# spell-checker: words ntdll GURL alloc wfile instantiatable pyright
"""
protocol.py - Protocol Handler for cAPI Auth.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import os
import sys
import threading
import urllib.error
import urllib.parse
import urllib.request
from typing import TYPE_CHECKING, Optional, Type
from urllib import parse
from typing import TYPE_CHECKING, Type
from config import config
from constants import appname, protocolhandler_redirect
@ -34,7 +39,7 @@ class GenericProtocolHandler:
def __init__(self) -> None:
self.redirect = protocolhandler_redirect # Base redirection URL
self.master: 'tkinter.Tk' = None # type: ignore
self.lastpayload: Optional[str] = None
self.lastpayload: str | None = None
def start(self, master: 'tkinter.Tk') -> None:
"""Start Protocol Handler."""
@ -42,7 +47,6 @@ class GenericProtocolHandler:
def close(self) -> None:
"""Stop / Close Protocol Handler."""
pass
def event(self, url: str) -> None:
"""Generate an auth event."""
@ -75,7 +79,7 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i
def start(self, master: 'tkinter.Tk') -> None:
"""Start Protocol Handler."""
GenericProtocolHandler.start(self, master)
self.lasturl: Optional[str] = None
self.lasturl: str | None = None
self.eventhandler = EventHandler.alloc().init()
def poll(self) -> None:
@ -106,12 +110,11 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i
def handleEvent_withReplyEvent_(self, event, replyEvent) -> None: # noqa: N802 N803 # Required to override
"""Actual event handling from NSAppleEventManager."""
protocolhandler.lasturl = urllib.parse.unquote( # noqa: F821: type: ignore # It's going to be a DPH in
# this code
protocolhandler.lasturl = parse.unquote(
event.paramDescriptorForKeyword_(keyDirectObject).stringValue()
).strip()
protocolhandler.master.after(DarwinProtocolHandler.POLL, protocolhandler.poll) # noqa: F821 # type: ignore
protocolhandler.master.after(DarwinProtocolHandler.POLL, protocolhandler.poll)
elif (config.auth_force_edmc_protocol
@ -124,9 +127,8 @@ elif (config.auth_force_edmc_protocol
# This could be false if you use auth_force_edmc_protocol, but then you get to keep the pieces
assert sys.platform == 'win32'
# spell-checker: words HBRUSH HICON WPARAM wstring WNDCLASS HMENU HGLOBAL
from ctypes import windll # type: ignore
from ctypes import ( # type: ignore
POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at
windll, POINTER, WINFUNCTYPE, Structure, byref, c_long, c_void_p, create_unicode_buffer, wstring_at
)
from ctypes.wintypes import (
ATOM, BOOL, DWORD, HBRUSH, HGLOBAL, HICON, HINSTANCE, HMENU, HWND, INT, LPARAM, LPCWSTR, LPMSG, LPVOID, LPWSTR,
@ -271,7 +273,7 @@ elif (config.auth_force_edmc_protocol
def __init__(self) -> None:
super().__init__()
self.thread: Optional[threading.Thread] = None
self.thread: threading.Thread | None = None
def start(self, master: 'tkinter.Tk') -> None:
"""Start the DDE thread."""
@ -342,7 +344,7 @@ elif (config.auth_force_edmc_protocol
if args.lower().startswith('open("') and args.endswith('")'):
logger.trace_if('frontier-auth.windows', f'args are: {args}')
url = urllib.parse.unquote(args[6:-2]).strip()
url = parse.unquote(args[6:-2]).strip()
if url.startswith(self.redirect):
logger.debug(f'Message starts with {self.redirect}')
self.event(url)
@ -381,7 +383,7 @@ else: # Linux / Run from source
if not os.getenv("EDMC_NO_UI"):
logger.info(f'Web server listening on {self.redirect}')
self.thread: Optional[threading.Thread] = None
self.thread: threading.Thread | None = None
def start(self, master: 'tkinter.Tk') -> None:
"""Start the HTTP server thread."""
@ -420,15 +422,14 @@ else: # Linux / Run from source
def parse(self) -> bool:
"""Parse a request."""
logger.trace_if('frontier-auth.http', f'Got message on path: {self.path}')
url = urllib.parse.unquote(self.path)
url = parse.unquote(self.path)
if url.startswith('/auth'):
logger.debug('Request starts with /auth, sending to protocolhandler.event()')
protocolhandler.event(url) # noqa: F821
protocolhandler.event(url)
self.send_response(200)
return True
else:
self.send_response(404) # Not found
return False
self.send_response(404) # Not found
return False
def do_HEAD(self) -> None: # noqa: N802 # Required to override
"""Handle HEAD Request."""
@ -440,14 +441,37 @@ else: # Linux / Run from source
if self.parse():
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><title>Authentication successful</title></head>'.encode('utf-8'))
self.wfile.write('<body><p>Authentication successful</p></body>'.encode('utf-8'))
self.wfile.write(self._generate_auth_response().encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
def log_request(self, code: int | str = '-', size: int | str = '-') -> None:
"""Override to prevent logging."""
pass
def _generate_auth_response(self) -> str:
"""
Generate the authentication response HTML.
:return: The HTML content of the authentication response.
"""
return (
'<html>'
'<head>'
'<title>Authentication successful - Elite: Dangerous</title>'
'<style>'
'body { background-color: #000; color: #fff; font-family: "Helvetica Neue", Arial, sans-serif; }'
'h1 { text-align: center; margin-top: 100px; }'
'p { text-align: center; }'
'</style>'
'</head>'
'<body>'
'<h1>Authentication successful</h1>'
'<p>Thank you for authenticating.</p>'
'<p>Please close this browser tab now.</p>'
'</body>'
'</html>'
)
def get_handler_impl() -> Type[GenericProtocolHandler]:
@ -459,14 +483,13 @@ def get_handler_impl() -> Type[GenericProtocolHandler]:
if sys.platform == 'darwin' and getattr(sys, 'frozen', False):
return DarwinProtocolHandler # pyright: reportUnboundVariable=false
elif (
if (
(sys.platform == 'win32' and config.auth_force_edmc_protocol)
or (getattr(sys, 'frozen', False) and not is_wine and not config.auth_force_localserver)
):
return WindowsProtocolHandler
else:
return LinuxProtocolHandler
return LinuxProtocolHandler
# *late init* singleton

View File

@ -1,5 +1,9 @@
"""
A clickable ttk label for HTTP links.
ttkHyperlinkLabel.py - Clickable ttk labels.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
In addition to standard ttk.Label arguments, takes the following arguments:
url: The URL as a string that the user will be sent to on clicking on
@ -14,6 +18,8 @@ In addition to standard ttk.Label arguments, takes the following arguments:
May be imported by plugins
"""
from __future__ import annotations
import sys
import tkinter as tk
import webbrowser
@ -26,14 +32,22 @@ if TYPE_CHECKING:
# FIXME: Split this into multi-file module to separate the platforms
class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object): # type: ignore
class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label): # type: ignore
"""Clickable label for HTTP links."""
def __init__(self, master: tk.Frame | None = None, **kw: Any) -> None:
self.url = 'url' in kw and kw.pop('url') or None
"""
Initialize the HyperlinkLabel.
:param master: The master widget.
:param kw: Additional keyword arguments.
"""
self.font_u: tk_font.Font
self.font_n = None
self.url = kw.pop('url', None)
self.popup_copy = kw.pop('popup_copy', False)
self.underline = kw.pop('underline', None) # override ttk.Label's underline
self.foreground = kw.get('foreground') or 'blue'
self.foreground = kw.get('foreground', 'blue')
self.disabledforeground = kw.pop('disabledforeground', ttk.Style().lookup(
'TLabel', 'foreground', ('disabled',))) # ttk.Label doesn't support disabledforeground option
@ -44,11 +58,11 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object)
tk.Label.__init__(self, master, **kw)
else:
ttk.Label.__init__(self, master, **kw) # type: ignore
ttk.Label.__init__(self, master, **kw)
self.bind('<Button-1>', self._click)
self.menu = tk.Menu(None, tearoff=tk.FALSE)
self.menu = tk.Menu(tearoff=tk.FALSE)
# LANG: Label for 'Copy' as in 'Copy and Paste'
self.menu.add_command(label=_('Copy'), command=self.copy) # As in Copy and Paste
self.bind(sys.platform == 'darwin' and '<Button-2>' or '<Button-3>', self._contextmenu)
@ -74,29 +88,30 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object)
setattr(self, thing, kw[thing])
# Emulate disabledforeground option for ttk.Label
if kw.get('state') == tk.DISABLED:
if 'foreground' not in kw:
if 'state' in kw:
state = kw['state']
if state == tk.DISABLED and 'foreground' not in kw:
kw['foreground'] = self.disabledforeground
elif 'state' in kw:
if 'foreground' not in kw:
elif state != tk.DISABLED and 'foreground' not in kw:
kw['foreground'] = self.foreground
if 'font' in kw:
self.font_n = kw['font']
self.font_u = tk_font.Font(font=self.font_n)
self.font_u.configure(underline=True)
kw['font'] = self.underline is True and self.font_u or self.font_n
kw['font'] = self.font_u if self.underline is True else self.font_n
if 'cursor' not in kw:
if (kw['state'] if 'state' in kw else str(self['state'])) == tk.DISABLED:
state = kw.get('state', str(self['state']))
if state == tk.DISABLED:
kw['cursor'] = 'arrow' # System default
elif self.url and (kw['text'] if 'text' in kw else self['text']):
kw['cursor'] = sys.platform == 'darwin' and 'pointinghand' or 'hand2'
kw['cursor'] = 'pointinghand' if sys.platform == 'darwin' else 'hand2'
else:
kw['cursor'] = (sys.platform == 'darwin' and 'notallowed') or (
sys.platform == 'win32' and 'no') or 'circle'
kw['cursor'] = 'notallowed' if sys.platform == 'darwin' else (
'no' if sys.platform == 'win32' else 'circle')
return super(HyperlinkLabel, self).configure(cnf, **kw)
return super().configure(cnf, **kw)
def __setitem__(self, key: str, value: Any) -> None:
"""
@ -105,15 +120,15 @@ class HyperlinkLabel(sys.platform == 'darwin' and tk.Label or ttk.Label, object)
:param key: option name
:param value: option value
"""
self.configure(None, **{key: value})
self.configure(**{key: value})
def _enter(self, event: tk.Event) -> None:
if self.url and self.underline is not False and str(self['state']) != tk.DISABLED:
super(HyperlinkLabel, self).configure(font=self.font_u)
super().configure(font=self.font_u)
def _leave(self, event: tk.Event) -> None:
if not self.underline:
super(HyperlinkLabel, self).configure(font=self.font_n)
super().configure(font=self.font_n)
def _click(self, event: tk.Event) -> None:
if self.url and self['text'] and str(self['state']) != tk.DISABLED: