mirror of
https://github.com/EDCD/EDMarketConnector.git
synced 2025-04-20 19:07:38 +03:00
Added docstrings and further type annotations
This commit is contained in:
parent
a21280ed3b
commit
b5d3b89a3c
111
plugins/eddn.py
111
plugins/eddn.py
@ -42,7 +42,9 @@ this.planet = None
|
||||
|
||||
# Avoid duplicates
|
||||
this.marketId = None
|
||||
this.commodities = this.outfitting = this.shipyard = None
|
||||
this.commodities = None
|
||||
this.outfitting: Optional[Tuple[bool, MutableMapping[str, Any]]] = None
|
||||
this.shipyard = None
|
||||
|
||||
HORIZ_SKU = 'ELITE_HORIZONS_V_PLANETARY_LANDINGS'
|
||||
|
||||
@ -70,7 +72,12 @@ class EDDN:
|
||||
self.replayfile: Optional[TextIO] = None # For delayed messages
|
||||
self.replaylog: List[str] = []
|
||||
|
||||
def load(self) -> bool:
|
||||
def load_journal_replay(self) -> bool:
|
||||
"""
|
||||
Load cached journal entries from disk
|
||||
|
||||
:return: a bool indicating success
|
||||
"""
|
||||
# Try to obtain exclusive access to the journal cache
|
||||
filename = join(config.app_dir, 'replay.jsonl')
|
||||
try:
|
||||
@ -100,6 +107,9 @@ class EDDN:
|
||||
return True
|
||||
|
||||
def flush(self):
|
||||
"""
|
||||
flush flushes the replay file, clearing any data currently there that is not in the replaylog list
|
||||
"""
|
||||
self.replayfile.seek(0, SEEK_SET)
|
||||
self.replayfile.truncate()
|
||||
for line in self.replaylog:
|
||||
@ -108,12 +118,21 @@ class EDDN:
|
||||
self.replayfile.flush()
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
close closes the replay file
|
||||
"""
|
||||
if self.replayfile:
|
||||
self.replayfile.close()
|
||||
|
||||
self.replayfile = None
|
||||
|
||||
def send(self, cmdr: str, msg: Mapping[str, Any]):
|
||||
def send(self, cmdr: str, msg: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
Send sends an update to EDDN
|
||||
|
||||
:param cmdr: the CMDR to use as the uploader ID
|
||||
:param msg: the payload to send
|
||||
"""
|
||||
uploader_id = cmdr
|
||||
|
||||
to_send: OrderedDictT[str, str] = OrderedDict([
|
||||
@ -132,7 +151,10 @@ class EDDN:
|
||||
|
||||
r.raise_for_status()
|
||||
|
||||
def sendreplay(self):
|
||||
def sendreplay(self) -> None:
|
||||
"""
|
||||
sendreplay updates EDDN with cached journal lines
|
||||
"""
|
||||
if not self.replayfile:
|
||||
return # Probably closing app
|
||||
|
||||
@ -183,7 +205,14 @@ class EDDN:
|
||||
|
||||
self.parent.after(self.REPLAYPERIOD, self.sendreplay)
|
||||
|
||||
def export_commodities(self, data: Mapping[str, Any], is_beta: bool):
|
||||
def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None:
|
||||
"""
|
||||
export_commodities updates EDDN with the commodities on the current (lastStarport) station.
|
||||
Once the send is complete, this.commodities is updated with the new data.
|
||||
|
||||
:param data: a dict containing the starport data
|
||||
:param is_beta: whether or not we're currently in beta mode
|
||||
"""
|
||||
commodities: List[OrderedDictT[str, Any]] = []
|
||||
for commodity in data['lastStarport'].get('commodities') or []:
|
||||
# Check 'marketable' and 'not prohibited'
|
||||
@ -229,7 +258,14 @@ class EDDN:
|
||||
|
||||
this.commodities = commodities
|
||||
|
||||
def export_outfitting(self, data: Mapping[str, Any], is_beta: bool):
|
||||
def export_outfitting(self, data: Mapping[str, Any], is_beta: bool) -> None:
|
||||
"""
|
||||
export_outfitting updates EDDN with the current (lastStarport) station's outfitting options, if any.
|
||||
Once the send is complete, this.outfitting is updated with the given data.
|
||||
|
||||
:param data: dict containing the outfitting data
|
||||
:param is_beta: whether or not we're currently in beta mode
|
||||
"""
|
||||
economies: Dict[str, Any] = data['lastStarport'].get('economies') or {}
|
||||
modules: Dict[str, Any] = data['lastStarport'].get('modules') or {}
|
||||
ships: Dict[str, Union[Dict[str, Any], List]] = data['lastStarport'].get('ships') or {
|
||||
@ -269,7 +305,14 @@ class EDDN:
|
||||
|
||||
this.outfitting = (horizons, outfitting)
|
||||
|
||||
def export_shipyard(self, data: Dict[str, Any], is_beta: bool):
|
||||
def export_shipyard(self, data: Dict[str, Any], is_beta: bool) -> None:
|
||||
"""
|
||||
export_shipyard updates EDDN with the current (lastStarport) station's outfitting options, if any.
|
||||
once the send is complete, this.shipyard is updated to the new data.
|
||||
|
||||
:param data: dict containing the shipyard data
|
||||
:param is_beta: whether or not we are in beta mode
|
||||
"""
|
||||
economies: Dict[str, Any] = data['lastStarport'].get('economies') or {}
|
||||
modules: Dict[str, Any] = data['lastStarport'].get('modules') or {}
|
||||
ships: Dict[str, Any] = data['lastStarport'].get('ships') or {'shipyard_list': {}, 'unavailable_list': []}
|
||||
@ -301,7 +344,15 @@ class EDDN:
|
||||
|
||||
this.shipyard = (horizons, shipyard)
|
||||
|
||||
def export_journal_commodities(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]):
|
||||
def export_journal_commodities(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
export_journal_commodities updates EDDN with the commodities list on the current station (lastStarport) from
|
||||
data in the journal. As a side effect, it also updates this.commodities with the data
|
||||
|
||||
:param cmdr: The commander to send data under
|
||||
:param is_beta: whether or not we're in beta mode
|
||||
:param entry: the journal entry containing the commodities data
|
||||
"""
|
||||
items: List[Mapping[str, Any]] = entry.get('Items') or []
|
||||
commodities: Sequence[OrderedDictT[AnyStr, Any]] = sorted((OrderedDict([
|
||||
('name', self.canonicalise(commodity['Name'])),
|
||||
@ -328,7 +379,15 @@ class EDDN:
|
||||
|
||||
this.commodities: OrderedDictT[str, Any] = commodities
|
||||
|
||||
def export_journal_outfitting(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]):
|
||||
def export_journal_outfitting(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
export_journal_outfitting updates EDDN with station outfitting based on a journal entry. As a side effect,
|
||||
it also updates this.outfitting with the data
|
||||
|
||||
:param cmdr: The commander to send data under
|
||||
:param is_beta: Whether or not we're in beta mode
|
||||
:param entry: The relevant journal entry
|
||||
"""
|
||||
modules: List[Mapping[str, Any]] = entry.get('Items', [])
|
||||
horizons: bool = entry.get('Horizons', False)
|
||||
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
|
||||
@ -353,7 +412,15 @@ class EDDN:
|
||||
|
||||
this.outfitting = (horizons, outfitting)
|
||||
|
||||
def export_journal_shipyard(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]):
|
||||
def export_journal_shipyard(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
export_journal_shipyard updates EDDN with station shipyard data based on a journal entry. As a side effect,
|
||||
this.shipyard is updated with the data.
|
||||
|
||||
:param cmdr: the commander to send this update under
|
||||
:param is_beta: Whether or not we're in beta mode
|
||||
:param entry: the relevant journal entry
|
||||
"""
|
||||
ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
|
||||
horizons: bool = entry.get('Horizons', False)
|
||||
shipyard = sorted(ship['ShipType'] for ship in ships)
|
||||
@ -373,13 +440,21 @@ class EDDN:
|
||||
|
||||
this.shipyard = (horizons, shipyard)
|
||||
|
||||
def export_journal_entry(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]):
|
||||
def export_journal_entry(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
export_journal_entry updates EDDN with a line from the journal. Additionally if additional lines are cached,
|
||||
it may send those as well.
|
||||
|
||||
:param cmdr: the commander under which this upload is made
|
||||
:param is_beta: whether or not we are in beta mode
|
||||
:param entry: the journal entry to send
|
||||
"""
|
||||
msg = {
|
||||
'$schemaRef': f'https://eddn.edcd.io/schemas/journal/1{"/test" if is_beta else ""}',
|
||||
'message': entry
|
||||
}
|
||||
|
||||
if self.replayfile or self.load():
|
||||
if self.replayfile or self.load_journal_replay():
|
||||
# Store the entry
|
||||
self.replaylog.append(json.dumps([cmdr, msg]))
|
||||
self.replayfile.write(f'{self.replaylog[-1]}\n')
|
||||
@ -405,15 +480,15 @@ class EDDN:
|
||||
|
||||
# Plugin callbacks
|
||||
|
||||
def plugin_start3(plugin_dir):
|
||||
def plugin_start3(plugin_dir: str) -> str:
|
||||
return 'EDDN'
|
||||
|
||||
|
||||
def plugin_app(parent: tk.Tk):
|
||||
def plugin_app(parent: tk.Tk) -> None:
|
||||
this.parent = parent
|
||||
this.eddn = EDDN(parent)
|
||||
# Try to obtain exclusive lock on journal cache, even if we don't need it yet
|
||||
if not this.eddn.load():
|
||||
if not this.eddn.load_journal_replay():
|
||||
# Shouldn't happen - don't bother localizing
|
||||
this.status['text'] = 'Error: Is another copy of this app already running?'
|
||||
|
||||
@ -469,13 +544,13 @@ def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame:
|
||||
return eddnframe
|
||||
|
||||
|
||||
def prefsvarchanged(event=None):
|
||||
def prefsvarchanged(event=None) -> None:
|
||||
this.eddn_station_button['state'] = tk.NORMAL
|
||||
this.eddn_system_button['state'] = tk.NORMAL
|
||||
this.eddn_delay_button['state'] = this.eddn.replayfile and this.eddn_system.get() and tk.NORMAL or tk.DISABLED
|
||||
|
||||
|
||||
def prefs_changed(cmdr: str, is_beta: bool):
|
||||
def prefs_changed(cmdr: str, is_beta: bool) -> None:
|
||||
config.set(
|
||||
'output',
|
||||
(config.getint('output') & (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP | config.OUT_MKT_MANUAL)) +
|
||||
@ -485,7 +560,7 @@ def prefs_changed(cmdr: str, is_beta: bool):
|
||||
)
|
||||
|
||||
|
||||
def plugin_stop():
|
||||
def plugin_stop() -> None:
|
||||
this.eddn.close()
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user