1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-13 07:47:14 +03:00

made types consistent and generic

This commit is contained in:
A_D 2021-08-17 19:32:18 +02:00
parent 1e71955375
commit f62f1ee97b
No known key found for this signature in database
GPG Key ID: 4BE9EB7DF45076C4
3 changed files with 12 additions and 9 deletions

View File

@ -5,7 +5,7 @@ import threading
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Sequence,
Tuple, TypedDict, Union, cast
Tuple, TypedDict, TypeVar, Union, cast
)
import requests
@ -23,6 +23,8 @@ CURRENT_KILLSWITCH_VERSION = 2
UPDATABLE_DATA = Union[Mapping, Sequence]
_current_version: semantic_version.Version = config.appversion_nobuild()
T = TypeVar('T', bound=UPDATABLE_DATA)
class SingleKill(NamedTuple):
"""A single KillSwitch. Possibly with additional rules."""
@ -38,7 +40,7 @@ class SingleKill(NamedTuple):
"""Return whether or not this SingleKill can apply rules to a dict to make it safe to use."""
return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields))
def apply_rules(self, target: UPDATABLE_DATA) -> UPDATABLE_DATA:
def apply_rules(self, target: T) -> T:
"""
Apply the rules this SingleKill instance has to make some data okay to send.
@ -245,8 +247,8 @@ class KillSwitchSet:
return [k for k in self.kill_switches if version in k.version]
def check_killswitch(
self, name: str, data: UPDATABLE_DATA, log=logger, version=_current_version
) -> Tuple[bool, UPDATABLE_DATA]:
self, name: str, data: T, log=logger, version=_current_version
) -> Tuple[bool, T]:
"""
Check whether or not a killswitch is enabled. If it is, apply rules if any.
@ -277,7 +279,7 @@ class KillSwitchSet:
log.info('Rules applied successfully, allowing execution to continue')
return False, new_data
def check_multiple_killswitches(self, data: UPDATABLE_DATA, *names: str, log=logger, version=_current_version):
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> Tuple[bool, T]:
"""
Check multiple killswitches in order.
@ -479,12 +481,12 @@ def get_disabled(id: str, *, version: semantic_version.Version = _current_versio
return active.get_disabled(id, version=version)
def check_killswitch(name: str, data: UPDATABLE_DATA, log=logger) -> Tuple[bool, UPDATABLE_DATA]:
def check_killswitch(name: str, data: T, log=logger) -> Tuple[bool, T]:
"""Query the global KillSwitchSet#check_killswitch method."""
return active.check_killswitch(name, data, log)
def check_multiple_killswitches(data: UPDATABLE_DATA, *names: str, log=logger) -> tuple[bool, UPDATABLE_DATA]:
def check_multiple_killswitches(data: T, *names: str, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_multiple method."""
return active.check_multiple_killswitches(data, *names, log=log)

View File

@ -407,7 +407,8 @@ def journal_entry( # noqa: C901, CCR001
if should_return:
return
entry = cast(MutableMapping[str, Any], new_entry)
entry = new_entry
this.on_foot = state['OnFoot']
if entry['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked'):

View File

@ -350,7 +350,7 @@ def journal_entry( # noqa: C901, CCR001
# this can and WILL break state, but if we're concerned about it sending bad data, we'd disable globally anyway
return ''
entry = cast(Dict[str, Any], new_entry)
entry = new_entry
this.on_foot = state['OnFoot']
event_name: str = entry['event']
this.cmdr = cmdr