1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-06-10 12:22:27 +03:00

Added CAPIData class

Currently it is simply a dict subclass that does some of the cleanup
required to make everything happy. In future it will be changed to be a
NamedTuple or similar that can be typed
This commit is contained in:
A_D 2020-09-23 15:06:16 +02:00 committed by Athanasius
parent 9f45960fe3
commit fbd924d5aa

View File

@ -7,8 +7,10 @@ protocol used for the callback.
""" """
import base64 import base64
import collections
import csv import csv
import hashlib import hashlib
import json
import numbers import numbers
import os import os
import random import random
@ -20,7 +22,7 @@ from email.utils import parsedate
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569 # TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
from os.path import join from os.path import join
from typing import TYPE_CHECKING, Any, Dict, List, NewType, Union from typing import TYPE_CHECKING, Any, Dict, List, Union, cast
import requests import requests
@ -31,11 +33,16 @@ from protocol import protocolhandler
logger = get_main_logger() logger = get_main_logger()
if TYPE_CHECKING: if TYPE_CHECKING:
_ = lambda x: x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist def _(x): return x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist
UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally
else:
UserDict = collections.UserDict # type: ignore # Otherwise simply use the actual class
# Define custom type for the dicts that hold CAPI data # Define custom type for the dicts that hold CAPI data
CAPIData = NewType('CAPIData', Dict) # CAPIData = NewType('CAPIData', Dict)
holdoff = 60 # be nice holdoff = 60 # be nice
timeout = 10 # requests timeout timeout = 10 # requests timeout
@ -117,6 +124,52 @@ ship_map = {
} }
class CAPIData(UserDict):
"""CAPI Response."""
def __init__(self, data: Union[str, Dict[str, Any], 'CAPIData', None] = None) -> None:
if data is None:
super().__init__()
elif isinstance(data, str):
super().__init__(json.loads(data))
else:
super().__init__(data)
self.original_data = self.data.copy() # Just in case
self.check_modules_ships()
def check_modules_ships(self) -> None:
modules: Dict[str, Any] = self.data['lastStarport'].get('modules')
if modules is None or not isinstance(modules, dict):
if modules is None:
logger.debug('modules was None. FC or Damaged Station?')
elif isinstance(modules, list):
if len(modules) == 0:
logger.debug('modules is empty list. Damaged Station?')
else:
logger.error(f'modules is non-empty list: {modules!r}')
else:
logger.error(f'modules was not None, a list, or a dict! type: {type(modules)}, content: {modules}')
# Set a safe value
self.data['lastStarport']['modules'] = modules = {}
ships: Dict[str, Any] = self.data['lastStarport'].get('ships')
if ships is None or not isinstance(ships, dict):
if ships is None:
logger.debug('ships was None')
else:
logger.error(f'ships was neither None nor a Dict! type: {type(ships)}, content: {ships}')
# Set a safe value
self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []}
def listify(thing: Union[List, Dict]) -> List: def listify(thing: Union[List, Dict]) -> List:
""" """
Convert actual JSON array or int-indexed dict into a Python list. Convert actual JSON array or int-indexed dict into a Python list.
@ -231,7 +284,7 @@ class Auth(object):
logger.debug(f'Trying for "{self.cmdr}"') logger.debug(f'Trying for "{self.cmdr}"')
self.verifier = None self.verifier = None
cmdrs = config.get('cmdrs') cmdrs = cast(List[str], config.get('cmdrs'))
logger.debug(f'Cmdrs: {cmdrs}') logger.debug(f'Cmdrs: {cmdrs}')
idx = cmdrs.index(self.cmdr) idx = cmdrs.index(self.cmdr)
@ -321,7 +374,7 @@ class Auth(object):
data = r.json() data = r.json()
if r.status_code == requests.codes.ok: if r.status_code == requests.codes.ok:
logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"') logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"')
cmdrs = config.get('cmdrs') cmdrs = cast(List[str], config.get('cmdrs'))
idx = cmdrs.index(self.cmdr) idx = cmdrs.index(self.cmdr)
tokens = config.get('fdev_apikeys') or [] tokens = config.get('fdev_apikeys') or []
tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens = tokens + [''] * (len(cmdrs) - len(tokens))
@ -350,9 +403,9 @@ class Auth(object):
def invalidate(cmdr: str) -> None: def invalidate(cmdr: str) -> None:
"""Invalidate Refresh Token for specified Commander.""" """Invalidate Refresh Token for specified Commander."""
logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"') logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"')
cmdrs = config.get('cmdrs') cmdrs = cast(List[str], config.get('cmdrs'))
idx = cmdrs.index(cmdr) idx = cmdrs.index(cmdr)
tokens = config.get('fdev_apikeys') or [] tokens = cast(List[str], config.get('fdev_apikeys') or [])
tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens = tokens + [''] * (len(cmdrs) - len(tokens))
tokens[idx] = '' tokens[idx] = ''
config.set('fdev_apikeys', tokens) config.set('fdev_apikeys', tokens)
@ -499,7 +552,7 @@ class Session(object):
try: try:
r.raise_for_status() # Typically 403 "Forbidden" on token expiry r.raise_for_status() # Typically 403 "Forbidden" on token expiry
data = r.json() # May also fail here if token expired since response is empty data = CAPIData(r.json()) # May also fail here if token expired since response is empty
except (requests.HTTPError, ValueError) as e: except (requests.HTTPError, ValueError) as e:
logger.exception('Frontier CAPI Auth: GET ') logger.exception('Frontier CAPI Auth: GET ')
@ -670,14 +723,14 @@ def fixup(data: CAPIData) -> CAPIData: # noqa: C901, CCR001 # Can't be usefully
datacopy = data.copy() datacopy = data.copy()
datacopy['lastStarport'] = data['lastStarport'].copy() datacopy['lastStarport'] = data['lastStarport'].copy()
datacopy['lastStarport']['commodities'] = commodities datacopy['lastStarport']['commodities'] = commodities
return CAPIData(datacopy) return datacopy
def ship(data: CAPIData) -> CAPIData: def ship(data: CAPIData) -> CAPIData:
"""Construct a subset of the received data describing the current ship.""" """Construct a subset of the received data describing the current ship."""
def filter_ship(d: CAPIData) -> CAPIData: def filter_ship(d: CAPIData) -> CAPIData:
"""Filter provided ship data.""" """Filter provided ship data."""
filtered: CAPIData = CAPIData({}) filtered = CAPIData()
for k, v in d.items(): for k, v in d.items():
if not v: if not v:
pass # just skip empty fields for brevity pass # just skip empty fields for brevity