1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-19 02:17:38 +03:00

companion.py: Slight tweaks from running mypy

NB: Using `mypy --follow-imports skip` for now to limit how much it
checks each time.
This commit is contained in:
Athanasius 2020-09-15 16:15:13 +01:00
parent c311957eff
commit ed52528718

View File

@ -21,7 +21,7 @@ from email.utils import parsedate
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
from os.path import join
from typing import TYPE_CHECKING, Dict, List, NewType, Union
from typing import TYPE_CHECKING, Any, Dict, List, NewType, Union
import requests
@ -67,7 +67,7 @@ category_map = {
'NonMarketable': False, # Don't appear in the in-game market so don't report
}
commodity_map = {}
commodity_map: Dict = {}
ship_map = {
'adder': 'Adder',
@ -134,7 +134,7 @@ def listify(thing: Union[List, Dict]) -> List:
return list(thing) # array is not sparse
elif isinstance(thing, dict):
retval = []
retval: List[Any] = []
for k, v in thing.items():
idx = int(k)
@ -214,10 +214,11 @@ class Auth(object):
"""Handles authentication with the Frontier CAPI service via oAuth2."""
def __init__(self, cmdr: str):
self.cmdr = cmdr
self.cmdr: str = cmdr
self.session = requests.Session()
self.session.headers['User-Agent'] = USER_AGENT
self.verifier = self.state = None
self.verifier: Union[bytes, None] = None
self.state: Union[str, None] = None
def refresh(self) -> Union[str, None]:
"""
@ -279,6 +280,8 @@ class Auth(object):
f'{SERVER_AUTH}{URL_AUTH}?response_type=code&audience=frontier&scope=capi&client_id={CLIENT_ID}&code_challenge={challenge}&code_challenge_method=S256&state={self.state}&redirect_uri={protocolhandler.redirect}' # noqa: E501 # I cant make this any shorter
)
return None
def authorize(self, payload: str) -> str:
"""Handle oAuth authorization callback.
@ -304,7 +307,7 @@ class Auth(object):
r = None
try:
logger.debug('Got code, posting it back...')
data = {
request_data = {
'grant_type': 'authorization_code',
'client_id': CLIENT_ID,
'code_verifier': self.verifier,
@ -312,7 +315,7 @@ class Auth(object):
'redirect_uri': protocolhandler.redirect,
}
r = self.session.post(SERVER_AUTH + URL_TOKEN, data=data, timeout=auth_timeout)
r = self.session.post(SERVER_AUTH + URL_TOKEN, data=request_data, timeout=auth_timeout)
data = r.json()
if r.status_code == requests.codes.ok:
logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"')
@ -324,7 +327,7 @@ class Auth(object):
config.set('fdev_apikeys', tokens)
config.save() # Save settings now for use by command-line app
return data.get('access_token')
return str(data.get('access_token'))
except Exception as e:
logger.exception(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
@ -515,7 +518,7 @@ class Session(object):
self.retrying = False
if 'timestamp' not in data:
logger.debug('timestamp not in data, adding from response headers')
data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date']))
data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date'])) # type: ignore
return data