mirror of
https://github.com/EDCD/EDMarketConnector.git
synced 2025-04-14 08:17:13 +03:00
companion.py: Full pass to 100% pass flake8
* docstrings added throughout. * .format() all now f-strings. * Type annotations added throughout. * White space tweaked.
This commit is contained in:
parent
faf4906eea
commit
c12c739c11
326
companion.py
326
companion.py
@ -1,31 +1,38 @@
|
||||
from builtins import str
|
||||
from builtins import range
|
||||
from builtins import object
|
||||
"""
|
||||
Handle use of Frontier's Companion API (CAPI) service.
|
||||
|
||||
Deals with initiating authentication for, and use of, CAPI.
|
||||
Some associated code is in protocol.py which creates and handles the edmc://
|
||||
protocol used for the callback.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import csv
|
||||
import requests
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
|
||||
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
|
||||
from email.utils import parsedate
|
||||
import hashlib
|
||||
import logging
|
||||
import numbers
|
||||
import os
|
||||
from os.path import join
|
||||
import random
|
||||
import time
|
||||
import urllib.parse
|
||||
import webbrowser
|
||||
from builtins import object, range, str
|
||||
from email.utils import parsedate
|
||||
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
|
||||
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
|
||||
from os.path import join
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Union
|
||||
|
||||
import requests
|
||||
|
||||
from config import appname, appversion, config
|
||||
from protocol import protocolhandler
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(appname)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
_ = lambda x: x # noqa # to make flake8 stop complaining that the hacked in _ method doesnt exist
|
||||
_ = lambda x: x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist
|
||||
|
||||
|
||||
holdoff = 60 # be nice
|
||||
@ -35,83 +42,89 @@ auth_timeout = 30 # timeout for initial auth
|
||||
# Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in
|
||||
# Athanasius' Frontier account
|
||||
# Obtain from https://auth.frontierstore.net/client/signup
|
||||
CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a'
|
||||
CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a'
|
||||
SERVER_AUTH = 'https://auth.frontierstore.net'
|
||||
URL_AUTH = '/auth'
|
||||
URL_AUTH = '/auth'
|
||||
URL_TOKEN = '/token'
|
||||
|
||||
USER_AGENT = 'EDCD-{}-{}'.format(appname, appversion)
|
||||
USER_AGENT = f'EDCD-{appname}-{appversion}'
|
||||
|
||||
SERVER_LIVE = 'https://companion.orerve.net'
|
||||
SERVER_BETA = 'https://pts-companion.orerve.net'
|
||||
URL_QUERY = '/profile'
|
||||
URL_MARKET = '/market'
|
||||
URL_SHIPYARD= '/shipyard'
|
||||
URL_QUERY = '/profile'
|
||||
URL_MARKET = '/market'
|
||||
URL_SHIPYARD = '/shipyard'
|
||||
|
||||
|
||||
# Map values reported by the Companion interface to names displayed in-game
|
||||
# May be imported by plugins
|
||||
category_map = {
|
||||
'Narcotics' : 'Legal Drugs',
|
||||
'Slaves' : 'Slavery',
|
||||
'Waste ' : 'Waste',
|
||||
'NonMarketable' : False, # Don't appear in the in-game market so don't report
|
||||
'Narcotics': 'Legal Drugs',
|
||||
'Slaves': 'Slavery',
|
||||
'Waste ': 'Waste',
|
||||
'NonMarketable': False, # Don't appear in the in-game market so don't report
|
||||
}
|
||||
|
||||
commodity_map = {}
|
||||
|
||||
ship_map = {
|
||||
'adder' : 'Adder',
|
||||
'anaconda' : 'Anaconda',
|
||||
'asp' : 'Asp Explorer',
|
||||
'asp_scout' : 'Asp Scout',
|
||||
'belugaliner' : 'Beluga Liner',
|
||||
'cobramkiii' : 'Cobra MkIII',
|
||||
'cobramkiv' : 'Cobra MkIV',
|
||||
'clipper' : 'Panther Clipper',
|
||||
'cutter' : 'Imperial Cutter',
|
||||
'diamondback' : 'Diamondback Scout',
|
||||
'diamondbackxl' : 'Diamondback Explorer',
|
||||
'dolphin' : 'Dolphin',
|
||||
'eagle' : 'Eagle',
|
||||
'empire_courier' : 'Imperial Courier',
|
||||
'empire_eagle' : 'Imperial Eagle',
|
||||
'empire_fighter' : 'Imperial Fighter',
|
||||
'empire_trader' : 'Imperial Clipper',
|
||||
'federation_corvette' : 'Federal Corvette',
|
||||
'federation_dropship' : 'Federal Dropship',
|
||||
'federation_dropship_mkii' : 'Federal Assault Ship',
|
||||
'federation_gunship' : 'Federal Gunship',
|
||||
'federation_fighter' : 'F63 Condor',
|
||||
'ferdelance' : 'Fer-de-Lance',
|
||||
'hauler' : 'Hauler',
|
||||
'independant_trader' : 'Keelback',
|
||||
'independent_fighter' : 'Taipan Fighter',
|
||||
'krait_mkii' : 'Krait MkII',
|
||||
'krait_light' : 'Krait Phantom',
|
||||
'mamba' : 'Mamba',
|
||||
'orca' : 'Orca',
|
||||
'python' : 'Python',
|
||||
'scout' : 'Taipan Fighter',
|
||||
'sidewinder' : 'Sidewinder',
|
||||
'testbuggy' : 'Scarab',
|
||||
'type6' : 'Type-6 Transporter',
|
||||
'type7' : 'Type-7 Transporter',
|
||||
'type9' : 'Type-9 Heavy',
|
||||
'type9_military' : 'Type-10 Defender',
|
||||
'typex' : 'Alliance Chieftain',
|
||||
'typex_2' : 'Alliance Crusader',
|
||||
'typex_3' : 'Alliance Challenger',
|
||||
'viper' : 'Viper MkIII',
|
||||
'viper_mkiv' : 'Viper MkIV',
|
||||
'vulture' : 'Vulture',
|
||||
'adder': 'Adder',
|
||||
'anaconda': 'Anaconda',
|
||||
'asp': 'Asp Explorer',
|
||||
'asp_scout': 'Asp Scout',
|
||||
'belugaliner': 'Beluga Liner',
|
||||
'cobramkiii': 'Cobra MkIII',
|
||||
'cobramkiv': 'Cobra MkIV',
|
||||
'clipper': 'Panther Clipper',
|
||||
'cutter': 'Imperial Cutter',
|
||||
'diamondback': 'Diamondback Scout',
|
||||
'diamondbackxl': 'Diamondback Explorer',
|
||||
'dolphin': 'Dolphin',
|
||||
'eagle': 'Eagle',
|
||||
'empire_courier': 'Imperial Courier',
|
||||
'empire_eagle': 'Imperial Eagle',
|
||||
'empire_fighter': 'Imperial Fighter',
|
||||
'empire_trader': 'Imperial Clipper',
|
||||
'federation_corvette': 'Federal Corvette',
|
||||
'federation_dropship': 'Federal Dropship',
|
||||
'federation_dropship_mkii': 'Federal Assault Ship',
|
||||
'federation_gunship': 'Federal Gunship',
|
||||
'federation_fighter': 'F63 Condor',
|
||||
'ferdelance': 'Fer-de-Lance',
|
||||
'hauler': 'Hauler',
|
||||
'independant_trader': 'Keelback',
|
||||
'independent_fighter': 'Taipan Fighter',
|
||||
'krait_mkii': 'Krait MkII',
|
||||
'krait_light': 'Krait Phantom',
|
||||
'mamba': 'Mamba',
|
||||
'orca': 'Orca',
|
||||
'python': 'Python',
|
||||
'scout': 'Taipan Fighter',
|
||||
'sidewinder': 'Sidewinder',
|
||||
'testbuggy': 'Scarab',
|
||||
'type6': 'Type-6 Transporter',
|
||||
'type7': 'Type-7 Transporter',
|
||||
'type9': 'Type-9 Heavy',
|
||||
'type9_military': 'Type-10 Defender',
|
||||
'typex': 'Alliance Chieftain',
|
||||
'typex_2': 'Alliance Crusader',
|
||||
'typex_3': 'Alliance Challenger',
|
||||
'viper': 'Viper MkIII',
|
||||
'viper_mkiv': 'Viper MkIV',
|
||||
'vulture': 'Vulture',
|
||||
}
|
||||
|
||||
|
||||
# Companion API sometimes returns an array as a json array, sometimes as a json object indexed by "int".
|
||||
# This seems to depend on whether the there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse.
|
||||
# In practice these arrays aren't very sparse so just convert them to lists with any 'gaps' holding None.
|
||||
def listify(thing):
|
||||
def listify(thing: Union[List, Dict]) -> List:
|
||||
"""
|
||||
Convert actual JSON array or int-indexed dict into a Python list.
|
||||
|
||||
Companion API sometimes returns an array as a json array, sometimes as
|
||||
a json object indexed by "int". This seems to depend on whether the
|
||||
there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse.
|
||||
In practice these arrays aren't very sparse so just convert them to
|
||||
lists with any 'gaps' holding None.
|
||||
"""
|
||||
if thing is None:
|
||||
return [] # data is not present
|
||||
|
||||
@ -132,10 +145,12 @@ def listify(thing):
|
||||
return retval
|
||||
|
||||
else:
|
||||
raise ValueError("expected an array or sparse array, got {!r}".format(thing))
|
||||
raise ValueError(f"expected an array or sparse array, got {thing!r}")
|
||||
|
||||
|
||||
class ServerError(Exception):
|
||||
"""Exception Class for CAPI ServerErrors."""
|
||||
|
||||
def __init__(self, *args):
|
||||
# Raised when cannot contact the Companion API server
|
||||
self.args = args
|
||||
@ -144,23 +159,34 @@ class ServerError(Exception):
|
||||
|
||||
|
||||
class ServerLagging(Exception):
|
||||
"""Exception Class for CAPI Server lagging.
|
||||
|
||||
Raised when Companion API server is returning old data, e.g. when the
|
||||
servers are too busy.
|
||||
"""
|
||||
|
||||
def __init__(self, *args):
|
||||
# Raised when Companion API server is returning old data, e.g. when the servers are too busy
|
||||
self.args = args
|
||||
if not args:
|
||||
self.args = (_('Error: Frontier server is lagging'),)
|
||||
|
||||
|
||||
class SKUError(Exception):
|
||||
"""Exception Class for CAPI SKU error.
|
||||
|
||||
Raised when the Companion API server thinks that the user has not
|
||||
purchased E:D i.e. doesn't have the correct 'SKU'.
|
||||
"""
|
||||
|
||||
def __init__(self, *args):
|
||||
# Raised when the Companion API server thinks that the user has not purchased E:D
|
||||
# i.e. doesn't have the correct 'SKU'
|
||||
self.args = args
|
||||
if not args:
|
||||
self.args = (_('Error: Frontier server SKU problem'),)
|
||||
|
||||
|
||||
class CredentialsError(Exception):
|
||||
"""Exception Class for CAPI Credentials error."""
|
||||
|
||||
def __init__(self, *args):
|
||||
self.args = args
|
||||
if not args:
|
||||
@ -168,24 +194,35 @@ class CredentialsError(Exception):
|
||||
|
||||
|
||||
class CmdrError(Exception):
|
||||
"""Exception Class for CAPI Commander error.
|
||||
|
||||
Raised when the user has multiple accounts and the username/password
|
||||
setting is not for the account they're currently playing OR the user has
|
||||
reset their Cmdr and the Companion API server is still returning data
|
||||
for the old Cmdr.
|
||||
"""
|
||||
|
||||
def __init__(self, *args):
|
||||
# Raised when the user has multiple accounts and the username/password setting is not for
|
||||
# the account they're currently playing OR the user has reset their Cmdr and the Companion API
|
||||
# server is still returning data for the old Cmdr
|
||||
self.args = args
|
||||
if not args:
|
||||
self.args = (_('Error: Wrong Cmdr'),)
|
||||
|
||||
|
||||
class Auth(object):
|
||||
def __init__(self, cmdr):
|
||||
"""Handles authentication with the Frontier CAPI service via oAuth2."""
|
||||
|
||||
def __init__(self, cmdr: str):
|
||||
self.cmdr = cmdr
|
||||
self.session = requests.Session()
|
||||
self.session.headers['User-Agent'] = USER_AGENT
|
||||
self.verifier = self.state = None
|
||||
|
||||
def refresh(self):
|
||||
# Try refresh token. Returns new refresh token if successful, otherwise makes new authorization request.
|
||||
def refresh(self) -> None:
|
||||
"""
|
||||
Attempt use of Refresh Token to get a valid Access Token.
|
||||
|
||||
If the Refresh Token doesn't work, make a new authorization request.
|
||||
"""
|
||||
logger.debug(f'Trying for "{self.cmdr}"')
|
||||
|
||||
self.verifier = None
|
||||
@ -219,7 +256,7 @@ class Auth(object):
|
||||
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
|
||||
self.dump(r)
|
||||
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
|
||||
|
||||
else:
|
||||
@ -233,21 +270,17 @@ class Auth(object):
|
||||
self.state = self.base64_url_encode(s.to_bytes(32, byteorder='big'))
|
||||
# Won't work under IE: https://blogs.msdn.microsoft.com/ieinternals/2011/07/13/understanding-protocols/
|
||||
logger.debug(f'Trying auth from scratch for Commander "{self.cmdr}"')
|
||||
challenge = self.base64_url_encode(hashlib.sha256(self.verifier).digest())
|
||||
webbrowser.open(
|
||||
'{server_auth}{url_auth}?response_type=code&audience=frontier&scope=capi&client_id={client_id}&code_challenge={challenge}&code_challenge_method=S256&state={state}&redirect_uri={redirect}'.format( # noqa: E501 # I cant make this any shorter
|
||||
server_auth=SERVER_AUTH,
|
||||
url_auth=URL_AUTH,
|
||||
client_id=CLIENT_ID,
|
||||
challenge=self.base64_url_encode(hashlib.sha256(self.verifier).digest()),
|
||||
state=self.state,
|
||||
redirect=protocolhandler.redirect
|
||||
)
|
||||
f'{SERVER_AUTH}{URL_AUTH}?response_type=code&audience=frontier&scope=capi&client_id={CLIENT_ID}&code_challenge={challenge}&code_challenge_method=S256&state={self.state}&redirect_uri={protocolhandler.redirect}' # noqa: E501 # I cant make this any shorter
|
||||
)
|
||||
|
||||
def authorize(self, payload):
|
||||
def authorize(self, payload: str) -> str:
|
||||
"""Handle oAuth authorization callback.
|
||||
|
||||
:return: access token if successful, otherwise raises CredentialsError.
|
||||
"""
|
||||
logger.debug('Checking oAuth authorization callback')
|
||||
# Handle OAuth authorization code callback.
|
||||
# Returns access token if successful, otherwise raises CredentialsError
|
||||
if '?' not in payload:
|
||||
logger.error(f'Frontier CAPI Auth: Malformed response (no "?" in payload)\n{payload}\n')
|
||||
raise CredentialsError('malformed payload') # Not well formed
|
||||
@ -255,14 +288,14 @@ class Auth(object):
|
||||
data = urllib.parse.parse_qs(payload[(payload.index('?') + 1):])
|
||||
if not self.state or not data.get('state') or data['state'][0] != self.state:
|
||||
logger.error(f'Frontier CAPI Auth: Unexpected response\n{payload}\n')
|
||||
raise CredentialsError('Unexpected response from authorization {!r}'.format(payload)) # Unexpected reply
|
||||
raise CredentialsError(f'Unexpected response from authorization {payload!r}')
|
||||
|
||||
if not data.get('code'):
|
||||
logger.error(f'Frontier CAPI Auth: Negative response (no "code" in returned data)\n{payload}\n')
|
||||
error = next(
|
||||
(data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',)
|
||||
)
|
||||
raise CredentialsError('Error: {!r}'.format(error)[0])
|
||||
raise CredentialsError(f'Error: {error[0]!r}')
|
||||
|
||||
try:
|
||||
logger.debug('Got code, posting it back...')
|
||||
@ -299,10 +332,11 @@ class Auth(object):
|
||||
logger.error(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
|
||||
self.dump(r)
|
||||
error = next((data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',))
|
||||
raise CredentialsError('Error: {!r}'.format(error)[0])
|
||||
raise CredentialsError(f'Error: {error[0]!r}')
|
||||
|
||||
@staticmethod
|
||||
def invalidate(cmdr):
|
||||
def invalidate(cmdr: str) -> None:
|
||||
"""Invalidate Refresh Token for specified Commander."""
|
||||
logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"')
|
||||
cmdrs = config.get('cmdrs')
|
||||
idx = cmdrs.index(cmdr)
|
||||
@ -312,14 +346,18 @@ class Auth(object):
|
||||
config.set('fdev_apikeys', tokens)
|
||||
config.save() # Save settings now for use by command-line app
|
||||
|
||||
def dump(self, r):
|
||||
def dump(self, r: requests.Response) -> None:
|
||||
"""Dump details of HTTP failure from oAuth attempt."""
|
||||
logger.debug(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason if r.reason else "None"} {r.text}')
|
||||
|
||||
def base64_url_encode(self, text):
|
||||
def base64_url_encode(self, text: str) -> str:
|
||||
"""Base64 encode text for URL."""
|
||||
return base64.urlsafe_b64encode(text).decode().replace('=', '')
|
||||
|
||||
|
||||
class Session(object):
|
||||
"""Methods for handling an oAuth2 session."""
|
||||
|
||||
STATE_INIT, STATE_AUTH, STATE_OK = list(range(3))
|
||||
|
||||
def __init__(self):
|
||||
@ -329,8 +367,12 @@ class Session(object):
|
||||
self.auth = None
|
||||
self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query
|
||||
|
||||
def login(self, cmdr=None, is_beta=None):
|
||||
# Returns True if login succeeded, False if re-authorization initiated.
|
||||
def login(self, cmdr: str = None, is_beta: Union[None, bool] = None) -> bool:
|
||||
"""
|
||||
Attempt oAuth2 login.
|
||||
|
||||
:return: True if login succeeded, False if re-authorization initiated.
|
||||
"""
|
||||
if not CLIENT_ID:
|
||||
logger.error('CLIENT_ID is None')
|
||||
raise CredentialsError('cannot login without a valid Client ID')
|
||||
@ -376,7 +418,8 @@ class Session(object):
|
||||
# Wait for callback
|
||||
|
||||
# Callback from protocol handler
|
||||
def auth_callback(self):
|
||||
def auth_callback(self) -> None:
|
||||
"""Handle callback from edmc:// handler."""
|
||||
logger.debug('Handling callback from edmc:// handler')
|
||||
if self.state != Session.STATE_AUTH:
|
||||
# Shouldn't be getting a callback
|
||||
@ -394,14 +437,16 @@ class Session(object):
|
||||
self.auth = None
|
||||
raise # Bad thing happened
|
||||
|
||||
def start(self, access_token):
|
||||
def start(self, access_token: str) -> None:
|
||||
"""Start an oAuth2 session."""
|
||||
logger.debug('Starting session')
|
||||
self.session = requests.Session()
|
||||
self.session.headers['Authorization'] = 'Bearer {}'.format(access_token)
|
||||
self.session.headers['Authorization'] = f'Bearer {access_token}'
|
||||
self.session.headers['User-Agent'] = USER_AGENT
|
||||
self.state = Session.STATE_OK
|
||||
|
||||
def query(self, endpoint):
|
||||
def query(self, endpoint: str) -> Mapping[str, Any]:
|
||||
"""Perform a query against the specified CAPI endpoint."""
|
||||
logger.debug(f'Performing query for endpoint "{endpoint}"')
|
||||
if self.state == Session.STATE_INIT:
|
||||
if self.login():
|
||||
@ -432,7 +477,7 @@ class Session(object):
|
||||
# Server error. Typically 500 "Internal Server Error" if server is down
|
||||
logger.debug('500 status back from CAPI')
|
||||
self.dump(r)
|
||||
raise ServerError('Received error {} from server'.format(r.status_code))
|
||||
raise ServerError(f'Received error {r.status_code} from server')
|
||||
|
||||
try:
|
||||
r.raise_for_status() # Typically 403 "Forbidden" on token expiry
|
||||
@ -467,36 +512,41 @@ class Session(object):
|
||||
|
||||
return data
|
||||
|
||||
def profile(self):
|
||||
def profile(self) -> Mapping[str, Any]:
|
||||
"""Perform general CAPI /profile endpoint query."""
|
||||
return self.query(URL_QUERY)
|
||||
|
||||
def station(self):
|
||||
def station(self) -> Union[Mapping[str, Any], None]:
|
||||
"""Perform CAPI /profile endpoint query for station data."""
|
||||
data = self.query(URL_QUERY)
|
||||
if data['commander'].get('docked'):
|
||||
services = data['lastStarport'].get('services', {})
|
||||
if not data['commander'].get('docked'):
|
||||
return None
|
||||
|
||||
last_starport_name = data['lastStarport']['name']
|
||||
last_starport_id = int(data['lastStarport']['id'])
|
||||
services = data['lastStarport'].get('services', {})
|
||||
|
||||
if services.get('commodities'):
|
||||
marketdata = self.query(URL_MARKET)
|
||||
if (last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id'])):
|
||||
raise ServerLagging()
|
||||
last_starport_name = data['lastStarport']['name']
|
||||
last_starport_id = int(data['lastStarport']['id'])
|
||||
|
||||
else:
|
||||
data['lastStarport'].update(marketdata)
|
||||
if services.get('commodities'):
|
||||
marketdata = self.query(URL_MARKET)
|
||||
if (last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id'])):
|
||||
raise ServerLagging()
|
||||
|
||||
if services.get('outfitting') or services.get('shipyard'):
|
||||
shipdata = self.query(URL_SHIPYARD)
|
||||
if (last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id'])):
|
||||
raise ServerLagging()
|
||||
else:
|
||||
data['lastStarport'].update(marketdata)
|
||||
|
||||
else:
|
||||
data['lastStarport'].update(shipdata)
|
||||
if services.get('outfitting') or services.get('shipyard'):
|
||||
shipdata = self.query(URL_SHIPYARD)
|
||||
if (last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id'])):
|
||||
raise ServerLagging()
|
||||
|
||||
else:
|
||||
data['lastStarport'].update(shipdata)
|
||||
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""Close CAPI authorization session."""
|
||||
self.state = Session.STATE_INIT
|
||||
if self.session:
|
||||
try:
|
||||
@ -507,18 +557,25 @@ class Session(object):
|
||||
|
||||
self.session = None
|
||||
|
||||
def invalidate(self):
|
||||
def invalidate(self) -> None:
|
||||
"""Invalidate oAuth2 credentials."""
|
||||
logger.debug('Forcing a full re-authentication')
|
||||
# Force a full re-authentication
|
||||
self.close()
|
||||
Auth.invalidate(self.credentials['cmdr'])
|
||||
|
||||
def dump(self, r):
|
||||
def dump(self, r: requests.Response) -> None:
|
||||
"""Log, as error, status of requests.Response from CAPI request."""
|
||||
logger.error(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason and r.reason or "None"} {r.text}')
|
||||
|
||||
# Returns a shallow copy of the received data suitable for export to older tools
|
||||
# English commodity names and anomalies fixed up
|
||||
def fixup(data):
|
||||
|
||||
def fixup(data: Mapping[str, Any]) -> Mapping[str, Any]: # noqa: C901, CCR001 # Can't be usefully simplified
|
||||
"""
|
||||
Fix up commodity names to English & miscellaneous anomalies fixes.
|
||||
|
||||
:return: a shallow copy of the received data suitable for export to
|
||||
older tools.
|
||||
"""
|
||||
if not commodity_map:
|
||||
# Lazily populate
|
||||
for f in ('commodity.csv', 'rare_commodity.csv'):
|
||||
@ -589,9 +646,10 @@ def fixup(data):
|
||||
return datacopy
|
||||
|
||||
|
||||
# Return a subset of the received data describing the current ship
|
||||
def ship(data):
|
||||
def filter_ship(d):
|
||||
def ship(data: Mapping[str, Any]) -> Mapping[str, Any]:
|
||||
"""Construct a subset of the received data describing the current ship."""
|
||||
def filter_ship(d: Mapping[str, Any]) -> Mapping[str, Any]:
|
||||
"""Filter provided ship data."""
|
||||
filtered = {}
|
||||
for k, v in d.items():
|
||||
if v == []:
|
||||
@ -619,8 +677,8 @@ def ship(data):
|
||||
return filter_ship(data['ship'])
|
||||
|
||||
|
||||
# Ship name suitable for writing to a file
|
||||
def ship_file_name(ship_name, ship_type):
|
||||
def ship_file_name(ship_name: str, ship_type: str) -> str:
|
||||
"""Return a ship name suitable for a filename."""
|
||||
name = str(ship_name or ship_map.get(ship_type.lower(), ship_type)).strip()
|
||||
if name.endswith('.'):
|
||||
name = name[:-1]
|
||||
|
Loading…
x
Reference in New Issue
Block a user