diff --git a/companion.py b/companion.py index 36fb62cb..d1a35c6d 100644 --- a/companion.py +++ b/companion.py @@ -7,8 +7,10 @@ protocol used for the callback. """ import base64 +import collections import csv import hashlib +import json import numbers import os import random @@ -20,7 +22,7 @@ from email.utils import parsedate # TODO: see https://github.com/EDCD/EDMarketConnector/issues/569 from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it from os.path import join -from typing import TYPE_CHECKING, Any, Dict, List, NewType, Union +from typing import TYPE_CHECKING, Any, Dict, List, Union, cast import requests @@ -31,11 +33,16 @@ from protocol import protocolhandler logger = get_main_logger() if TYPE_CHECKING: - _ = lambda x: x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist + def _(x): return x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist + + UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally +else: + UserDict = collections.UserDict # type: ignore # Otherwise simply use the actual class + # Define custom type for the dicts that hold CAPI data -CAPIData = NewType('CAPIData', Dict) +# CAPIData = NewType('CAPIData', Dict) holdoff = 60 # be nice timeout = 10 # requests timeout @@ -117,6 +124,52 @@ ship_map = { } +class CAPIData(UserDict): + """CAPI Response.""" + + def __init__(self, data: Union[str, Dict[str, Any], 'CAPIData', None] = None) -> None: + if data is None: + super().__init__() + elif isinstance(data, str): + super().__init__(json.loads(data)) + else: + super().__init__(data) + + self.original_data = self.data.copy() # Just in case + + self.check_modules_ships() + + def check_modules_ships(self) -> None: + modules: Dict[str, Any] = self.data['lastStarport'].get('modules') + if modules is None or not isinstance(modules, dict): + if modules is None: + logger.debug('modules was None. FC or Damaged Station?') + + elif isinstance(modules, list): + if len(modules) == 0: + logger.debug('modules is empty list. Damaged Station?') + + else: + logger.error(f'modules is non-empty list: {modules!r}') + + else: + logger.error(f'modules was not None, a list, or a dict! type: {type(modules)}, content: {modules}') + + # Set a safe value + self.data['lastStarport']['modules'] = modules = {} + + ships: Dict[str, Any] = self.data['lastStarport'].get('ships') + if ships is None or not isinstance(ships, dict): + if ships is None: + logger.debug('ships was None') + + else: + logger.error(f'ships was neither None nor a Dict! type: {type(ships)}, content: {ships}') + + # Set a safe value + self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []} + + def listify(thing: Union[List, Dict]) -> List: """ Convert actual JSON array or int-indexed dict into a Python list. @@ -231,7 +284,7 @@ class Auth(object): logger.debug(f'Trying for "{self.cmdr}"') self.verifier = None - cmdrs = config.get('cmdrs') + cmdrs = cast(List[str], config.get('cmdrs')) logger.debug(f'Cmdrs: {cmdrs}') idx = cmdrs.index(self.cmdr) @@ -321,7 +374,7 @@ class Auth(object): data = r.json() if r.status_code == requests.codes.ok: logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"') - cmdrs = config.get('cmdrs') + cmdrs = cast(List[str], config.get('cmdrs')) idx = cmdrs.index(self.cmdr) tokens = config.get('fdev_apikeys') or [] tokens = tokens + [''] * (len(cmdrs) - len(tokens)) @@ -350,9 +403,9 @@ class Auth(object): def invalidate(cmdr: str) -> None: """Invalidate Refresh Token for specified Commander.""" logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"') - cmdrs = config.get('cmdrs') + cmdrs = cast(List[str], config.get('cmdrs')) idx = cmdrs.index(cmdr) - tokens = config.get('fdev_apikeys') or [] + tokens = cast(List[str], config.get('fdev_apikeys') or []) tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens[idx] = '' config.set('fdev_apikeys', tokens) @@ -499,7 +552,7 @@ class Session(object): try: r.raise_for_status() # Typically 403 "Forbidden" on token expiry - data = r.json() # May also fail here if token expired since response is empty + data = CAPIData(r.json()) # May also fail here if token expired since response is empty except (requests.HTTPError, ValueError) as e: logger.exception('Frontier CAPI Auth: GET ') @@ -670,14 +723,14 @@ def fixup(data: CAPIData) -> CAPIData: # noqa: C901, CCR001 # Can't be usefully datacopy = data.copy() datacopy['lastStarport'] = data['lastStarport'].copy() datacopy['lastStarport']['commodities'] = commodities - return CAPIData(datacopy) + return datacopy def ship(data: CAPIData) -> CAPIData: """Construct a subset of the received data describing the current ship.""" def filter_ship(d: CAPIData) -> CAPIData: """Filter provided ship data.""" - filtered: CAPIData = CAPIData({}) + filtered = CAPIData() for k, v in d.items(): if not v: pass # just skip empty fields for brevity