1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-17 17:42:20 +03:00

pull decode/decompress out into its own function

This commit is contained in:
A_D 2022-01-27 19:40:06 +02:00
parent 17c886af84
commit f4ba4775f2
No known key found for this signature in database
GPG Key ID: 4BE9EB7DF45076C4

View File

@ -6,7 +6,7 @@ import tempfile
import threading import threading
import zlib import zlib
from http import server from http import server
from typing import Any, Callable, Tuple, Union from typing import Any, Callable, Literal, Tuple, Union
from urllib.parse import parse_qs from urllib.parse import parse_qs
from config import appname from config import appname
@ -37,16 +37,7 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
encoding = self.headers.get('Content-Encoding') encoding = self.headers.get('Content-Encoding')
if encoding == 'gzip': to_save = self.get_printable(data_raw, encoding)
data = gzip.decompress(data_raw).decode('utf-8', errors='replace')
elif encoding == 'deflate':
data = zlib.decompress(data_raw).decode('utf-8', errors='replace')
else:
data = data_raw.decode('utf-8', errors='replace')
to_save = data
target_path = self.path target_path = self.path
if len(target_path) > 1 and target_path[0] == '/': if len(target_path) > 1 and target_path[0] == '/':
@ -57,7 +48,7 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path) response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path)
if callable(response): if callable(response):
response = response(data) response = response(to_save)
self.send_response_only(200, "OK") self.send_response_only(200, "OK")
if response is not None: if response is not None:
@ -85,6 +76,31 @@ class LoggingHandler(server.BaseHTTPRequestHandler):
with output_lock, target_file.open('a') as f: with output_lock, target_file.open('a') as f:
f.write(to_save + "\n\n") f.write(to_save + "\n\n")
@staticmethod
def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str:
"""
Convert an incoming data stream into a string.
:param data: The data to convert
:param compression: The compression to remove, defaults to None
:raises ValueError: If compression is unknown
:return: printable strings
"""
ret: bytes = b''
if compression is None:
ret = data
elif compression == 'deflate':
ret = zlib.decompress(data)
elif compression == 'gzip':
ret = gzip.decompress(data)
else:
raise ValueError(f'Unknown encoding for data {compression!r}')
return ret.decode('utf-8', errors='replace')
def safe_file_name(name: str): def safe_file_name(name: str):
""" """