1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-05-30 23:29:30 +03:00

Cleared final mypy errors in inara

This commit is contained in:
A_D 2021-04-06 17:13:04 +02:00 committed by Athanasius
parent 34d4e72bc4
commit d0bc006f9f

View File

@ -1,13 +1,13 @@
"""Inara Sync."""
import dataclasses
import json
import threading
import time
import tkinter as tk
from collections import OrderedDict, defaultdict, deque
from operator import itemgetter
from threading import Lock, Thread
from typing import TYPE_CHECKING, Any, AnyStr, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
from typing import OrderedDict as OrderedDictT
from typing import Sequence, Union, cast
@ -34,6 +34,26 @@ FAKE = ('CQC', 'Training', 'Destination') # Fake systems that shouldn't be sent
CREDIT_RATIO = 1.05 # Update credits if they change by 5% over the course of a session
# These need to be defined above This
class Credentials(NamedTuple):
"""Credentials holds the set of credentials required to identify an inara API payload to inara."""
cmdr: Optional[str]
fid: Optional[str]
api_key: str
EVENT_DATA = Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]
class Event(NamedTuple):
"""Event represents an event for the Inara API."""
name: str
timestamp: str
data: EVENT_DATA
class This:
"""Holds module globals."""
@ -44,8 +64,6 @@ class This:
self.lastship = None # eventData from the last addCommanderShip or setCommanderShip event
# Cached Cmdr state
self.events: List[OrderedDictT[str, Any]] = [] # Unsent events
self.event_lock = Lock
self.cmdr: Optional[str] = None
self.FID: Optional[str] = None # Frontier ID
self.multicrew: bool = False # don't send captain's ship info to Inara while on a crew
@ -65,11 +83,11 @@ class This:
self.timer_run = True
# Main window clicks
self.system_link = None
self.system = None
self.system_address = None
self.system_population = None
self.station_link = None
self.system_link: tk.Widget = None # type: ignore
self.system: Optional[str] = None # type: ignore
self.system_address: Optional[str] = None # type: ignore
self.system_population: Optional[int] = None
self.station_link: tk.Widget = None # type: ignore
self.station = None
self.station_marketid = None
@ -80,45 +98,8 @@ class This:
self.apikey: nb.Entry
self.apikey_label: HyperlinkLabel
this = This()
# last time we updated, if unset in config this is 0, which means an instant update
LAST_UPDATE_CONF_KEY = 'inara_last_update'
EVENT_COLLECT_TIME = 31 # Minimum time to take collecting events before requesting a send
WORKER_WAIT_TIME = 35 # Minimum time for worker to wait between sends
STATION_UNDOCKED: str = '×' # "Station" name to display when not docked = U+00D7
class Credentials(NamedTuple):
"""Credentials holds the set of credentials required to identify an inara API payload to inara."""
cmdr: Optional[str]
fid: Optional[str]
api_key: str
EVENT_DATA = Union[Mapping[AnyStr, Any], Sequence[Mapping[AnyStr, Any]]]
class Event(NamedTuple):
"""Event represents an event for the Inara API."""
name: str
timestamp: str
data: EVENT_DATA
@dataclasses.dataclass
class NewThis:
"""
NewThis is where the plugin stores all of its data.
It is named NewThis as it is currently being migrated to. Once migration is complete it will be renamed to This.
"""
events: Dict[Credentials, Deque[Event]] = dataclasses.field(default_factory=lambda: defaultdict(deque))
event_lock: Lock = dataclasses.field(default_factory=Lock) # protects events, for use when rewriting events
self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque)
self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events
def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]) -> None:
"""
@ -135,7 +116,15 @@ class NewThis:
self.events[key].extend(filter(predicate, tmp))
new_this = NewThis()
this = This()
# last time we updated, if unset in config this is 0, which means an instant update
LAST_UPDATE_CONF_KEY = 'inara_last_update'
EVENT_COLLECT_TIME = 31 # Minimum time to take collecting events before requesting a send
WORKER_WAIT_TIME = 35 # Minimum time for worker to wait between sends
STATION_UNDOCKED: str = '×' # "Station" name to display when not docked = U+00D7
TARGET_URL = 'https://inara.cz/inapi/v1/'
@ -768,7 +757,7 @@ def journal_entry( # noqa: C901, CCR001
if this.fleet != fleet:
this.fleet = fleet
new_this.filter_events(current_creds, lambda e: e.name != 'setCommanderShip')
this.filter_events(current_creds, lambda e: e.name != 'setCommanderShip')
# this.events = [x for x in this.events if x['eventName'] != 'setCommanderShip'] # Remove any unsent
for ship in this.fleet:
@ -780,9 +769,11 @@ def journal_entry( # noqa: C901, CCR001
if this.loadout != loadout:
this.loadout = loadout
new_this.filter_events(
this.filter_events(
current_creds,
lambda e: e.name != 'setCommanderShipLoadout' or e.data['shipGameID'] != this.loadout['shipGameID']
lambda e: (
e.name != 'setCommanderShipLoadout'
or cast(dict, e.data)['shipGameID'] != cast(dict, this.loadout)['shipGameID'])
)
new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout)
@ -820,7 +811,7 @@ def journal_entry( # noqa: C901, CCR001
# Only send on change
this.storedmodules = modules
# Remove any unsent
new_this.filter_events(current_creds, lambda e: e.name != 'setCommanderStorageModules')
this.filter_events(current_creds, lambda e: e.name != 'setCommanderStorageModules')
# this.events = list(filter(lambda e: e['eventName'] != 'setCommanderStorageModules', this.events))
new_add_event('setCommanderStorageModules', entry['timestamp'], this.storedmodules)
@ -978,7 +969,7 @@ def journal_entry( # noqa: C901, CCR001
# Community Goals
if event_name == 'CommunityGoal':
# Remove any unsent
new_this.filter_events(
this.filter_events(
current_creds, lambda e: e.name not in ('setCommunityGoal', 'setCommanderCommunityGoalProgress')
)
@ -1109,7 +1100,7 @@ def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001
if config.get_int('inara_out') and not is_beta and not this.multicrew and credentials(this.cmdr):
if not (CREDIT_RATIO > this.lastcredits / data['commander']['credits'] > 1/CREDIT_RATIO):
new_this.filter_events(
this.filter_events(
Credentials(this.cmdr, this.FID, str(credentials(this.cmdr))),
lambda e: e.name != 'setCommanderCredits'
)
@ -1223,8 +1214,8 @@ def new_add_event(
key = Credentials(str(cmdr), str(fid), api_key) # this fails type checking due to `this` weirdness, hence str()
with new_this.event_lock:
new_this.events[key].append(Event(name, timestamp, data))
with this.event_lock:
this.events[key].append(Event(name, timestamp, data))
def new_worker():
@ -1272,8 +1263,8 @@ def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]:
:return: the frozen event list
"""
out: Dict[Credentials, List[Event]] = {}
with new_this.event_lock:
for key, events in new_this.events.items():
with this.event_lock:
for key, events in this.events.items():
out[key] = list(events)
if clear:
events.clear()