1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-06-01 08:01:22 +03:00

[2051] Revert Linux Changes

I can't adequately test this right now, so not touching it.
This commit is contained in:
David Sangrey 2023-11-10 11:22:49 -05:00
parent 28e10b70ef
commit cb4a26186a
No known key found for this signature in database
GPG Key ID: 3AEADBB0186884BC

View File

@ -9,7 +9,6 @@ import os
import pathlib import pathlib
import sys import sys
from configparser import ConfigParser from configparser import ConfigParser
from typing import Optional, Union, List
from config import AbstractConfig, appname, logger from config import AbstractConfig, appname, logger
assert sys.platform == 'linux' assert sys.platform == 'linux'
@ -19,97 +18,100 @@ class LinuxConfig(AbstractConfig):
"""Linux implementation of AbstractConfig.""" """Linux implementation of AbstractConfig."""
SECTION = 'config' SECTION = 'config'
# TODO: I dislike this, would rather use a sane config file format. But here we are.
__unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'} __unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'}
__escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'} __escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'}
def __init__(self, filename: Optional[str] = None) -> None: def __init__(self, filename: str | None = None) -> None:
"""
Initialize LinuxConfig instance.
:param filename: Optional file name to use for configuration storage.
"""
super().__init__() super().__init__()
# http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html
# Initialize directory paths
xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser() xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser()
self.app_dir_path = xdg_data_home / appname self.app_dir_path = xdg_data_home / appname
self.app_dir_path.mkdir(exist_ok=True, parents=True) self.app_dir_path.mkdir(exist_ok=True, parents=True)
self.plugin_dir_path = self.app_dir_path / 'plugins' self.plugin_dir_path = self.app_dir_path / 'plugins'
self.plugin_dir_path.mkdir(exist_ok=True) self.plugin_dir_path.mkdir(exist_ok=True)
self.respath_path = pathlib.Path(__file__).parent.parent self.respath_path = pathlib.Path(__file__).parent.parent
self.internal_plugin_dir_path = self.respath_path / 'plugins' self.internal_plugin_dir_path = self.respath_path / 'plugins'
self.default_journal_dir_path = None # type: ignore self.default_journal_dir_path = None # type: ignore
self.identifier = f'uk.org.marginal.{appname.lower()}' # TODO: Unused?
# Configure the filename
config_home = pathlib.Path(os.getenv('XDG_CONFIG_HOME', default='~/.config')).expanduser() config_home = pathlib.Path(os.getenv('XDG_CONFIG_HOME', default='~/.config')).expanduser()
self.filename = pathlib.Path(filename) if filename is not None else config_home / appname / f'{appname}.ini'
self.filename = config_home / appname / f'{appname}.ini'
if filename is not None:
self.filename = pathlib.Path(filename)
self.filename.parent.mkdir(exist_ok=True, parents=True) self.filename.parent.mkdir(exist_ok=True, parents=True)
# Initialize the configuration self.config: ConfigParser | None = ConfigParser(comment_prefixes=('#',), interpolation=None)
self.config = ConfigParser(comment_prefixes=('#',), interpolation=None) self.config.read(self.filename) # read() ignores files that dont exist
self.config.read(self.filename)
# Ensure that our section exists. This is here because configparser will happily create files for us, but it # Ensure that our section exists. This is here because configparser will happily create files for us, but it
# does not magically create sections # does not magically create sections
try: try:
self.config[self.SECTION].get("this_does_not_exist") self.config[self.SECTION].get("this_does_not_exist", fallback=None)
except KeyError: except KeyError:
logger.info("Config section not found. Backing up existing file (if any) and re-adding a section header") logger.info("Config section not found. Backing up existing file (if any) and readding a section header")
if self.filename.exists(): if self.filename.exists():
backup_filename = self.filename.parent / f'{appname}.ini.backup' (self.filename.parent / f'{appname}.ini.backup').write_bytes(self.filename.read_bytes())
backup_filename.write_bytes(self.filename.read_bytes())
self.config.add_section(self.SECTION) self.config.add_section(self.SECTION)
# Set 'outdir' if not specified or invalid if (outdir := self.get_str('outdir')) is None or not pathlib.Path(outdir).is_dir():
outdir = self.get_str('outdir')
if outdir is None or not pathlib.Path(outdir).is_dir():
self.set('outdir', self.home) self.set('outdir', self.home)
def __escape(self, s: str) -> str: def __escape(self, s: str) -> str:
""" """
Escape special characters in a string. Escape a string using self.__escape_lut.
:param s: The input string. This does NOT support multi-character escapes.
:return: The escaped string.
:param s: str - String to be escaped.
:return: str - The escaped string.
""" """
escaped_chars = [] out = ""
for c in s: for c in s:
escaped_chars.append(self.__escape_lut.get(c, c)) if c not in self.__escape_lut:
out += c
continue
return ''.join(escaped_chars) out += '\\' + self.__escape_lut[c]
return out
def __unescape(self, s: str) -> str: def __unescape(self, s: str) -> str:
""" """
Unescape special characters in a string. Unescape a string.
:param s: The input string. :param s: str - The string to unescape.
:return: The unescaped string. :return: str - The unescaped string.
""" """
unescaped_chars = [] out: list[str] = []
i = 0 i = 0
while i < len(s): while i < len(s):
current_char = s[i] c = s[i]
if current_char != '\\': if c != '\\':
unescaped_chars.append(current_char) out.append(c)
i += 1 i += 1
continue continue
if i == len(s) - 1: # We have a backslash, check what its escaping
if i == len(s)-1:
raise ValueError('Escaped string has unescaped trailer') raise ValueError('Escaped string has unescaped trailer')
unescaped = self.__unescape_lut.get(s[i + 1]) unescaped = self.__unescape_lut.get(s[i+1])
if unescaped is None: if unescaped is None:
raise ValueError(f'Unknown escape: \\{s[i + 1]}') raise ValueError(f'Unknown escape: \\ {s[i+1]}')
unescaped_chars.append(unescaped) out.append(unescaped)
i += 2 i += 2
return "".join(unescaped_chars) return "".join(out)
def __raw_get(self, key: str) -> Optional[str]: def __raw_get(self, key: str) -> str | None:
""" """
Get a raw data value from the config file. Get a raw data value from the config file.
@ -121,7 +123,7 @@ class LinuxConfig(AbstractConfig):
return self.config[self.SECTION].get(key) return self.config[self.SECTION].get(key)
def get_str(self, key: str, *, default: Optional[str] = None) -> str: def get_str(self, key: str, *, default: str | None = None) -> str:
""" """
Return the string referred to by the given key if it exists, or the default. Return the string referred to by the given key if it exists, or the default.
@ -129,28 +131,29 @@ class LinuxConfig(AbstractConfig):
""" """
data = self.__raw_get(key) data = self.__raw_get(key)
if data is None: if data is None:
return default or "" return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
if '\n' in data: if '\n' in data:
raise ValueError('Expected string, but got list') raise ValueError('asked for string, got list')
return self.__unescape(data) return self.__unescape(data)
def get_list(self, key: str, *, default: Optional[list] = None) -> list: def get_list(self, key: str, *, default: list | None = None) -> list:
""" """
Return the list referred to by the given key if it exists, or the default. Return the list referred to by the given key if it exists, or the default.
Implements :meth:`AbstractConfig.get_list`. Implements :meth:`AbstractConfig.get_list`.
""" """
data = self.__raw_get(key) data = self.__raw_get(key)
if data is None: if data is None:
return default or [] return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
split = data.split('\n') split = data.split('\n')
if split[-1] != ';': if split[-1] != ';':
raise ValueError('Encoded list does not have trailer sentinel') raise ValueError('Encoded list does not have trailer sentinel')
return [self.__unescape(item) for item in split[:-1]] return list(map(self.__unescape, split[:-1]))
def get_int(self, key: str, *, default: int = 0) -> int: def get_int(self, key: str, *, default: int = 0) -> int:
""" """
@ -159,47 +162,55 @@ class LinuxConfig(AbstractConfig):
Implements :meth:`AbstractConfig.get_int`. Implements :meth:`AbstractConfig.get_int`.
""" """
data = self.__raw_get(key) data = self.__raw_get(key)
if data is None: if data is None:
return default return default
try: try:
return int(data) return int(data)
except ValueError as e:
raise ValueError(f'Failed to convert {key=} to int') from e
def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: except ValueError as e:
raise ValueError(f'requested {key=} as int cannot be converted to int') from e
def get_bool(self, key: str, *, default: bool | None = None) -> bool:
""" """
Return the bool referred to by the given key if it exists, or the default. Return the bool referred to by the given key if it exists, or the default.
Implements :meth:`AbstractConfig.get_bool`. Implements :meth:`AbstractConfig.get_bool`.
""" """
if self.config is None: if self.config is None:
raise ValueError('Attempt to use a closed config') raise ValueError('attempt to use a closed config')
data = self.__raw_get(key) data = self.__raw_get(key)
if data is None: if data is None:
return default or False return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default
return bool(int(data)) return bool(int(data))
def set(self, key: str, val: Union[int, str, List[str]]) -> None: def set(self, key: str, val: int | str | list[str]) -> None:
""" """
Set the given key's data to the given value. Set the given key's data to the given value.
Implements :meth:`AbstractConfig.set`. Implements :meth:`AbstractConfig.set`.
""" """
if self.config is None: if self.config is None:
raise ValueError('Attempt to use a closed config') raise ValueError('attempt to use a closed config')
to_set: str | None = None
if isinstance(val, bool): if isinstance(val, bool):
to_set = str(int(val)) to_set = str(int(val))
elif isinstance(val, str): elif isinstance(val, str):
to_set = self.__escape(val) to_set = self.__escape(val)
elif isinstance(val, int): elif isinstance(val, int):
to_set = str(val) to_set = str(val)
elif isinstance(val, list): elif isinstance(val, list):
to_set = '\n'.join([self.__escape(s) for s in val] + [';']) to_set = '\n'.join([self.__escape(s) for s in val] + [';'])
else: else:
raise ValueError(f'Unexpected type for value {type(val).__name__}') raise ValueError(f'Unexpected type for value {type(val)=}')
self.config.set(self.SECTION, key, to_set) self.config.set(self.SECTION, key, to_set)
self.save() self.save()
@ -211,7 +222,7 @@ class LinuxConfig(AbstractConfig):
Implements :meth:`AbstractConfig.delete`. Implements :meth:`AbstractConfig.delete`.
""" """
if self.config is None: if self.config is None:
raise ValueError('Attempt to delete from a closed config') raise ValueError('attempt to use a closed config')
self.config.remove_option(self.SECTION, key) self.config.remove_option(self.SECTION, key)
self.save() self.save()
@ -223,7 +234,7 @@ class LinuxConfig(AbstractConfig):
Implements :meth:`AbstractConfig.save`. Implements :meth:`AbstractConfig.save`.
""" """
if self.config is None: if self.config is None:
raise ValueError('Attempt to save a closed config') raise ValueError('attempt to use a closed config')
with open(self.filename, 'w', encoding='utf-8') as f: with open(self.filename, 'w', encoding='utf-8') as f:
self.config.write(f) self.config.write(f)
@ -235,4 +246,4 @@ class LinuxConfig(AbstractConfig):
Implements :meth:`AbstractConfig.close`. Implements :meth:`AbstractConfig.close`.
""" """
self.save() self.save()
self.config = None # type: ignore self.config = None