mirror of
https://github.com/EDCD/EDMarketConnector.git
synced 2025-06-16 07:12:17 +03:00
Merge pull request #1409 from A-UNDERSCORE-D/fix/1390/Ensure-we-handle-413
-from-EDDN-properly
Compress outgoing EDDN data if its large
This commit is contained in:
commit
c6a52cb080
@ -1,10 +1,12 @@
|
|||||||
"""Simple HTTP listener to be used with debugging various EDMC sends."""
|
"""Simple HTTP listener to be used with debugging various EDMC sends."""
|
||||||
|
import gzip
|
||||||
import json
|
import json
|
||||||
import pathlib
|
import pathlib
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
|
import zlib
|
||||||
from http import server
|
from http import server
|
||||||
from typing import Any, Callable, Tuple, Union
|
from typing import Any, Callable, Literal, Tuple, Union
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
|
|
||||||
from config import appname
|
from config import appname
|
||||||
@ -30,8 +32,11 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
|
|||||||
def do_POST(self) -> None: # noqa: N802 # I cant change it
|
def do_POST(self) -> None: # noqa: N802 # I cant change it
|
||||||
"""Handle POST."""
|
"""Handle POST."""
|
||||||
logger.info(f"Received a POST for {self.path!r}!")
|
logger.info(f"Received a POST for {self.path!r}!")
|
||||||
data = self.rfile.read(int(self.headers['Content-Length'])).decode('utf-8', errors='replace')
|
data_raw: bytes = self.rfile.read(int(self.headers['Content-Length']))
|
||||||
to_save = data
|
|
||||||
|
encoding = self.headers.get('Content-Encoding')
|
||||||
|
|
||||||
|
to_save = data = self.get_printable(data_raw, encoding)
|
||||||
|
|
||||||
target_path = self.path
|
target_path = self.path
|
||||||
if len(target_path) > 1 and target_path[0] == '/':
|
if len(target_path) > 1 and target_path[0] == '/':
|
||||||
@ -42,7 +47,7 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path)
|
response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path)
|
||||||
if callable(response):
|
if callable(response):
|
||||||
response = response(data)
|
response = response(to_save)
|
||||||
|
|
||||||
self.send_response_only(200, "OK")
|
self.send_response_only(200, "OK")
|
||||||
if response is not None:
|
if response is not None:
|
||||||
@ -64,12 +69,37 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
|
|||||||
target_file = output_data_path / (safe_file_name(target_path) + '.log')
|
target_file = output_data_path / (safe_file_name(target_path) + '.log')
|
||||||
if target_file.parent != output_data_path:
|
if target_file.parent != output_data_path:
|
||||||
logger.warning(f"REFUSING TO WRITE FILE THAT ISN'T IN THE RIGHT PLACE! {target_file=}")
|
logger.warning(f"REFUSING TO WRITE FILE THAT ISN'T IN THE RIGHT PLACE! {target_file=}")
|
||||||
logger.warning(f'DATA FOLLOWS\n{data}')
|
logger.warning(f'DATA FOLLOWS\n{data}') # type: ignore # mypy thinks data is a byte string here
|
||||||
return
|
return
|
||||||
|
|
||||||
with output_lock, target_file.open('a') as f:
|
with output_lock, target_file.open('a') as f:
|
||||||
f.write(to_save + "\n\n")
|
f.write(to_save + "\n\n")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str:
|
||||||
|
"""
|
||||||
|
Convert an incoming data stream into a string.
|
||||||
|
|
||||||
|
:param data: The data to convert
|
||||||
|
:param compression: The compression to remove, defaults to None
|
||||||
|
:raises ValueError: If compression is unknown
|
||||||
|
:return: printable strings
|
||||||
|
"""
|
||||||
|
ret: bytes = b''
|
||||||
|
if compression is None:
|
||||||
|
ret = data
|
||||||
|
|
||||||
|
elif compression == 'deflate':
|
||||||
|
ret = zlib.decompress(data)
|
||||||
|
|
||||||
|
elif compression == 'gzip':
|
||||||
|
ret = gzip.decompress(data)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Unknown encoding for data {compression!r}')
|
||||||
|
|
||||||
|
return ret.decode('utf-8', errors='replace')
|
||||||
|
|
||||||
|
|
||||||
def safe_file_name(name: str):
|
def safe_file_name(name: str):
|
||||||
"""
|
"""
|
||||||
@ -103,6 +133,7 @@ def generate_inara_response(raw_data: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def extract_edsm_data(data: str) -> dict[str, Any]:
|
def extract_edsm_data(data: str) -> dict[str, Any]:
|
||||||
|
"""Extract relevant data from edsm data."""
|
||||||
res = parse_qs(data)
|
res = parse_qs(data)
|
||||||
return {name: data[0] for name, data in res.items()}
|
return {name: data[0] for name, data in res.items()}
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ from collections import OrderedDict
|
|||||||
from os import SEEK_SET
|
from os import SEEK_SET
|
||||||
from os.path import join
|
from os.path import join
|
||||||
from platform import system
|
from platform import system
|
||||||
|
from textwrap import dedent
|
||||||
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, MutableMapping, Optional
|
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, MutableMapping, Optional
|
||||||
from typing import OrderedDict as OrderedDictT
|
from typing import OrderedDict as OrderedDictT
|
||||||
from typing import TextIO, Tuple, Union
|
from typing import TextIO, Tuple, Union
|
||||||
@ -27,6 +28,7 @@ from monitor import monitor
|
|||||||
from myNotebook import Frame
|
from myNotebook import Frame
|
||||||
from prefs import prefsVersion
|
from prefs import prefsVersion
|
||||||
from ttkHyperlinkLabel import HyperlinkLabel
|
from ttkHyperlinkLabel import HyperlinkLabel
|
||||||
|
from util import text
|
||||||
|
|
||||||
if sys.platform != 'win32':
|
if sys.platform != 'win32':
|
||||||
from fcntl import LOCK_EX, LOCK_NB, lockf
|
from fcntl import LOCK_EX, LOCK_NB, lockf
|
||||||
@ -168,6 +170,10 @@ class EDDN:
|
|||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
"""Flush the replay file, clearing any data currently there that is not in the replaylog list."""
|
"""Flush the replay file, clearing any data currently there that is not in the replaylog list."""
|
||||||
|
if self.replayfile is None:
|
||||||
|
logger.error('replayfile is None!')
|
||||||
|
return
|
||||||
|
|
||||||
self.replayfile.seek(0, SEEK_SET)
|
self.replayfile.seek(0, SEEK_SET)
|
||||||
self.replayfile.truncate()
|
self.replayfile.truncate()
|
||||||
for line in self.replaylog:
|
for line in self.replaylog:
|
||||||
@ -213,7 +219,20 @@ class EDDN:
|
|||||||
('message', msg['message']),
|
('message', msg['message']),
|
||||||
])
|
])
|
||||||
|
|
||||||
r = self.session.post(self.eddn_url, data=json.dumps(to_send), timeout=self.TIMEOUT)
|
# About the smallest request is going to be (newlines added for brevity):
|
||||||
|
# {"$schemaRef":"https://eddn.edcd.io/schemas/commodity/3","header":{"softwareName":"E:D Market
|
||||||
|
# Connector Windows","softwareVersion":"5.3.0-beta4extra","uploaderID":"abcdefghijklm"},"messag
|
||||||
|
# e":{"systemName":"delphi","stationName":"The Oracle","marketId":128782803,"timestamp":"2022-0
|
||||||
|
# 1-26T12:00:00Z","commodities":[]}}
|
||||||
|
#
|
||||||
|
# Which comes to 315 bytes (including \n) and compresses to 244 bytes. So lets just compress everything
|
||||||
|
|
||||||
|
encoded, compressed = text.gzip(json.dumps(to_send, separators=(',', ':')), max_size=0)
|
||||||
|
headers: None | dict[str, str] = None
|
||||||
|
if compressed:
|
||||||
|
headers = {'Content-Encoding': 'gzip'}
|
||||||
|
|
||||||
|
r = self.session.post(self.eddn_url, data=encoded, timeout=self.TIMEOUT, headers=headers)
|
||||||
if r.status_code != requests.codes.ok:
|
if r.status_code != requests.codes.ok:
|
||||||
|
|
||||||
# Check if EDDN is still objecting to an empty commodities list
|
# Check if EDDN is still objecting to an empty commodities list
|
||||||
@ -226,18 +245,48 @@ class EDDN:
|
|||||||
logger.trace_if('plugin.eddn', "EDDN is still objecting to empty commodities data")
|
logger.trace_if('plugin.eddn', "EDDN is still objecting to empty commodities data")
|
||||||
return # We want to silence warnings otherwise
|
return # We want to silence warnings otherwise
|
||||||
|
|
||||||
|
if r.status_code == 413:
|
||||||
|
extra_data = {
|
||||||
|
'schema_ref': msg.get('$schemaRef', 'Unset $schemaRef!'),
|
||||||
|
'sent_data_len': str(len(encoded)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if '/journal/' in extra_data['schema_ref']:
|
||||||
|
extra_data['event'] = msg.get('message', {}).get('event', 'No Event Set')
|
||||||
|
|
||||||
|
self._log_response(r, header_msg='Got a 413 while POSTing data', **extra_data)
|
||||||
|
return # drop the error
|
||||||
|
|
||||||
if not self.UNKNOWN_SCHEMA_RE.match(r.text):
|
if not self.UNKNOWN_SCHEMA_RE.match(r.text):
|
||||||
logger.debug(
|
self._log_response(r, header_msg='Status from POST wasn\'t 200 (OK)')
|
||||||
f'''Status from POST wasn't OK:
|
|
||||||
Status\t{r.status_code}
|
|
||||||
URL\t{r.url}
|
|
||||||
Headers\t{r.headers}
|
|
||||||
Content:\n{r.text}
|
|
||||||
Msg:\n{msg}'''
|
|
||||||
)
|
|
||||||
|
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
|
def _log_response(
|
||||||
|
self,
|
||||||
|
response: requests.Response,
|
||||||
|
header_msg='Failed to POST to EDDN',
|
||||||
|
**kwargs
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Log a response object with optional additional data.
|
||||||
|
|
||||||
|
:param response: The response to log
|
||||||
|
:param header_msg: A header message to add to the log, defaults to 'Failed to POST to EDDN'
|
||||||
|
:param kwargs: Any other notes to add, will be added below the main data in the same format.
|
||||||
|
"""
|
||||||
|
additional_data = "\n".join(
|
||||||
|
f'''{name.replace('_', ' ').title():<8}:\t{value}''' for name, value in kwargs.items()
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(dedent(f'''\
|
||||||
|
{header_msg}:
|
||||||
|
Status :\t{response.status_code}
|
||||||
|
URL :\t{response.url}
|
||||||
|
Headers :\t{response.headers}
|
||||||
|
Content :\t{response.text}
|
||||||
|
''')+additional_data)
|
||||||
|
|
||||||
def sendreplay(self) -> None: # noqa: CCR001
|
def sendreplay(self) -> None: # noqa: CCR001
|
||||||
"""Send cached Journal lines to EDDN."""
|
"""Send cached Journal lines to EDDN."""
|
||||||
if not self.replayfile:
|
if not self.replayfile:
|
||||||
|
27
util/text.py
Normal file
27
util/text.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
"""Utilities for dealing with text (and byte representations thereof)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from gzip import compress
|
||||||
|
|
||||||
|
__all__ = ['gzip']
|
||||||
|
|
||||||
|
|
||||||
|
def gzip(data: str | bytes, max_size: int = 512, encoding='utf-8') -> tuple[bytes, bool]:
|
||||||
|
"""
|
||||||
|
Compress the given data if the max size is greater than specified.
|
||||||
|
|
||||||
|
The default was chosen somewhat arbitrarily, see eddn.py for some more careful
|
||||||
|
work towards keeping the data almost always compressed
|
||||||
|
|
||||||
|
:param data: The data to compress
|
||||||
|
:param max_size: The max size of data, in bytes, defaults to 512
|
||||||
|
:param encoding: The encoding to use if data is a str, defaults to 'utf-8'
|
||||||
|
:return: the payload to send, and a bool indicating compression state
|
||||||
|
"""
|
||||||
|
if isinstance(data, str):
|
||||||
|
data = data.encode(encoding=encoding)
|
||||||
|
|
||||||
|
if len(data) <= max_size:
|
||||||
|
return data, False
|
||||||
|
|
||||||
|
return compress(data), True
|
Loading…
x
Reference in New Issue
Block a user