""" killswitch.py - Fetch kill switches from EDMC Repo. Copyright (c) EDCD, All Rights Reserved Licensed under the GNU General Public License. See LICENSE file. """ from __future__ import annotations import json import threading from copy import deepcopy from typing import ( TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, MutableSequence, NamedTuple, Sequence, TypedDict, TypeVar, cast, Union ) import requests import semantic_version from semantic_version.base import Version import config import EDMCLogging logger = EDMCLogging.get_main_logger() OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json' DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json' CURRENT_KILLSWITCH_VERSION = 2 UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style _current_version: semantic_version.Version = config.appversion_nobuild() T = TypeVar('T', bound=UPDATABLE_DATA) class SingleKill(NamedTuple): """A single KillSwitch. Possibly with additional rules.""" match: str reason: str redact_fields: list[str] | None = None delete_fields: list[str] | None = None set_fields: dict[str, Any] | None = None @property def has_rules(self) -> bool: """Return whether this SingleKill can apply rules to a dict to make it safe to use.""" return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields)) def apply_rules(self, target: T) -> T: """ Apply the rules this SingleKill instance has to make some data okay to send. Note that this MODIFIES DATA IN PLACE. :param target: data to apply a rule to :raises: Any and all exceptions _deep_apply and _apply can raise. """ for key, value in (self.set_fields if self .set_fields is not None else {}).items(): _deep_apply(target, key, value) for key in (self.redact_fields if self.redact_fields is not None else []): _deep_apply(target, key, "REDACTED") for key in (self.delete_fields if self.delete_fields is not None else []): _deep_apply(target, key, delete=True) return target def _apply(target: UPDATABLE_DATA, key: str, to_set: Any = None, delete: bool = False): """ Set or delete the given target key on the given target. :param target: The thing to set data on :param key: the key or index to set the data to :param to_set: the data to set, if any, defaults to None :param delete: whether or not to delete the key or index, defaults to False :raises ValueError: when an unexpected target type is passed :raises IndexError: when an invalid index is set or deleted """ if isinstance(target, MutableMapping): if delete: target.pop(key, None) else: target[key] = to_set elif isinstance(target, MutableSequence): idx = _get_int(key) if idx is None: raise ValueError(f'Cannot use string {key!r} as int for index into Sequence') if delete and len(target) > 0: length = len(target) if idx in range(-length, length): target.pop(idx) elif len(target) == idx: target.append(to_set) else: target[idx] = to_set # this can raise, that's fine else: raise ValueError(f'Dont know how to apply data to {type(target)} {target!r}') def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): # noqa: CCR001 # Recursive silliness. """ Set the given path to the given value, if it exists. if the path has dots (ascii period -- '.'), it will be successively split if possible for deeper indices into target :param target: the dict to modify :param to_set: the data to set, defaults to None :param delete: whether or not to delete the key rather than set it :raises IndexError: when an invalid index is traversed into :raises KeyError: when an invalid key is traversed into """ current = target key: str = "" while '.' in path: if path in current: # it exists on this level, dont go further break if isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()): # there is a dotted key in here that can be used for this # if there's a dotted key in here (must be a mapping), use that if we can keys = current.keys() for k in filter(lambda x: '.' in x, keys): if path.startswith(k): key = k path = path.removeprefix(k) # we assume that the `.` here is for "accessing" the next key. if path[0] == '.': path = path[1:] if len(path) == 0: path = key break else: key, _, path = path.partition('.') if isinstance(current, Mapping): current = current[key] # type: ignore # I really don't know at this point what you want from me mypy. elif isinstance(current, Sequence): target_idx = _get_int(key) # mypy is broken. doesn't like := here. if target_idx is not None: current = current[target_idx] else: raise ValueError(f'Cannot index sequence with non-int key {key!r}') else: raise ValueError(f'Dont know how to index a {type(current)} ({current!r})') _apply(current, path, to_set, delete) def _get_int(s: str) -> int | None: try: return int(s) except ValueError: return None class KillSwitches(NamedTuple): """One version's set of kill switches.""" version: semantic_version.SimpleSpec kills: dict[str, SingleKill] @staticmethod def from_dict(data: KillSwitchSetJSON) -> KillSwitches: """Create a KillSwitches instance from a dictionary.""" ks = {} for match, ks_data in data['kills'].items(): ks[match] = SingleKill( match=match, reason=ks_data['reason'], redact_fields=ks_data.get('redact_fields'), set_fields=ks_data.get('set_fields'), delete_fields=ks_data.get('delete_fields') ) return KillSwitches(version=semantic_version.SimpleSpec(data['version']), kills=ks) class DisabledResult(NamedTuple): """DisabledResult is the result returned from various is_disabled calls.""" disabled: bool kill: SingleKill | None @property def reason(self) -> str: """Reason provided for why this killswitch exists.""" return self.kill.reason if self.kill is not None else "" def has_kill(self) -> bool: """Return whether this DisabledResult has a Kill associated with it.""" return self.kill is not None def has_rules(self) -> bool: """Return whether the kill on this Result contains rules.""" # HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always # be non-None at the point it is evaluated return self.has_kill() and self.kill.has_rules # type: ignore class KillSwitchSet: """Queryable set of kill switches.""" def __init__(self, kill_switches: list[KillSwitches]) -> None: self.kill_switches = kill_switches def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult: """ Return whether the given feature ID is disabled by a killswitch for the given version. :param id: The feature ID to check :param version: The version to check killswitches for, defaults to the current EDMC version :return: a namedtuple indicating status and reason, if any """ if isinstance(version, str): version = semantic_version.Version.coerce(version) for ks in self.kill_switches: if version not in ks.version: continue return DisabledResult(id in ks.kills, ks.kills.get(id, None)) return DisabledResult(False, None) def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool: """Return whether a given feature ID is disabled for the given version.""" return self.get_disabled(id, version=version).disabled def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str: """Return a reason for why the given id is disabled for the given version, if any.""" return self.get_disabled(id, version=version).reason def kills_for_version(self, version: semantic_version.Version = _current_version) -> list[KillSwitches]: """ Get all killswitch entries that apply to the given version. :param version: the version to check against, defaults to the current EDMC version :return: the matching kill switches """ return [k for k in self.kill_switches if version in k.version] def check_killswitch( self, name: str, data: T, log=logger, version=_current_version ) -> tuple[bool, T]: """ Check whether a killswitch is enabled. If it is, apply rules if any. :param name: The killswitch to check :param data: The data to modify if needed :return: A two tuple consisting of: A bool indicating if the caller should return, and either the original data or a *COPY* that has been modified by rules """ res = self.get_disabled(name, version=version) if not res.disabled: return False, data log.info(f'Killswitch {name} is enabled. Checking if rules exist to make use safe') if not res.has_rules(): logger.info('No rules exist. Stopping processing') return True, data if TYPE_CHECKING: # pyright, mypy, please -_- assert res.kill is not None try: new_data = res.kill.apply_rules(deepcopy(data)) except Exception as e: log.exception(f'Exception occurred while attempting to apply rules! bailing out! {e=}') return True, data log.info('Rules applied successfully, allowing execution to continue') return False, new_data def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> tuple[bool, T]: """ Check multiple killswitches in order. Note that the names are applied in the order passed, and that the first true return from check_killswitch causes this to return :param data: the data to update :param log: the logger to use, defaults to the standard EDMC main logger :return: A two tuple of bool and updated data, where the bool is true when the caller _should_ halt processing """ for name in names: should_return, data = self.check_killswitch(name=name, data=data, log=log, version=version) if should_return: return True, data return False, data def __str__(self) -> str: """Return a string representation of KillSwitchSet.""" return f'KillSwitchSet: {str(self.kill_switches)}' def __repr__(self) -> str: """Return __repr__ for KillSwitchSet.""" return f'KillSwitchSet(kill_switches={self.kill_switches!r})' class BaseSingleKillSwitch(TypedDict): # noqa: D101 reason: str class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101 redact_fields: list[str] # set fields to "REDACTED" delete_fields: list[str] # remove fields entirely set_fields: dict[str, Any] # set fields to given data class KillSwitchSetJSON(TypedDict): # noqa: D101 version: str kills: dict[str, SingleKillSwitchJSON] class KillSwitchJSONFile(TypedDict): # noqa: D101 version: int last_updated: str kill_switches: list[KillSwitchSetJSON] def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> KillSwitchJSONFile | None: """ Fetch the JSON representation of our kill switches. :param target: the URL to fetch the kill switch list from, defaults to DEFAULT_KILLSWITCH_URL :return: a list of dicts containing kill switch data, or None """ logger.info("Attempting to fetch kill switches") if target.startswith('file:'): target = target.replace('file:', '') try: with open(target) as t: return json.load(t) except FileNotFoundError: logger.warning(f"No such file '{target}'") return None try: data = requests.get(target, timeout=10).json() except ValueError as e: logger.warning(f"Failed to get kill switches, data was invalid: {e}") return None except requests.exceptions.RequestException as requ_err: logger.warning(f"unable to connect to {target!r}: {requ_err}") return None return data class _KillSwitchV1(TypedDict): version: str kills: dict[str, str] class _KillSwitchJSONFileV1(TypedDict): version: int last_updated: str kill_switches: list[_KillSwitchV1] def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile: version = data['version'] if version == CURRENT_KILLSWITCH_VERSION: return data if version == 1: logger.info('Got an old version killswitch file (v1) upgrading!') to_return: KillSwitchJSONFile = deepcopy(data) data_v1 = cast(_KillSwitchJSONFileV1, data) to_return['kill_switches'] = [ cast(KillSwitchSetJSON, { # I need to cheat here a touch. It is this I promise 'version': d['version'], 'kills': { match: {'reason': reason} for match, reason in d['kills'].items() } }) for d in data_v1['kill_switches'] ] to_return['version'] = CURRENT_KILLSWITCH_VERSION return to_return raise ValueError(f'Unknown Killswitch version {data["version"]}') def parse_kill_switches(data: KillSwitchJSONFile) -> list[KillSwitches]: """ Parse kill switch dict to List of KillSwitches. :param data: dict containing raw killswitch data :return: a list of all provided killswitches """ data = _upgrade_kill_switch_dict(data) last_updated = data['last_updated'] ks_version = data['version'] logger.info(f'Kill switches last updated {last_updated}') if ks_version != CURRENT_KILLSWITCH_VERSION: logger.warning(f'Unknown killswitch version {ks_version} (expected {CURRENT_KILLSWITCH_VERSION}). Bailing out') return [] kill_switches = data['kill_switches'] out = [] for idx, ks_data in enumerate(kill_switches): try: ks = KillSwitches.from_dict(ks_data) out.append(ks) except Exception as e: logger.exception(f'Could not parse killswitch idx {idx}: {e}') return out def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: str | None = None) -> KillSwitchSet | None: """ Get a kill switch set object. :param target: the URL to fetch the killswitch JSON from, defaults to DEFAULT_KILLSWITCH_URL :return: the KillSwitchSet for the URL, or None if there was an error """ if (data := fetch_kill_switches(target)) is None: if fallback is not None: logger.warning('could not get killswitches, trying fallback') data = fetch_kill_switches(fallback) if data is None: logger.warning('Could not get killswitches.') return None return KillSwitchSet(parse_kill_switches(data)) def get_kill_switches_thread( target, callback: Callable[[KillSwitchSet | None], None], fallback: str | None = None, ) -> None: """ Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available. :param target: Target killswitch file :param callback: The callback to pass the newly created KillSwitchSet :param fallback: Fallback killswitch file, if any, defaults to None """ def make_request(): callback(get_kill_switches(target, fallback=fallback)) threading.Thread(target=make_request, daemon=True).start() active: KillSwitchSet = KillSwitchSet([]) def setup_main_list(filename: str | None): """ Set up the global set of kill switches for querying. Plugins should NOT call this EVER. """ if filename is None: filename = DEFAULT_KILLSWITCH_URL if (data := get_kill_switches(filename, OLD_KILLSWITCH_URL)) is None: logger.warning("Unable to fetch kill switches. Setting global set to an empty set") return global active active = data logger.trace(f'{len(active.kill_switches)} Active Killswitches:') for v in active.kill_switches: logger.trace(v) def get_disabled(id: str, *, version: semantic_version.Version = _current_version) -> DisabledResult: """ Query the global KillSwitchSet for whether or not a given ID is disabled. See KillSwitchSet#is_disabled for more information """ return active.get_disabled(id, version=version) def check_killswitch(name: str, data: T, log=logger) -> tuple[bool, T]: """Query the global KillSwitchSet#check_killswitch method.""" return active.check_killswitch(name, data, log) def check_multiple_killswitches(data: T, *names: str, log=logger) -> tuple[bool, T]: """Query the global KillSwitchSet#check_multiple method.""" return active.check_multiple_killswitches(data, *names, log=log) def is_disabled(id: str, *, version: semantic_version.Version = _current_version) -> bool: """Query the global KillSwitchSet#is_disabled method.""" return active.is_disabled(id, version=version) def get_reason(id: str, *, version: semantic_version.Version = _current_version) -> str: """Query the global KillSwitchSet#get_reason method.""" return active.get_reason(id, version=version) def kills_for_version(version: semantic_version.Version = _current_version) -> list[KillSwitches]: """Query the global KillSwitchSet for kills matching a particular version.""" return active.kills_for_version(version)