From dd85fc050469a3719593f2266aeee56b76981bec Mon Sep 17 00:00:00 2001 From: A_D Date: Mon, 3 Aug 2020 21:49:52 +0200 Subject: [PATCH 1/4] Rewrote inara queue system This replaces the list+queue system that the inara plugin originally used with a deque based one. The main differences here are that the list the worker thread uses to send to inara and the list that events are added to is the same, with the worker thread making a duplicate and clearing the original each time it sends events (losing events if it fails to upload three times). The format of the data has changed as well, from simple tuples to NamedTuple classes that provide some extra type safety and sanity when accessing fields. The event queue itself is actually multiple queues, one per API/FID/CMDR_name triplicate, thus allowing multiple commander switches while we're running without causing any weird issues --- plugins/inara.py | 695 +++++++++++++++++++++++++---------------------- 1 file changed, 377 insertions(+), 318 deletions(-) diff --git a/plugins/inara.py b/plugins/inara.py index 5cd381c0..1e252ac9 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -2,17 +2,18 @@ # Inara sync # -from collections import OrderedDict +from collections import OrderedDict, defaultdict import json -from typing import Any, AnyStr, Dict, List, Mapping, Optional, OrderedDict as OrderedDictT, \ - Sequence, TYPE_CHECKING, Union +from typing import Any, AnyStr, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional, OrderedDict as OrderedDictT, \ + Sequence, TYPE_CHECKING, Tuple, Union +import dataclasses import requests import sys import time from operator import itemgetter from queue import Queue -from threading import Thread +from threading import Lock, Thread import logging import tkinter as tk @@ -22,6 +23,11 @@ import myNotebook as nb from config import appname, applongname, appversion, config import plug import timeout_session + +# For new impl +from collections import deque + + logger = logging.getLogger(appname) if TYPE_CHECKING: @@ -42,6 +48,7 @@ this.lastship = None # eventData from the last addCommanderShip or setCommander # Cached Cmdr state this.events: List[OrderedDictT[str, Any]] = [] # Unsent events +this.event_lock = Lock this.cmdr: Optional[str] = None this.FID: Optional[str] = None # Frontier ID this.multicrew: bool = False # don't send captain's ship info to Inara while on a crew @@ -59,7 +66,9 @@ this.shipswap: bool = False # just swapped ship # last time we updated, if unset in config this is 0, which means an instant update LAST_UPDATE_CONF_KEY = 'inara_last_update' -FLOOD_LIMIT_SECONDS = 30 # minimum time between sending events +EVENT_COLLECT_TIME = 31 # Minimum time to take collecting events before requesting a send +WORKER_WAIT_TIME = 35 # Minimum time for worker to wait between sends + this.timer_run = True @@ -74,6 +83,50 @@ this.station_marketid = None STATION_UNDOCKED: str = '×' # "Station" name to display when not docked = U+00D7 +class Credentials(NamedTuple): + """ + Credentials holds an inara API payload + """ + cmdr: str + fid: str + api_key: str + + +EVENT_DATA = Union[Mapping[AnyStr, Any], Sequence[Mapping[AnyStr, Any]]] + + +class Event(NamedTuple): + """ + Event represents an event for the Inara API + """ + name: str + timestamp: str + data: EVENT_DATA + + +@dataclasses.dataclass +class NewThis: + events: Dict[Credentials, Deque[Event]] = dataclasses.field(default_factory=lambda: defaultdict(deque)) + event_lock: Lock = dataclasses.field(default_factory=Lock) # protects events, for use when rewriting events + + def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]): + """ + filter_events is the equivalent of running filter() on any event list in the events dict. + it will automatically handle locking, and replacing the event list with the filtered version. + + :param key: the key to filter + :param predicate: the predicate to use while filtering + """ + with self.event_lock: + tmp = self.events[key].copy() + self.events[key].clear() + self.events[key].extend(filter(predicate, tmp)) + + +new_this = NewThis() +TARGET_URL = 'https://inara.cz/inapi/v1/' + + def system_url(system_name: str): if this.system_address: return requests.utils.requote_uri(f'https://inara.cz/galaxy-starsystem/?search={this.system_address}') @@ -97,13 +150,10 @@ def station_url(system_name: Optional[str], station_name: Optional[str]): def plugin_start3(plugin_dir): - this.thread = Thread(target=worker, name='Inara worker') + this.thread = Thread(target=new_worker, name='Inara worker') this.thread.daemon = True this.thread.start() - this.timer_thread = Thread(target=call_timer, name='Inara timer') - this.timer_thread.daemon = True - this.timer_thread.start() return 'Inara' @@ -115,13 +165,10 @@ def plugin_app(parent: tk.Tk): def plugin_stop(): - # Send any unsent events - call() - time.sleep(0.1) # Sleep for 100ms to allow call to go out, and to force a context switch to our other threads # Signal thread to close and wait for it this.queue.put(None) - this.thread.join() - this.thread = None + # this.thread.join() + # this.thread = None this.timer_run = False @@ -227,8 +274,9 @@ def prefs_changed(cmdr: str, is_beta: bool): if this.log.get() and changed: this.newuser = True # Send basic info at next Journal event - add_event('getCommanderProfile', time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), {'searchName': cmdr}) - call() + new_add_event('getCommanderProfile', time.strftime( + '%Y-%m-%dT%H:%M:%SZ', time.gmtime()), {'searchName': cmdr}) + # call() def credentials(cmdr: str) -> Optional[str]: @@ -251,8 +299,8 @@ def credentials(cmdr: str) -> Optional[str]: def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Dict[str, Any], state: Dict[str, Any]): # Send any unsent events when switching accounts - if cmdr and cmdr != this.cmdr: - call(force=True) + # if cmdr and cmdr != this.cmdr: + # call(force=True) event_name = entry['event'] this.cmdr = cmdr @@ -309,6 +357,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di this.station_marketid = None if config.getint('inara_out') and not is_beta and not this.multicrew and credentials(cmdr): + current_creds = Credentials(this.cmdr, this.FID, str(credentials(this.cmdr))) try: # Dump starting state to Inara if (this.newuser or @@ -319,41 +368,44 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di this.newsession = False # Send rank info to Inara on startup - add_event( + new_add_event( 'setCommanderRankPilot', entry['timestamp'], [ - OrderedDict([ - ('rankName', k.lower()), - ('rankValue', v[0]), - ('rankProgress', v[1] / 100.0), - ]) for k, v in state['Rank'].items() if v is not None + {'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': v[1] / 100.0} + for k, v in state['Rank'].items() if v is not None ] ) - add_event( + new_add_event( 'setCommanderReputationMajorFaction', entry['timestamp'], [ - OrderedDict([('majorfactionName', k.lower()), ('majorfactionReputation', v / 100.0)]) + {'majorfactionName': k.lower(), 'majorfactionReputation': v / 100.0} for k, v in state['Reputation'].items() if v is not None ] ) if state['Engineers']: # Not populated < 3.3 - add_event( + to_send = [] + for k, v in state['Engineers'].items(): + e = {'engineerName': k} + if isinstance(v, tuple): + e['rankValue'] = v[0] + + else: + e['rankStage'] = v + + to_send.append(e) + + new_add_event( 'setCommanderRankEngineer', entry['timestamp'], - [ - OrderedDict( - [('engineerName', k), isinstance(v, tuple) and ('rankValue', v[0]) or ('rankStage', v)] - ) - for k, v in state['Engineers'].items() - ] + to_send, ) # Update location - add_event( + new_add_event( 'setCommanderTravelLocation', entry['timestamp'], OrderedDict([ @@ -364,104 +416,94 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di # Update ship if state['ShipID']: # Unknown if started in Fighter or SRV - data = OrderedDict([ - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ('shipName', state['ShipName']), # Can be None - ('shipIdent', state['ShipIdent']), # Can be None - ('isCurrentShip', True), - ]) + cur_ship = { + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + 'shipName': state['ShipName'], + 'shipIdent': state['ShipIdent'], + 'isCurrentShip': True, + + } if state['HullValue']: - data['shipHullValue'] = state['HullValue'] + cur_ship['shipHullValue'] = state['HullValue'] if state['ModulesValue']: - data['shipModulesValue'] = state['ModulesValue'] + cur_ship['shipModulesValue'] = state['ModulesValue'] - data['shipRebuyCost'] = state['Rebuy'] - add_event('setCommanderShip', entry['timestamp'], data) + cur_ship['shipRebuyCost'] = state['Rebuy'] + new_add_event('setCommanderShip', entry['timestamp'], cur_ship) this.loadout = make_loadout(state) - add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) - - call() # Call here just to be sure that if we can send, we do, otherwise it'll get it in the next tick + new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) # Promotions elif event_name == 'Promotion': for k, v in state['Rank'].items(): if k in entry: - add_event( + new_add_event( 'setCommanderRankPilot', entry['timestamp'], - OrderedDict([ - ('rankName', k.lower()), - ('rankValue', v[0]), - ('rankProgress', 0), - ]) + {'rankName': k.lower(), 'rankValue': v[0], 'rankProgress': 0} ) elif event_name == 'EngineerProgress' and 'Engineer' in entry: - add_event( + to_send = {'engineerName': entry['Engineer']} + if 'Rank' in entry: + to_send['rankValue'] = entry['Rank'] + + else: + to_send['rankStage'] = entry['Progress'] + + new_add_event( 'setCommanderRankEngineer', entry['timestamp'], - OrderedDict([ - ('engineerName', entry['Engineer']), - ('rankValue', entry['Rank']) if 'Rank' in entry else ('rankStage', entry['Progress']), - ]) + to_send ) # PowerPlay status change if event_name == 'PowerplayJoin': - add_event( + new_add_event( 'setCommanderRankPower', entry['timestamp'], - OrderedDict([ - ('powerName', entry['Power']), - ('rankValue', 1), - ]) + {'powerName': entry['Power'], 'rankValue': 1} ) elif event_name == 'PowerplayLeave': - add_event( + new_add_event( 'setCommanderRankPower', entry['timestamp'], - OrderedDict([ - ('powerName', entry['Power']), - ('rankValue', 0), - ]) + {'powerName': entry['Power'], 'rankValue': 0} ) elif event_name == 'PowerplayDefect': - add_event( + new_add_event( 'setCommanderRankPower', entry['timestamp'], - OrderedDict([ - ('powerName', entry['ToPower']), - ('rankValue', 1), - ]) + {'powerName': entry['ToPower'], 'rankValue': 1} ) # Ship change if event_name == 'Loadout' and this.shipswap: - data = OrderedDict([ - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ('shipName', state['ShipName']), # Can be None - ('shipIdent', state['ShipIdent']), # Can be None - ('isCurrentShip', True), - ]) + cur_ship = { + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + 'shipName': state['ShipName'], # Can be None + 'shipIdent': state['ShipIdent'], # Can be None + 'isCurrentShip': True, + } if state['HullValue']: - data['shipHullValue'] = state['HullValue'] + cur_ship['shipHullValue'] = state['HullValue'] if state['ModulesValue']: - data['shipModulesValue'] = state['ModulesValue'] + cur_ship['shipModulesValue'] = state['ModulesValue'] - data['shipRebuyCost'] = state['Rebuy'] - add_event('setCommanderShip', entry['timestamp'], data) + cur_ship['shipRebuyCost'] = state['Rebuy'] + new_add_event('setCommanderShip', entry['timestamp'], cur_ship) this.loadout = make_loadout(state) - add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) + new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) this.shipswap = False # Location change @@ -475,15 +517,15 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di this.suppress_docked = False else: - add_event( + new_add_event( 'addCommanderTravelDock', entry['timestamp'], - OrderedDict([ - ('starsystemName', system), - ('stationName', station), - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ]) + { + 'starsystemName': system, + 'stationName': station, + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + } ) elif event_name == 'Undocked': @@ -493,64 +535,60 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di elif event_name == 'SupercruiseEntry': if this.undocked: # Staying in system after undocking - send any pending events from in-station action - add_event( + new_add_event( 'setCommanderTravelLocation', entry['timestamp'], - OrderedDict([ - ('starsystemName', system), - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ]) + { + 'starsystemName': system, + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + } ) this.undocked = False elif event_name == 'FSDJump': this.undocked = False - add_event( + new_add_event( 'addCommanderTravelFSDJump', entry['timestamp'], - OrderedDict([ - ('starsystemName', entry['StarSystem']), - ('jumpDistance', entry['JumpDist']), - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ]) + { + 'starsystemName': entry['StarSystem'], + 'jumpDistance': entry['JumpDist'], + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + } ) if entry.get('Factions'): - add_event( + new_add_event( 'setCommanderReputationMinorFaction', entry['timestamp'], [ - OrderedDict( - [('minorfactionName', f['Name']), ('minorfactionReputation', f['MyReputation']/100.0)] - ) + {'minorfactionName': f['Name'], 'minorfactionReputation': f['MyReputation'] / 100.0} for f in entry['Factions'] ] ) elif event_name == 'CarrierJump': - add_event( + new_add_event( 'addCommanderTravelCarrierJump', entry['timestamp'], - OrderedDict([ - ('starsystemName', entry['StarSystem']), - ('stationName', entry['StationName']), - ('marketID', entry['MarketID']), - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ]) + { + 'starsystemName': entry['StarSystem'], + 'stationName': entry['StationName'], + 'marketID': entry['MarketID'], + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + } ) if entry.get('Factions'): - add_event( + new_add_event( 'setCommanderReputationMinorFaction', entry['timestamp'], [ - OrderedDict( - [('minorfactionName', f['Name']), ('minorfactionReputation', f['MyReputation']/100.0)] - ) + {'minorfactionName': f['Name'], 'minorfactionReputation': f['MyReputation'] / 100.0} for f in entry['Factions'] ] ) @@ -558,11 +596,11 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di # Ignore the following 'Docked' event this.suppress_docked = True - cargo = [OrderedDict([('itemName', k), ('itemCount', state['Cargo'][k])]) for k in sorted(state['Cargo'])] + cargo = [{'itemName': k, 'itemCount': state['Cargo'][k]} for k in sorted(state['Cargo'])] # Send cargo and materials if changed if this.cargo != cargo: - add_event('setCommanderInventoryCargo', entry['timestamp'], cargo) + new_add_event('setCommanderInventoryCargo', entry['timestamp'], cargo) this.cargo = cargo materials = [] @@ -572,7 +610,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di ) if this.materials != materials: - add_event('setCommanderInventoryMaterials', entry['timestamp'], materials) + new_add_event('setCommanderInventoryMaterials', entry['timestamp'], materials) this.materials = materials except Exception as e: @@ -581,23 +619,23 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di # Send credits and stats to Inara on startup only - otherwise may be out of date if event_name == 'LoadGame': - add_event( + new_add_event( 'setCommanderCredits', entry['timestamp'], - OrderedDict([('commanderCredits', state['Credits']), ('commanderLoan', state['Loan'])]) + {'commanderCredits': state['Credits'], 'commanderLoan': state['Loan']} ) this.lastcredits = state['Credits'] elif event_name == 'Statistics': - add_event('setCommanderGameStatistics', entry['timestamp'], state['Statistics']) # may be out of date + new_add_event('setCommanderGameStatistics', entry['timestamp'], state['Statistics']) # may be out of date # Selling / swapping ships if event_name == 'ShipyardNew': add_event( 'addCommanderShip', entry['timestamp'], - OrderedDict([('shipType', entry['ShipType']), ('shipGameID', entry['NewShipID'])]) + {'shipType': entry['ShipType'], 'shipGameID': entry['NewShipID']} ) this.shipswap = True # Want subsequent Loadout event to be sent immediately @@ -607,51 +645,51 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di this.shipswap = True # Don't know new ship name and ident 'til the following Loadout event if 'StoreShipID' in entry: - add_event( + new_add_event( 'setCommanderShip', entry['timestamp'], - OrderedDict([ - ('shipType', entry['StoreOldShip']), - ('shipGameID', entry['StoreShipID']), - ('starsystemName', system), - ('stationName', station), - ]) + { + 'shipType': entry['StoreOldShip'], + 'shipGameID': entry['StoreShipID'], + 'starsystemName': system, + 'stationName': station, + } ) elif 'SellShipID' in entry: - add_event( + new_add_event( 'delCommanderShip', entry['timestamp'], - OrderedDict([ - ('shipType', entry.get('SellOldShip', entry['ShipType'])), - ('shipGameID', entry['SellShipID']), - ]) + { + 'shipType': entry.get('SellOldShip', entry['ShipType']), + 'shipGameID': entry['SellShipID'], + } ) elif event_name == 'SetUserShipName': - add_event( + new_add_event( 'setCommanderShip', entry['timestamp'], - OrderedDict([ - ('shipType', state['ShipType']), - ('shipGameID', state['ShipID']), - ('shipName', state['ShipName']), # Can be None - ('shipIdent', state['ShipIdent']), # Can be None - ('isCurrentShip', True), - ]) + { + 'shipType': state['ShipType'], + 'shipGameID': state['ShipID'], + 'shipName': state['ShipName'], # Can be None + 'shipIdent': state['ShipIdent'], # Can be None + 'isCurrentShip': True, + } ) elif event_name == 'ShipyardTransfer': - add_event( + new_add_event( 'setCommanderShipTransfer', entry['timestamp'], - OrderedDict([ - ('shipType', entry['ShipType']), - ('shipGameID', entry['ShipID']), - ('starsystemName', system), - ('stationName', station), - ('transferTime', entry['TransferTime']), - ]) + { + 'shipType': entry['ShipType'], + 'shipGameID': entry['ShipID'], + 'starsystemName': system, + 'stationName': station, + 'transferTime': entry['TransferTime'], + } ) # Fleet @@ -679,22 +717,24 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di if this.fleet != fleet: this.fleet = fleet - this.events = [x for x in this.events if x['eventName'] != 'setCommanderShip'] # Remove any unsent + new_this.filter_events(current_creds, lambda e: e.name != 'setCommanderShip') + + # this.events = [x for x in this.events if x['eventName'] != 'setCommanderShip'] # Remove any unsent for ship in this.fleet: - add_event('setCommanderShip', entry['timestamp'], ship) + new_add_event('setCommanderShip', entry['timestamp'], ship) # Loadout if event_name == 'Loadout' and not this.newsession: loadout = make_loadout(state) if this.loadout != loadout: this.loadout = loadout - # Remove any unsent for this ship - this.events = [ - e for e in this.events - if e['eventName'] != 'setCommanderShipLoadout' or e['shipGameID'] != this.loadout['shipGameID'] - ] - add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) + new_this.filter_events( + current_creds, + lambda e: e.name != 'setCommanderShipLoadout' or e.data['shipGameID'] != this.loadout['shipGameID'] + ) + + new_add_event('setCommanderShipLoadout', entry['timestamp'], this.loadout) # Stored modules if event_name == 'StoredModules': @@ -729,8 +769,10 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di # Only send on change this.storedmodules = modules # Remove any unsent - this.events = list(filter(lambda e: e['eventName'] != 'setCommanderStorageModules', this.events)) - add_event('setCommanderStorageModules', entry['timestamp'], this.storedmodules) + new_this.filter_events(current_creds, lambda e: e.name != 'setCommanderStorageModules') + + # this.events = list(filter(lambda e: e['eventName'] != 'setCommanderStorageModules', this.events)) + new_add_event('setCommanderStorageModules', entry['timestamp'], this.storedmodules) # Missions if event_name == 'MissionAccepted': @@ -764,14 +806,14 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di if prop in entry: data[iprop] = entry[prop] - add_event('addCommanderMission', entry['timestamp'], data) + new_add_event('addCommanderMission', entry['timestamp'], data) elif event_name == 'MissionAbandoned': - add_event('setCommanderMissionAbandoned', entry['timestamp'], {'missionGameID': entry['MissionID']}) + new_add_event('setCommanderMissionAbandoned', entry['timestamp'], {'missionGameID': entry['MissionID']}) elif event_name == 'MissionCompleted': for x in entry.get('PermitsAwarded', []): - add_event('addCommanderPermit', entry['timestamp'], {'starsystemName': x}) + new_add_event('addCommanderPermit', entry['timestamp'], {'starsystemName': x}) data = OrderedDict([('missionGameID', entry['MissionID'])]) if 'Donation' in entry: @@ -810,10 +852,10 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di if factioneffects: data['minorfactionEffects'] = factioneffects - add_event('setCommanderMissionCompleted', entry['timestamp'], data) + new_add_event('setCommanderMissionCompleted', entry['timestamp'], data) elif event_name == 'MissionFailed': - add_event('setCommanderMissionFailed', entry['timestamp'], {'missionGameID': entry['MissionID']}) + new_add_event('setCommanderMissionFailed', entry['timestamp'], {'missionGameID': entry['MissionID']}) # Combat if event_name == 'Died': @@ -824,7 +866,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di elif 'KillerName' in entry: data['opponentName'] = entry['KillerName'] - add_event('addCommanderCombatDeath', entry['timestamp'], data) + new_add_event('addCommanderCombatDeath', entry['timestamp'], data) elif event_name == 'Interdicted': data = OrderedDict([('starsystemName', system), @@ -841,7 +883,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di elif 'Power' in entry: data['opponentName'] = entry['Power'] - add_event('addCommanderCombatInterdicted', entry['timestamp'], data) + new_add_event('addCommanderCombatInterdicted', entry['timestamp'], data) elif event_name == 'Interdiction': data: OrderedDictT[str, Any] = OrderedDict([ @@ -859,36 +901,40 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di elif 'Power' in entry: data['opponentName'] = entry['Power'] - add_event('addCommanderCombatInterdiction', entry['timestamp'], data) + new_add_event('addCommanderCombatInterdiction', entry['timestamp'], data) elif event_name == 'EscapeInterdiction': - add_event( + new_add_event( 'addCommanderCombatInterdictionEscape', entry['timestamp'], - OrderedDict([ - ('starsystemName', system), - ('opponentName', entry['Interdictor']), - ('isPlayer', entry['IsPlayer']), - ]) + { + 'starsystemName': system, + 'opponentName': entry['Interdictor'], + 'isPlayer': entry['IsPlayer'], + } ) elif event_name == 'PVPKill': - add_event( + new_add_event( 'addCommanderCombatKill', entry['timestamp'], - OrderedDict([ - ('starsystemName', system), - ('opponentName', entry['Victim']), - ]) + { + 'starsystemName': system, + 'opponentName': entry['Victim'], + } ) # Community Goals if event_name == 'CommunityGoal': # Remove any unsent - this.events = list(filter( - lambda e: e['eventName'] not in ('setCommunityGoal', 'setCommanderCommunityGoalProgress'), - this.events - )) + new_this.filter_events( + current_creds, lambda e: e.name not in ('setCommunityGoal', 'setCommanderCommunityGoalProgress') + ) + + # this.events = list(filter( + # lambda e: e['eventName'] not in ('setCommunityGoal', 'setCommanderCommunityGoalProgress'), + # this.events + # )) for goal in entry['CurrentGoals']: data = OrderedDict([ @@ -912,7 +958,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di data['tierMax'] = int(goal['TopTier']['Name'].split()[-1]) data['completionBonus'] = goal['TopTier']['Bonus'] - add_event('setCommunityGoal', entry['timestamp'], data) + new_add_event('setCommunityGoal', entry['timestamp'], data) data = OrderedDict([ ('communitygoalGameID', goal['CGID']), @@ -926,28 +972,28 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di if 'PlayerInTopRank' in goal: data['isTopRank'] = goal['PlayerInTopRank'] - add_event('setCommanderCommunityGoalProgress', entry['timestamp'], data) + new_add_event('setCommanderCommunityGoalProgress', entry['timestamp'], data) # Friends if event_name == 'Friends': if entry['Status'] in ['Added', 'Online']: - add_event( + new_add_event( 'addCommanderFriend', entry['timestamp'], - OrderedDict([ - ('commanderName', entry['Name']), - ('gamePlatform', 'pc'), - ]) + { + 'commanderName': entry['Name'], + 'gamePlatform': 'pc', + } ) elif entry['Status'] in ['Declined', 'Lost']: - add_event( + new_add_event( 'delCommanderFriend', entry['timestamp'], - OrderedDict([ - ('commanderName', entry['Name']), - ('gamePlatform', 'pc'), - ]) + { + 'commanderName': entry['Name'], + 'gamePlatform': 'pc', + } ) this.newuser = False @@ -1005,14 +1051,18 @@ def cmdr_data(data, is_beta): if config.getint('inara_out') and not is_beta and not this.multicrew and credentials(this.cmdr): if not (CREDIT_RATIO > this.lastcredits / data['commander']['credits'] > 1/CREDIT_RATIO): - this.events = [x for x in this.events if x['eventName'] != 'setCommanderCredits'] # Remove any unsent - add_event( + new_this.filter_events( + Credentials(this.cmdr, this.FID, str(credentials(this.cmdr))), + lambda e: e.name != 'setCommanderCredits') + + # this.events = [x for x in this.events if x['eventName'] != 'setCommanderCredits'] # Remove any unsent + new_add_event( 'setCommanderCredits', data['timestamp'], - OrderedDict([ - ('commanderCredits', data['commander']['credits']), - ('commanderLoan', data['commander'].get('debt', 0)), - ]) + { + 'commanderCredits': data['commander']['credits'], + 'commanderLoan': data['commander'].get('debt', 0), + } ) this.lastcredits = float(data['commander']['credits']) @@ -1078,137 +1128,146 @@ def make_loadout(state: Dict[str, Any]) -> OrderedDictT[str, Any]: ]) -EVENT_DATA = Mapping[AnyStr, Any] - - -def add_event(name: str, timestamp: str, data: Union[EVENT_DATA, Sequence[EVENT_DATA]]): +def new_add_event( + name: str, + timestamp: str, + data: EVENT_DATA, + cmdr: Optional[str] = None, + fid: Optional[str] = None +): """ - Add an event to the event queue + add a journal event to the queue, to be sent to inara at the next opportunity. If provided, use the given cmdr + name over the current one :param name: name of the event - :param timestamp: timestamp for the event - :param data: data to be sent in the payload + :param timestamp: timestamp of the event + :param data: payload for the event + :param cmdr: the commander to send as, defaults to the current commander """ + if cmdr is None: + cmdr = this.cmdr - this.events.append(OrderedDict([ - ('eventName', name), - ('eventTimestamp', timestamp), - ('eventData', data), - ])) + if fid is None: + fid = this.FID - -def call_timer(wait: int = FLOOD_LIMIT_SECONDS): - """ - call_timer runs in its own thread polling out to INARA once every FLOOD_LIMIT_SECONDS - - :param wait: time to wait between polls, defaults to FLOOD_LIMIT_SECONDS - """ - while this.timer_run: - time.sleep(wait) - if this.timer_run: # check again in here just in case we're closing and the stars align - call() - - -def call(callback=None, force=False): - """ - call queues a call out to the inara API - - Note that it will not allow a call more than once every FLOOD_LIMIT_SECONDS - unless the force parameter is True. - - :param callback: Unused and ignored. , defaults to None - :param force: Whether or not to ignore flood limits, defaults to False - """ - if not this.events: + api_key = credentials(this.cmdr) + if api_key is None: + logger.warn(f"cannot find an API key for cmdr {this.cmdr!r}") return - if (time.time() - config.getint(LAST_UPDATE_CONF_KEY)) <= FLOOD_LIMIT_SECONDS and not force: - return + key = Credentials(str(cmdr), str(fid), api_key) # this fails type checking due to `this` weirdness, hence str() - config.set(LAST_UPDATE_CONF_KEY, int(time.time())) - logger.info(f"queuing upload of {len(this.events)} events") - data = OrderedDict([ - ('header', OrderedDict([ - ('appName', applongname), - ('appVersion', appversion), - ('APIkey', credentials(this.cmdr)), - ('commanderName', this.cmdr), - ('commanderFrontierID', this.FID), - ])), - ('events', list(this.events)), # shallow copy - ]) - - this.events = [] - this.queue.put(('https://inara.cz/inapi/v1/', data, None)) - -# Worker thread + with new_this.event_lock: + new_this.events[key].append(Event(name, timestamp, data)) -def worker(): - """ - worker is the main thread worker and backbone of the plugin. - - As events are added to `this.queue`, the worker thread will push them to the API - """ - +def new_worker(): while True: - item = this.queue.get() - if not item: - return # Closing - else: - (url, data, callback) = item + events = get_events() + for creds, event_list in events.items(): + if not event_list: + continue - retrying = 0 - while retrying < 3: - try: - r = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT) - r.raise_for_status() - reply = r.json() - status = reply['header']['eventStatus'] - if callback: - callback(reply) + data = { + 'header': { + 'appName': applongname, + 'appVersion': appversion, + 'APIkey': creds.api_key, + 'commanderName': creds.cmdr, + 'commanderFrontierID': creds.fid + }, + 'events': [ + {'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list + ] + } + logger.info(f'sending {len(data["events"])} events for {creds.cmdr}') + try_send_data(TARGET_URL, data) - elif status // 100 != 2: # 2xx == OK (maybe with warnings) - # Log fatal errors - logger.warning(f'Inara\t{status} {reply["header"].get("eventStatusText", "")}') - logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}') - plug.show_error(_('Error: Inara {MSG}').format(MSG=reply['header'].get('eventStatusText', status))) + time.sleep(WORKER_WAIT_TIME) - else: - # Log individual errors and warnings - for data_event, reply_event in zip(data['events'], reply['events']): - if reply_event['eventStatus'] != 200: - logger.warning(f'Inara\t{status} {reply_event.get("eventStatusText", "")}') - logger.debug(f'JSON data:\n{json.dumps(data_event)}') - if reply_event['eventStatus'] // 100 != 2: - plug.show_error(_('Error: Inara {MSG}').format( - MSG=f'{data_event["eventName"]},' - f'{reply_event.get("eventStatusText", reply_event["eventStatus"])}')) - if data_event['eventName'] in ('addCommanderTravelCarrierJump', - 'addCommanderTravelDock', - 'addCommanderTravelFSDJump', - 'setCommanderTravelLocation'): - this.lastlocation = reply_event.get('eventData', {}) - # calls update_location in main thread - this.system_link.event_generate('<>', when="tail") +def get_events(clear=True) -> Dict[Credentials, List[Event]]: + """ + get_events fetches all events from the current queue and returns a frozen version of them - elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']: - this.lastship = reply_event.get('eventData', {}) - # calls update_ship in main thread - this.system_link.event_generate('<>', when="tail") + :param clear: whether or not to clear the queues as we go, defaults to True + :return: the frozen event list + """ + out: Dict[Credentials, List[Event]] = {} + with new_this.event_lock: + for key, events in new_this.events.items(): + out[key] = list(events) + if clear: + events.clear() + return out + + +def try_send_data(url: str, data: Mapping[str, Any]): + """ + attempt repeatedly to send the payload forward + + :param url: target URL for the payload + :param data: the payload + """ + for i in range(3): + logger.debug(f"sending data to API, retry #{i}") + try: + if send_data(url, data): break - except Exception as e: - logger.debug('Unable to send events', exc_info=e) - retrying += 1 - else: - if callback: - callback(None) + except Exception as e: + logger.debug('unable to send events', exc_info=e) + return - else: - plug.show_error(_("Error: Can't connect to Inara")) + +def send_data(url: str, data: Mapping[str, Any]) -> bool: + """ + write a set of events to the inara API + + :param url: the target URL to post to + :param data: the data to POST + :return: success state + """ + r = this.session.post(url, data=json.dumps(data, separators=(',', ':')), timeout=_TIMEOUT) + r.raise_for_status() + reply = r.json() + status = reply['header']['eventStatus'] + + if status // 100 != 2: # 2xx == OK (maybe with warnings) + # Log fatal errors + logger.warning(f'Inara\t{status} {reply["header"].get("eventStatusText", "")}') + logger.debug(f'JSON data:\n{json.dumps(data, indent=2, separators = (",", ": "))}') + plug.show_error(_('Error: Inara {MSG}').format(MSG=reply['header'].get('eventStatusText', status))) + + else: + # Log individual errors and warnings + for data_event, reply_event in zip(data['events'], reply['events']): + if reply_event['eventStatus'] != 200: + logger.warning(f'Inara\t{status} {reply_event.get("eventStatusText", "")}') + logger.debug(f'JSON data:\n{json.dumps(data_event)}') + if reply_event['eventStatus'] // 100 != 2: + plug.show_error(_('Error: Inara {MSG}').format( + MSG=f'{data_event["eventName"]},' + f'{reply_event.get("eventStatusText", reply_event["eventStatus"])}' + )) + + if data_event['eventName'] in ( + 'addCommanderTravelCarrierJump', + 'addCommanderTravelDock', + 'addCommanderTravelFSDJump', + 'setCommanderTravelLocation' + ): + this.lastlocation = reply_event.get('eventData', {}) + # calls update_location in main thread + this.system_link.event_generate('<>', when="tail") + + elif data_event['eventName'] in ['addCommanderShip', 'setCommanderShip']: + this.lastship = reply_event.get('eventData', {}) + # calls update_ship in main thread + this.system_link.event_generate('<>', when="tail") + + return True # regardless of errors above, we DID manage to send it, therefore inform our caller as such def update_location(event=None): From d5dd23ce3848dbb882dbfc9d0e8681c0483ea5b3 Mon Sep 17 00:00:00 2001 From: A_D Date: Tue, 4 Aug 2020 19:25:44 +0200 Subject: [PATCH 2/4] Missed an add_event call --- plugins/inara.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inara.py b/plugins/inara.py index 1e252ac9..f727cfec 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -632,7 +632,7 @@ def journal_entry(cmdr: str, is_beta: bool, system: str, station: str, entry: Di # Selling / swapping ships if event_name == 'ShipyardNew': - add_event( + new_add_event( 'addCommanderShip', entry['timestamp'], {'shipType': entry['ShipType'], 'shipGameID': entry['NewShipID']} From 3155b929fa4ae93a47a39b7ef66c933f70306ad3 Mon Sep 17 00:00:00 2001 From: A_D Date: Fri, 7 Aug 2020 13:37:19 +0200 Subject: [PATCH 3/4] Fixed docstring on Credentials, log wording --- plugins/inara.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inara.py b/plugins/inara.py index f727cfec..52813104 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -85,7 +85,7 @@ STATION_UNDOCKED: str = '×' # "Station" name to display when not docked = U+00 class Credentials(NamedTuple): """ - Credentials holds an inara API payload + Credentials holds the set of credentials required to identify an inara API payload to inara """ cmdr: str fid: str @@ -1211,7 +1211,7 @@ def try_send_data(url: str, data: Mapping[str, Any]): :param data: the payload """ for i in range(3): - logger.debug(f"sending data to API, retry #{i}") + logger.debug(f"sending data to API, attempt #{i}") try: if send_data(url, data): break From 265faf3cdd9dc02da3276601f202c75677ee1deb Mon Sep 17 00:00:00 2001 From: A_D Date: Fri, 7 Aug 2020 13:45:06 +0200 Subject: [PATCH 4/4] Reordered imports --- plugins/inara.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/plugins/inara.py b/plugins/inara.py index 52813104..608b8d61 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -2,31 +2,28 @@ # Inara sync # -from collections import OrderedDict, defaultdict -import json -from typing import Any, AnyStr, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional, OrderedDict as OrderedDictT, \ - Sequence, TYPE_CHECKING, Tuple, Union import dataclasses - -import requests +import json +import logging import sys import time +import tkinter as tk +# For new impl +from collections import OrderedDict, defaultdict, deque from operator import itemgetter from queue import Queue from threading import Lock, Thread -import logging +from typing import TYPE_CHECKING, Any, AnyStr, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional +from typing import OrderedDict as OrderedDictT +from typing import Sequence, Union -import tkinter as tk -from ttkHyperlinkLabel import HyperlinkLabel -import myNotebook as nb +import requests -from config import appname, applongname, appversion, config +import myNotebook as nb # noqa: N813 import plug import timeout_session - -# For new impl -from collections import deque - +from config import applongname, appname, appversion, config +from ttkHyperlinkLabel import HyperlinkLabel logger = logging.getLogger(appname)