1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-06-05 18:03:17 +03:00

CAPI: Some progress in using a Queue to signal result/error

Due to main app being Tk we can't just use Python async functionality.

So instead we have a class to hold a message and optional exception.
And instance of that goes into a queue.
The reading of that in the main thread is triggered by sending a Tk
event.

Much more to come.
This commit is contained in:
Athanasius 2021-08-16 15:37:55 +01:00
parent 2fc1568bf7
commit 655c7ea1ca
No known key found for this signature in database
GPG Key ID: AE3E527847057C7D
2 changed files with 179 additions and 133 deletions

View File

@ -7,6 +7,7 @@ import html
import json import json
import locale import locale
import pathlib import pathlib
import queue
import re import re
import sys import sys
# import threading # import threading
@ -14,6 +15,7 @@ import webbrowser
from builtins import object, str from builtins import object, str
from os import chdir, environ from os import chdir, environ
from os.path import dirname, join from os.path import dirname, join
from queue import Queue
from sys import platform from sys import platform
from time import localtime, strftime, time from time import localtime, strftime, time
from typing import TYPE_CHECKING, Optional, Tuple from typing import TYPE_CHECKING, Optional, Tuple
@ -384,6 +386,8 @@ class AppWindow(object):
def __init__(self, master: tk.Tk): # noqa: C901, CCR001 # TODO - can possibly factor something out def __init__(self, master: tk.Tk): # noqa: C901, CCR001 # TODO - can possibly factor something out
self.holdofftime = config.get_int('querytime', default=0) + companion.holdoff self.holdofftime = config.get_int('querytime', default=0) + companion.holdoff
self.capi_response_queue: Queue = Queue()
companion.session.set_capi_response_queue(self.capi_response_queue)
self.w = master self.w = master
self.w.title(applongname) self.w.title(applongname)
@ -933,7 +937,19 @@ class AppWindow(object):
def capi_handle_response(self, event=None): def capi_handle_response(self, event=None):
"""Handle the resulting data from a CAPI query.""" """Handle the resulting data from a CAPI query."""
data = ... try:
data = self.capi_response_queue.get(block=False)
except queue.Empty:
logger.error('There was no response in the queue!')
# TODO: Set status text
return
else:
if isinstance(data, companion.CAPIFailedRequest):
logger.trace_if('capi.worker', f'Failed Request: {data.message}')
raise data.exception
# Validation # Validation
if 'commander' not in data: if 'commander' not in data:
# This can happen with EGS Auth if no commander created yet # This can happen with EGS Auth if no commander created yet

View File

@ -15,6 +15,7 @@ import numbers
import os import os
import random import random
import threading import threading
import tkinter as tk
import time import time
import urllib.parse import urllib.parse
import webbrowser import webbrowser
@ -465,6 +466,13 @@ class Auth(object):
return base64.urlsafe_b64encode(text).decode().replace('=', '') return base64.urlsafe_b64encode(text).decode().replace('=', '')
class CAPIFailedRequest():
"""CAPI failed query error class."""
def __init__(self, message, exception=None):
self.message = message
self.exception = exception
class Session(object): class Session(object):
"""Methods for handling Frontier Auth and CAPI queries.""" """Methods for handling Frontier Auth and CAPI queries."""
@ -477,8 +485,10 @@ class Session(object):
self.session: Optional[requests.Session] = None self.session: Optional[requests.Session] = None
self.auth: Optional[Auth] = None self.auth: Optional[Auth] = None
self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query
self.tk_master: Optional[tk.Tk] = None
logger.info('Starting CAPI queries thread...') logger.info('Starting CAPI queries thread...')
self.capi_response_queue: Queue
self.capi_query_queue: Queue = Queue() self.capi_query_queue: Queue = Queue()
self.capi_query_thread = threading.Thread( self.capi_query_thread = threading.Thread(
target=self.capi_query_worker, target=self.capi_query_worker,
@ -488,6 +498,12 @@ class Session(object):
self.capi_query_thread.start() self.capi_query_thread.start()
logger.info('Done') logger.info('Done')
def set_capi_response_queue(self, capi_response_queue: Queue) -> None:
self.capi_response_queue = capi_response_queue
def set_tk_master(self, master: tk.Tk) -> None:
self.tk_master = master
###################################################################### ######################################################################
# Frontier Authorization # Frontier Authorization
###################################################################### ######################################################################
@ -591,18 +607,61 @@ class Session(object):
###################################################################### ######################################################################
# CAPI queries # CAPI queries
###################################################################### ######################################################################
def capi_query_worker(self, ): def capi_query_worker(self):
"""Worker thread that performs actual CAPI queries.""" """Worker thread that performs actual CAPI queries."""
logger.info('CAPI worker thread starting') logger.info('CAPI worker thread starting')
while True: while True:
query: Optional[str] = self.capi_query_queue.get() endpoint: Optional[str] = self.capi_query_queue.get()
if not query: if not endpoint:
logger.info('Empty queue message, exiting...') logger.info('Empty queue message, exiting...')
break break
logger.trace_if('capi.worker', f'Processing query: {query}') logger.trace_if('capi.worker', f'Processing query: {endpoint}')
time.sleep(1) # XXX
self.capi_response_queue.put(
CAPIFailedRequest(f'Unable to connect to endpoint {endpoint}')
)
self.tk_master.event_generate('<<CAPIResponse>>')
continue
# XXX
try:
r = self.session.get(self.server + endpoint, timeout=timeout) # type: ignore
r.raise_for_status() # Typically 403 "Forbidden" on token expiry
data = CAPIData(r.json(), endpoint) # May also fail here if token expired since response is empty
except requests.ConnectionError as e:
logger.warning(f'Unable to resolve name for CAPI: {e} (for request: {endpoint})')
self.capi_response_queue.put(
CAPIFailedRequest(f'Unable to connect to endpoint {endpoint}', exception=e)
)
continue
# raise ServerConnectionError(f'Unable to connect to endpoint {endpoint}') from e
except Exception as e:
logger.debug('Attempting GET', exc_info=e)
# LANG: Frontier CAPI data retrieval failed
# raise ServerError(f'{_("Frontier CAPI query failure")}: {endpoint}') from e
self.capi_response_queue.put(
CAPIFailedRequest(f'Frontier CAPI query failure: {endpoint}', exception=e)
)
continue
if r.url.startswith(SERVER_AUTH):
logger.info('Redirected back to Auth Server')
# Redirected back to Auth server - force full re-authentication
self.dump(r)
self.invalidate()
self.retrying = False
self.login()
raise CredentialsError()
elif 500 <= r.status_code < 600:
# Server error. Typically 500 "Internal Server Error" if server is down
logger.debug('500 status back from CAPI')
self.dump(r)
# LANG: Frontier CAPI data retrieval failed with 5XX code
raise ServerError(f'{_("Frontier CAPI server error")}: {r.status_code}')
logger.info('CAPI worker thread DONE') logger.info('CAPI worker thread DONE')
@ -625,38 +684,9 @@ class Session(object):
if conf_module.capi_pretend_down: if conf_module.capi_pretend_down:
raise ServerConnectionError(f'Pretending CAPI down: {endpoint}') raise ServerConnectionError(f'Pretending CAPI down: {endpoint}')
try:
self.capi_query_queue.put(endpoint) self.capi_query_queue.put(endpoint)
r = self.session.get(self.server + endpoint, timeout=timeout) # type: ignore
except requests.ConnectionError as e:
logger.warning(f'Unable to resolve name for CAPI: {e} (for request: {endpoint})')
raise ServerConnectionError(f'Unable to connect to endpoint {endpoint}') from e
except Exception as e:
logger.debug('Attempting GET', exc_info=e)
# LANG: Frontier CAPI data retrieval failed
raise ServerError(f'{_("Frontier CAPI query failure")}: {endpoint}') from e
if r.url.startswith(SERVER_AUTH):
logger.info('Redirected back to Auth Server')
# Redirected back to Auth server - force full re-authentication
self.dump(r)
self.invalidate()
self.retrying = False
self.login()
raise CredentialsError()
elif 500 <= r.status_code < 600:
# Server error. Typically 500 "Internal Server Error" if server is down
logger.debug('500 status back from CAPI')
self.dump(r)
# LANG: Frontier CAPI data retrieval failed with 5XX code
raise ServerError(f'{_("Frontier CAPI server error")}: {r.status_code}')
try: try:
r.raise_for_status() # Typically 403 "Forbidden" on token expiry ...
data = CAPIData(r.json(), endpoint) # May also fail here if token expired since response is empty
except (requests.HTTPError, ValueError) as e: except (requests.HTTPError, ValueError) as e:
logger.exception('Frontier CAPI Auth: GET ') logger.exception('Frontier CAPI Auth: GET ')