1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-06-04 17:41:18 +03:00

Merge pull request #704 from EDCD/cleanup/commodity.py

Cleanup/commodity.py
This commit is contained in:
Athanasius 2020-09-16 21:01:27 +01:00 committed by GitHub
commit 42c826f04b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,33 +1,42 @@
from builtins import str """
from builtins import range Handle use of Frontier's Companion API (CAPI) service.
from builtins import object
Deals with initiating authentication for, and use of, CAPI.
Some associated code is in protocol.py which creates and handles the edmc://
protocol used for the callback.
"""
import base64 import base64
import csv import csv
import requests
from typing import TYPE_CHECKING
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
from email.utils import parsedate
import hashlib import hashlib
import logging
import numbers import numbers
import os import os
from os.path import join
import random import random
import time import time
import urllib.parse import urllib.parse
import webbrowser import webbrowser
from builtins import object, range, str
from email.utils import parsedate
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
from os.path import join
from typing import TYPE_CHECKING, Any, Dict, List, NewType, Union
import requests
from config import appname, appversion, config from config import appname, appversion, config
from protocol import protocolhandler from protocol import protocolhandler
import logging
logger = logging.getLogger(appname) logger = logging.getLogger(appname)
if TYPE_CHECKING: if TYPE_CHECKING:
_ = lambda x: x # noqa # to make flake8 stop complaining that the hacked in _ method doesnt exist _ = lambda x: x # noqa: E731 # to make flake8 stop complaining that the hacked in _ method doesnt exist
# Define custom type for the dicts that hold CAPI data
CAPIData = NewType('CAPIData', Dict)
holdoff = 60 # be nice holdoff = 60 # be nice
timeout = 10 # requests timeout timeout = 10 # requests timeout
auth_timeout = 30 # timeout for initial auth auth_timeout = 30 # timeout for initial auth
@ -35,83 +44,89 @@ auth_timeout = 30 # timeout for initial auth
# Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in # Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in
# Athanasius' Frontier account # Athanasius' Frontier account
# Obtain from https://auth.frontierstore.net/client/signup # Obtain from https://auth.frontierstore.net/client/signup
CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a' CLIENT_ID = os.getenv('CLIENT_ID') or 'fb88d428-9110-475f-a3d2-dc151c2b9c7a'
SERVER_AUTH = 'https://auth.frontierstore.net' SERVER_AUTH = 'https://auth.frontierstore.net'
URL_AUTH = '/auth' URL_AUTH = '/auth'
URL_TOKEN = '/token' URL_TOKEN = '/token'
USER_AGENT = 'EDCD-{}-{}'.format(appname, appversion) USER_AGENT = f'EDCD-{appname}-{appversion}'
SERVER_LIVE = 'https://companion.orerve.net' SERVER_LIVE = 'https://companion.orerve.net'
SERVER_BETA = 'https://pts-companion.orerve.net' SERVER_BETA = 'https://pts-companion.orerve.net'
URL_QUERY = '/profile' URL_QUERY = '/profile'
URL_MARKET = '/market' URL_MARKET = '/market'
URL_SHIPYARD= '/shipyard' URL_SHIPYARD = '/shipyard'
# Map values reported by the Companion interface to names displayed in-game # Map values reported by the Companion interface to names displayed in-game
# May be imported by plugins # May be imported by plugins
category_map = { category_map = {
'Narcotics' : 'Legal Drugs', 'Narcotics': 'Legal Drugs',
'Slaves' : 'Slavery', 'Slaves': 'Slavery',
'Waste ' : 'Waste', 'Waste ': 'Waste',
'NonMarketable' : False, # Don't appear in the in-game market so don't report 'NonMarketable': False, # Don't appear in the in-game market so don't report
} }
commodity_map = {} commodity_map: Dict = {}
ship_map = { ship_map = {
'adder' : 'Adder', 'adder': 'Adder',
'anaconda' : 'Anaconda', 'anaconda': 'Anaconda',
'asp' : 'Asp Explorer', 'asp': 'Asp Explorer',
'asp_scout' : 'Asp Scout', 'asp_scout': 'Asp Scout',
'belugaliner' : 'Beluga Liner', 'belugaliner': 'Beluga Liner',
'cobramkiii' : 'Cobra MkIII', 'cobramkiii': 'Cobra MkIII',
'cobramkiv' : 'Cobra MkIV', 'cobramkiv': 'Cobra MkIV',
'clipper' : 'Panther Clipper', 'clipper': 'Panther Clipper',
'cutter' : 'Imperial Cutter', 'cutter': 'Imperial Cutter',
'diamondback' : 'Diamondback Scout', 'diamondback': 'Diamondback Scout',
'diamondbackxl' : 'Diamondback Explorer', 'diamondbackxl': 'Diamondback Explorer',
'dolphin' : 'Dolphin', 'dolphin': 'Dolphin',
'eagle' : 'Eagle', 'eagle': 'Eagle',
'empire_courier' : 'Imperial Courier', 'empire_courier': 'Imperial Courier',
'empire_eagle' : 'Imperial Eagle', 'empire_eagle': 'Imperial Eagle',
'empire_fighter' : 'Imperial Fighter', 'empire_fighter': 'Imperial Fighter',
'empire_trader' : 'Imperial Clipper', 'empire_trader': 'Imperial Clipper',
'federation_corvette' : 'Federal Corvette', 'federation_corvette': 'Federal Corvette',
'federation_dropship' : 'Federal Dropship', 'federation_dropship': 'Federal Dropship',
'federation_dropship_mkii' : 'Federal Assault Ship', 'federation_dropship_mkii': 'Federal Assault Ship',
'federation_gunship' : 'Federal Gunship', 'federation_gunship': 'Federal Gunship',
'federation_fighter' : 'F63 Condor', 'federation_fighter': 'F63 Condor',
'ferdelance' : 'Fer-de-Lance', 'ferdelance': 'Fer-de-Lance',
'hauler' : 'Hauler', 'hauler': 'Hauler',
'independant_trader' : 'Keelback', 'independant_trader': 'Keelback',
'independent_fighter' : 'Taipan Fighter', 'independent_fighter': 'Taipan Fighter',
'krait_mkii' : 'Krait MkII', 'krait_mkii': 'Krait MkII',
'krait_light' : 'Krait Phantom', 'krait_light': 'Krait Phantom',
'mamba' : 'Mamba', 'mamba': 'Mamba',
'orca' : 'Orca', 'orca': 'Orca',
'python' : 'Python', 'python': 'Python',
'scout' : 'Taipan Fighter', 'scout': 'Taipan Fighter',
'sidewinder' : 'Sidewinder', 'sidewinder': 'Sidewinder',
'testbuggy' : 'Scarab', 'testbuggy': 'Scarab',
'type6' : 'Type-6 Transporter', 'type6': 'Type-6 Transporter',
'type7' : 'Type-7 Transporter', 'type7': 'Type-7 Transporter',
'type9' : 'Type-9 Heavy', 'type9': 'Type-9 Heavy',
'type9_military' : 'Type-10 Defender', 'type9_military': 'Type-10 Defender',
'typex' : 'Alliance Chieftain', 'typex': 'Alliance Chieftain',
'typex_2' : 'Alliance Crusader', 'typex_2': 'Alliance Crusader',
'typex_3' : 'Alliance Challenger', 'typex_3': 'Alliance Challenger',
'viper' : 'Viper MkIII', 'viper': 'Viper MkIII',
'viper_mkiv' : 'Viper MkIV', 'viper_mkiv': 'Viper MkIV',
'vulture' : 'Vulture', 'vulture': 'Vulture',
} }
# Companion API sometimes returns an array as a json array, sometimes as a json object indexed by "int". def listify(thing: Union[List, Dict]) -> List:
# This seems to depend on whether the there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse. """
# In practice these arrays aren't very sparse so just convert them to lists with any 'gaps' holding None. Convert actual JSON array or int-indexed dict into a Python list.
def listify(thing):
Companion API sometimes returns an array as a json array, sometimes as
a json object indexed by "int". This seems to depend on whether the
there are 'gaps' in the Cmdr's data - i.e. whether the array is sparse.
In practice these arrays aren't very sparse so just convert them to
lists with any 'gaps' holding None.
"""
if thing is None: if thing is None:
return [] # data is not present return [] # data is not present
@ -119,7 +134,7 @@ def listify(thing):
return list(thing) # array is not sparse return list(thing) # array is not sparse
elif isinstance(thing, dict): elif isinstance(thing, dict):
retval = [] retval: List[Any] = []
for k, v in thing.items(): for k, v in thing.items():
idx = int(k) idx = int(k)
@ -132,10 +147,12 @@ def listify(thing):
return retval return retval
else: else:
raise ValueError("expected an array or sparse array, got {!r}".format(thing)) raise ValueError(f"expected an array or sparse array, got {thing!r}")
class ServerError(Exception): class ServerError(Exception):
"""Exception Class for CAPI ServerErrors."""
def __init__(self, *args): def __init__(self, *args):
# Raised when cannot contact the Companion API server # Raised when cannot contact the Companion API server
self.args = args self.args = args
@ -144,23 +161,34 @@ class ServerError(Exception):
class ServerLagging(Exception): class ServerLagging(Exception):
"""Exception Class for CAPI Server lagging.
Raised when Companion API server is returning old data, e.g. when the
servers are too busy.
"""
def __init__(self, *args): def __init__(self, *args):
# Raised when Companion API server is returning old data, e.g. when the servers are too busy
self.args = args self.args = args
if not args: if not args:
self.args = (_('Error: Frontier server is lagging'),) self.args = (_('Error: Frontier server is lagging'),)
class SKUError(Exception): class SKUError(Exception):
"""Exception Class for CAPI SKU error.
Raised when the Companion API server thinks that the user has not
purchased E:D i.e. doesn't have the correct 'SKU'.
"""
def __init__(self, *args): def __init__(self, *args):
# Raised when the Companion API server thinks that the user has not purchased E:D
# i.e. doesn't have the correct 'SKU'
self.args = args self.args = args
if not args: if not args:
self.args = (_('Error: Frontier server SKU problem'),) self.args = (_('Error: Frontier server SKU problem'),)
class CredentialsError(Exception): class CredentialsError(Exception):
"""Exception Class for CAPI Credentials error."""
def __init__(self, *args): def __init__(self, *args):
self.args = args self.args = args
if not args: if not args:
@ -168,24 +196,38 @@ class CredentialsError(Exception):
class CmdrError(Exception): class CmdrError(Exception):
"""Exception Class for CAPI Commander error.
Raised when the user has multiple accounts and the username/password
setting is not for the account they're currently playing OR the user has
reset their Cmdr and the Companion API server is still returning data
for the old Cmdr.
"""
def __init__(self, *args): def __init__(self, *args):
# Raised when the user has multiple accounts and the username/password setting is not for
# the account they're currently playing OR the user has reset their Cmdr and the Companion API
# server is still returning data for the old Cmdr
self.args = args self.args = args
if not args: if not args:
self.args = (_('Error: Wrong Cmdr'),) self.args = (_('Error: Wrong Cmdr'),)
class Auth(object): class Auth(object):
def __init__(self, cmdr): """Handles authentication with the Frontier CAPI service via oAuth2."""
self.cmdr = cmdr
def __init__(self, cmdr: str):
self.cmdr: str = cmdr
self.session = requests.Session() self.session = requests.Session()
self.session.headers['User-Agent'] = USER_AGENT self.session.headers['User-Agent'] = USER_AGENT
self.verifier = self.state = None self.verifier: Union[bytes, None] = None
self.state: Union[str, None] = None
def refresh(self): def refresh(self) -> Union[str, None]:
# Try refresh token. Returns new refresh token if successful, otherwise makes new authorization request. """
Attempt use of Refresh Token to get a valid Access Token.
If the Refresh Token doesn't work, make a new authorization request.
:return: Access Token if retrieved, else None.
"""
logger.debug(f'Trying for "{self.cmdr}"') logger.debug(f'Trying for "{self.cmdr}"')
self.verifier = None self.verifier = None
@ -199,14 +241,14 @@ class Auth(object):
tokens = tokens + [''] * (len(cmdrs) - len(tokens)) tokens = tokens + [''] * (len(cmdrs) - len(tokens))
if tokens[idx]: if tokens[idx]:
logger.debug('We have a refresh token for that idx') logger.debug('We have a refresh token for that idx')
try: data = {
data = { 'grant_type': 'refresh_token',
'grant_type': 'refresh_token', 'client_id': CLIENT_ID,
'client_id': CLIENT_ID, 'refresh_token': tokens[idx],
'refresh_token': tokens[idx], }
}
logger.debug('Attempting refresh with Frontier...') logger.debug('Attempting refresh with Frontier...')
try:
r = self.session.post(SERVER_AUTH + URL_TOKEN, data=data, timeout=auth_timeout) r = self.session.post(SERVER_AUTH + URL_TOKEN, data=data, timeout=auth_timeout)
if r.status_code == requests.codes.ok: if r.status_code == requests.codes.ok:
data = r.json() data = r.json()
@ -219,7 +261,7 @@ class Auth(object):
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
self.dump(r) self.dump(r)
except Exception as e: except (ValueError, requests.RequestException, ):
logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
else: else:
@ -233,21 +275,19 @@ class Auth(object):
self.state = self.base64_url_encode(s.to_bytes(32, byteorder='big')) self.state = self.base64_url_encode(s.to_bytes(32, byteorder='big'))
# Won't work under IE: https://blogs.msdn.microsoft.com/ieinternals/2011/07/13/understanding-protocols/ # Won't work under IE: https://blogs.msdn.microsoft.com/ieinternals/2011/07/13/understanding-protocols/
logger.debug(f'Trying auth from scratch for Commander "{self.cmdr}"') logger.debug(f'Trying auth from scratch for Commander "{self.cmdr}"')
challenge = self.base64_url_encode(hashlib.sha256(self.verifier).digest())
webbrowser.open( webbrowser.open(
'{server_auth}{url_auth}?response_type=code&audience=frontier&scope=capi&client_id={client_id}&code_challenge={challenge}&code_challenge_method=S256&state={state}&redirect_uri={redirect}'.format( # noqa: E501 # I cant make this any shorter f'{SERVER_AUTH}{URL_AUTH}?response_type=code&audience=frontier&scope=capi&client_id={CLIENT_ID}&code_challenge={challenge}&code_challenge_method=S256&state={self.state}&redirect_uri={protocolhandler.redirect}' # noqa: E501 # I cant make this any shorter
server_auth=SERVER_AUTH,
url_auth=URL_AUTH,
client_id=CLIENT_ID,
challenge=self.base64_url_encode(hashlib.sha256(self.verifier).digest()),
state=self.state,
redirect=protocolhandler.redirect
)
) )
def authorize(self, payload): return None
def authorize(self, payload: str) -> str:
"""Handle oAuth authorization callback.
:return: access token if successful, otherwise raises CredentialsError.
"""
logger.debug('Checking oAuth authorization callback') logger.debug('Checking oAuth authorization callback')
# Handle OAuth authorization code callback.
# Returns access token if successful, otherwise raises CredentialsError
if '?' not in payload: if '?' not in payload:
logger.error(f'Frontier CAPI Auth: Malformed response (no "?" in payload)\n{payload}\n') logger.error(f'Frontier CAPI Auth: Malformed response (no "?" in payload)\n{payload}\n')
raise CredentialsError('malformed payload') # Not well formed raise CredentialsError('malformed payload') # Not well formed
@ -255,19 +295,20 @@ class Auth(object):
data = urllib.parse.parse_qs(payload[(payload.index('?') + 1):]) data = urllib.parse.parse_qs(payload[(payload.index('?') + 1):])
if not self.state or not data.get('state') or data['state'][0] != self.state: if not self.state or not data.get('state') or data['state'][0] != self.state:
logger.error(f'Frontier CAPI Auth: Unexpected response\n{payload}\n') logger.error(f'Frontier CAPI Auth: Unexpected response\n{payload}\n')
raise CredentialsError('Unexpected response from authorization {!r}'.format(payload)) # Unexpected reply raise CredentialsError(f'Unexpected response from authorization {payload!r}')
if not data.get('code'): if not data.get('code'):
logger.error(f'Frontier CAPI Auth: Negative response (no "code" in returned data)\n{payload}\n') logger.error(f'Frontier CAPI Auth: Negative response (no "code" in returned data)\n{payload}\n')
error = next( error = next(
(data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',) (data[k] for k in ('error_description', 'error', 'message') if k in data),
'<unknown error>'
) )
raise CredentialsError('Error: {!r}'.format(error)[0]) raise CredentialsError(f'Error: {error!r}')
r = None
try: try:
logger.debug('Got code, posting it back...') logger.debug('Got code, posting it back...')
r = None request_data = {
data = {
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
'client_id': CLIENT_ID, 'client_id': CLIENT_ID,
'code_verifier': self.verifier, 'code_verifier': self.verifier,
@ -275,7 +316,7 @@ class Auth(object):
'redirect_uri': protocolhandler.redirect, 'redirect_uri': protocolhandler.redirect,
} }
r = self.session.post(SERVER_AUTH + URL_TOKEN, data=data, timeout=auth_timeout) r = self.session.post(SERVER_AUTH + URL_TOKEN, data=request_data, timeout=auth_timeout)
data = r.json() data = r.json()
if r.status_code == requests.codes.ok: if r.status_code == requests.codes.ok:
logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"') logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"')
@ -287,7 +328,7 @@ class Auth(object):
config.set('fdev_apikeys', tokens) config.set('fdev_apikeys', tokens)
config.save() # Save settings now for use by command-line app config.save() # Save settings now for use by command-line app
return data.get('access_token') return str(data.get('access_token'))
except Exception as e: except Exception as e:
logger.exception(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"") logger.exception(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
@ -298,11 +339,15 @@ class Auth(object):
logger.error(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"") logger.error(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
self.dump(r) self.dump(r)
error = next((data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',)) error = next(
raise CredentialsError('Error: {!r}'.format(error)[0]) (data[k] for k in ('error_description', 'error', 'message') if k in data),
'<unknown error>'
)
raise CredentialsError(f'Error: {error!r}')
@staticmethod @staticmethod
def invalidate(cmdr): def invalidate(cmdr: str) -> None:
"""Invalidate Refresh Token for specified Commander."""
logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"') logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"')
cmdrs = config.get('cmdrs') cmdrs = config.get('cmdrs')
idx = cmdrs.index(cmdr) idx = cmdrs.index(cmdr)
@ -312,25 +357,36 @@ class Auth(object):
config.set('fdev_apikeys', tokens) config.set('fdev_apikeys', tokens)
config.save() # Save settings now for use by command-line app config.save() # Save settings now for use by command-line app
def dump(self, r): # noinspection PyMethodMayBeStatic
def dump(self, r: requests.Response) -> None:
"""Dump details of HTTP failure from oAuth attempt."""
logger.debug(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason if r.reason else "None"} {r.text}') logger.debug(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason if r.reason else "None"} {r.text}')
def base64_url_encode(self, text): # noinspection PyMethodMayBeStatic
def base64_url_encode(self, text: bytes) -> str:
"""Base64 encode text for URL."""
return base64.urlsafe_b64encode(text).decode().replace('=', '') return base64.urlsafe_b64encode(text).decode().replace('=', '')
class Session(object): class Session(object):
"""Methods for handling an oAuth2 session."""
STATE_INIT, STATE_AUTH, STATE_OK = list(range(3)) STATE_INIT, STATE_AUTH, STATE_OK = list(range(3))
def __init__(self): def __init__(self):
self.state = Session.STATE_INIT self.state = Session.STATE_INIT
self.server = None
self.credentials = None self.credentials = None
self.session = None self.session = None
self.auth = None self.auth = None
self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query
def login(self, cmdr=None, is_beta=None): def login(self, cmdr: str = None, is_beta: Union[None, bool] = None) -> bool:
# Returns True if login succeeded, False if re-authorization initiated. """
Attempt oAuth2 login.
:return: True if login succeeded, False if re-authorization initiated.
"""
if not CLIENT_ID: if not CLIENT_ID:
logger.error('CLIENT_ID is None') logger.error('CLIENT_ID is None')
raise CredentialsError('cannot login without a valid Client ID') raise CredentialsError('cannot login without a valid Client ID')
@ -376,7 +432,8 @@ class Session(object):
# Wait for callback # Wait for callback
# Callback from protocol handler # Callback from protocol handler
def auth_callback(self): def auth_callback(self) -> None:
"""Handle callback from edmc:// handler."""
logger.debug('Handling callback from edmc:// handler') logger.debug('Handling callback from edmc:// handler')
if self.state != Session.STATE_AUTH: if self.state != Session.STATE_AUTH:
# Shouldn't be getting a callback # Shouldn't be getting a callback
@ -394,14 +451,16 @@ class Session(object):
self.auth = None self.auth = None
raise # Bad thing happened raise # Bad thing happened
def start(self, access_token): def start(self, access_token: str) -> None:
"""Start an oAuth2 session."""
logger.debug('Starting session') logger.debug('Starting session')
self.session = requests.Session() self.session = requests.Session()
self.session.headers['Authorization'] = 'Bearer {}'.format(access_token) self.session.headers['Authorization'] = f'Bearer {access_token}'
self.session.headers['User-Agent'] = USER_AGENT self.session.headers['User-Agent'] = USER_AGENT
self.state = Session.STATE_OK self.state = Session.STATE_OK
def query(self, endpoint): def query(self, endpoint: str) -> CAPIData:
"""Perform a query against the specified CAPI endpoint."""
logger.debug(f'Performing query for endpoint "{endpoint}"') logger.debug(f'Performing query for endpoint "{endpoint}"')
if self.state == Session.STATE_INIT: if self.state == Session.STATE_INIT:
if self.login(): if self.login():
@ -432,7 +491,7 @@ class Session(object):
# Server error. Typically 500 "Internal Server Error" if server is down # Server error. Typically 500 "Internal Server Error" if server is down
logger.debug('500 status back from CAPI') logger.debug('500 status back from CAPI')
self.dump(r) self.dump(r)
raise ServerError('Received error {} from server'.format(r.status_code)) raise ServerError(f'Received error {r.status_code} from server')
try: try:
r.raise_for_status() # Typically 403 "Forbidden" on token expiry r.raise_for_status() # Typically 403 "Forbidden" on token expiry
@ -463,40 +522,45 @@ class Session(object):
self.retrying = False self.retrying = False
if 'timestamp' not in data: if 'timestamp' not in data:
logger.debug('timestamp not in data, adding from response headers') logger.debug('timestamp not in data, adding from response headers')
data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date'])) data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date'])) # type: ignore
return data return data
def profile(self): def profile(self) -> CAPIData:
"""Perform general CAPI /profile endpoint query."""
return self.query(URL_QUERY) return self.query(URL_QUERY)
def station(self): def station(self) -> Union[CAPIData, None]:
"""Perform CAPI /profile endpoint query for station data."""
data = self.query(URL_QUERY) data = self.query(URL_QUERY)
if data['commander'].get('docked'): if not data['commander'].get('docked'):
services = data['lastStarport'].get('services', {}) return None
last_starport_name = data['lastStarport']['name'] services = data['lastStarport'].get('services', {})
last_starport_id = int(data['lastStarport']['id'])
if services.get('commodities'): last_starport_name = data['lastStarport']['name']
marketdata = self.query(URL_MARKET) last_starport_id = int(data['lastStarport']['id'])
if (last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id'])):
raise ServerLagging()
else: if services.get('commodities'):
data['lastStarport'].update(marketdata) marketdata = self.query(URL_MARKET)
if last_starport_name != marketdata['name'] or last_starport_id != int(marketdata['id']):
raise ServerLagging()
if services.get('outfitting') or services.get('shipyard'): else:
shipdata = self.query(URL_SHIPYARD) data['lastStarport'].update(marketdata)
if (last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id'])):
raise ServerLagging()
else: if services.get('outfitting') or services.get('shipyard'):
data['lastStarport'].update(shipdata) shipdata = self.query(URL_SHIPYARD)
if last_starport_name != shipdata['name'] or last_starport_id != int(shipdata['id']):
raise ServerLagging()
else:
data['lastStarport'].update(shipdata)
return data return data
def close(self): def close(self) -> None:
"""Close CAPI authorization session."""
self.state = Session.STATE_INIT self.state = Session.STATE_INIT
if self.session: if self.session:
try: try:
@ -507,18 +571,26 @@ class Session(object):
self.session = None self.session = None
def invalidate(self): def invalidate(self) -> None:
"""Invalidate oAuth2 credentials."""
logger.debug('Forcing a full re-authentication') logger.debug('Forcing a full re-authentication')
# Force a full re-authentication # Force a full re-authentication
self.close() self.close()
Auth.invalidate(self.credentials['cmdr']) Auth.invalidate(self.credentials['cmdr'])
def dump(self, r): # noinspection PyMethodMayBeStatic
def dump(self, r: requests.Response) -> None:
"""Log, as error, status of requests.Response from CAPI request."""
logger.error(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason and r.reason or "None"} {r.text}') logger.error(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason and r.reason or "None"} {r.text}')
# Returns a shallow copy of the received data suitable for export to older tools
# English commodity names and anomalies fixed up def fixup(data: CAPIData) -> CAPIData: # noqa: C901, CCR001 # Can't be usefully simplified
def fixup(data): """
Fix up commodity names to English & miscellaneous anomalies fixes.
:return: a shallow copy of the received data suitable for export to
older tools.
"""
if not commodity_map: if not commodity_map:
# Lazily populate # Lazily populate
for f in ('commodity.csv', 'rare_commodity.csv'): for f in ('commodity.csv', 'rare_commodity.csv'):
@ -586,15 +658,16 @@ def fixup(data):
datacopy = data.copy() datacopy = data.copy()
datacopy['lastStarport'] = data['lastStarport'].copy() datacopy['lastStarport'] = data['lastStarport'].copy()
datacopy['lastStarport']['commodities'] = commodities datacopy['lastStarport']['commodities'] = commodities
return datacopy return CAPIData(datacopy)
# Return a subset of the received data describing the current ship def ship(data: CAPIData) -> CAPIData:
def ship(data): """Construct a subset of the received data describing the current ship."""
def filter_ship(d): def filter_ship(d: CAPIData) -> CAPIData:
filtered = {} """Filter provided ship data."""
filtered: CAPIData = CAPIData({})
for k, v in d.items(): for k, v in d.items():
if v == []: if not v:
pass # just skip empty fields for brevity pass # just skip empty fields for brevity
elif k in ('alive', 'cargo', 'cockpitBreached', 'health', 'oxygenRemaining', elif k in ('alive', 'cargo', 'cockpitBreached', 'health', 'oxygenRemaining',
@ -619,8 +692,8 @@ def ship(data):
return filter_ship(data['ship']) return filter_ship(data['ship'])
# Ship name suitable for writing to a file def ship_file_name(ship_name: str, ship_type: str) -> str:
def ship_file_name(ship_name, ship_type): """Return a ship name suitable for a filename."""
name = str(ship_name or ship_map.get(ship_type.lower(), ship_type)).strip() name = str(ship_name or ship_map.get(ship_type.lower(), ship_type)).strip()
if name.endswith('.'): if name.endswith('.'):
name = name[:-1] name = name[:-1]