mirror of
https://github.com/EDCD/EDMarketConnector.git
synced 2025-06-08 03:12:33 +03:00
eddn: OUT_EDDN_DELAY (not inverted) & further work on the sending
* The eddn parts of the OUT_EDDN_DO_NOT_DELAY -> OUT_EDDN_DELAY change. This includes the 'sense' of it being inverted from what it was. * EDDN.REPLAY_DELAY is now a float, as it's used with `time.sleep()`. *This* is the 400ms value for inter-message cooldown. * EDDN.REPLAY_PERIOD is still an int, used with tk `after()`. This is how often we attempt the queue. * EDDN.session is no longer a thing, move that part of EDDN.close() to EDDNSender.close(). * EDDN queue DB has `id`, not `message_id`. * Now *looping* in the queue sender, not only sending the oldest message.
This commit is contained in:
parent
6070f82c6b
commit
06fa3629ea
@ -30,6 +30,7 @@ import pathlib
|
|||||||
import re
|
import re
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
import tkinter as tk
|
import tkinter as tk
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from platform import system
|
from platform import system
|
||||||
@ -169,7 +170,7 @@ class EDDNSender:
|
|||||||
|
|
||||||
self.queue_processing = Lock()
|
self.queue_processing = Lock()
|
||||||
# Initiate retry/send-now timer
|
# Initiate retry/send-now timer
|
||||||
self.eddn.parent.after(self.eddn.REPLAYPERIOD, self.queue_check_and_send)
|
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send)
|
||||||
|
|
||||||
def sqlite_queue_v1(self) -> sqlite3.Connection:
|
def sqlite_queue_v1(self) -> sqlite3.Connection:
|
||||||
"""
|
"""
|
||||||
@ -253,6 +254,9 @@ class EDDNSender:
|
|||||||
if self.db_conn:
|
if self.db_conn:
|
||||||
self.db_conn.close()
|
self.db_conn.close()
|
||||||
|
|
||||||
|
logger.debug('Closing EDDN requests.Session.')
|
||||||
|
self.session.close()
|
||||||
|
|
||||||
def add_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> int:
|
def add_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> int:
|
||||||
"""
|
"""
|
||||||
Add an EDDN message to the database.
|
Add an EDDN message to the database.
|
||||||
@ -423,12 +427,16 @@ class EDDNSender:
|
|||||||
|
|
||||||
def queue_check_and_send(self) -> None:
|
def queue_check_and_send(self) -> None:
|
||||||
"""Check if we should be sending queued messages, and send if we should."""
|
"""Check if we should be sending queued messages, and send if we should."""
|
||||||
|
# logger.debug("Called")
|
||||||
# Mutex in case we're already processing
|
# Mutex in case we're already processing
|
||||||
if not self.queue_processing.acquire(blocking=False):
|
if not self.queue_processing.acquire(blocking=False):
|
||||||
|
logger.debug("Couldn't obtain mutex")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# logger.debug("Obtained mutex")
|
||||||
# We send either if docked or 'Delay sending until docked' not set
|
# We send either if docked or 'Delay sending until docked' not set
|
||||||
if this.docked or config.get_int('output') & config.OUT_EDDN_DO_NOT_DELAY:
|
if this.docked or not (config.get_int('output') & config.OUT_EDDN_DELAY):
|
||||||
|
# logger.debug("Should send")
|
||||||
# We need our own cursor here, in case the semantics of
|
# We need our own cursor here, in case the semantics of
|
||||||
# tk `after()` could allow this to run in the middle of other
|
# tk `after()` could allow this to run in the middle of other
|
||||||
# database usage.
|
# database usage.
|
||||||
@ -445,7 +453,7 @@ class EDDNSender:
|
|||||||
try:
|
try:
|
||||||
db_cursor.execute(
|
db_cursor.execute(
|
||||||
"""
|
"""
|
||||||
SELECT message_id FROM messages
|
SELECT id FROM messages
|
||||||
ORDER BY created ASC
|
ORDER BY created ASC
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
@ -454,14 +462,20 @@ class EDDNSender:
|
|||||||
logger.exception("DB error querying queued messages")
|
logger.exception("DB error querying queued messages")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
row = dict(zip([c[0] for c in db_cursor.description], db_cursor.fetchone()))
|
while row := db_cursor.fetchone():
|
||||||
self.send_message_by_id(row['message_id'])
|
row = dict(zip([c[0] for c in db_cursor.description], row))
|
||||||
|
self.send_message_by_id(row['id'])
|
||||||
|
time.sleep(self.eddn.REPLAY_DELAY)
|
||||||
|
|
||||||
db_cursor.close()
|
db_cursor.close()
|
||||||
|
|
||||||
|
# else:
|
||||||
|
# logger.debug("Should NOT send")
|
||||||
|
|
||||||
# Set us up to run again after a delay
|
# Set us up to run again after a delay
|
||||||
self.queue_processing.release()
|
self.queue_processing.release()
|
||||||
self.eddn.parent.after(self.eddn.REPLAYPERIOD, self.queue_check_and_send)
|
# logger.debug("Mutex released")
|
||||||
|
self.eddn.parent.after(self.eddn.REPLAY_PERIOD, self.queue_check_and_send)
|
||||||
|
|
||||||
def _log_response(
|
def _log_response(
|
||||||
self,
|
self,
|
||||||
@ -518,7 +532,9 @@ class EDDN:
|
|||||||
if 'eddn' in debug_senders:
|
if 'eddn' in debug_senders:
|
||||||
DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn'
|
DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn'
|
||||||
|
|
||||||
REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms]
|
# FIXME: Change back to `300_000`
|
||||||
|
REPLAY_PERIOD = 1_000 # How often to try (re-)sending the queue, [milliseconds]
|
||||||
|
REPLAY_DELAY = 0.400 # Roughly two messages per second, accounting for send delays [seconds]
|
||||||
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
|
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
|
||||||
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
|
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
|
||||||
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
|
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
|
||||||
@ -545,9 +561,6 @@ class EDDN:
|
|||||||
|
|
||||||
logger.debug('Done.')
|
logger.debug('Done.')
|
||||||
|
|
||||||
logger.debug('Closing EDDN requests.Session.')
|
|
||||||
self.session.close()
|
|
||||||
|
|
||||||
def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None: # noqa: CCR001
|
def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None: # noqa: CCR001
|
||||||
"""
|
"""
|
||||||
Update EDDN with the commodities on the current (lastStarport) station.
|
Update EDDN with the commodities on the current (lastStarport) station.
|
||||||
@ -888,7 +901,7 @@ class EDDN:
|
|||||||
# Check if the user configured messages to be sent.
|
# Check if the user configured messages to be sent.
|
||||||
#
|
#
|
||||||
# 1. If this is a 'station' data message then check config.EDDN_SEND_STATION_DATA
|
# 1. If this is a 'station' data message then check config.EDDN_SEND_STATION_DATA
|
||||||
# 2. Else check against config.EDDN_SEND_NON_STATION *and* config.OUT_EDDN_DO_NOT_DELAY
|
# 2. Else check against config.EDDN_SEND_NON_STATION *and* config.OUT_EDDN_DELAY
|
||||||
if any(f'{s}' in msg['$schemaRef'] for s in EDDNSender.STATION_SCHEMAS):
|
if any(f'{s}' in msg['$schemaRef'] for s in EDDNSender.STATION_SCHEMAS):
|
||||||
# 'Station data'
|
# 'Station data'
|
||||||
if config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA:
|
if config.get_int('output') & config.OUT_EDDN_SEND_STATION_DATA:
|
||||||
@ -900,7 +913,7 @@ class EDDN:
|
|||||||
elif config.get_int('output') & config.OUT_EDDN_SEND_NON_STATION:
|
elif config.get_int('output') & config.OUT_EDDN_SEND_NON_STATION:
|
||||||
# Any data that isn't 'station' is configured to be sent
|
# Any data that isn't 'station' is configured to be sent
|
||||||
msg_id = self.sender.add_message(cmdr, msg)
|
msg_id = self.sender.add_message(cmdr, msg)
|
||||||
if config.get_int('output') & config.OUT_EDDN_DO_NOT_DELAY:
|
if not (config.get_int('output') & config.OUT_EDDN_DELAY):
|
||||||
# No delay in sending configured, so attempt immediately
|
# No delay in sending configured, so attempt immediately
|
||||||
self.sender.send_message_by_id(msg_id)
|
self.sender.send_message_by_id(msg_id)
|
||||||
|
|
||||||
@ -1843,7 +1856,7 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame:
|
|||||||
)
|
)
|
||||||
|
|
||||||
this.eddn_system_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W)
|
this.eddn_system_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W)
|
||||||
this.eddn_delay = tk.IntVar(value=(output & config.OUT_EDDN_DO_NOT_DELAY) and 1)
|
this.eddn_delay = tk.IntVar(value=(output & config.OUT_EDDN_DELAY) and 1)
|
||||||
# Output setting under 'Send system and scan data to the Elite Dangerous Data Network' new in E:D 2.2
|
# Output setting under 'Send system and scan data to the Elite Dangerous Data Network' new in E:D 2.2
|
||||||
this.eddn_delay_button = nb.Checkbutton(
|
this.eddn_delay_button = nb.Checkbutton(
|
||||||
eddnframe,
|
eddnframe,
|
||||||
@ -1883,7 +1896,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None:
|
|||||||
& (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP | config.OUT_MKT_MANUAL)) +
|
& (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP | config.OUT_MKT_MANUAL)) +
|
||||||
(this.eddn_station.get() and config.OUT_EDDN_SEND_STATION_DATA) +
|
(this.eddn_station.get() and config.OUT_EDDN_SEND_STATION_DATA) +
|
||||||
(this.eddn_system.get() and config.OUT_EDDN_SEND_NON_STATION) +
|
(this.eddn_system.get() and config.OUT_EDDN_SEND_NON_STATION) +
|
||||||
(this.eddn_delay.get() and config.OUT_EDDN_DO_NOT_DELAY)
|
(this.eddn_delay.get() and config.OUT_EDDN_DELAY)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user