From c12c739c11af5e55750b4e9dd7cc438a350e4312 Mon Sep 17 00:00:00 2001 From: Athanasius <github@miggy.org> Date: Tue, 15 Sep 2020 14:15:07 +0100 Subject: [PATCH] companion.py: Full pass to 100% pass flake8 * docstrings added throughout. * .format() all now f-strings. * Type annotations added throughout. * White space tweaked. --- companion.py | 326 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 192 insertions(+), 134 deletions(-) diff --git a/companion.py b/companion.py index 947ec170..b6396827 100644 --- a/companion.py +++ b/companion.py @@ -1,31 +1,38 @@ -from builtins import str -from builtins import range -from builtins import object +""" +Handle use of Frontier's Companion API (CAPI) service. + +Deals with initiating authentication for, and use of, CAPI. +Some associated code is in protocol.py which creates and handles the edmc:// +protocol used for the callback. +""" + import base64 import csv -import requests -from typing import TYPE_CHECKING - -# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569 -from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it -from email.utils import parsedate import hashlib +import logging import numbers import os -from os.path import join import random import time import urllib.parse import webbrowser +from builtins import object, range, str +from email.utils import parsedate +# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569 +from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it +from os.path import join +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Union + +import requests from config import appname, appversion, config from protocol import protocolhandler -import logging + logger = logging.getLogger(appname) if TYPE_CHECKING: - _ = lambda x: x # noqa # to make flake8 stop complaining that the hacked in _ method doesnt exist + _ = lambda x: x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist holdoff = 60 # be nice @@ -35,83 +42,89 @@ auth_timeout = 30 # timeout for initial auth # Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in # Athanasius' Frontier account # Obtain from https://auth.frontierstore.net/client/signup -CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a' +CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a' SERVER_AUTH = 'https://auth.frontierstore.net' -URL_AUTH = '/auth' +URL_AUTH = '/auth' URL_TOKEN = '/token' -USER_AGENT = 'EDCD-{}-{}'.format(appname, appversion) +USER_AGENT = f'EDCD-{appname}-{appversion}' SERVER_LIVE = 'https://companion.orerve.net' SERVER_BETA = 'https://pts-companion.orerve.net' -URL_QUERY = '/profile' -URL_MARKET = '/market' -URL_SHIPYARD= '/shipyard' +URL_QUERY = '/profile' +URL_MARKET = '/market' +URL_SHIPYARD = '/shipyard' # Map values reported by the Companion interface to names displayed in-game # May be imported by plugins category_map = { - 'Narcotics' : 'Legal Drugs', - 'Slaves' : 'Slavery', - 'Waste ' : 'Waste', - 'NonMarketable' : False, # Don't appear in the in-game market so don't report + 'Narcotics': 'Legal Drugs', + 'Slaves': 'Slavery', + 'Waste ': 'Waste', + 'NonMarketable': False, # Don't appear in the in-game market so don't report } commodity_map = {} ship_map = { - 'adder' : 'Adder', - 'anaconda' : 'Anaconda', - 'asp' : 'Asp Explorer', - 'asp_scout' : 'Asp Scout', - 'belugaliner' : 'Beluga Liner', - 'cobramkiii' : 'Cobra MkIII', - 'cobramkiv' : 'Cobra MkIV', - 'clipper' : 'Panther Clipper', - 'cutter' : 'Imperial Cutter', - 'diamondback' : 'Diamondback Scout', - 'diamondbackxl' : 'Diamondback Explorer', - 'dolphin' : 'Dolphin', - 'eagle' : 'Eagle', - 'empire_courier' : 'Imperial Courier', - 'empire_eagle' : 'Imperial Eagle', - 'empire_fighter' : 'Imperial Fighter', - 'empire_trader' : 'Imperial Clipper', - 'federation_corvette' : 'Federal Corvette', - 'federation_dropship' : 'Federal Dropship', - 'federation_dropship_mkii' : 'Federal Assault Ship', - 'federation_gunship' : 'Federal Gunship', - 'federation_fighter' : 'F63 Condor', - 'ferdelance' : 'Fer-de-Lance', - 'hauler' : 'Hauler', - 'independant_trader' : 'Keelback', - 'independent_fighter' : 'Taipan Fighter', - 'krait_mkii' : 'Krait MkII', - 'krait_light' : 'Krait Phantom', - 'mamba' : 'Mamba', - 'orca' : 'Orca', - 'python' : 'Python', - 'scout' : 'Taipan Fighter', - 'sidewinder' : 'Sidewinder', - 'testbuggy' : 'Scarab', - 'type6' : 'Type-6 Transporter', - 'type7' : 'Type-7 Transporter', - 'type9' : 'Type-9 Heavy', - 'type9_military' : 'Type-10 Defender', - 'typex' : 'Alliance Chieftain', - 'typex_2' : 'Alliance Crusader', - 'typex_3' : 'Alliance Challenger', - 'viper' : 'Viper MkIII', - 'viper_mkiv' : 'Viper MkIV', - 'vulture' : 'Vulture', + 'adder': 'Adder', + 'anaconda': 'Anaconda', + 'asp': 'Asp Explorer', + 'asp_scout': 'Asp Scout', + 'belugaliner': 'Beluga Liner', + 'cobramkiii': 'Cobra MkIII', + 'cobramkiv': 'Cobra MkIV', + 'clipper': 'Panther Clipper', + 'cutter': 'Imperial Cutter', + 'diamondback': 'Diamondback Scout', + 'diamondbackxl': 'Diamondback Explorer', + 'dolphin': 'Dolphin', + 'eagle': 'Eagle', + 'empire_courier': 'Imperial Courier', + 'empire_eagle': 'Imperial Eagle', + 'empire_fighter': 'Imperial Fighter', + 'empire_trader': 'Imperial Clipper', + 'federation_corvette': 'Federal Corvette', + 'federation_dropship': 'Federal Dropship', + 'federation_dropship_mkii': 'Federal Assault Ship', + 'federation_gunship': 'Federal Gunship', + 'federation_fighter': 'F63 Condor', + 'ferdelance': 'Fer-de-Lance', + 'hauler': 'Hauler', + 'independant_trader': 'Keelback', + 'independent_fighter': 'Taipan Fighter', + 'krait_mkii': 'Krait MkII', + 'krait_light': 'Krait Phantom', + 'mamba': 'Mamba', + 'orca': 'Orca', + 'python': 'Python', + 'scout': 'Taipan Fighter', + 'sidewinder': 'Sidewinder', + 'testbuggy': 'Scarab', + 'type6': 'Type-6 Transporter', + 'type7': 'Type-7 Transporter', + 'type9': 'Type-9 Heavy', + 'type9_military': 'Type-10 Defender', + 'typex': 'Alliance Chieftain', + 'typex_2': 'Alliance Crusader', + 'typex_3': 'Alliance Challenger', + 'viper': 'Viper MkIII', + 'viper_mkiv': 'Viper MkIV', + 'vulture': 'Vulture', } -# Companion API sometimes returns an array as a json array, sometimes as a json object indexed by "int". -# This seems to depend on whether the there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse. -# In practice these arrays aren't very sparse so just convert them to lists with any 'gaps' holding None. -def listify(thing): +def listify(thing: Union[List, Dict]) -> List: + """ + Convert actual JSON array or int-indexed dict into a Python list. + + Companion API sometimes returns an array as a json array, sometimes as + a json object indexed by "int". This seems to depend on whether the + there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse. + In practice these arrays aren't very sparse so just convert them to + lists with any 'gaps' holding None. + """ if thing is None: return [] # data is not present @@ -132,10 +145,12 @@ def listify(thing): return retval else: - raise ValueError("expected an array or sparse array, got {!r}".format(thing)) + raise ValueError(f"expected an array or sparse array, got {thing!r}") class ServerError(Exception): + """Exception Class for CAPI ServerErrors.""" + def __init__(self, *args): # Raised when cannot contact the Companion API server self.args = args @@ -144,23 +159,34 @@ class ServerError(Exception): class ServerLagging(Exception): + """Exception Class for CAPI Server lagging. + + Raised when Companion API server is returning old data, e.g. when the + servers are too busy. + """ + def __init__(self, *args): - # Raised when Companion API server is returning old data, e.g. when the servers are too busy self.args = args if not args: self.args = (_('Error: Frontier server is lagging'),) class SKUError(Exception): + """Exception Class for CAPI SKU error. + + Raised when the Companion API server thinks that the user has not + purchased E:D i.e. doesn't have the correct 'SKU'. + """ + def __init__(self, *args): - # Raised when the Companion API server thinks that the user has not purchased E:D - # i.e. doesn't have the correct 'SKU' self.args = args if not args: self.args = (_('Error: Frontier server SKU problem'),) class CredentialsError(Exception): + """Exception Class for CAPI Credentials error.""" + def __init__(self, *args): self.args = args if not args: @@ -168,24 +194,35 @@ class CredentialsError(Exception): class CmdrError(Exception): + """Exception Class for CAPI Commander error. + + Raised when the user has multiple accounts and the username/password + setting is not for the account they're currently playing OR the user has + reset their Cmdr and the Companion API server is still returning data + for the old Cmdr. + """ + def __init__(self, *args): - # Raised when the user has multiple accounts and the username/password setting is not for - # the account they're currently playing OR the user has reset their Cmdr and the Companion API - # server is still returning data for the old Cmdr self.args = args if not args: self.args = (_('Error: Wrong Cmdr'),) class Auth(object): - def __init__(self, cmdr): + """Handles authentication with the Frontier CAPI service via oAuth2.""" + + def __init__(self, cmdr: str): self.cmdr = cmdr self.session = requests.Session() self.session.headers['User-Agent'] = USER_AGENT self.verifier = self.state = None - def refresh(self): - # Try refresh token. Returns new refresh token if successful, otherwise makes new authorization request. + def refresh(self) -> None: + """ + Attempt use of Refresh Token to get a valid Access Token. + + If the Refresh Token doesn't work, make a new authorization request. + """ logger.debug(f'Trying for "{self.cmdr}"') self.verifier = None @@ -219,7 +256,7 @@ class Auth(object): logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") self.dump(r) - except Exception as e: + except Exception: logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") else: @@ -233,21 +270,17 @@ class Auth(object): self.state = self.base64_url_encode(s.to_bytes(32, byteorder='big')) # Won't work under IE: https://blogs.msdn.microsoft.com/ieinternals/2011/07/13/understanding-protocols/ logger.debug(f'Trying auth from scratch for Commander "{self.cmdr}"') + challenge = self.base64_url_encode(hashlib.sha256(self.verifier).digest()) webbrowser.open( - '{server_auth}{url_auth}?response_type=code&audience=frontier&scope=capi&client_id={client_id}&code_challenge={challenge}&code_challenge_method=S256&state={state}&redirect_uri={redirect}'.format( # noqa: E501 # I cant make this any shorter - server_auth=SERVER_AUTH, - url_auth=URL_AUTH, - client_id=CLIENT_ID, - challenge=self.base64_url_encode(hashlib.sha256(self.verifier).digest()), - state=self.state, - redirect=protocolhandler.redirect - ) + f'{SERVER_AUTH}{URL_AUTH}?response_type=code&audience=frontier&scope=capi&client_id={CLIENT_ID}&code_challenge={challenge}&code_challenge_method=S256&state={self.state}&redirect_uri={protocolhandler.redirect}' # noqa: E501 # I cant make this any shorter ) - def authorize(self, payload): + def authorize(self, payload: str) -> str: + """Handle oAuth authorization callback. + + :return: access token if successful, otherwise raises CredentialsError. + """ logger.debug('Checking oAuth authorization callback') - # Handle OAuth authorization code callback. - # Returns access token if successful, otherwise raises CredentialsError if '?' not in payload: logger.error(f'Frontier CAPI Auth: Malformed response (no "?" in payload)\n{payload}\n') raise CredentialsError('malformed payload') # Not well formed @@ -255,14 +288,14 @@ class Auth(object): data = urllib.parse.parse_qs(payload[(payload.index('?') + 1):]) if not self.state or not data.get('state') or data['state'][0] != self.state: logger.error(f'Frontier CAPI Auth: Unexpected response\n{payload}\n') - raise CredentialsError('Unexpected response from authorization {!r}'.format(payload)) # Unexpected reply + raise CredentialsError(f'Unexpected response from authorization {payload!r}') if not data.get('code'): logger.error(f'Frontier CAPI Auth: Negative response (no "code" in returned data)\n{payload}\n') error = next( (data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',) ) - raise CredentialsError('Error: {!r}'.format(error)[0]) + raise CredentialsError(f'Error: {error[0]!r}') try: logger.debug('Got code, posting it back...') @@ -299,10 +332,11 @@ class Auth(object): logger.error(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"") self.dump(r) error = next((data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',)) - raise CredentialsError('Error: {!r}'.format(error)[0]) + raise CredentialsError(f'Error: {error[0]!r}') @staticmethod - def invalidate(cmdr): + def invalidate(cmdr: str) -> None: + """Invalidate Refresh Token for specified Commander.""" logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"') cmdrs = config.get('cmdrs') idx = cmdrs.index(cmdr) @@ -312,14 +346,18 @@ class Auth(object): config.set('fdev_apikeys', tokens) config.save() # Save settings now for use by command-line app - def dump(self, r): + def dump(self, r: requests.Response) -> None: + """Dump details of HTTP failure from oAuth attempt.""" logger.debug(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason if r.reason else "None"} {r.text}') - def base64_url_encode(self, text): + def base64_url_encode(self, text: str) -> str: + """Base64 encode text for URL.""" return base64.urlsafe_b64encode(text).decode().replace('=', '') class Session(object): + """Methods for handling an oAuth2 session.""" + STATE_INIT, STATE_AUTH, STATE_OK = list(range(3)) def __init__(self): @@ -329,8 +367,12 @@ class Session(object): self.auth = None self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query - def login(self, cmdr=None, is_beta=None): - # Returns True if login succeeded, False if re-authorization initiated. + def login(self, cmdr: str = None, is_beta: Union[None, bool] = None) -> bool: + """ + Attempt oAuth2 login. + + :return: True if login succeeded, False if re-authorization initiated. + """ if not CLIENT_ID: logger.error('CLIENT_ID is None') raise CredentialsError('cannot login without a valid Client ID') @@ -376,7 +418,8 @@ class Session(object): # Wait for callback # Callback from protocol handler - def auth_callback(self): + def auth_callback(self) -> None: + """Handle callback from edmc:// handler.""" logger.debug('Handling callback from edmc:// handler') if self.state != Session.STATE_AUTH: # Shouldn't be getting a callback @@ -394,14 +437,16 @@ class Session(object): self.auth = None raise # Bad thing happened - def start(self, access_token): + def start(self, access_token: str) -> None: + """Start an oAuth2 session.""" logger.debug('Starting session') self.session = requests.Session() - self.session.headers['Authorization'] = 'Bearer {}'.format(access_token) + self.session.headers['Authorization'] = f'Bearer {access_token}' self.session.headers['User-Agent'] = USER_AGENT self.state = Session.STATE_OK - def query(self, endpoint): + def query(self, endpoint: str) -> Mapping[str, Any]: + """Perform a query against the specified CAPI endpoint.""" logger.debug(f'Performing query for endpoint "{endpoint}"') if self.state == Session.STATE_INIT: if self.login(): @@ -432,7 +477,7 @@ class Session(object): # Server error. Typically 500 "Internal Server Error" if server is down logger.debug('500 status back from CAPI') self.dump(r) - raise ServerError('Received error {} from server'.format(r.status_code)) + raise ServerError(f'Received error {r.status_code} from server') try: r.raise_for_status() # Typically 403 "Forbidden" on token expiry @@ -467,36 +512,41 @@ class Session(object): return data - def profile(self): + def profile(self) -> Mapping[str, Any]: + """Perform general CAPI /profile endpoint query.""" return self.query(URL_QUERY) - def station(self): + def station(self) -> Union[Mapping[str, Any], None]: + """Perform CAPI /profile endpoint query for station data.""" data = self.query(URL_QUERY) - if data['commander'].get('docked'): - services = data['lastStarport'].get('services', {}) + if not data['commander'].get('docked'): + return None - last_starport_name = data['lastStarport']['name'] - last_starport_id = int(data['lastStarport']['id']) + services = data['lastStarport'].get('services', {}) - if services.get('commodities'): - marketdata = self.query(URL_MARKET) - if (last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id'])): - raise ServerLagging() + last_starport_name = data['lastStarport']['name'] + last_starport_id = int(data['lastStarport']['id']) - else: - data['lastStarport'].update(marketdata) + if services.get('commodities'): + marketdata = self.query(URL_MARKET) + if (last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id'])): + raise ServerLagging() - if services.get('outfitting') or services.get('shipyard'): - shipdata = self.query(URL_SHIPYARD) - if (last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id'])): - raise ServerLagging() + else: + data['lastStarport'].update(marketdata) - else: - data['lastStarport'].update(shipdata) + if services.get('outfitting') or services.get('shipyard'): + shipdata = self.query(URL_SHIPYARD) + if (last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id'])): + raise ServerLagging() + + else: + data['lastStarport'].update(shipdata) return data - def close(self): + def close(self) -> None: + """Close CAPI authorization session.""" self.state = Session.STATE_INIT if self.session: try: @@ -507,18 +557,25 @@ class Session(object): self.session = None - def invalidate(self): + def invalidate(self) -> None: + """Invalidate oAuth2 credentials.""" logger.debug('Forcing a full re-authentication') # Force a full re-authentication self.close() Auth.invalidate(self.credentials['cmdr']) - def dump(self, r): + def dump(self, r: requests.Response) -> None: + """Log, as error, status of requests.Response from CAPI request.""" logger.error(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason and r.reason or "None"} {r.text}') -# Returns a shallow copy of the received data suitable for export to older tools -# English commodity names and anomalies fixed up -def fixup(data): + +def fixup(data: Mapping[str, Any]) -> Mapping[str, Any]: # noqa: C901, CCR001 # Can't be usefully simplified + """ + Fix up commodity names to English & miscellaneous anomalies fixes. + + :return: a shallow copy of the received data suitable for export to + older tools. + """ if not commodity_map: # Lazily populate for f in ('commodity.csv', 'rare_commodity.csv'): @@ -589,9 +646,10 @@ def fixup(data): return datacopy -# Return a subset of the received data describing the current ship -def ship(data): - def filter_ship(d): +def ship(data: Mapping[str, Any]) -> Mapping[str, Any]: + """Construct a subset of the received data describing the current ship.""" + def filter_ship(d: Mapping[str, Any]) -> Mapping[str, Any]: + """Filter provided ship data.""" filtered = {} for k, v in d.items(): if v == []: @@ -619,8 +677,8 @@ def ship(data): return filter_ship(data['ship']) -# Ship name suitable for writing to a file -def ship_file_name(ship_name, ship_type): +def ship_file_name(ship_name: str, ship_type: str) -> str: + """Return a ship name suitable for a filename.""" name = str(ship_name or ship_map.get(ship_type.lower(), ship_type)).strip() if name.endswith('.'): name = name[:-1]