From fda91df04fa0c7d68a4a1c7f09ec5171acd49540 Mon Sep 17 00:00:00 2001 From: Athanasius Date: Wed, 23 Nov 2022 13:29:47 +0000 Subject: [PATCH] eddn: Working with tk `after()`, on timer or when docked * An aborted attempt was made to use a thread worker, but: 1. sqlite3 doesn't allow cross-thread use of the same sqlite3 connection. 2. Having an on-going query on one cursor, e.g. gathering all the outstanding message `id`, whilst trying to DELETE a row hits a "database is locked" error. * So, back to tk `after()`. `send_message_by_id()` has been audited to ensure its boolean return is accurate. So there shouldn't be any way in which to get hung up on a single message *other than if the EDDN Gateway is having issues, and thus it should be retried anyway*. Any reason for a 'bad message' will cause `True` return and thus deletion of the message in *this* call to `queue_check_and_send()`. * There is a new `reschedule` parameter to `queue_check_and_send()`. If `True` then at the end it should re-schedule. There is a check in `journal_entry()` for the `Docked` event, and if this occurs it will schedule `queue_check_and_send()` with `reschedule` set to `False` so that we don't end up with multiple parallel schedulings. It's still possible for a docking to have coincided with a scheduled run and thus cause double-rate sending to EDDN, but we can live with that. * The same scheduling mechanism is used, with a much smaller delay, to process more than one queued message per run. Hence the `have_rescheduled` bool *in* the function to indicate if a 'fast' reschedule has already been set. This prevents the slow one *also* being set in this scenario. The latter will be scheduled when the fast one found no more rows to process. --- plugins/eddn.py | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/plugins/eddn.py b/plugins/eddn.py index 1bd6e834..2158c6de 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -176,7 +176,7 @@ class EDDNSender: self.queue_processing = Lock() # Initiate retry/send-now timer - self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send) + self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, True) def sqlite_queue_v1(self) -> sqlite3.Connection: """ @@ -431,15 +431,24 @@ class EDDNSender: return False - def queue_check_and_send(self) -> None: - """Check if we should be sending queued messages, and send if we should.""" - # logger.debug("Called") + def queue_check_and_send(self, reschedule: bool = False) -> None: + """ + Check if we should be sending queued messages, and send if we should. + + :param reschedule: Boolean indicating if we should call `after()` again. + """ + logger.debug("Called") # Mutex in case we're already processing if not self.queue_processing.acquire(blocking=False): logger.debug("Couldn't obtain mutex") + if reschedule: + self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule) + return # logger.debug("Obtained mutex") + # Used to indicate if we've rescheduled at the faster rate already. + have_rescheduled = False # We send either if docked or 'Delay sending until docked' not set if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY): # logger.debug("Should send") @@ -461,6 +470,7 @@ class EDDNSender: """ SELECT id FROM messages ORDER BY created ASC + LIMIT 1 """ ) @@ -468,20 +478,24 @@ class EDDNSender: logger.exception("DB error querying queued messages") else: - while row := db_cursor.fetchone(): + row = db_cursor.fetchone() + if row: row = dict(zip([c[0] for c in db_cursor.description], row)) self.send_message_by_id(row['id']) - time.sleep(self.eddn.REPLAY_DELAY) + # Always re-schedule as this is only a "Don't hammer EDDN" delay + self.eddn.parent.after(self.eddn.REPLAY_DELAY, self.queue_check_and_send, reschedule) + have_rescheduled = True - db_cursor.close() + db_cursor.close() # else: # logger.debug("Should NOT send") - # Set us up to run again after a delay self.queue_processing.release() # logger.debug("Mutex released") - self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send) + if reschedule and not have_rescheduled: + # Set us up to run again per the configured period + self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule) def _log_response( self, @@ -539,8 +553,8 @@ class EDDN: DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn' # FIXME: Change back to `300_000` - REPLAY_PERIOD = 1_000 # How often to try (re-)sending the queue, [milliseconds] - REPLAY_DELAY = 0.400 # Roughly two messages per second, accounting for send delays [seconds] + REPLAY_PERIOD = 300_000 # How often to try (re-)sending the queue, [milliseconds] + REPLAY_DELAY = 400 # Roughly two messages per second, accounting for send delays [milliseconds] REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE) CANONICALISE_RE = re.compile(r'\$(.+)_name;') @@ -2081,6 +2095,9 @@ def journal_entry( # noqa: C901, CCR001 # Yes, explicitly state `None` here, so it's crystal clear. this.systemaddress = entry.get('SystemAddress', None) # type: ignore + if event_name == 'docked': + this.eddn.parent.after(this.eddn.REPLAY_DELAY, this.eddn.sender.queue_check_and_send, False) + elif event_name == 'approachbody': this.body_name = entry['Body'] this.body_id = entry.get('BodyID')