1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-18 18:07:37 +03:00

CAPI: 'station' query going via work queue

* Defined inner functions in the worker function to handle the actual
  queries.  This allows for *them* to simply return data or raise
  exceptions to be caught by the main worker function.  *IT* then
  handles returning any error appropriately via the queue to AppWindow.

* Due to the coupling between AppWindow and these queries there are
  several extra parameters passed into the queue worker and then back
  out.  This is largely due to having to split AppWindow.getandsend()
  into two functions: capi_request_data() and capi_handle_response().

  This might get changed to use a class to encapsulate those values,
  rather than the bare tuple currently being used.

* No full flake8 & mypy pass done yet.

* Some companion.py globals renamed so their use is more obvious.
This commit is contained in:
Athanasius 2021-08-17 16:12:58 +01:00
parent 3fe097a955
commit c1deb9113e
No known key found for this signature in database
GPG Key ID: AE3E527847057C7D
2 changed files with 203 additions and 122 deletions

View File

@ -18,7 +18,7 @@ from os.path import dirname, join
from queue import Queue
from sys import platform
from time import localtime, strftime, time
from typing import TYPE_CHECKING, Optional, Tuple
from typing import TYPE_CHECKING, Optional, Tuple, Union
# Have this as early as possible for people running EDMarketConnector.exe
# from cmd.exe or a bat file or similar. Else they might not be in the correct
@ -385,7 +385,7 @@ class AppWindow(object):
def __init__(self, master: tk.Tk): # noqa: C901, CCR001 # TODO - can possibly factor something out
self.holdofftime = config.get_int('querytime', default=0) + companion.holdoff
self.capi_query_holdoff_time = config.get_int('querytime', default=0) + companion.capi_query_cooldown
self.capi_response_queue: Queue = Queue()
companion.session.set_capi_response_queue(self.capi_response_queue)
@ -894,7 +894,7 @@ class AppWindow(object):
return True
def capi_request_data(self, event=None, retrying: bool = False): # noqa: C901, CCR001
def capi_request_data(self, event=None, retrying: bool = False):
"""
Perform CAPI data retrieval and associated actions.
@ -903,23 +903,23 @@ class AppWindow(object):
"""
auto_update = not event
play_sound = (auto_update or int(event.type) == self.EVENT_VIRTUAL) and not config.get_int('hotkey_mute')
play_bad = False
err: Optional[str] = None
if (
not monitor.cmdr or not monitor.mode or monitor.state['Captain']
or not monitor.system or monitor.mode == 'CQC'
):
logger.trace_if('capi.worker', 'CQC detected, aborting query')
return # In CQC or on crew - do nothing
if companion.session.state == companion.Session.STATE_AUTH:
logger.trace_if('capi.worker', 'Auth in progress? Aborting query')
# Attempt another Auth
self.login()
return
if not retrying:
if time() < self.holdofftime: # Was invoked by key while in cooldown
if play_sound and (self.holdofftime - time()) < companion.holdoff * 0.75:
if time() < self.capi_query_holdoff_time: # Was invoked by key while in cooldown
if play_sound and (self.capi_query_holdoff_time - time()) < companion.capi_query_cooldown * 0.75:
self.status['text'] = ''
hotkeymgr.play_bad() # Don't play sound in first few seconds to prevent repeats
@ -934,14 +934,23 @@ class AppWindow(object):
self.w.update_idletasks()
querytime = int(time())
companion.session.station()
logger.trace_if('capi.worker', 'Requesting full station data')
companion.session.station(querytime=querytime, play_sound=play_sound)
config.set('querytime', querytime)
def capi_handle_response(self, event=None):
"""Handle the resulting data from a CAPI query."""
play_bad = False
err: Optional[str] = None
data: Union[companion.CAPIData, companion.CAPIFailedRequest]
querytime: int
play_sound: bool
auto_update: bool
try:
data = self.capi_response_queue.get(block=False)
logger.trace_if('capi.worker', 'Pulling answer off queue')
data, querytime, play_sound, auto_update = self.capi_response_queue.get(block=False)
if isinstance(data, companion.CAPIFailedRequest):
logger.trace_if('capi.worker', f'Failed Request: {data.message}')
if data.exception:
@ -950,6 +959,7 @@ class AppWindow(object):
else:
raise ValueError(data.message)
logger.trace_if('capi.worker', 'Answer is not a Failure')
# Validation
if 'commander' not in data:
# This can happen with EGS Auth if no commander created yet
@ -972,6 +982,7 @@ class AppWindow(object):
elif monitor.cmdr and data['commander']['name'] != monitor.cmdr:
# Companion API Commander doesn't match Journal
logger.trace_if('capi.worker', 'Raising CmdrError()')
raise companion.CmdrError()
elif auto_update and not monitor.state['OnFoot'] and not data['commander'].get('docked'):
@ -1010,7 +1021,7 @@ class AppWindow(object):
f"{last_station!r} != {monitor.station!r}")
raise companion.ServerLagging()
self.holdofftime = querytime + companion.holdoff
self.capi_query_holdoff_time = querytime + companion.capi_query_cooldown
elif not monitor.state['OnFoot'] and data['ship']['id'] != monitor.state['ShipID']:
# CAPI ship must match
@ -1025,6 +1036,7 @@ class AppWindow(object):
raise companion.ServerLagging()
else:
# TODO: Change to depend on its own CL arg
if __debug__: # Recording
companion.session.dump_capi_data(data)
@ -1068,7 +1080,7 @@ class AppWindow(object):
err = 'Error: Exporting Market data'
play_bad = True
self.holdofftime = querytime + companion.holdoff
self.capi_query_holdoff_time = querytime + companion.capi_query_cooldown
except queue.Empty:
logger.error('There was no response in the queue!')
@ -1118,7 +1130,7 @@ class AppWindow(object):
hotkeymgr.play_bad()
# Update Odyssey Suit data
companion.suit_update(data)
companion.session.suit_update(data)
self.update_suit_text()
self.suit_show_if_set()
@ -1398,10 +1410,10 @@ class AppWindow(object):
def cooldown(self) -> None:
"""Display and update the cooldown timer for 'Update' button."""
if time() < self.holdofftime:
if time() < self.capi_query_holdoff_time:
# Update button in main window
self.button['text'] = self.theme_button['text'] \
= _('cooldown {SS}s').format(SS=int(self.holdofftime - time())) # LANG: Cooldown on 'Update' button
= _('cooldown {SS}s').format(SS=int(self.capi_query_holdoff_time - time())) # LANG: Cooldown on 'Update' button
self.w.after(1000, self.cooldown)
else:

View File

@ -47,8 +47,8 @@ else:
# Define custom type for the dicts that hold CAPI data
# CAPIData = NewType('CAPIData', Dict)
holdoff = 60 # be nice
timeout = 10 # requests timeout
capi_query_cooldown = 60 # be nice
capi_default_timeout = 10 # requests timeout
auth_timeout = 30 # timeout for initial auth
# Used by both class Auth and Session
@ -79,7 +79,7 @@ class CAPIData(UserDict):
if source_endpoint is None:
return
if source_endpoint == self.FRONTIER_CAPI_PATH_SHIPYARD and self.data.get('lastStarport'):
if source_endpoint == Session.FRONTIER_CAPI_PATH_SHIPYARD and self.data.get('lastStarport'):
# All the other endpoints may or may not have a lastStarport, but definitely wont have valid data
# for this check, which means it'll just make noise for no reason while we're working on other things
self.check_modules_ships()
@ -527,7 +527,7 @@ class Session(object):
:return: True if login succeeded, False if re-authorization initiated.
"""
if not self.CLIENT_ID:
if not Auth.CLIENT_ID:
logger.error('self.CLIENT_ID is None')
raise CredentialsError('cannot login without a valid Client ID')
@ -617,88 +617,23 @@ class Session(object):
"""Worker thread that performs actual CAPI queries."""
logger.info('CAPI worker thread starting')
while True:
endpoint: Optional[str] = self.capi_query_queue.get()
if not endpoint:
logger.info('Empty queue message, exiting...')
break
def capi_single_query(capi_endpoint: str, timeout: int = capi_default_timeout) -> CAPIData:
"""
Perform a *single* CAPI endpoint query within the thread worker.
logger.trace_if('capi.worker', f'Processing query: {endpoint}')
self.query(self.FRONTIER_CAPI_PATH_PROFILE)
if not data['commander'].get('docked') and not monitor.state['OnFoot']:
return data
# Sanity checks in case data isn't as we expect, and maybe 'docked' flag
# is also lagging.
if (last_starport := data.get('lastStarport')) is None:
logger.error("No lastStarport in data!")
return data
if ((last_starport_name := last_starport.get('name')) is None
or last_starport_name == ''):
# This could well be valid if you've been out exploring for a long
# time.
logger.warning("No lastStarport name!")
return data
# WORKAROUND: n/a | 06-08-2021: Issue 1198 and https://issues.frontierstore.net/issue-detail/40706
# -- strip "+" chars off star port names returned by the CAPI
last_starport_name = last_starport["name"] = last_starport_name.rstrip(" +")
services = last_starport.get('services', {})
if not isinstance(services, dict):
# Odyssey Alpha Phase 3 4.0.0.20 has been observed having
# this be an empty list when you've jumped to another system
# and not yet docked. As opposed to no services key at all
# or an empty dict.
logger.error(f'services is "{type(services)}", not dict !')
if __debug__:
self.dump_capi_data(data)
# Set an empty dict so as to not have to retest below.
services = {}
last_starport_id = int(last_starport.get('id'))
if services.get('commodities'):
marketdata = self.query(self.FRONTIER_CAPI_PATH_MARKET)
if last_starport_id != int(marketdata['id']):
logger.warning(f"{last_starport_id!r} != {int(marketdata['id'])!r}")
raise ServerLagging()
else:
marketdata['name'] = last_starport_name
data['lastStarport'].update(marketdata)
if services.get('outfitting') or services.get('shipyard'):
shipdata = self.query(self.FRONTIER_CAPI_PATH_SHIPYARD)
if last_starport_id != int(shipdata['id']):
logger.warning(f"{last_starport_id!r} != {int(shipdata['id'])!r}")
raise ServerLagging()
else:
shipdata['name'] = last_starport_name
data['lastStarport'].update(shipdata)
# WORKAROUND END
return data
:param capi_endpoint: An actual Frontier CAPI endpoint to query.
:param timeout: requests query timeout to use.
:return: The resulting CAPI data, of type CAPIData.
"""
capi_data: CAPIData
try:
r = self.session.get(self.server + endpoint, timeout=timeout) # type: ignore
r = self.session.get(self.server + capi_endpoint, timeout=timeout) # type: ignore
r.raise_for_status() # Typically 403 "Forbidden" on token expiry
self.capi_response_queue.put(
CAPIFailedRequest(f'Redirected back to Auth Server', exception=CredentialsError())
)
continue
data = CAPIData(r.json(), endpoint) # May also fail here if token expired since response is empty
capi_data = CAPIData(r.json(), capi_endpoint) # May also fail here if token expired since response is empty
except requests.ConnectionError as e:
logger.warning(f'Unable to resolve name for CAPI: {e} (for request: {endpoint})')
self.capi_response_queue.put(
CAPIFailedRequest(f'Unable to connect to endpoint {endpoint}', exception=e)
)
continue
# raise ServerConnectionError(f'Unable to connect to endpoint {endpoint}') from e
logger.warning(f'Unable to resolve name for CAPI: {e} (for request: {capi_endpoint})')
raise ServerConnectionError(f'Unable to connect to endpoint: {capi_endpoint}') from e
except (requests.HTTPError, ValueError) as e:
logger.exception('Frontier CAPI Auth: GET ')
@ -712,10 +647,12 @@ class Session(object):
self.login()
raise CredentialsError('query failed after refresh') from e
# TODO: Better to return error and have upstream re-try auth ?
elif self.login(): # Maybe our token expired. Re-authorize in any case
logger.debug('Initial query failed, but login() just worked, trying again...')
self.retrying = True
return self.query(endpoint)
# TODO: This, or raise (custom?) exception for upstream to do it?
return capi_single_query(capi_endpoint)
else:
self.retrying = False
@ -725,18 +662,11 @@ class Session(object):
except Exception as e:
logger.debug('Attempting GET', exc_info=e)
# LANG: Frontier CAPI data retrieval failed
# raise ServerError(f'{_("Frontier CAPI query failure")}: {endpoint}') from e
self.capi_response_queue.put(
CAPIFailedRequest(f'Frontier CAPI query failure: {endpoint}', exception=e)
)
continue
raise ServerError(f'{_("Frontier CAPI query failure")}: {capi_endpoint}') from e
if r.url.startswith(FRONTIER_AUTH_SERVER):
logger.info('Redirected back to Auth Server')
self.capi_response_queue.put(
CAPIFailedRequest(f'Redirected back to Auth Server', exception=CredentialsError())
)
continue
raise CredentialsError('Redirected back to Auth Server')
elif 500 <= r.status_code < 600:
# Server error. Typically 500 "Internal Server Error" if server is down
@ -747,15 +677,145 @@ class Session(object):
self.retrying = False
if endpoint == self.FRONTIER_CAPI_PATH_PROFILE and 'commander' not in data:
if capi_endpoint == self.FRONTIER_CAPI_PATH_PROFILE and 'commander' not in capi_data:
logger.error('No commander in returned data')
if 'timestamp' not in data:
data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date'])) # type: ignore
if 'timestamp' not in capi_data:
capi_data['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', parsedate(r.headers['Date'])) # type: ignore
return capi_data
def capi_station_queries(timeout: int = capi_default_timeout) -> CAPIData:
"""
Perform all 'station' queries for the caller.
A /profile query is performed to check that we are docked (or on foot)
and the station name and marketid match the prior Docked event.
If they do match, and the services list says they're present, also
retrieve CAPI market and/or shipyard/outfitting data and merge into
the /profile data.
:param timeout: requests timeout to use.
:return: CAPIData instance with what we retrieved.
"""
station_data = capi_single_query(self.FRONTIER_CAPI_PATH_PROFILE, timeout=timeout)
if not station_data['commander'].get('docked') and not monitor.state['OnFoot']:
return station_data
# Sanity checks in case data isn't as we expect, and maybe 'docked' flag
# is also lagging.
if (last_starport := station_data.get('lastStarport')) is None:
logger.error("No lastStarport in data!")
return station_data
if ((last_starport_name := last_starport.get('name')) is None
or last_starport_name == ''):
# This could well be valid if you've been out exploring for a long
# time.
logger.warning("No lastStarport name!")
return station_data
# WORKAROUND: n/a | 06-08-2021: Issue 1198 and https://issues.frontierstore.net/issue-detail/40706
# -- strip "+" chars off star port names returned by the CAPI
last_starport_name = last_starport["name"] = last_starport_name.rstrip(" +")
services = last_starport.get('services', {})
if not isinstance(services, dict):
# Odyssey Alpha Phase 3 4.0.0.20 has been observed having
# this be an empty list when you've jumped to another system
# and not yet docked. As opposed to no services key at all
# or an empty dict.
logger.error(f'services is "{type(services)}", not dict !')
# TODO: Change this to be dependent on its own CL arg
if __debug__:
self.dump_capi_data(station_data)
# Set an empty dict so as to not have to retest below.
services = {}
last_starport_id = int(last_starport.get('id'))
if services.get('commodities'):
market_data = capi_single_query(self.FRONTIER_CAPI_PATH_MARKET, timeout=timeout)
if last_starport_id != int(market_data['id']):
logger.warning(f"{last_starport_id!r} != {int(market_data['id'])!r}")
raise ServerLagging()
else:
market_data['name'] = last_starport_name
station_data['lastStarport'].update(market_data)
if services.get('outfitting') or services.get('shipyard'):
shipyard_data = capi_single_query(self.FRONTIER_CAPI_PATH_SHIPYARD, timeout=timeout)
if last_starport_id != int(shipyard_data['id']):
logger.warning(f"{last_starport_id!r} != {int(shipyard_data['id'])!r}")
raise ServerLagging()
else:
shipyard_data['name'] = last_starport_name
station_data['lastStarport'].update(shipyard_data)
# WORKAROUND END
return station_data
while True:
endpoint: Optional[str]
querytime: int
play_sound: bool
auto_update: bool
endpoint, querytime, play_sound, auto_update = self.capi_query_queue.get()
if not endpoint:
logger.info('Empty queue message, exiting...')
break
logger.trace_if('capi.worker', f'Processing query: {endpoint}')
data: CAPIData
if endpoint == self._CAPI_PATH_STATION:
try:
data = capi_station_queries()
except Exception as e:
self.capi_response_queue.put(
data
(
CAPIFailedRequest(
message=e.args,
exception=e
),
querytime,
play_sound,
auto_update
)
)
else:
self.capi_response_queue.put(
(data, querytime, play_sound, auto_update)
)
else:
try:
data = capi_single_query(self.FRONTIER_CAPI_PATH_PROFILE)
except Exception as e:
self.capi_response_queue.put(
(
CAPIFailedRequest(
message=e.args,
exception=e
),
querytime,
play_sound,
auto_update
)
)
else:
self.capi_response_queue.put(
(data, querytime, play_sound, auto_update)
)
self.tk_master.event_generate('<<CAPIResponse>>')
logger.info('CAPI worker thread DONE')
@ -763,8 +823,16 @@ class Session(object):
"""Ask the CAPI query thread to finish."""
self.capi_query_queue.put(None)
def query(self, endpoint: str) -> None:
"""Perform a query against the specified CAPI endpoint."""
def query(self, endpoint: str, querytime: int, play_sound: bool = False, auto_update: bool = False) -> None:
"""
Perform a query against the specified CAPI endpoint.
:param querytime: When this query was initiated.
:param play_sound: Whether the app should play a sound on error.
:param endpoint: The CAPI endpoint to query, might be a pseudo-value.
:param auto_update: Whether this request was triggered automatically.
:return:
"""
logger.trace_if('capi.query', f'Performing query for endpoint "{endpoint}"')
if self.state == Session.STATE_INIT:
if self.login():
@ -778,26 +846,27 @@ class Session(object):
if conf_module.capi_pretend_down:
raise ServerConnectionError(f'Pretending CAPI down: {endpoint}')
self.capi_query_queue.put(endpoint)
self.capi_query_queue.put(
(endpoint, querytime, play_sound, auto_update)
)
def profile(self):
"""Perform general CAPI /profile endpoint query."""
self.query(self.FRONTIER_CAPI_PATH_PROFILE)
def station(self) -> CAPIData: # noqa: CCR001
def station(self, querytime: int, play_sound: bool = False, auto_update: bool = False) -> CAPIData:
"""
Perform CAPI quer(y|ies) for station data.
A /profile query is performed to check that we are docked (or on foot)
and the station name and marketid match the prior Docked event.
If they do match, and the services list says they're present, also
retrieve CAPI market and/or shipyard/outfitting data and merge into
the /profile data.
:param querytime: When this query was initiated.
:param play_sound: Whether the app should play a sound on error.
:param auto_update: Whether this request was triggered automatically.
:return: Possibly augmented CAPI data.
"""
# Ask the thread worker to perform all three queries
self.capi_query_queue.put(_CAPI_PATH_STATION)
self.capi_query_queue.put(
(self._CAPI_PATH_STATION, querytime, play_sound, auto_update)
)
######################################################################
######################################################################