1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-17 17:42:20 +03:00

eddn: Working with tk after(), on timer or when docked

* An aborted attempt was made to use a thread worker, but:
  1. sqlite3 doesn't allow cross-thread use of the same sqlite3 connection.
  2. Having an on-going query on one cursor, e.g. gathering all the
    outstanding message `id`, whilst trying to DELETE a row hits a
    "database is locked" error.
* So, back to tk `after()`.  `send_message_by_id()` has been audited to ensure
  its boolean return is accurate.  So there shouldn't be any way in which to
  get hung up on a single message *other than if the EDDN Gateway is having
  issues, and thus it should be retried anyway*.  Any reason for a 'bad
  message' will cause `True` return and thus deletion of the message in
  *this* call to `queue_check_and_send()`.
* There is a new `reschedule` parameter to `queue_check_and_send()`.  If
  `True` then at the end it should re-schedule.

  There is a check in `journal_entry()` for the `Docked` event, and if this
  occurs it will schedule `queue_check_and_send()` with `reschedule` set to
  `False` so that we don't end up with multiple parallel schedulings.

  It's still possible for a docking to have coincided with a scheduled run
  and thus cause double-rate sending to EDDN, but we can live with that.

* The same scheduling mechanism is used, with a much smaller delay, to
  process more than one queued message per run.

  Hence the `have_rescheduled` bool *in* the function to indicate if a 'fast'
  reschedule has already been set.  This prevents the slow one *also* being
  set in this scenario.  The latter will be scheduled when the fast one
  found no more rows to process.
This commit is contained in:
Athanasius 2022-11-23 13:29:47 +00:00
parent f2dbfacf70
commit fda91df04f
No known key found for this signature in database
GPG Key ID: 772697E181BB2767

View File

@ -176,7 +176,7 @@ class EDDNSender:
self.queue_processing = Lock()
# Initiate retry/send-now timer
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send)
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, True)
def sqlite_queue_v1(self) -> sqlite3.Connection:
"""
@ -431,15 +431,24 @@ class EDDNSender:
return False
def queue_check_and_send(self) -> None:
"""Check if we should be sending queued messages, and send if we should."""
# logger.debug("Called")
def queue_check_and_send(self, reschedule: bool = False) -> None:
"""
Check if we should be sending queued messages, and send if we should.
:param reschedule: Boolean indicating if we should call `after()` again.
"""
logger.debug("Called")
# Mutex in case we're already processing
if not self.queue_processing.acquire(blocking=False):
logger.debug("Couldn't obtain mutex")
if reschedule:
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule)
return
# logger.debug("Obtained mutex")
# Used to indicate if we've rescheduled at the faster rate already.
have_rescheduled = False
# We send either if docked or 'Delay sending until docked' not set
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY):
# logger.debug("Should send")
@ -461,6 +470,7 @@ class EDDNSender:
"""
SELECT id FROM messages
ORDER BY created ASC
LIMIT 1
"""
)
@ -468,20 +478,24 @@ class EDDNSender:
logger.exception("DB error querying queued messages")
else:
while row := db_cursor.fetchone():
row = db_cursor.fetchone()
if row:
row = dict(zip([c[0] for c in db_cursor.description], row))
self.send_message_by_id(row['id'])
time.sleep(self.eddn.REPLAY_DELAY)
# Always re-schedule as this is only a "Don't hammer EDDN" delay
self.eddn.parent.after(self.eddn.REPLAY_DELAY, self.queue_check_and_send, reschedule)
have_rescheduled = True
db_cursor.close()
# else:
# logger.debug("Should NOT send")
# Set us up to run again after a delay
self.queue_processing.release()
# logger.debug("Mutex released")
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send)
if reschedule and not have_rescheduled:
# Set us up to run again per the configured period
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send, reschedule)
def _log_response(
self,
@ -539,8 +553,8 @@ class EDDN:
DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn'
# FIXME: Change back to `300_000`
REPLAY_PERIOD = 1_000 # How often to try (re-)sending the queue, [milliseconds]
REPLAY_DELAY = 0.400 # Roughly two messages per second, accounting for send delays [seconds]
REPLAY_PERIOD = 300_000 # How often to try (re-)sending the queue, [milliseconds]
REPLAY_DELAY = 400 # Roughly two messages per second, accounting for send delays [milliseconds]
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
@ -2081,6 +2095,9 @@ def journal_entry( # noqa: C901, CCR001
# Yes, explicitly state `None` here, so it's crystal clear.
this.systemaddress = entry.get('SystemAddress', None) # type: ignore
if event_name == 'docked':
this.eddn.parent.after(this.eddn.REPLAY_DELAY, this.eddn.sender.queue_check_and_send, False)
elif event_name == 'approachbody':
this.body_name = entry['Body']
this.body_id = entry.get('BodyID')