From 40a8a19ce4cb21fcea332cb1eb586c7f3775830e Mon Sep 17 00:00:00 2001 From: Athanasius Date: Wed, 25 Aug 2021 12:14:33 +0100 Subject: [PATCH] CAPI: Use special EDMCCAPIRequest endpoint, not 'None' for worker shutdown --- companion.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/companion.py b/companion.py index 8d78b619..5528262c 100644 --- a/companion.py +++ b/companion.py @@ -537,6 +537,8 @@ class EDMCCAPIReturn: class EDMCCAPIRequest(EDMCCAPIReturn): """Encapsulates a request for CAPI data.""" + REQUEST_WORKER_SHUTDOWN = '__EDMC_WORKER_SHUTDOWN' + def __init__( self, endpoint: str, query_time: int, tk_response_event: Optional[str] = None, retrying: bool = False, @@ -597,7 +599,7 @@ class Session(object): self.capi_raw_data = CAPIDataRaw() # Cache of raw replies from CAPI service # Queue that holds requests for CAPI queries, the items should always # be EDMCCAPIRequest objects. - self.capi_query_queue: Queue[Optional[EDMCCAPIRequest]] = Queue() + self.capi_request_queue: Queue[EDMCCAPIRequest] = Queue() # This queue is used to pass the result, possibly a failure, of CAPI # queries back to the requesting code (technically anything checking # this queue, but it should be either EDMarketConnector.AppWindow or @@ -876,15 +878,14 @@ class Session(object): return station_data while True: - query = self.capi_query_queue.get() + query = self.capi_request_queue.get() if not isinstance(query, EDMCCAPIRequest): - if query is not None: - logger.error("Item from queue wasn't an EDMCCAPIRequest") - break + logger.error("Item from queue wasn't an EDMCCAPIRequest") + break - else: - logger.info('Empty queue message, exiting...') - break + if query.endpoint == query.REQUEST_WORKER_SHUTDOWN: + logger.info(f'endpoint {query.REQUEST_WORKER_SHUTDOWN}, exiting...') + break logger.trace_if('capi.worker', f'Processing query: {query.endpoint}') capi_data: CAPIData @@ -949,7 +950,12 @@ class Session(object): def capi_query_close_worker(self) -> None: """Ask the CAPI query thread to finish.""" - self.capi_query_queue.put(None) + self.capi_request_queue.put( + EDMCCAPIRequest( + endpoint=EDMCCAPIRequest.REQUEST_WORKER_SHUTDOWN, + query_time=int(time.time()) + ) + ) def query( self, endpoint: str, query_time: int, @@ -983,7 +989,7 @@ class Session(object): if conf_module.capi_pretend_down: raise ServerConnectionError(f'Pretending CAPI down: {endpoint}') - self.capi_query_queue.put( + self.capi_request_queue.put( EDMCCAPIRequest( endpoint=endpoint, tk_response_event=tk_response_event, @@ -1032,7 +1038,7 @@ class Session(object): :param auto_update: Whether this request was triggered automatically. """ # Ask the thread worker to perform all three queries - self.capi_query_queue.put( + self.capi_request_queue.put( EDMCCAPIRequest( endpoint=self._CAPI_PATH_STATION, tk_response_event=tk_response_event,