1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-15 00:30:33 +03:00

EDDNSender: Closer to actually sending messages now

This commit is contained in:
Athanasius 2022-09-30 16:10:25 +01:00 committed by Athanasius
parent 2b957d140c
commit f66a98464e
No known key found for this signature in database
GPG Key ID: 772697E181BB2767

View File

@ -22,6 +22,7 @@
#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $# ! $#
import http
import itertools
import json
import os
@ -129,8 +130,14 @@ class EDDNSender:
"""Handle sending of EDDN messages to the Gateway."""
SQLITE_DB_FILENAME_V1 = 'eddn_queue-v1.db'
TIMEOUT = 10 # requests timeout
UNKNOWN_SCHEMA_RE = re.compile(
r"^FAIL: \[JsonValidationException\('Schema "
r"https://eddn.edcd.io/schemas/(?P<schema_name>.+)/(?P<schema_version>[0-9]+) is unknown, "
r"unable to validate.',\)\]$"
)
def __init__(self, eddn_endpoint: str) -> None:
def __init__(self, eddn: 'EDDN', eddn_endpoint: str) -> None:
"""
Prepare the system for processing messages.
@ -138,9 +145,13 @@ class EDDNSender:
- Convert any legacy file into the database.
- (Future) Handle any database migrations.
:param eddn: Reference to the `EDDN` instance this is for.
:param eddn_endpoint: Where messages should be sent.
"""
self.eddn = eddn
self.eddn_endpoint = eddn_endpoint
self.session = requests.Session()
self.session.headers['User-Agent'] = user_agent
self.db_conn = self.sqlite_queue_v1()
self.db = self.db_conn.cursor()
@ -224,7 +235,7 @@ class EDDNSender:
replay_file = open(filename, 'w') # Will truncate
replay_file.close()
os.unlink(filename)
def close(self) -> None:
"""Clean up any resources."""
if self.db:
@ -263,8 +274,8 @@ class EDDNSender:
created = msg['message']['timestamp']
edmc_version = msg['header']['softwareVersion']
game_version = msg['header']['gameversion']
game_build = msg['header']['gamebuild']
game_version = msg['header'].get('gameversion', '')
game_build = msg['header'].get('gamebuild', '')
uploader = msg['header']['uploaderID']
try:
@ -300,121 +311,103 @@ class EDDNSender:
)
self.db_conn.commit()
# TODO: a good few of these methods are static or could be classmethods. they should be created as such.
class EDDN:
"""EDDN Data export."""
DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/'
if 'eddn' in debug_senders:
DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn'
REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms]
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
TIMEOUT = 10 # requests timeout
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
UNKNOWN_SCHEMA_RE = re.compile(
r"^FAIL: \[JsonValidationException\('Schema "
r"https://eddn.edcd.io/schemas/(?P<schema_name>.+)/(?P<schema_version>[0-9]+) is unknown, "
r"unable to validate.',\)\]$"
)
CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+')
def __init__(self, parent: tk.Tk):
self.parent: tk.Tk = parent
self.session = requests.Session()
self.session.headers['User-Agent'] = user_agent
if config.eddn_url is not None:
self.eddn_url = config.eddn_url
else:
self.eddn_url = self.DEFAULT_URL
self.sender = EDDNSender(self.eddn_url)
self.fss_signals: List[Mapping[str, Any]] = []
def close(self):
"""Close down the EDDN class instance."""
logger.debug('Closing Sender...')
if self.sender:
self.sender.close()
logger.debug('Done.')
logger.debug('Closing EDDN requests.Session.')
self.session.close()
def send(self, cmdr: str, msg: Mapping[str, Any]) -> None:
def send_message_by_id(self, id: int):
"""
Send sends an update to EDDN.
Transmit the message identified by the given ID.
:param cmdr: the CMDR to use as the uploader ID.
:param msg: the payload to send.
:param id:
:return:
"""
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', msg)
self.db.execute(
"""
SELECT * FROM messages WHERE id = :row_id
""",
{'row_id': id}
)
row = dict(zip([c[0] for c in self.db.description], self.db.fetchone()))
try:
self.send_message(row['message'])
except requests.exceptions.HTTPError as e:
logger.warning(f"HTTPError: {str(e)}")
finally:
# Remove from queue
...
def send_message(self, msg: str) -> bool:
"""
Transmit a fully-formed EDDN message to the Gateway.
Should catch and handle all failure conditions. A `True` return might
mean that the message was successfully sent, *or* that this message
should not be retried after a failure, i.e. too large.
:param msg: Fully formed, string, message.
:return: `True` for "now remove this message from the queue"
"""
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg))
if should_return:
logger.warning('eddn.send has been disabled via killswitch. Returning.')
return
msg = new_data
msg = json.dumps(new_data)
uploader_id = cmdr
to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([
('$schemaRef', msg['$schemaRef']),
('header', OrderedDict([
('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'),
('softwareVersion', str(appversion_nobuild())),
('uploaderID', uploader_id),
])),
('message', msg['message']),
])
# About the smallest request is going to be (newlines added for brevity):
# {"$schemaRef":"https://eddn.edcd.io/schemas/commodity/3","header":{"softwareName":"E:D Market
# Connector Windows","softwareVersion":"5.3.0-beta4extra","uploaderID":"abcdefghijklm"},"messag
# e":{"systemName":"delphi","stationName":"The Oracle","marketId":128782803,"timestamp":"2022-0
# 1-26T12:00:00Z","commodities":[]}}
#
# Which comes to 315 bytes (including \n) and compresses to 244 bytes. So lets just compress everything
encoded, compressed = text.gzip(json.dumps(to_send, separators=(',', ':')), max_size=0)
status: tk.Widget = self.eddn.parent.children['status']
# Even the smallest possible message compresses somewhat, so always compress
encoded, compressed = text.gzip(json.dumps(msg, separators=(',', ':')), max_size=0)
headers: None | dict[str, str] = None
if compressed:
headers = {'Content-Encoding': 'gzip'}
r = self.session.post(self.eddn_url, data=encoded, timeout=self.TIMEOUT, headers=headers)
if r.status_code != requests.codes.ok:
try:
r = self.session.post(self.eddn_endpoint, data=encoded, timeout=self.TIMEOUT, headers=headers)
if r.status_code == requests.codes.ok:
return True
# Check if EDDN is still objecting to an empty commodities list
if (
r.status_code == 400
and msg['$schemaRef'] == 'https://eddn.edcd.io/schemas/commodity/3'
and msg['message']['commodities'] == []
and r.text == "FAIL: [<ValidationError: '[] is too short'>]"
):
logger.trace_if('plugin.eddn', "EDDN is still objecting to empty commodities data")
return # We want to silence warnings otherwise
if r.status_code == 413:
if r.status_code == http.HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
extra_data = {
'schema_ref': msg.get('$schemaRef', 'Unset $schemaRef!'),
'schema_ref': new_data.get('$schemaRef', 'Unset $schemaRef!'),
'sent_data_len': str(len(encoded)),
}
if '/journal/' in extra_data['schema_ref']:
extra_data['event'] = msg.get('message', {}).get('event', 'No Event Set')
extra_data['event'] = new_data.get('message', {}).get('event', 'No Event Set')
self._log_response(r, header_msg='Got a 413 while POSTing data', **extra_data)
return # drop the error
self._log_response(r, header_msg='Got "Payload Too Large" while POSTing data', **extra_data)
return True
if not self.UNKNOWN_SCHEMA_RE.match(r.text):
self._log_response(r, header_msg='Status from POST wasn\'t 200 (OK)')
self._log_response(r, header_msg="Status from POST wasn't 200 (OK)")
r.raise_for_status()
r.raise_for_status()
except requests.exceptions.HTTPError as e:
if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text):
logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}"
f"/{unknown_schema['schema_version']}")
# This dropping is to cater for the time period when EDDN doesn't *yet* support a new schema.
return True
elif e.response.status_code == http.HTTPStatus.BAD_REQUEST:
# EDDN straight up says no, so drop the message
logger.debug(f"EDDN responded '400 Bad Request' to the message, dropping:\n{msg!r}")
return True
else:
# This should catch anything else, e.g. timeouts, gateway errors
status['text'] = self.http_error_to_log(e)
except requests.exceptions.RequestException as e:
logger.debug('Failed sending', exc_info=e)
# LANG: Error while trying to send data to EDDN
status['text'] = _("Error: Can't connect to EDDN")
except Exception as e:
logger.debug('Failed sending', exc_info=e)
status['text'] = str(e)
return False
def _log_response(
self,
@ -441,6 +434,92 @@ class EDDN:
Content :\t{response.text}
''')+additional_data)
@staticmethod
def http_error_to_log(exception: requests.exceptions.HTTPError) -> str:
"""Convert an exception from raise_for_status to a log message and displayed error."""
status_code = exception.errno
if status_code == 429: # HTTP UPGRADE REQUIRED
logger.warning('EDMC is sending schemas that are too old')
# LANG: EDDN has banned this version of our client
return _('EDDN Error: EDMC is too old for EDDN. Please update.')
elif status_code == 400:
# we a validation check or something else.
logger.warning(f'EDDN Error: {status_code} -- {exception.response}')
# LANG: EDDN returned an error that indicates something about what we sent it was wrong
return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log')
else:
logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}')
# LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number
return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
# TODO: a good few of these methods are static or could be classmethods. they should be created as such.
class EDDN:
"""EDDN Data export."""
DEFAULT_URL = 'https://eddn.edcd.io:4430/upload/'
if 'eddn' in debug_senders:
DEFAULT_URL = f'http://{edmc_data.DEBUG_WEBSERVER_HOST}:{edmc_data.DEBUG_WEBSERVER_PORT}/eddn'
REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms]
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
CAPI_LOCALISATION_RE = re.compile(r'^loc[A-Z].+')
def __init__(self, parent: tk.Tk):
self.parent: tk.Tk = parent
if config.eddn_url is not None:
self.eddn_url = config.eddn_url
else:
self.eddn_url = self.DEFAULT_URL
self.sender = EDDNSender(self, self.eddn_url)
self.fss_signals: List[Mapping[str, Any]] = []
def close(self):
"""Close down the EDDN class instance."""
logger.debug('Closing Sender...')
if self.sender:
self.sender.close()
logger.debug('Done.')
logger.debug('Closing EDDN requests.Session.')
self.session.close()
def send(self, cmdr: str, msg: Mapping[str, Any]) -> None:
"""
Enqueue a message for transmission.
:param cmdr: the CMDR to use as the uploader ID.
:param msg: the payload to send.
"""
to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([
('$schemaRef', msg['$schemaRef']),
('header', OrderedDict([
('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'),
('softwareVersion', str(appversion_nobuild())),
('uploaderID', cmdr),
# TODO: Add `gameversion` and `gamebuild` if that change is live
# on EDDN.
])),
('message', msg['message']),
])
# Ensure it's en-queued
msg_id = self.sender.add_message(cmdr, to_send)
# Now try to transmit it immediately
if self.sender.send_message_by_id(msg_id):
# De-queue
self.sender.delete_message(msg_id)
def sendreplay(self) -> None: # noqa: CCR001
"""Send cached Journal lines to EDDN."""
# TODO: Convert to using the sqlite3 db
@ -493,63 +572,13 @@ class EDDN:
'https://eddn.edcd.io/schemas/'
)
try:
self.send(cmdr, msg)
self.replaylog.pop(0)
if not len(self.replaylog) % self.REPLAYFLUSH:
self.flush()
except requests.exceptions.HTTPError as e:
if unknown_schema := self.UNKNOWN_SCHEMA_RE.match(e.response.text):
logger.debug(f"EDDN doesn't (yet?) know about schema: {unknown_schema['schema_name']}"
f"/{unknown_schema['schema_version']}")
# NB: This dropping is to cater for the time when EDDN
# doesn't *yet* support a new schema.
self.replaylog.pop(0) # Drop the message
self.flush() # Truncates the file, then writes the extant data
elif e.response.status_code == 400:
# EDDN straight up says no, so drop the message
logger.debug(f"EDDN responded '400' to the message, dropping:\n{msg!r}")
self.replaylog.pop(0) # Drop the message
self.flush() # Truncates the file, then writes the extant data
else:
status['text'] = self.http_error_to_log(e)
except requests.exceptions.RequestException as e:
logger.debug('Failed sending', exc_info=e)
# LANG: Error while trying to send data to EDDN
status['text'] = _("Error: Can't connect to EDDN")
return # stop sending
except Exception as e:
logger.debug('Failed sending', exc_info=e)
status['text'] = str(e)
return # stop sending
self.send(cmdr, msg)
self.replaylog.pop(0)
if not len(self.replaylog) % self.REPLAYFLUSH:
self.flush()
self.parent.after(self.REPLAYPERIOD, self.sendreplay)
@staticmethod
def http_error_to_log(exception: requests.exceptions.HTTPError) -> str:
"""Convert an exception from raise_for_status to a log message and displayed error."""
status_code = exception.errno
if status_code == 429: # HTTP UPGRADE REQUIRED
logger.warning('EDMC is sending schemas that are too old')
# LANG: EDDN has banned this version of our client
return _('EDDN Error: EDMC is too old for EDDN. Please update.')
elif status_code == 400:
# we a validation check or something else.
logger.warning(f'EDDN Error: {status_code} -- {exception.response}')
# LANG: EDDN returned an error that indicates something about what we sent it was wrong
return _('EDDN Error: Validation Failed (EDMC Too Old?). See Log')
else:
logger.warning(f'Unknown status code from EDDN: {status_code} -- {exception.response}')
# LANG: EDDN returned some sort of HTTP error, one we didn't expect. {STATUS} contains a number
return _('EDDN Error: Returned {STATUS} status code').format(STATUS=status_code)
def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None: # noqa: CCR001
"""
@ -883,33 +912,13 @@ class EDDN:
def export_journal_entry(self, cmdr: str, entry: Mapping[str, Any], msg: Mapping[str, Any]) -> None:
"""
Update EDDN with an event from the journal.
Additionally if other lines have been saved for retry, it may send
those as well.
Send a Journal-sourced EDDN message.
:param cmdr: Commander name as passed in through `journal_entry()`.
:param entry: The full journal event dictionary (due to checks in this function).
:param msg: The EDDN message body to be sent.
"""
if self.replayfile or self.journal_replay_load_file():
# Store the entry
self.replaylog.append(json.dumps([cmdr, msg]))
self.replayfile.write(f'{self.replaylog[-1]}\n') # type: ignore
if (
entry['event'] == 'Docked' or (entry['event'] == 'Location' and entry['Docked']) or not
(config.get_int('output') & config.OUT_SYS_DELAY)
):
self.parent.after(self.REPLAYPERIOD, self.sendreplay) # Try to send this and previous entries
else:
# Can't access replay file! Send immediately.
# LANG: Status text shown while attempting to send data
self.parent.children['status']['text'] = _('Sending data to EDDN...')
self.parent.update_idletasks()
self.send(cmdr, msg)
self.parent.children['status']['text'] = ''
self.send(cmdr, msg)
def export_journal_generic(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
"""