1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-13 07:47:14 +03:00

[2051] Dashboard, Killswitch, l10n

This commit is contained in:
David Sangrey 2023-11-17 19:21:48 -05:00
parent 95c73179c6
commit 45a38ccdaa
No known key found for this signature in database
GPG Key ID: 3AEADBB0186884BC
3 changed files with 133 additions and 137 deletions

View File

@ -1,32 +1,31 @@
"""Handle the game Status.json file.""" """
dashboard.py - Handle the game Status.json file.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json import json
import pathlib
import sys import sys
import time import time
import tkinter as tk import tkinter as tk
from calendar import timegm from os.path import getsize, isdir, isfile, join
from os.path import getsize, isdir, isfile from typing import Any, cast
from typing import Any, Dict, Optional, cast
from watchdog.observers.api import BaseObserver from watchdog.observers.api import BaseObserver
from config import config from config import config
from EDMCLogging import get_main_logger from EDMCLogging import get_main_logger
logger = get_main_logger() logger = get_main_logger()
if sys.platform == 'darwin': if sys.platform in ('darwin', 'win32'):
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
elif sys.platform == 'win32':
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
else: else:
# Linux's inotify doesn't work over CIFS or NFS, so poll # Linux's inotify doesn't work over CIFS or NFS, so poll
FileSystemEventHandler = object # dummy class FileSystemEventHandler: # type: ignore
"""Dummy class to represent a file system event handler on platforms other than macOS and Windows."""
class Dashboard(FileSystemEventHandler): class Dashboard(FileSystemEventHandler):
@ -39,9 +38,9 @@ class Dashboard(FileSystemEventHandler):
self.session_start: int = int(time.time()) self.session_start: int = int(time.time())
self.root: tk.Tk = None # type: ignore self.root: tk.Tk = None # type: ignore
self.currentdir: str = None # type: ignore # The actual logdir that we're monitoring self.currentdir: str = None # type: ignore # The actual logdir that we're monitoring
self.observer: Optional[Observer] = None # type: ignore self.observer: Observer | None = None # type: ignore
self.observed = None # a watchdog ObservedWatch, or None if polling self.observed = None # a watchdog ObservedWatch, or None if polling
self.status: Dict[str, Any] = {} # Current status for communicating status back to main thread self.status: dict[str, Any] = {} # Current status for communicating status back to main thread
def start(self, root: tk.Tk, started: int) -> bool: def start(self, root: tk.Tk, started: int) -> bool:
""" """
@ -56,10 +55,8 @@ class Dashboard(FileSystemEventHandler):
self.session_start = started self.session_start = started
logdir = config.get_str('journaldir', default=config.default_journal_dir) logdir = config.get_str('journaldir', default=config.default_journal_dir)
if logdir == '': logdir = logdir or config.default_journal_dir
logdir = config.default_journal_dir if not isdir(logdir):
if not logdir or not isdir(logdir):
logger.info(f"No logdir, or it isn't a directory: {logdir=}") logger.info(f"No logdir, or it isn't a directory: {logdir=}")
self.stop() self.stop()
return False return False
@ -74,7 +71,7 @@ class Dashboard(FileSystemEventHandler):
# File system events are unreliable/non-existent over network drives on Linux. # File system events are unreliable/non-existent over network drives on Linux.
# We can't easily tell whether a path points to a network drive, so assume # We can't easily tell whether a path points to a network drive, so assume
# any non-standard logdir might be on a network drive and poll instead. # any non-standard logdir might be on a network drive and poll instead.
if not (sys.platform != 'win32') and not self.observer: if sys.platform == 'win32' and not self.observer:
logger.debug('Setting up observer...') logger.debug('Setting up observer...')
self.observer = Observer() self.observer = Observer()
self.observer.daemon = True self.observer.daemon = True
@ -87,7 +84,7 @@ class Dashboard(FileSystemEventHandler):
self.observer = None # type: ignore self.observer = None # type: ignore
logger.debug('Done') logger.debug('Done')
if not self.observed and not (sys.platform != 'win32'): if not self.observed and sys.platform == 'win32':
logger.debug('Starting observer...') logger.debug('Starting observer...')
self.observed = cast(BaseObserver, self.observer).schedule(self, self.currentdir) self.observed = cast(BaseObserver, self.observer).schedule(self, self.currentdir)
logger.debug('Done') logger.debug('Done')
@ -178,22 +175,17 @@ class Dashboard(FileSystemEventHandler):
""" """
if config.shutting_down: if config.shutting_down:
return return
try: try:
with (pathlib.Path(self.currentdir) / 'Status.json').open('rb') as h: status_json_path = join(self.currentdir, 'Status.json')
with open(status_json_path, 'rb') as h:
data = h.read().strip() data = h.read().strip()
if data: # Can be empty if polling while the file is being re-written if data: # Can be empty if polling while the file is being re-written
entry = json.loads(data) entry = json.loads(data)
# Status file is shared between beta and live. Filter out status not in this game session.
# Status file is shared between beta and live. So filter out status not in this game session. entry_timestamp = time.mktime(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ'))
if ( if entry_timestamp >= self.session_start and self.status != entry:
timegm(time.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ')) >= self.session_start
and self.status != entry
):
self.status = entry self.status = entry
self.root.event_generate('<<DashboardEvent>>', when="tail") self.root.event_generate('<<DashboardEvent>>', when="tail")
except Exception: except Exception:
logger.exception('Processing Status.json') logger.exception('Processing Status.json')

View File

@ -1,18 +1,22 @@
"""Fetch kill switches from EDMC Repo.""" """
killswitch.py - Fetch kill switches from EDMC Repo.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations from __future__ import annotations
import json import json
import threading import threading
from copy import deepcopy from copy import deepcopy
from typing import ( from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Sequence, TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, MutableSequence, NamedTuple, Sequence,
Tuple, TypedDict, TypeVar, Union, cast TypedDict, TypeVar, cast, Union
) )
import requests import requests
import semantic_version import semantic_version
from semantic_version.base import Version from semantic_version.base import Version
import config import config
import EDMCLogging import EDMCLogging
@ -21,7 +25,7 @@ logger = EDMCLogging.get_main_logger()
OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json' OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json'
DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json' DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json'
CURRENT_KILLSWITCH_VERSION = 2 CURRENT_KILLSWITCH_VERSION = 2
UPDATABLE_DATA = Union[Mapping, Sequence] UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style
_current_version: semantic_version.Version = config.appversion_nobuild() _current_version: semantic_version.Version = config.appversion_nobuild()
T = TypeVar('T', bound=UPDATABLE_DATA) T = TypeVar('T', bound=UPDATABLE_DATA)
@ -32,13 +36,13 @@ class SingleKill(NamedTuple):
match: str match: str
reason: str reason: str
redact_fields: Optional[List[str]] = None redact_fields: list[str] | None = None
delete_fields: Optional[List[str]] = None delete_fields: list[str] | None = None
set_fields: Optional[Dict[str, Any]] = None set_fields: dict[str, Any] | None = None
@property @property
def has_rules(self) -> bool: def has_rules(self) -> bool:
"""Return whether or not this SingleKill can apply rules to a dict to make it safe to use.""" """Return whether this SingleKill can apply rules to a dict to make it safe to use."""
return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields)) return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields))
def apply_rules(self, target: T) -> T: def apply_rules(self, target: T) -> T:
@ -119,9 +123,9 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
# it exists on this level, dont go further # it exists on this level, dont go further
break break
elif isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()): if isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()):
# there is a dotted key in here that can be used for this # there is a dotted key in here that can be used for this
# if theres a dotted key in here (must be a mapping), use that if we can # if there's a dotted key in here (must be a mapping), use that if we can
keys = current.keys() keys = current.keys()
for k in filter(lambda x: '.' in x, keys): for k in filter(lambda x: '.' in x, keys):
@ -140,7 +144,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
key, _, path = path.partition('.') key, _, path = path.partition('.')
if isinstance(current, Mapping): if isinstance(current, Mapping):
current = current[key] # type: ignore # I really dont know at this point what you want from me mypy. current = current[key] # type: ignore # I really don't know at this point what you want from me mypy.
elif isinstance(current, Sequence): elif isinstance(current, Sequence):
target_idx = _get_int(key) # mypy is broken. doesn't like := here. target_idx = _get_int(key) # mypy is broken. doesn't like := here.
@ -155,7 +159,7 @@ def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False):
_apply(current, path, to_set, delete) _apply(current, path, to_set, delete)
def _get_int(s: str) -> Optional[int]: def _get_int(s: str) -> int | None:
try: try:
return int(s) return int(s)
except ValueError: except ValueError:
@ -166,7 +170,7 @@ class KillSwitches(NamedTuple):
"""One version's set of kill switches.""" """One version's set of kill switches."""
version: semantic_version.SimpleSpec version: semantic_version.SimpleSpec
kills: Dict[str, SingleKill] kills: dict[str, SingleKill]
@staticmethod @staticmethod
def from_dict(data: KillSwitchSetJSON) -> KillSwitches: def from_dict(data: KillSwitchSetJSON) -> KillSwitches:
@ -189,7 +193,7 @@ class DisabledResult(NamedTuple):
"""DisabledResult is the result returned from various is_disabled calls.""" """DisabledResult is the result returned from various is_disabled calls."""
disabled: bool disabled: bool
kill: Optional[SingleKill] kill: SingleKill | None
@property @property
def reason(self) -> str: def reason(self) -> str:
@ -197,11 +201,11 @@ class DisabledResult(NamedTuple):
return self.kill.reason if self.kill is not None else "" return self.kill.reason if self.kill is not None else ""
def has_kill(self) -> bool: def has_kill(self) -> bool:
"""Return whether or not this DisabledResult has a Kill associated with it.""" """Return whether this DisabledResult has a Kill associated with it."""
return self.kill is not None return self.kill is not None
def has_rules(self) -> bool: def has_rules(self) -> bool:
"""Return whether or not the kill on this Result contains rules.""" """Return whether the kill on this Result contains rules."""
# HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always # HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always
# be non-None at the point it is evaluated # be non-None at the point it is evaluated
return self.has_kill() and self.kill.has_rules # type: ignore return self.has_kill() and self.kill.has_rules # type: ignore
@ -210,12 +214,12 @@ class DisabledResult(NamedTuple):
class KillSwitchSet: class KillSwitchSet:
"""Queryable set of kill switches.""" """Queryable set of kill switches."""
def __init__(self, kill_switches: List[KillSwitches]) -> None: def __init__(self, kill_switches: list[KillSwitches]) -> None:
self.kill_switches = kill_switches self.kill_switches = kill_switches
def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult: def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult:
""" """
Return whether or not the given feature ID is disabled by a killswitch for the given version. Return whether the given feature ID is disabled by a killswitch for the given version.
:param id: The feature ID to check :param id: The feature ID to check
:param version: The version to check killswitches for, defaults to the :param version: The version to check killswitches for, defaults to the
@ -234,14 +238,14 @@ class KillSwitchSet:
return DisabledResult(False, None) return DisabledResult(False, None)
def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool: def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Return whether or not a given feature ID is disabled for the given version.""" """Return whether a given feature ID is disabled for the given version."""
return self.get_disabled(id, version=version).disabled return self.get_disabled(id, version=version).disabled
def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str: def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str:
"""Return a reason for why the given id is disabled for the given version, if any.""" """Return a reason for why the given id is disabled for the given version, if any."""
return self.get_disabled(id, version=version).reason return self.get_disabled(id, version=version).reason
def kills_for_version(self, version: semantic_version.Version = _current_version) -> List[KillSwitches]: def kills_for_version(self, version: semantic_version.Version = _current_version) -> list[KillSwitches]:
""" """
Get all killswitch entries that apply to the given version. Get all killswitch entries that apply to the given version.
@ -252,9 +256,9 @@ class KillSwitchSet:
def check_killswitch( def check_killswitch(
self, name: str, data: T, log=logger, version=_current_version self, name: str, data: T, log=logger, version=_current_version
) -> Tuple[bool, T]: ) -> tuple[bool, T]:
""" """
Check whether or not a killswitch is enabled. If it is, apply rules if any. Check whether a killswitch is enabled. If it is, apply rules if any.
:param name: The killswitch to check :param name: The killswitch to check
:param data: The data to modify if needed :param data: The data to modify if needed
@ -283,7 +287,7 @@ class KillSwitchSet:
log.info('Rules applied successfully, allowing execution to continue') log.info('Rules applied successfully, allowing execution to continue')
return False, new_data return False, new_data
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> Tuple[bool, T]: def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> tuple[bool, T]:
""" """
Check multiple killswitches in order. Check multiple killswitches in order.
@ -323,16 +327,16 @@ class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101
class KillSwitchSetJSON(TypedDict): # noqa: D101 class KillSwitchSetJSON(TypedDict): # noqa: D101
version: str version: str
kills: Dict[str, SingleKillSwitchJSON] kills: dict[str, SingleKillSwitchJSON]
class KillSwitchJSONFile(TypedDict): # noqa: D101 class KillSwitchJSONFile(TypedDict): # noqa: D101
version: int version: int
last_updated: str last_updated: str
kill_switches: List[KillSwitchSetJSON] kill_switches: list[KillSwitchSetJSON]
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSONFile]: def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> KillSwitchJSONFile | None:
""" """
Fetch the JSON representation of our kill switches. Fetch the JSON representation of our kill switches.
@ -343,7 +347,7 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO
if target.startswith('file:'): if target.startswith('file:'):
target = target.replace('file:', '') target = target.replace('file:', '')
try: try:
with open(target, 'r') as t: with open(target) as t:
return json.load(t) return json.load(t)
except FileNotFoundError: except FileNotFoundError:
@ -366,13 +370,13 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSO
class _KillSwitchV1(TypedDict): class _KillSwitchV1(TypedDict):
version: str version: str
kills: Dict[str, str] kills: dict[str, str]
class _KillSwitchJSONFileV1(TypedDict): class _KillSwitchJSONFileV1(TypedDict):
version: int version: int
last_updated: str last_updated: str
kill_switches: List[_KillSwitchV1] kill_switches: list[_KillSwitchV1]
def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile: def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
@ -402,7 +406,7 @@ def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
raise ValueError(f'Unknown Killswitch version {data["version"]}') raise ValueError(f'Unknown Killswitch version {data["version"]}')
def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]: def parse_kill_switches(data: KillSwitchJSONFile) -> list[KillSwitches]:
""" """
Parse kill switch dict to List of KillSwitches. Parse kill switch dict to List of KillSwitches.
@ -431,7 +435,7 @@ def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]:
return out return out
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = None) -> Optional[KillSwitchSet]: def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: str | None = None) -> KillSwitchSet | None:
""" """
Get a kill switch set object. Get a kill switch set object.
@ -451,7 +455,7 @@ def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = N
def get_kill_switches_thread( def get_kill_switches_thread(
target, callback: Callable[[Optional[KillSwitchSet]], None], fallback: Optional[str] = None, target, callback: Callable[[KillSwitchSet | None], None], fallback: str | None = None,
) -> None: ) -> None:
""" """
Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available. Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available.
@ -469,7 +473,7 @@ def get_kill_switches_thread(
active: KillSwitchSet = KillSwitchSet([]) active: KillSwitchSet = KillSwitchSet([])
def setup_main_list(filename: Optional[str]): def setup_main_list(filename: str | None):
""" """
Set up the global set of kill switches for querying. Set up the global set of kill switches for querying.
@ -498,7 +502,7 @@ def get_disabled(id: str, *, version: semantic_version.Version = _current_versio
return active.get_disabled(id, version=version) return active.get_disabled(id, version=version)
def check_killswitch(name: str, data: T, log=logger) -> Tuple[bool, T]: def check_killswitch(name: str, data: T, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_killswitch method.""" """Query the global KillSwitchSet#check_killswitch method."""
return active.check_killswitch(name, data, log) return active.check_killswitch(name, data, log)
@ -518,6 +522,6 @@ def get_reason(id: str, *, version: semantic_version.Version = _current_version)
return active.get_reason(id, version=version) return active.get_reason(id, version=version)
def kills_for_version(version: semantic_version.Version = _current_version) -> List[KillSwitches]: def kills_for_version(version: semantic_version.Version = _current_version) -> list[KillSwitches]:
"""Query the global KillSwitchSet for kills matching a particular version.""" """Query the global KillSwitchSet for kills matching a particular version."""
return active.kills_for_version(version) return active.kills_for_version(version)

136
l10n.py
View File

@ -1,18 +1,27 @@
#!/usr/bin/env python3 """
"""Localization with gettext is a pain on non-Unix systems. Use OSX-style strings files instead.""" l10n.py - Localize using OSX-Style Strings.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
Localization with gettext is a pain on non-Unix systems.
"""
from __future__ import annotations
import builtins import builtins
import locale import locale
import numbers import numbers
import os
import pathlib
import re import re
import sys import sys
import warnings import warnings
from collections import OrderedDict from collections import OrderedDict
from contextlib import suppress from contextlib import suppress
from os.path import basename, dirname, isdir, isfile, join from os import pardir, listdir, sep, makedirs
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, TextIO, Union, cast from os.path import basename, dirname, isdir, isfile, join, abspath, exists
from typing import TYPE_CHECKING, Iterable, TextIO, cast
from config import config
from EDMCLogging import get_main_logger
if TYPE_CHECKING: if TYPE_CHECKING:
def _(x: str) -> str: ... def _(x: str) -> str: ...
@ -20,22 +29,16 @@ if TYPE_CHECKING:
# Note that this is also done in EDMarketConnector.py, and thus removing this here may not have a desired effect # Note that this is also done in EDMarketConnector.py, and thus removing this here may not have a desired effect
try: try:
locale.setlocale(locale.LC_ALL, '') locale.setlocale(locale.LC_ALL, '')
except Exception: except Exception:
# Locale env variables incorrect or locale package not installed/configured on Linux, mysterious reasons on Windows # Locale env variables incorrect or locale package not installed/configured on Linux, mysterious reasons on Windows
print("Can't set locale!") print("Can't set locale!")
from config import config
from EDMCLogging import get_main_logger
logger = get_main_logger() logger = get_main_logger()
# Language name # Language name
LANGUAGE_ID = '!Language' LANGUAGE_ID = '!Language'
LOCALISATION_DIR = 'L10n' LOCALISATION_DIR = 'L10n'
if sys.platform == 'darwin': if sys.platform == 'darwin':
from Foundation import ( # type: ignore # exists on Darwin from Foundation import ( # type: ignore # exists on Darwin
NSLocale, NSNumberFormatter, NSNumberFormatterDecimalStyle NSLocale, NSNumberFormatter, NSNumberFormatterDecimalStyle
@ -68,11 +71,11 @@ class _Translations:
FALLBACK = 'en' # strings in this code are in English FALLBACK = 'en' # strings in this code are in English
FALLBACK_NAME = 'English' FALLBACK_NAME = 'English'
TRANS_RE = re.compile(r'\s*"((?:[^"]|(?:\"))+)"\s*=\s*"((?:[^"]|(?:\"))+)"\s*;\s*$') TRANS_RE = re.compile(r'\s*"((?:[^"]|\")+)"\s*=\s*"((?:[^"]|\")+)"\s*;\s*$')
COMMENT_RE = re.compile(r'\s*/\*.*\*/\s*$') COMMENT_RE = re.compile(r'\s*/\*.*\*/\s*$')
def __init__(self) -> None: def __init__(self) -> None:
self.translations: Dict[Optional[str], Dict[str, str]] = {None: {}} self.translations: dict[str | None, dict[str, str]] = {None: {}}
def install_dummy(self) -> None: def install_dummy(self) -> None:
""" """
@ -112,7 +115,7 @@ class _Translations:
return return
self.translations = {None: self.contents(cast(str, lang))} self.translations = {None: self.contents(cast(str, lang))}
for plugin in os.listdir(config.plugin_dir_path): for plugin in listdir(config.plugin_dir_path):
plugin_path = join(config.plugin_dir_path, plugin, LOCALISATION_DIR) plugin_path = join(config.plugin_dir_path, plugin, LOCALISATION_DIR)
if isdir(plugin_path): if isdir(plugin_path):
try: try:
@ -126,7 +129,7 @@ class _Translations:
builtins.__dict__['_'] = self.translate builtins.__dict__['_'] = self.translate
def contents(self, lang: str, plugin_path: Optional[str] = None) -> Dict[str, str]: def contents(self, lang: str, plugin_path: str | None = None) -> dict[str, str]:
"""Load all the translations from a translation file.""" """Load all the translations from a translation file."""
assert lang in self.available() assert lang in self.available()
translations = {} translations = {}
@ -151,7 +154,7 @@ class _Translations:
return translations return translations
def translate(self, x: str, context: Optional[str] = None) -> str: def translate(self, x: str, context: str | None = None) -> str:
""" """
Translate the given string to the current lang. Translate the given string to the current lang.
@ -161,7 +164,7 @@ class _Translations:
""" """
if context: if context:
# TODO: There is probably a better way to go about this now. # TODO: There is probably a better way to go about this now.
context = context[len(config.plugin_dir)+1:].split(os.sep)[0] context = context[len(config.plugin_dir)+1:].split(sep)[0]
if self.translations[None] and context not in self.translations: if self.translations[None] and context not in self.translations:
logger.debug(f'No translations for {context!r}') logger.debug(f'No translations for {context!r}')
@ -172,23 +175,23 @@ class _Translations:
return self.translations[None].get(x) or str(x).replace(r'\"', '"').replace('{CR}', '\n') return self.translations[None].get(x) or str(x).replace(r'\"', '"').replace('{CR}', '\n')
def available(self) -> Set[str]: def available(self) -> set[str]:
"""Return a list of available language codes.""" """Return a list of available language codes."""
path = self.respath() path = self.respath()
if getattr(sys, 'frozen', False) and sys.platform == 'darwin': if getattr(sys, 'frozen', False) and sys.platform == 'darwin':
available = { available = {
x[:-len('.lproj')] for x in os.listdir(path) x[:-len('.lproj')] for x in listdir(path)
if x.endswith('.lproj') and isfile(join(x, 'Localizable.strings')) if x.endswith('.lproj') and isfile(join(x, 'Localizable.strings'))
} }
else: else:
available = {x[:-len('.strings')] for x in os.listdir(path) if x.endswith('.strings')} available = {x[:-len('.strings')] for x in listdir(path) if x.endswith('.strings')}
return available return available
def available_names(self) -> Dict[Optional[str], str]: def available_names(self) -> dict[str | None, str]:
"""Available language names by code.""" """Available language names by code."""
names: Dict[Optional[str], str] = OrderedDict([ names: dict[str | None, str] = OrderedDict([
# LANG: The system default language choice in Settings > Appearance # LANG: The system default language choice in Settings > Appearance
(None, _('Default')), # Appearance theme and language setting (None, _('Default')), # Appearance theme and language setting
]) ])
@ -200,20 +203,20 @@ class _Translations:
return names return names
def respath(self) -> pathlib.Path: def respath(self) -> str:
"""Path to localisation files.""" """Path to localisation files."""
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
if sys.platform == 'darwin': if sys.platform == 'darwin':
return (pathlib.Path(sys.executable).parents[0] / os.pardir / 'Resources').resolve() return abspath(join(dirname(sys.executable), pardir, 'Resources'))
return pathlib.Path(dirname(sys.executable)) / LOCALISATION_DIR return abspath(join(dirname(sys.executable), LOCALISATION_DIR))
elif __file__: if __file__:
return pathlib.Path(__file__).parents[0] / LOCALISATION_DIR return abspath(join(dirname(__file__), LOCALISATION_DIR))
return pathlib.Path(LOCALISATION_DIR) return abspath(LOCALISATION_DIR)
def file(self, lang: str, plugin_path: Optional[str] = None) -> Optional[TextIO]: def file(self, lang: str, plugin_path: str | None = None) -> TextIO | None:
""" """
Open the given lang file for reading. Open the given lang file for reading.
@ -222,20 +225,21 @@ class _Translations:
:return: the opened file (Note: This should be closed when done) :return: the opened file (Note: This should be closed when done)
""" """
if plugin_path: if plugin_path:
f = pathlib.Path(plugin_path) / f'{lang}.strings' file_path = join(plugin_path, f'{lang}.strings')
if not f.exists(): if not exists(file_path):
return None return None
try: try:
return f.open('r', encoding='utf-8') return open(file_path, encoding='utf-8')
except OSError: except OSError:
logger.exception(f'could not open {f}') logger.exception(f'could not open {file_path}')
elif getattr(sys, 'frozen', False) and sys.platform == 'darwin': elif getattr(sys, 'frozen', False) and sys.platform == 'darwin':
return (self.respath() / f'{lang}.lproj' / 'Localizable.strings').open('r', encoding='utf-16') res_path = join(self.respath(), f'{lang}.lproj', 'Localizable.strings')
return open(res_path, encoding='utf-16')
return (self.respath() / f'{lang}.strings').open('r', encoding='utf-8') res_path = join(self.respath(), f'{lang}.strings')
return open(res_path, encoding='utf-8')
class _Locale: class _Locale:
@ -250,11 +254,11 @@ class _Locale:
self.float_formatter.setMinimumFractionDigits_(5) self.float_formatter.setMinimumFractionDigits_(5)
self.float_formatter.setMaximumFractionDigits_(5) self.float_formatter.setMaximumFractionDigits_(5)
def stringFromNumber(self, number: Union[float, int], decimals: int | None = None) -> str: # noqa: N802 def stringFromNumber(self, number: float | int, decimals: int | None = None) -> str: # noqa: N802
warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.')) warnings.warn(DeprecationWarning('use _Locale.string_from_number instead.'))
return self.string_from_number(number, decimals) # type: ignore return self.string_from_number(number, decimals) # type: ignore
def numberFromString(self, string: str) -> Union[int, float, None]: # noqa: N802 def numberFromString(self, string: str) -> int | float | None: # noqa: N802
warnings.warn(DeprecationWarning('use _Locale.number_from_string instead.')) warnings.warn(DeprecationWarning('use _Locale.number_from_string instead.'))
return self.number_from_string(string) return self.number_from_string(string)
@ -262,7 +266,7 @@ class _Locale:
warnings.warn(DeprecationWarning('use _Locale.preferred_languages instead.')) warnings.warn(DeprecationWarning('use _Locale.preferred_languages instead.'))
return self.preferred_languages() return self.preferred_languages()
def string_from_number(self, number: Union[float, int], decimals: int = 5) -> str: def string_from_number(self, number: float | int, decimals: int = 5) -> str:
""" """
Convert a number to a string. Convert a number to a string.
@ -285,11 +289,9 @@ class _Locale:
if not decimals and isinstance(number, numbers.Integral): if not decimals and isinstance(number, numbers.Integral):
return locale.format_string('%d', number, True) return locale.format_string('%d', number, True)
return locale.format_string('%.*f', (decimals, number), True)
else: def number_from_string(self, string: str) -> int | float | None:
return locale.format_string('%.*f', (decimals, number), True)
def number_from_string(self, string: str) -> Union[int, float, None]:
""" """
Convert a string to a number using the system locale. Convert a string to a number using the system locale.
@ -308,7 +310,17 @@ class _Locale:
return None return None
def preferred_languages(self) -> Iterable[str]: # noqa: CCR001 def wszarray_to_list(self, array):
offset = 0
while offset < len(array):
sz = ctypes.wstring_at(ctypes.addressof(array) + offset * 2) # type: ignore
if sz:
yield sz
offset += len(sz) + 1
else:
break
def preferred_languages(self) -> Iterable[str]:
""" """
Return a list of preferred language codes. Return a list of preferred language codes.
@ -326,32 +338,21 @@ class _Locale:
elif sys.platform != 'win32': elif sys.platform != 'win32':
# POSIX # POSIX
lang = locale.getlocale()[0] lang = locale.getlocale()[0]
languages = lang and [lang.replace('_', '-')] or [] languages = [lang.replace('_', '-')] if lang else []
else: else:
def wszarray_to_list(array):
offset = 0
while offset < len(array):
sz = ctypes.wstring_at(ctypes.addressof(array) + offset*2)
if sz:
yield sz
offset += len(sz)+1
else:
break
num = ctypes.c_ulong() num = ctypes.c_ulong()
size = ctypes.c_ulong(0) size = ctypes.c_ulong(0)
languages = [] languages = []
if GetUserPreferredUILanguages( if GetUserPreferredUILanguages(
MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size) MUI_LANGUAGE_NAME, ctypes.byref(num), None, ctypes.byref(size)
) and size.value: ) and size.value:
buf = ctypes.create_unicode_buffer(size.value) buf = ctypes.create_unicode_buffer(size.value)
if GetUserPreferredUILanguages( if GetUserPreferredUILanguages(
MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size) MUI_LANGUAGE_NAME, ctypes.byref(num), ctypes.byref(buf), ctypes.byref(size)
): ):
languages = wszarray_to_list(buf) languages = self.wszarray_to_list(buf)
# HACK: <n/a> | 2021-12-11: OneSky calls "Chinese Simplified" "zh-Hans" # HACK: <n/a> | 2021-12-11: OneSky calls "Chinese Simplified" "zh-Hans"
# in the name of the file, but that will be zh-CN in terms of # in the name of the file, but that will be zh-CN in terms of
@ -369,14 +370,13 @@ Translations = _Translations()
# generate template strings file - like xgettext # generate template strings file - like xgettext
# parsing is limited - only single ' or " delimited strings, and only one string per line # parsing is limited - only single ' or " delimited strings, and only one string per line
if __name__ == "__main__": if __name__ == "__main__":
import re
regexp = re.compile(r'''_\([ur]?(['"])(((?<!\\)\\\1|.)+?)\1\)[^#]*(#.+)?''') # match a single line python literal regexp = re.compile(r'''_\([ur]?(['"])(((?<!\\)\\\1|.)+?)\1\)[^#]*(#.+)?''') # match a single line python literal
seen: Dict[str, str] = {} seen: dict[str, str] = {}
for f in ( for f in (
sorted(x for x in os.listdir('.') if x.endswith('.py')) + sorted(x for x in listdir('.') if x.endswith('.py')) +
sorted(join('plugins', x) for x in (os.listdir('plugins') if isdir('plugins') else []) if x.endswith('.py')) sorted(join('plugins', x) for x in (listdir('plugins') if isdir('plugins') else []) if x.endswith('.py'))
): ):
with open(f, 'r', encoding='utf-8') as h: with open(f, encoding='utf-8') as h:
lineno = 0 lineno = 0
for line in h: for line in h:
lineno += 1 lineno += 1
@ -386,9 +386,9 @@ if __name__ == "__main__":
(match.group(4) and (match.group(4)[1:].strip()) + '. ' or '') + f'[{basename(f)}]' (match.group(4) and (match.group(4)[1:].strip()) + '. ' or '') + f'[{basename(f)}]'
) )
if seen: if seen:
target_path = pathlib.Path(LOCALISATION_DIR) / 'en.template.new' target_path = join(LOCALISATION_DIR, 'en.template.new')
target_path.parent.mkdir(exist_ok=True) makedirs(dirname(target_path), exist_ok=True)
with target_path.open('w', encoding='utf-8') as target_file: with open(target_path, 'w', encoding='utf-8') as target_file:
target_file.write(f'/* Language name */\n"{LANGUAGE_ID}" = "English";\n\n') target_file.write(f'/* Language name */\n"{LANGUAGE_ID}" = "English";\n\n')
for thing in sorted(seen, key=str.lower): for thing in sorted(seen, key=str.lower):
if seen[thing]: if seen[thing]: