1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-13 07:47:14 +03:00

Merge branch 'develop' into main

This commit is contained in:
Athanasius 2020-08-27 12:53:03 +01:00
commit 364aaf2aef
21 changed files with 2950 additions and 1490 deletions

View File

@ -1,7 +1,9 @@
# This workflow will:
#
# install Python dependencies
# Run flake8 to add annotations to the PR
# 1. Store some github context in env vars.
# 2. Only checkout if base and head ref owners are the same.
# 3. Check if any *.py files are in the diff.
# 4. Only then install python and perform the annotation with flake8.
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: PR-annotate-flake8
@ -11,19 +13,68 @@ on:
branches: [ develop ]
jobs:
build:
flake8_annotate:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Python 3.7
uses: actions/setup-python@v2
with:
python-version: 3.7
- name: Annotate with Flake8
uses: "tayfun/flake8-your-pr@master"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
##################################################################
# Check if the base and head repos are the same, if not we won't
# be able to add annotations.
##################################################################
- name: Check head and base repo same
env:
BASE_REPO_OWNER: ${{github.event.pull_request.base.repo.owner.login}}
HEAD_REPO_OWNER: ${{github.event.pull_request.head.repo.owner.login}}
BASE_REF: ${{github.base_ref}}
run: |
# Just to be running something
env
##################################################################
##################################################################
# Perform the checkout
##################################################################
- name: Checkout
if: ${{ github.event.pull_request.base.repo.owner.login == github.event.pull_request.head.repo.owner.login }}
uses: actions/checkout@v2
with:
fetch-depth: 0
##################################################################
##################################################################
# flake8-your-pr 'fails' if no *.py files changed
# So we check if any were affected and store that in env
##################################################################
- name: Check for PY files
if: ${{ github.event.pull_request.base.repo.owner.login == github.event.pull_request.head.repo.owner.login }}
run: |
# Checkout might not have happened.
if [ ! -d ".git" ]; then exit 0 ; fi
# Get a list of files ending with ".py", stuff in environment.
# We don't rely on exit status because Workflows run with
# "set -e" so any failure in a pipe is total failure.
PYFILES=$(git diff --name-only "refs/remotes/origin/${BASE_REF}" -- | egrep '.py$' || true)
# Use magic output to store in env for rest of workflow
echo "::set-env name=PYFILES::${PYFILES}"
##################################################################
##################################################################
# Get Python set up
##################################################################
- name: Set up Python 3.7
if: ${{ env.PYFILES != '' }}
uses: actions/setup-python@v2
with:
python-version: 3.7
##################################################################
##################################################################
# Perform the annotation
##################################################################
- name: Annotate with Flake8
# Only if at least one *.py file was affected
if: ${{ env.PYFILES != '' }}
uses: "tayfun/flake8-your-pr@master"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
##################################################################

87
.github/workflows/pr-checks.yml vendored Normal file
View File

@ -0,0 +1,87 @@
# This workflow will:
#
# * install Python dependencies
# * lint with a single version of Python
#
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: PR-Checks
on:
pull_request:
branches: [ develop ]
jobs:
flake8:
runs-on: ubuntu-18.04
steps:
####################################################################
# Checkout the necessary commits
####################################################################
# We need the repo from the 'head' of the PR, not what it's
# based on.
- name: Checkout head commits
# https://github.com/actions/checkout
uses: actions/checkout@v2
with:
repository: ${{github.event.pull_request.head.repo.full_name}}
fetch-depth: 0
# But we do need the base references
- name: Fetch base commits
env:
BASE_REPO_URL: ${{github.event.pull_request.base.repo.svn_url}}
BASE_REPO_OWNER: ${{github.event.pull_request.base.repo.owner.login}}
run: |
# Add the 'base' repo as a new remote
git remote add ${BASE_REPO_OWNER} ${BASE_REPO_URL}
# And then fetch its references
git fetch ${BASE_REPO_OWNER}
####################################################################
####################################################################
# Get Python set up
####################################################################
- name: Set up Python 3.7
uses: actions/setup-python@v2
with:
python-version: 3.7
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; elif [ -f requirements.txt ]; then pip install -r requirements.txt ; fi
####################################################################
####################################################################
# Show full github context to aid debugging.
####################################################################
- name: Show github context
run: |
env
cat $GITHUB_EVENT_PATH
####################################################################
####################################################################
# Lint with flake8
####################################################################
- name: Lint with flake8
env:
BASE_REPO_URL: ${{github.event.pull_request.base.repo.svn_url}}
BASE_REPO_OWNER: ${{github.event.pull_request.base.repo.owner.login}}
BASE_REF: ${{github.base_ref}}
run: |
# Explicitly check for some errors
# E9 - Runtime (syntax and the like)
# F63 - 'tests' checking
# F7 - syntax errors
# F82 - undefined checking
git diff "refs/remotes/${BASE_REPO_OWNER}/${BASE_REF}" -- | flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --diff
# Can optionally add `--exit-zero` to the flake8 arguments so that
# this doesn't fail the build.
git diff "refs/remotes/${BASE_REPO_OWNER}/${BASE_REF}" -- | flake8 . --count --statistics --diff
####################################################################

4
.gitignore vendored
View File

@ -13,3 +13,7 @@ appcast_win_*.xml
appcast_mac_*.xml
EDMarketConnector.VisualElementsManifest.xml
*.zip
.idea
.vscode
venv

View File

@ -1,2 +0,0 @@
[tool.autopep8]
max_line_length = 120

View File

@ -184,14 +184,14 @@ Coding Conventions
Yes:
```python
if somethingTrue:
Things_we_then_do()
if something_true:
one_thing_we_do()
```
No:
```python
if somethingTrue: One_thing_we_do()
if something_true: one_thing_we_do()
```
Yes, some existing code still flouts this rule.
@ -224,6 +224,43 @@ No:
* Going forwards please do place [type hints](https://docs.python.org/3/library/typing.html) on the declarations of your functions, both their arguments and return
types.
* Use `logging` not `print()`, and definitely not `sys.stdout.write()`!
`EDMarketConnector.py` sets up a `logging.Logger` for this under the
`appname`, so:
import logging
from config import appname
logger = logging.getLogger(appname)
logger.info(f'Some message with a {variable}')
try:
something
except Exception as e: # Try to be more specific
logger.error(f'Error in ... with ...', exc_info=e)
**DO NOT** use the following, as you might cause a circular import:
from EDMarketConnector import logger
We have implemented a `logging.Filter` that adds support for the following
in `logging.Formatter()` strings:
1. `%(qualname)s` which gets the full `<module>.ClassA(.ClassB...).func`
of the calling function.
1. `%(class)s` which gets just the enclosing class name(s) of the calling
function.
If you want to see how we did this, check `EDMCLogging.py`.
So don't worry about adding anything about the class or function you're
logging from, it's taken care of.
*Do use a pertinent message, even when using `exc_info=...` to log an
exception*. e.g. Logging will know you were in your `get_foo()` function
but you should still tell it what actually (failed to have) happened
in there.
* In general, please follow [PEP8](https://www.python.org/dev/peps/pep-0008/).
---

412
EDMC.py
View File

@ -5,9 +5,9 @@
import argparse
import json
import requests
import sys
import os
from typing import Any, Optional
# workaround for https://github.com/EDCD/EDMarketConnector/issues/568
os.environ["EDMC_NO_UI"] = "1"
@ -36,199 +36,267 @@ sys.path.append(config.internal_plugin_dir)
import eddn
SERVER_RETRY = 5 # retry pause for Companion servers [s]
SERVER_RETRY = 5 # retry pause for Companion servers [s]
EXIT_SUCCESS, EXIT_SERVER, EXIT_CREDENTIALS, EXIT_VERIFICATION, EXIT_LAGGING, EXIT_SYS_ERR = range(6)
JOURNAL_RE = re.compile(r'^Journal(Beta)?\.[0-9]{12}\.[0-9]{2}\.log$')
# quick and dirty version comparison assuming "strict" numeric only version numbers
def versioncmp(versionstring):
return list(map(int, versionstring.split('.')))
try:
# arg parsing
parser = argparse.ArgumentParser(prog=appcmdname, description='Prints the current system and station (if docked) to stdout and optionally writes player status, ship locations, ship loadout and/or station data to file. Requires prior setup through the accompanying GUI app.')
parser.add_argument('-v', '--version', help='print program version and exit', action='store_const', const=True)
parser.add_argument('-a', metavar='FILE', help='write ship loadout to FILE in Companion API json format')
parser.add_argument('-e', metavar='FILE', help='write ship loadout to FILE in E:D Shipyard plain text format')
parser.add_argument('-l', metavar='FILE', help='write ship locations to FILE in CSV format')
parser.add_argument('-m', metavar='FILE', help='write station commodity market data to FILE in CSV format')
parser.add_argument('-o', metavar='FILE', help='write station outfitting data to FILE in CSV format')
parser.add_argument('-s', metavar='FILE', help='write station shipyard data to FILE in CSV format')
parser.add_argument('-t', metavar='FILE', help='write player status to FILE in CSV format')
parser.add_argument('-d', metavar='FILE', help='write raw JSON data to FILE')
parser.add_argument('-n', action='store_true', help='send data to EDDN')
parser.add_argument('-p', metavar='CMDR', help='Returns data from the specified player account')
parser.add_argument('-j', help=argparse.SUPPRESS) # Import JSON dump
args = parser.parse_args()
def deep_get(target: dict, *args: str, default=None) -> Any:
if not hasattr(target, 'get'):
raise ValueError(f"Cannot call get on {target} ({type(target)})")
if args.version:
updater = Updater(provider='internal')
newversion: EDMCVersion = updater.check_appcast()
if newversion:
print('{CURRENT} ("{UPDATE}" is available)'.format(
CURRENT=appversion,
UPDATE=newversion.title))
else:
print(appversion)
sys.exit(EXIT_SUCCESS)
current = target
for arg in args:
res = current.get(arg)
if res is None:
return default
if args.j:
# Import and collate from JSON dump
data = json.load(open(args.j))
config.set('querytime', int(getmtime(args.j)))
else:
# Get state from latest Journal file
try:
logdir = config.get('journaldir') or config.default_journal_dir
logfiles = sorted([x for x in os.listdir(logdir) if re.search('^Journal(Beta)?\.[0-9]{12}\.[0-9]{2}\.log$', x)],
key=lambda x: x.split('.')[1:])
logfile = join(logdir, logfiles[-1])
with open(logfile, 'r') as loghandle:
for line in loghandle:
try:
monitor.parse_entry(line)
except:
if __debug__:
print('Invalid journal entry "%s"' % repr(line))
except Exception as e:
print("Can't read Journal file: {}\n".format(str(e)), file=sys.stderr)
sys.exit(EXIT_SYS_ERR)
current = res
if not monitor.cmdr:
sys.stderr.write('Not available while E:D is at the main menu\n')
sys.exit(EXIT_SYS_ERR)
return current
# Get data from Companion API
if args.p:
cmdrs = config.get('cmdrs') or []
if args.p in cmdrs:
idx = cmdrs.index(args.p)
def main():
try:
# arg parsing
parser = argparse.ArgumentParser(
prog=appcmdname,
description='Prints the current system and station (if docked) to stdout and optionally writes player '
'status, ship locations, ship loadout and/or station data to file. '
'Requires prior setup through the accompanying GUI app.'
)
parser.add_argument('-v', '--version', help='print program version and exit', action='store_const', const=True)
parser.add_argument('-a', metavar='FILE', help='write ship loadout to FILE in Companion API json format')
parser.add_argument('-e', metavar='FILE', help='write ship loadout to FILE in E:D Shipyard plain text format')
parser.add_argument('-l', metavar='FILE', help='write ship locations to FILE in CSV format')
parser.add_argument('-m', metavar='FILE', help='write station commodity market data to FILE in CSV format')
parser.add_argument('-o', metavar='FILE', help='write station outfitting data to FILE in CSV format')
parser.add_argument('-s', metavar='FILE', help='write station shipyard data to FILE in CSV format')
parser.add_argument('-t', metavar='FILE', help='write player status to FILE in CSV format')
parser.add_argument('-d', metavar='FILE', help='write raw JSON data to FILE')
parser.add_argument('-n', action='store_true', help='send data to EDDN')
parser.add_argument('-p', metavar='CMDR', help='Returns data from the specified player account')
parser.add_argument('-j', help=argparse.SUPPRESS) # Import JSON dump
args = parser.parse_args()
if args.version:
updater = Updater(provider='internal')
newversion: Optional[EDMCVersion] = updater.check_appcast()
if newversion:
print(f'{appversion} ({newversion.title!r} is available)')
else:
for idx, cmdr in enumerate(cmdrs):
if cmdr.lower() == args.p.lower():
break
else:
raise companion.CredentialsError()
companion.session.login(cmdrs[idx], monitor.is_beta)
print(appversion)
sys.exit(EXIT_SUCCESS)
if args.j:
# Import and collate from JSON dump
data = json.load(open(args.j))
config.set('querytime', int(getmtime(args.j)))
else:
cmdrs = config.get('cmdrs') or []
if monitor.cmdr not in cmdrs:
raise companion.CredentialsError()
companion.session.login(monitor.cmdr, monitor.is_beta)
querytime = int(time())
data = companion.session.station()
config.set('querytime', querytime)
# Get state from latest Journal file
try:
logdir = config.get('journaldir') or config.default_journal_dir
logfiles = sorted((x for x in os.listdir(logdir) if JOURNAL_RE.search(x)), key=lambda x: x.split('.')[1:])
# Validation
if not data.get('commander') or not data['commander'].get('name','').strip():
sys.stderr.write('Who are you?!\n')
sys.exit(EXIT_SERVER)
elif (not data.get('lastSystem', {}).get('name') or
(data['commander'].get('docked') and not data.get('lastStarport', {}).get('name'))): # Only care if docked
sys.stderr.write('Where are you?!\n') # Shouldn't happen
sys.exit(EXIT_SERVER)
elif not data.get('ship') or not data['ship'].get('modules') or not data['ship'].get('name','').strip():
sys.stderr.write('What are you flying?!\n') # Shouldn't happen
sys.exit(EXIT_SERVER)
elif args.j:
pass # Skip further validation
elif data['commander']['name'] != monitor.cmdr:
sys.stderr.write('Wrong Cmdr\n') # Companion API return doesn't match Journal
sys.exit(EXIT_CREDENTIALS)
elif ((data['lastSystem']['name'] != monitor.system) or
((data['commander']['docked'] and data['lastStarport']['name'] or None) != monitor.station) or
(data['ship']['id'] != monitor.state['ShipID']) or
(data['ship']['name'].lower() != monitor.state['ShipType'])):
sys.stderr.write('Frontier server is lagging\n')
sys.exit(EXIT_LAGGING)
logfile = join(logdir, logfiles[-1])
# stuff we can do when not docked
if args.d:
with open(args.d, 'wb') as h:
h.write(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True, separators=(',', ': ')).encode('utf-8'))
if args.a:
loadout.export(data, args.a)
if args.e:
edshipyard.export(data, args.e)
if args.l:
stats.export_ships(data, args.l)
if args.t:
stats.export_status(data, args.t)
with open(logfile, 'r') as loghandle:
for line in loghandle:
try:
monitor.parse_entry(line)
except Exception:
if __debug__:
print(f'Invalid journal entry {line!r}')
if data['commander'].get('docked'):
print('%s,%s' % (data.get('lastSystem', {}).get('name', 'Unknown'), data.get('lastStarport', {}).get('name', 'Unknown')))
else:
print(data.get('lastSystem', {}).get('name', 'Unknown'))
except Exception as e:
print(f"Can't read Journal file: {str(e)}", file=sys.stderr)
sys.exit(EXIT_SYS_ERR)
if (args.m or args.o or args.s or args.n or args.j):
if not data['commander'].get('docked'):
sys.stderr.write("You're not docked at a station!\n")
sys.exit(EXIT_SUCCESS)
elif not data.get('lastStarport', {}).get('name'):
sys.stderr.write("Unknown station!\n")
if not monitor.cmdr:
print('Not available while E:D is at the main menu', file=sys.stderr)
sys.exit(EXIT_SYS_ERR)
# Get data from Companion API
if args.p:
cmdrs = config.get('cmdrs') or []
if args.p in cmdrs:
idx = cmdrs.index(args.p)
else:
for idx, cmdr in enumerate(cmdrs):
if cmdr.lower() == args.p.lower():
break
else:
raise companion.CredentialsError()
companion.session.login(cmdrs[idx], monitor.is_beta)
else:
cmdrs = config.get('cmdrs') or []
if monitor.cmdr not in cmdrs:
raise companion.CredentialsError()
companion.session.login(monitor.cmdr, monitor.is_beta)
querytime = int(time())
data = companion.session.station()
config.set('querytime', querytime)
# Validation
if not deep_get(data, 'commander', 'name', default='').strip():
print('Who are you?!', file=sys.stderr)
sys.exit(EXIT_SERVER)
elif not deep_get(data, 'lastSystem', 'name') or \
data['commander'].get('docked') and not \
deep_get(data, 'lastStarport', 'name'): # Only care if docked
print('Where are you?!', file=sys.stderr) # Shouldn't happen
sys.exit(EXIT_SERVER)
elif not deep_get(data, 'ship', 'modules') or not deep_get(data, 'ship', 'name', default=''):
print('What are you flying?!', file=sys.stderr) # Shouldn't happen
sys.exit(EXIT_SERVER)
elif args.j:
pass # Skip further validation
elif data['commander']['name'] != monitor.cmdr:
print('Wrong Cmdr', file=sys.stderr) # Companion API return doesn't match Journal
sys.exit(EXIT_CREDENTIALS)
elif data['lastSystem']['name'] != monitor.system or \
((data['commander']['docked'] and data['lastStarport']['name'] or None) != monitor.station) or \
data['ship']['id'] != monitor.state['ShipID'] or \
data['ship']['name'].lower() != monitor.state['ShipType']:
print('Frontier server is lagging', file=sys.stderr)
sys.exit(EXIT_LAGGING)
elif not (data['lastStarport'].get('commodities') or data['lastStarport'].get('modules')): # Ignore possibly missing shipyard info
sys.stderr.write("Station doesn't have anything!\n")
# stuff we can do when not docked
if args.d:
out = json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True, separators=(',', ': '))
with open(args.d, 'wb') as f:
f.write(out.encode("utf-8"))
if args.a:
loadout.export(data, args.a)
if args.e:
edshipyard.export(data, args.e)
if args.l:
stats.export_ships(data, args.l)
if args.t:
stats.export_status(data, args.t)
if data['commander'].get('docked'):
print('{},{}'.format(
deep_get(data, 'lastSystem', 'name', default='Unknown'),
deep_get(data, 'lastStarport', 'name', default='Unknown')
))
else:
print(deep_get(data, 'lastSystem', 'name', default='Unknown'))
if (args.m or args.o or args.s or args.n or args.j):
if not data['commander'].get('docked'):
print("You're not docked at a station!", file=sys.stderr)
sys.exit(EXIT_SUCCESS)
elif not deep_get(data, 'lastStarport', 'name'):
print("Unknown station!", file=sys.stderr)
sys.exit(EXIT_LAGGING)
# Ignore possibly missing shipyard info
elif not data['lastStarport'].get('commodities') or data['lastStarport'].get('modules'):
print("Station doesn't have anything!", file=sys.stderr)
sys.exit(EXIT_SUCCESS)
else:
sys.exit(EXIT_SUCCESS)
else:
# Finally - the data looks sane and we're docked at a station
if args.j:
# Collate from JSON dump
collate.addcommodities(data)
collate.addmodules(data)
collate.addships(data)
if args.m:
if data['lastStarport'].get('commodities'):
# Fixup anomalies in the commodity data
fixed = companion.fixup(data)
commodity.export(fixed, COMMODITY_DEFAULT, args.m)
else:
print("Station doesn't have a market", file=sys.stderr)
if args.o:
if data['lastStarport'].get('modules'):
outfitting.export(data, args.o)
else:
print("Station doesn't supply outfitting", file=sys.stderr)
if (args.s or args.n) and not args.j and not \
data['lastStarport'].get('ships') and data['lastStarport']['services'].get('shipyard'):
# Retry for shipyard
sleep(SERVER_RETRY)
new_data = companion.session.station()
# might have undocked while we were waiting for retry in which case station data is unreliable
if new_data['commander'].get('docked') and \
deep_get(new_data, 'lastSystem', 'name') == monitor.system and \
deep_get(new_data, 'lastStarport', 'name') == monitor.station:
data = new_data
if args.s:
if deep_get(data, 'lastStarport', 'ships', 'shipyard_list'):
shipyard.export(data, args.s)
elif not args.j and monitor.stationservices and 'Shipyard' in monitor.stationservices:
print("Failed to get shipyard data", file=sys.stderr)
else:
print("Station doesn't have a shipyard", file=sys.stderr)
if args.n:
try:
eddn_sender = eddn.EDDN(None)
eddn_sender.export_commodities(data, monitor.is_beta)
eddn_sender.export_outfitting(data, monitor.is_beta)
eddn_sender.export_shipyard(data, monitor.is_beta)
except Exception as e:
print(f"Failed to send data to EDDN: {str(e)}", file=sys.stderr)
sys.exit(EXIT_SUCCESS)
# Finally - the data looks sane and we're docked at a station
except companion.ServerError:
print('Server is down', file=sys.stderr)
sys.exit(EXIT_SERVER)
if args.j:
# Collate from JSON dump
collate.addcommodities(data)
collate.addmodules(data)
collate.addships(data)
except companion.SKUError:
print('Server SKU problem', file=sys.stderr)
sys.exit(EXIT_SERVER)
if args.m:
if data['lastStarport'].get('commodities'):
# Fixup anomalies in the commodity data
fixed = companion.fixup(data)
commodity.export(fixed, COMMODITY_DEFAULT, args.m)
else:
sys.stderr.write("Station doesn't have a market\n")
except companion.CredentialsError:
print('Invalid Credentials', file=sys.stderr)
sys.exit(EXIT_CREDENTIALS)
if args.o:
if data['lastStarport'].get('modules'):
outfitting.export(data, args.o)
else:
sys.stderr.write("Station doesn't supply outfitting\n")
if (args.s or args.n) and not args.j and not data['lastStarport'].get('ships') and data['lastStarport']['services'].get('shipyard'):
# Retry for shipyard
sleep(SERVER_RETRY)
data2 = companion.session.station()
if (data2['commander'].get('docked') and # might have undocked while we were waiting for retry in which case station data is unreliable
data2.get('lastSystem', {}).get('name') == monitor.system and
data2.get('lastStarport', {}).get('name') == monitor.station):
data = data2
if args.s:
if data['lastStarport'].get('ships', {}).get('shipyard_list'):
shipyard.export(data, args.s)
elif not args.j and monitor.stationservices and 'Shipyard' in monitor.stationservices:
sys.stderr.write("Failed to get shipyard data\n")
else:
sys.stderr.write("Station doesn't have a shipyard\n")
if args.n:
try:
eddn_sender = eddn.EDDN(None)
eddn_sender.export_commodities(data, monitor.is_beta)
eddn_sender.export_outfitting(data, monitor.is_beta)
eddn_sender.export_shipyard(data, monitor.is_beta)
except Exception as e:
sys.stderr.write("Failed to send data to EDDN: %s\n" % unicode(e).encode('ascii', 'replace'))
sys.exit(EXIT_SUCCESS)
except companion.ServerError as e:
sys.stderr.write('Server is down\n')
sys.exit(EXIT_SERVER)
except companion.SKUError as e:
sys.stderr.write('Server SKU problem\n')
sys.exit(EXIT_SERVER)
except companion.CredentialsError as e:
sys.stderr.write('Invalid Credentials\n')
sys.exit(EXIT_CREDENTIALS)
if __name__ == '__main__':
main()

293
EDMCLogging.py Normal file
View File

@ -0,0 +1,293 @@
"""
Set up required logging for the application.
This module provides for a common logging-powered log facility.
Mostly it implements a logging.Filter() in order to get two extra
members on the logging.LogRecord instance for use in logging.Formatter()
strings.
"""
# So that any warning about accessing a protected member is only in one place.
from sys import _getframe as getframe
import inspect
import logging
import pathlib
from typing import Tuple
from config import config
# TODO: Tests:
#
# 1. Call from bare function in file.
# 2. Call from `if __name__ == "__main__":` section
#
# 3. Call from 1st level function in 1st level Class in file
# 4. Call from 2nd level function in 1st level Class in file
# 5. Call from 3rd level function in 1st level Class in file
#
# 6. Call from 1st level function in 2nd level Class in file
# 7. Call from 2nd level function in 2nd level Class in file
# 8. Call from 3rd level function in 2nd level Class in file
#
# 9. Call from 1st level function in 3rd level Class in file
# 10. Call from 2nd level function in 3rd level Class in file
# 11. Call from 3rd level function in 3rd level Class in file
#
# 12. Call from 2nd level file, all as above.
#
# 13. Call from *module*
#
# 14. Call from *package*
_default_loglevel = logging.DEBUG
class Logger:
"""
Wrapper class for all logging configuration and code.
Class instantiation requires the 'logger name' and optional loglevel.
It is intended that this 'logger name' be re-used in all files/modules
that need to log.
Users of this class should then call getLogger() to get the
logging.Logger instance.
"""
def __init__(self, logger_name: str, loglevel: int = _default_loglevel):
"""
Set up a `logging.Logger` with our preferred configuration.
This includes using an EDMCContextFilter to add 'class' and 'qualname'
expansions for logging.Formatter().
"""
self.logger = logging.getLogger(logger_name)
# Configure the logging.Logger
self.logger.setLevel(loglevel)
# Set up filter for adding class name
self.logger_filter = EDMCContextFilter()
self.logger.addFilter(self.logger_filter)
self.logger_channel = logging.StreamHandler()
self.logger_channel.setLevel(loglevel)
self.logger_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s.%(qualname)s:%(lineno)d: %(message)s') # noqa: E501
self.logger_formatter.default_time_format = '%Y-%m-%d %H:%M:%S'
self.logger_formatter.default_msec_format = '%s.%03d'
self.logger_channel.setFormatter(self.logger_formatter)
self.logger.addHandler(self.logger_channel)
def get_logger(self) -> logging.Logger:
"""
Obtain the self.logger of the class instance.
Not to be confused with logging.getLogger().
"""
return self.logger
def get_plugin_logger(name: str, loglevel: int = _default_loglevel) -> logging.Logger:
"""
Return a logger suitable for a plugin.
'Found' plugins need their own logger to call out where the logging is
coming from, but we don't need to set up *everything* for them.
The name will be '{config.appname}.{plugin.name}', e.g.
'EDMarketConnector.miggytest'. This means that any logging sent through
there *also* goes to the channels defined in the 'EDMarketConnector'
logger, so we can let that take care of the formatting.
If we add our own channel then the output gets duplicated (assuming same
logLevel set).
However we do need to attach our filter to this still. That's not at
the channel level.
:param name: Name of this Logger.
:param loglevel: Optional logLevel for this Logger.
:return: logging.Logger instance, all set up.
"""
plugin_logger = logging.getLogger(name)
plugin_logger.setLevel(loglevel)
plugin_logger.addFilter(EDMCContextFilter())
return plugin_logger
class EDMCContextFilter(logging.Filter):
"""
Implements filtering to add extra format specifiers, and tweak others.
logging.Filter sub-class to place extra attributes of the calling site
into the record.
"""
def filter(self, record: logging.LogRecord) -> bool:
"""
Attempt to set/change fields in the LogRecord.
1. class = class name(s) of the call site, if applicable
2. qualname = __qualname__ of the call site. This simplifies
logging.Formatter() as you can use just this no matter if there is
a class involved or not, so you get a nice clean:
<file/module>.<classA>[.classB....].<function>
If we fail to be able to properly set either then:
1. Use print() to alert, to be SURE a message is seen.
2. But also return strings noting the error, so there'll be
something in the log output if it happens.
:param record: The LogRecord we're "filtering"
:return: bool - Always true in order for this record to be logged.
"""
(class_name, qualname, module_name) = self.caller_attributes(module_name=getattr(record, 'module'))
# Only set if we got a useful value
if module_name:
setattr(record, 'module', module_name)
# Only set if not already provided by logging itself
if getattr(record, 'class', None) is None:
setattr(record, 'class', class_name)
# Only set if not already provided by logging itself
if getattr(record, 'qualname', None) is None:
setattr(record, 'qualname', qualname)
return True
@classmethod # noqa: CCR001 - this is as refactored as is sensible
def caller_attributes(cls, module_name: str = '') -> Tuple[str, str, str]:
"""
Determine extra or changed fields for the caller.
1. qualname finds the relevant object and its __qualname__
2. caller_class_names is just the full class names of the calling
class if relevant.
3. module is munged if we detect the caller is an EDMC plugin,
whether internal or found.
"""
frame = cls.find_caller_frame()
caller_qualname = caller_class_names = ''
if frame:
# <https://stackoverflow.com/questions/2203424/python-how-to-retrieve-class-information-from-a-frame-object#2220759>
frame_info = inspect.getframeinfo(frame)
args, _, _, value_dict = inspect.getargvalues(frame)
if len(args) and args[0] in ('self', 'cls'):
frame_class = value_dict[args[0]]
if frame_class:
# Find __qualname__ of the caller
fn = getattr(frame_class, frame_info.function)
if fn and fn.__qualname__:
caller_qualname = fn.__qualname__
# Find containing class name(s) of caller, if any
if frame_class.__class__ and frame_class.__class__.__qualname__:
caller_class_names = frame_class.__class__.__qualname__
# It's a call from the top level module file
elif frame_info.function == '<module>':
caller_class_names = '<none>'
caller_qualname = value_dict['__name__']
elif frame_info.function != '':
caller_class_names = '<none>'
caller_qualname = frame_info.function
module_name = cls.munge_module_name(frame_info, module_name)
# https://docs.python.org/3.7/library/inspect.html#the-interpreter-stack
del frame
if caller_qualname == '':
print('ALERT! Something went wrong with finding caller qualname for logging!')
caller_qualname = '<ERROR in EDMCLogging.caller_class_and_qualname() for "qualname">'
if caller_class_names == '':
print('ALERT! Something went wrong with finding caller class name(s) for logging!')
caller_class_names = '<ERROR in EDMCLogging.caller_class_and_qualname() for "class">'
return caller_class_names, caller_qualname, module_name
@classmethod
def find_caller_frame(cls):
"""
Find the stack frame of the logging caller.
:returns: 'frame' object such as from sys._getframe()
"""
# Go up through stack frames until we find the first with a
# type(f_locals.self) of logging.Logger. This should be the start
# of the frames internal to logging.
frame: 'frame' = getframe(0)
while frame:
if isinstance(frame.f_locals.get('self'), logging.Logger):
frame = frame.f_back # Want to start on the next frame below
break
frame = frame.f_back
# Now continue up through frames until we find the next one where
# that is *not* true, as it should be the call site of the logger
# call
while frame:
if not isinstance(frame.f_locals.get('self'), logging.Logger):
break # We've found the frame we want
frame = frame.f_back
return frame
@classmethod
def munge_module_name(cls, frame_info: inspect.Traceback, module_name: str) -> str:
"""
Adjust module_name based on the file path for the given frame.
We want to distinguish between other code and both our internal plugins
and the 'found' ones.
For internal plugins we want "plugins.<filename>".
For 'found' plugins we want "<plugins>.<plugin_name>...".
:param frame_info: The frame_info of the caller.
:param module_name: The module_name string to munge.
:return: The munged module_name.
"""
file_name = pathlib.Path(frame_info.filename).expanduser()
plugin_dir = pathlib.Path(config.plugin_dir).expanduser()
internal_plugin_dir = pathlib.Path(config.internal_plugin_dir).expanduser()
# Find the first parent called 'plugins'
plugin_top = file_name
while plugin_top and plugin_top.name != '':
if plugin_top.parent.name == 'plugins':
break
plugin_top = plugin_top.parent
# Check we didn't walk up to the root/anchor
if plugin_top.name != '':
# Check we're still inside config.plugin_dir
if plugin_top.parent == plugin_dir:
# In case of deeper callers we need a range of the file_name
pt_len = len(plugin_top.parts)
name_path = '.'.join(file_name.parts[(pt_len - 1):-1])
module_name = f'<plugins>.{name_path}.{module_name}'
# Check we're still inside the installation folder.
elif file_name.parent == internal_plugin_dir:
# Is this a deeper caller ?
pt_len = len(plugin_top.parts)
name_path = '.'.join(file_name.parts[(pt_len - 1):-1])
# Pre-pend 'plugins.<plugin folder>.' to module
if name_path == '':
# No sub-folder involved so module_name is sufficient
module_name = f'plugins.{module_name}'
else:
# Sub-folder(s) involved, so include them
module_name = f'plugins.{name_path}.{module_name}'
return module_name

File diff suppressed because it is too large Load Diff

View File

@ -236,6 +236,7 @@
"Language" = "Idioma";
/* [EDMarketConnector.py] - Leave '%H:%M:%S' as-is */
>>>>>>> develop
"Last updated at %H:%M:%S" = "Última actualización: %H:%M:%S";
/* Federation rank. [stats.py] */

View File

@ -47,6 +47,9 @@ breaking with future code changes.**
`from monitor import gamerunning` - in case a plugin needs to know if we
think the game is running.
`import timeout_session` - provides a method called `new_session` that creates a requests.session with a default timeout
on all requests. Recommended to reduce noise in HTTP requests
```python

View File

@ -4,9 +4,10 @@ from builtins import object
import base64
import csv
import requests
from typing import TYPE_CHECKING
# TODO: see https://github.com/EDCD/EDMarketConnector/issues/569
from http.cookiejar import LWPCookieJar # No longer needed but retained in case plugins use it
from http.cookiejar import LWPCookieJar # noqa: F401 - No longer needed but retained in case plugins use it
from email.utils import parsedate
import hashlib
import numbers
@ -14,14 +15,15 @@ import os
from os.path import join
import random
import time
from traceback import print_exc
import urllib.parse
import webbrowser
from config import appname, appversion, config
from protocol import protocolhandler
import logging
logger = logging.getLogger(appname)
from typing import TYPE_CHECKING
if TYPE_CHECKING:
_ = lambda x: x # noqa # to make flake8 stop complaining that the hacked in _ method doesnt exist
@ -196,29 +198,28 @@ class Auth(object):
return data.get('access_token')
else:
print('Auth\tCan\'t refresh token for {}'.format(self.cmdr))
logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
self.dump(r)
except Exception:
print('Auth\tCan\'t refresh token for {}'.format(self.cmdr))
print_exc()
except Exception as e:
logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"")
else:
print('Auth\tNo token for {}'.format(self.cmdr))
logger.error(f"Frontier CAPI Auth: No token for \"{self.cmdr}\"")
# New request
print('Auth\tNew authorization request')
logger.info('Frontier CAPI Auth: New authorization request')
v = random.SystemRandom().getrandbits(8 * 32)
self.verifier = self.base64URLEncode(v.to_bytes(32, byteorder='big')).encode('utf-8')
self.verifier = self.base64_url_encode(v.to_bytes(32, byteorder='big')).encode('utf-8')
s = random.SystemRandom().getrandbits(8 * 32)
self.state = self.base64URLEncode(s.to_bytes(32, byteorder='big'))
self.state = self.base64_url_encode(s.to_bytes(32, byteorder='big'))
# Won't work under IE: https://blogs.msdn.microsoft.com/ieinternals/2011/07/13/understanding-protocols/
webbrowser.open(
'{server_auth}{url_auth}?response_type=code&audience=frontier&scope=capi&client_id={client_id}&code_challenge={challenge}&code_challenge_method=S256&state={state}&redirect_uri={redirect}'.format( # noqa: E501 # I cant make this any shorter
server_auth=SERVER_AUTH,
url_auth=URL_AUTH,
client_id=CLIENT_ID,
challenge=self.base64URLEncode(hashlib.sha256(self.verifier).digest()),
challenge=self.base64_url_encode(hashlib.sha256(self.verifier).digest()),
state=self.state,
redirect=protocolhandler.redirect
)
@ -228,16 +229,16 @@ class Auth(object):
# Handle OAuth authorization code callback.
# Returns access token if successful, otherwise raises CredentialsError
if '?' not in payload:
print('Auth\tMalformed response {!r}'.format(payload))
logger.error(f'Frontier CAPI Auth: Malformed response\n{payload}\n')
raise CredentialsError('malformed payload') # Not well formed
data = urllib.parse.parse_qs(payload[(payload.index('?') + 1):])
if not self.state or not data.get('state') or data['state'][0] != self.state:
print('Auth\tUnexpected response {!r}'.format(payload))
logger.error(f'Frontier CAPI Auth: Unexpected response\n{payload}\n')
raise CredentialsError('Unexpected response from authorization {!r}'.format(payload)) # Unexpected reply
if not data.get('code'):
print('Auth\tNegative response {!r}'.format(payload))
logger.error(f'Frontier CAPI Auth: Negative response\n{payload}\n')
error = next(
(data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',)
)
@ -256,7 +257,7 @@ class Auth(object):
r = self.session.post(SERVER_AUTH + URL_TOKEN, data=data, timeout=auth_timeout)
data = r.json()
if r.status_code == requests.codes.ok:
print('Auth\tNew token for {}'.format(self.cmdr))
logger.info(f'Frontier CAPI Auth: New token for \"{self.cmdr}\"')
cmdrs = config.get('cmdrs')
idx = cmdrs.index(self.cmdr)
tokens = config.get('fdev_apikeys') or []
@ -268,21 +269,20 @@ class Auth(object):
return data.get('access_token')
except Exception as e:
print('Auth\tCan\'t get token for {}'.format(self.cmdr))
print_exc()
logger.exception(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
if r:
self.dump(r)
raise CredentialsError('unable to get token') from e
print('Auth\tCan\'t get token for {}'.format(self.cmdr))
logger.error(f"Frontier CAPI Auth: Can't get token for \"{self.cmdr}\"")
self.dump(r)
error = next((data[k] for k in ('error_description', 'error', 'message') if k in data), ('<unknown error>',))
raise CredentialsError('Error: {!r}'.format(error)[0])
@staticmethod
def invalidate(cmdr):
print('Auth\tInvalidated token for {}'.format(cmdr))
logger.info(f'Frontier CAPI Auth: Invalidated token for "{cmdr}"')
cmdrs = config.get('cmdrs')
idx = cmdrs.index(cmdr)
tokens = config.get('fdev_apikeys') or []
@ -292,9 +292,9 @@ class Auth(object):
config.save() # Save settings now for use by command-line app
def dump(self, r):
print('Auth\t' + r.url, r.status_code, r.reason if r.reason else 'None', r.text)
logger.debug(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason if r.reason else "None"} {r.text}')
def base64URLEncode(self, text):
def base64_url_encode(self, text):
return base64.urlsafe_b64encode(text).decode().replace('=', '')
@ -379,10 +379,8 @@ class Session(object):
r = self.session.get(self.server + endpoint, timeout=timeout)
except Exception as e:
if __debug__:
print_exc()
raise ServerError('unable to get endpoint {}'.format(endpoint)) from e
logger.debug('Attempting GET', exc_info=e)
raise ServerError(f'unable to get endpoint {endpoint}') from e
if r.url.startswith(SERVER_AUTH):
# Redirected back to Auth server - force full re-authentication
@ -402,7 +400,7 @@ class Session(object):
data = r.json() # May also fail here if token expired since response is empty
except (requests.HTTPError, ValueError) as e:
print_exc()
logger.exception('Frontier CAPI Auth: GET ')
self.dump(r)
self.close()
@ -410,6 +408,7 @@ class Session(object):
self.invalidate()
self.retrying = False
self.login()
logger.error('Frontier CAPI Auth: query failed after refresh')
raise CredentialsError('query failed after refresh') from e
elif self.login(): # Maybe our token expired. Re-authorize in any case
@ -418,6 +417,7 @@ class Session(object):
else:
self.retrying = False
logger.error('Frontier CAPI Auth: HTTP error or invalid JSON')
raise CredentialsError('HTTP error or invalid JSON') from e
self.retrying = False
@ -461,9 +461,8 @@ class Session(object):
try:
self.session.close()
except Exception:
if __debug__:
print_exc()
except Exception as e:
logger.debug('Frontier CAPI Auth: closing', exc_info=e)
self.session = None
@ -473,7 +472,7 @@ class Session(object):
Auth.invalidate(self.credentials['cmdr'])
def dump(self, r):
print('cAPI\t' + r.url, r.status_code, r.reason and r.reason or 'None', r.text)
logger.error(f'Frontier CAPI Auth: {r.url} {r.status_code} {r.reason and r.reason or "None"} {r.text}')
# Returns a shallow copy of the received data suitable for export to older tools
# English commodity names and anomalies fixed up
@ -497,15 +496,7 @@ def fixup(data):
# But also see https://github.com/Marginal/EDMarketConnector/issues/32
for thing in ('buyPrice', 'sellPrice', 'demand', 'demandBracket', 'stock', 'stockBracket'):
if not isinstance(commodity.get(thing), numbers.Number):
if __debug__:
print(
'Invalid {!r}:{!r} ({}) for {!r}'.format(
thing,
commodity.get(thing),
type(commodity.get(thing)),
commodity.get('name', '')
)
)
logger.debug(f'Invalid {thing}:{commodity.get(thing)} ({type(commodity.get(thing))}) for {commodity.get("name", "")}') # noqa: E501
break
else:
@ -520,20 +511,16 @@ def fixup(data):
pass
elif not commodity.get('categoryname'):
if __debug__:
print('Missing "categoryname" for {!r}'.format(commodity.get('name', '')))
logger.debug(f'Missing "categoryname" for {commodity.get("name", "")}')
elif not commodity.get('name'):
if __debug__:
print('Missing "name" for a commodity in {!r}'.format(commodity.get('categoryname', '')))
logger.debug(f'Missing "name" for a commodity in {commodity.get("categoryname", "")}')
elif not commodity['demandBracket'] in range(4):
if __debug__:
print('Invalid "demandBracket":{!r} for {!r}'.format(commodity['demandBracket'], commodity['name']))
logger.debug(f'Invalid "demandBracket":{commodity["demandBracket"]} for {commodity["name"]}')
elif not commodity['stockBracket'] in range(4):
if __debug__:
print('Invalid "stockBracket":{!r} for {!r}'.format(commodity['stockBracket'], commodity['name']))
logger.debug(f'Invalid "stockBracket":{commodity["stockBracket"]} for {commodity["name"]}')
else:
# Rewrite text fields

File diff suppressed because it is too large Load Diff

160
plug.py
View File

@ -7,49 +7,51 @@ import os
import importlib
import sys
import operator
import threading # We don't use it, but plugins might
from traceback import print_exc
import threading # noqa: F401 - We don't use it, but plugins might
from typing import Optional
import logging
import tkinter as tk
import myNotebook as nb
from config import config
from time import time as time
import myNotebook as nb # noqa: N813
from config import config, appname
import EDMCLogging
logger = logging.getLogger(appname)
# Dashboard Flags constants
FlagsDocked = 1<<0 # on a landing pad
FlagsLanded = 1<<1 # on planet surface
FlagsLandingGearDown = 1<<2
FlagsShieldsUp = 1<<3
FlagsSupercruise = 1<<4
FlagsFlightAssistOff = 1<<5
FlagsHardpointsDeployed = 1<<6
FlagsInWing = 1<<7
FlagsLightsOn = 1<<8
FlagsCargoScoopDeployed = 1<<9
FlagsSilentRunning = 1<<10
FlagsScoopingFuel = 1<<11
FlagsSrvHandbrake = 1<<12
FlagsSrvTurret = 1<<13 # using turret view
FlagsSrvUnderShip = 1<<14 # turret retracted
FlagsSrvDriveAssist = 1<<15
FlagsFsdMassLocked = 1<<16
FlagsFsdCharging = 1<<17
FlagsFsdCooldown = 1<<18
FlagsLowFuel = 1<<19 # <25%
FlagsOverHeating = 1<<20 # > 100%
FlagsHasLatLong = 1<<21
FlagsIsInDanger = 1<<22
FlagsBeingInterdicted = 1<<23
FlagsInMainShip = 1<<24
FlagsInFighter = 1<<25
FlagsInSRV = 1<<26
FlagsAnalysisMode = 1<<27 # Hud in Analysis mode
FlagsNightVision = 1<<28
FlagsAverageAltitude = 1<<29 # Altitude from Average radius
FlagsFsdJump = 1<<30
FlagsSrvHighBeam = 1<<31
FlagsDocked = 1 << 0 # on a landing pad
FlagsLanded = 1 << 1 # on planet surface
FlagsLandingGearDown = 1 << 2
FlagsShieldsUp = 1 << 3
FlagsSupercruise = 1 << 4
FlagsFlightAssistOff = 1 << 5
FlagsHardpointsDeployed = 1 << 6
FlagsInWing = 1 << 7
FlagsLightsOn = 1 << 8
FlagsCargoScoopDeployed = 1 << 9
FlagsSilentRunning = 1 << 10
FlagsScoopingFuel = 1 << 11
FlagsSrvHandbrake = 1 << 12
FlagsSrvTurret = 1 << 13 # using turret view
FlagsSrvUnderShip = 1 << 14 # turret retracted
FlagsSrvDriveAssist = 1 << 15
FlagsFsdMassLocked = 1 << 16
FlagsFsdCharging = 1 << 17
FlagsFsdCooldown = 1 << 18
FlagsLowFuel = 1 << 19 # <25%
FlagsOverHeating = 1 << 20 # > 100%
FlagsHasLatLong = 1 << 21
FlagsIsInDanger = 1 << 22
FlagsBeingInterdicted = 1 << 23
FlagsInMainShip = 1 << 24
FlagsInFighter = 1 << 25
FlagsInSRV = 1 << 26
FlagsAnalysisMode = 1 << 27 # Hud in Analysis mode
FlagsNightVision = 1 << 28
FlagsAverageAltitude = 1 << 29 # Altitude from Average radius
FlagsFsdJump = 1 << 30
FlagsSrvHighBeam = 1 << 31
# Dashboard GuiFocus constants
GuiFocusNoFocus = 0
@ -79,7 +81,7 @@ last_error = {
class Plugin(object):
def __init__(self, name, loadfile):
def __init__(self, name: str, loadfile: str, plugin_logger: Optional[logging.Logger]):
"""
Load a single plugin
:param name: module name
@ -87,28 +89,31 @@ class Plugin(object):
:raises Exception: Typically ImportError or OSError
"""
self.name = name # Display name.
self.folder = name # basename of plugin folder. None for internal plugins.
self.module = None # None for disabled plugins.
self.name = name # Display name.
self.folder = name # basename of plugin folder. None for internal plugins.
self.module = None # None for disabled plugins.
self.logger = plugin_logger
if loadfile:
sys.stdout.write('loading plugin {} from "{}"\n'.format(name.replace('.', '_'), loadfile))
logger.info(f'loading plugin "{name.replace(".", "_")}" from "{loadfile}"')
try:
module = importlib.machinery.SourceFileLoader('plugin_{}'.format(name.encode(encoding='ascii', errors='replace').decode('utf-8').replace('.', '_')), loadfile).load_module()
module = importlib.machinery.SourceFileLoader('plugin_{}'.format(
name.encode(encoding='ascii', errors='replace').decode('utf-8').replace('.', '_')),
loadfile).load_module()
if getattr(module, 'plugin_start3', None):
newname = module.plugin_start3(os.path.dirname(loadfile))
self.name = newname and str(newname) or name
self.module = module
elif getattr(module, 'plugin_start', None):
sys.stdout.write('plugin %s needs migrating\n' % name)
logger.warning(f'plugin {name} needs migrating\n')
PLUGINS_not_py3.append(self)
else:
sys.stdout.write('plugin %s has no plugin_start3() function\n' % name)
except:
print_exc()
logger.error(f'plugin {name} has no plugin_start3() function')
except Exception as e:
logger.exception(f': Failed for Plugin "{name}"')
raise
else:
sys.stdout.write('plugin %s disabled\n' % name)
logger.info(f'plugin {name} disabled')
def _get_func(self, funcname):
"""
@ -136,8 +141,8 @@ class Plugin(object):
elif not isinstance(appitem, tk.Widget):
raise AssertionError
return appitem
except:
print_exc()
except Exception as e:
logger.exception(f'Failed for Plugin "{self.name}"')
return None
def get_prefs(self, parent, cmdr, is_beta):
@ -156,8 +161,8 @@ class Plugin(object):
if not isinstance(frame, nb.Frame):
raise AssertionError
return frame
except:
print_exc()
except Exception as e:
logger.exception(f'Failed for Plugin "{self.name}"')
return None
@ -171,12 +176,12 @@ def load_plugins(master):
for name in sorted(os.listdir(config.internal_plugin_dir)):
if name.endswith('.py') and not name[0] in ['.', '_']:
try:
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir, name))
plugin.folder = None # Suppress listing in Plugins prefs tab
plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir, name), logger)
plugin.folder = None # Suppress listing in Plugins prefs tab
internal.append(plugin)
except:
pass
PLUGINS.extend(sorted(internal, key = lambda p: operator.attrgetter('name')(p).lower()))
except Exception as e:
logger.exception(f'Failure loading internal Plugin "{name}"')
PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower()))
# Add plugin folder to load path so packages can be loaded from plugin folder
sys.path.append(config.plugin_dir)
@ -189,15 +194,22 @@ def load_plugins(master):
pass
elif name.endswith('.disabled'):
name, discard = name.rsplit('.', 1)
found.append(Plugin(name, None))
found.append(Plugin(name, None, logger))
else:
try:
# Add plugin's folder to load path in case plugin has internal package dependencies
sys.path.append(os.path.join(config.plugin_dir, name))
found.append(Plugin(name, os.path.join(config.plugin_dir, name, 'load.py')))
except:
# Create a logger for this 'found' plugin. Must be before the
# load.py is loaded.
plugin_logger = EDMCLogging.get_plugin_logger(f'{appname}.{name}')
found.append(Plugin(name, os.path.join(config.plugin_dir, name, 'load.py'), plugin_logger))
except Exception as e:
logger.exception(f'Failure loading found Plugin "{name}"')
pass
PLUGINS.extend(sorted(found, key = lambda p: operator.attrgetter('name')(p).lower()))
PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower()))
def provides(fn_name):
"""
@ -240,8 +252,8 @@ def notify_stop():
try:
newerror = plugin_stop()
error = error or newerror
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
return error
@ -257,8 +269,8 @@ def notify_prefs_cmdr_changed(cmdr, is_beta):
if prefs_cmdr_changed:
try:
prefs_cmdr_changed(cmdr, is_beta)
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_prefs_changed(cmdr, is_beta):
@ -275,8 +287,8 @@ def notify_prefs_changed(cmdr, is_beta):
if prefs_changed:
try:
prefs_changed(cmdr, is_beta)
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
def notify_journal_entry(cmdr, is_beta, system, station, entry, state):
@ -298,8 +310,8 @@ def notify_journal_entry(cmdr, is_beta, system, station, entry, state):
# Pass a copy of the journal entry in case the callee modifies it
newerror = journal_entry(cmdr, is_beta, system, station, dict(entry), dict(state))
error = error or newerror
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
return error
@ -319,8 +331,8 @@ def notify_dashboard_entry(cmdr, is_beta, entry):
# Pass a copy of the status entry in case the callee modifies it
newerror = status(cmdr, is_beta, dict(entry))
error = error or newerror
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
return error
@ -338,8 +350,8 @@ def notify_newdata(data, is_beta):
try:
newerror = cmdr_data(data, is_beta)
error = error or newerror
except:
print_exc()
except Exception as e:
logger.exception(f'Plugin "{plugin.name}" failed')
return error

View File

@ -7,17 +7,21 @@ import io
# Migrate settings from <= 3.01
from config import config
if not config.get('shipyard_provider') and config.getint('shipyard'):
config.set('shipyard_provider', 'Coriolis')
config.delete('shipyard')
def plugin_start3(plugin_dir):
def plugin_start3(_):
return 'Coriolis'
# Return a URL for the current ship
def shipyard_url(loadout, is_beta):
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') # most compact representation
"""Return a URL for the current ship"""
# most compact representation
string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8')
if not string:
return False
@ -25,4 +29,7 @@ def shipyard_url(loadout, is_beta):
with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(string)
return (is_beta and 'https://beta.coriolis.io/import?data=' or 'https://coriolis.io/import?data=') + base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
encoded = base64.urlsafe_b64encode(out.getvalue()).decode().replace('=', '%3D')
url = 'https://beta.coriolis.io/import?data=' if is_beta else 'https://coriolis.io/import?data='
return f'{url}{encoded}'

View File

@ -24,23 +24,27 @@
import sys
from typing import Any, Optional, TYPE_CHECKING
import requests
from config import config
if TYPE_CHECKING:
from tkinter import Tk
STATION_UNDOCKED: str = u'×' # "Station" name to display when not docked = U+00D7
this = sys.modules[__name__] # For holding module globals
STATION_UNDOCKED: str = '×' # "Station" name to display when not docked = U+00D7
this: Any = sys.modules[__name__] # For holding module globals
# Main window clicks
this.system_link = None
this.system = None
this.system_address = None
this.system_population = None
this.station_link = None
this.station = None
this.station_marketid = None
this.system_link: Optional[str] = None
this.system: Optional[str] = None
this.system_address: Optional[str] = None
this.system_population: Optional[int] = None
this.station_link: 'Optional[Tk]' = None
this.station: Optional[str] = None
this.station_marketid: Optional[int] = None
def system_url(system_name: str) -> str:
@ -61,21 +65,21 @@ def station_url(system_name: str, station_name: str) -> str:
def plugin_start3(plugin_dir):
return 'eddb'
def plugin_app(parent):
this.system_link = parent.children['system'] # system label in main window
def plugin_app(parent: 'Tk'):
this.system_link = parent.children['system'] # system label in main window
this.system = None
this.system_address = None
this.station = None
this.station_marketid = None # Frontier MarketID
this.station_link = parent.children['station'] # station label in main window
this.station_link.configure(popup_copy = lambda x: x != STATION_UNDOCKED)
this.station_link.configure(popup_copy=lambda x: x != STATION_UNDOCKED)
def prefs_changed(cmdr, is_beta):
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
pass
def journal_entry(cmdr, is_beta, system, station, entry, state):
# Always update our system address even if we're not currently the provider for system or station, but dont update
# on events that contain "future" data, such as FSDTarget
@ -105,7 +109,15 @@ def journal_entry(cmdr, is_beta, system, station, entry, state):
# But only actually change the URL if we are current station provider.
if config.get('station_provider') == 'eddb':
this.station_link['text'] = this.station or (this.system_population and this.system_population > 0 and STATION_UNDOCKED or '')
text = this.station
if not text:
if this.system_population is not None and this.system_population > 0:
text = STATION_UNDOCKED
else:
text = ''
this.station_link['text'] = text
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.station_link.update_idletasks()
@ -113,11 +125,15 @@ def journal_entry(cmdr, is_beta, system, station, entry, state):
def cmdr_data(data, is_beta):
# Always store initially, even if we're not the *current* system provider.
if not this.station_marketid:
this.station_marketid = data['commander']['docked'] and data['lastStarport']['id']
if not this.station_marketid and data['commander']['docked']:
this.station_marketid = data['lastStarport']['id']
# Only trust CAPI if these aren't yet set
this.system = this.system or data['lastSystem']['name']
this.station = this.station or data['commander']['docked'] and data['lastStarport']['name']
if not this.system:
this.system = data['lastSystem']['name']
if not this.station and data['commander']['docked']:
this.station = data['lastStarport']['name']
# Override standard URL functions
if config.get('system_provider') == 'eddb':
@ -125,15 +141,17 @@ def cmdr_data(data, is_beta):
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.system_link.update_idletasks()
if config.get('station_provider') == 'eddb':
if data['commander']['docked']:
this.station_link['text'] = this.station
elif data['lastStarport']['name'] and data['lastStarport']['name'] != "":
this.station_link['text'] = STATION_UNDOCKED
else:
this.station_link['text'] = ''
# Do *NOT* set 'url' here, as it's set to a function that will call
# through correctly. We don't want a static string.
this.station_link.update_idletasks()

View File

@ -1,32 +1,40 @@
# Export to EDDN
from collections import OrderedDict
import itertools
import json
from os import SEEK_SET, SEEK_CUR, SEEK_END
import logging
import pathlib
import re
import sys
import tkinter as tk
from collections import OrderedDict
from os import SEEK_SET
from os.path import exists, join
from platform import system
import re
from typing import TYPE_CHECKING, Any, AnyStr, Dict, Iterator, List, Mapping, MutableMapping, Optional
from typing import OrderedDict as OrderedDictT
from typing import Sequence, TextIO, Tuple, Union
import requests
import sys
import uuid
import tkinter as tk
from ttkHyperlinkLabel import HyperlinkLabel
import myNotebook as nb
import myNotebook as nb # noqa: N813
from companion import category_map
from config import applongname, appname, appversion, config
from myNotebook import Frame
from prefs import prefsVersion
from ttkHyperlinkLabel import HyperlinkLabel
if sys.platform != 'win32':
from fcntl import lockf, LOCK_EX, LOCK_NB
if __debug__:
from traceback import print_exc
from config import applongname, appversion, config
from companion import category_map
if TYPE_CHECKING:
def _(x: str) -> str:
return x
logger = logging.getLogger(appname)
this = sys.modules[__name__] # For holding module globals
this: Any = sys.modules[__name__] # For holding module globals
# Track location to add to Journal events
this.systemaddress = None
@ -35,130 +43,180 @@ this.planet = None
# Avoid duplicates
this.marketId = None
this.commodities = this.outfitting = this.shipyard = None
this.commodities = None
this.outfitting: Optional[Tuple[bool, MutableMapping[str, Any]]] = None
this.shipyard = None
HORIZ_SKU = 'ELITE_HORIZONS_V_PLANETARY_LANDINGS'
class EDDN(object):
# TODO: a good few of these methods are static or could be classmethods. they should be created as such.
### SERVER = 'http://localhost:8081' # testing
class EDDN:
# SERVER = 'http://localhost:8081' # testing
SERVER = 'https://eddn.edcd.io:4430'
UPLOAD = '%s/upload/' % SERVER
REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms]
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
TIMEOUT= 10 # requests timeout
MODULE_RE = re.compile('^Hpt_|^Int_|Armour_', re.IGNORECASE)
UPLOAD = f'{SERVER}/upload/'
REPLAYPERIOD = 400 # Roughly two messages per second, accounting for send delays [ms]
REPLAYFLUSH = 20 # Update log on disk roughly every 10 seconds
TIMEOUT = 10 # requests timeout
MODULE_RE = re.compile(r'^Hpt_|^Int_|Armour_', re.IGNORECASE)
CANONICALISE_RE = re.compile(r'\$(.+)_name;')
def __init__(self, parent):
self.parent = parent
def __init__(self, parent: tk.Tk):
self.parent: tk.Tk = parent
self.session = requests.Session()
self.replayfile = None # For delayed messages
self.replaylog = []
self.replayfile: Optional[TextIO] = None # For delayed messages
self.replaylog: List[str] = []
def load(self):
def load_journal_replay(self) -> bool:
"""
Load cached journal entries from disk
:return: a bool indicating success
"""
# Try to obtain exclusive access to the journal cache
filename = join(config.app_dir, 'replay.jsonl')
try:
try:
# Try to open existing file
self.replayfile = open(filename, 'r+', buffering=1)
except:
except Exception:
if exists(filename):
raise # Couldn't open existing file
raise # Couldn't open existing file
else:
self.replayfile = open(filename, 'w+', buffering=1) # Create file
if sys.platform != 'win32': # open for writing is automatically exclusive on Windows
lockf(self.replayfile, LOCK_EX|LOCK_NB)
except:
if __debug__: print_exc()
self.replayfile = open(filename, 'w+', buffering=1) # Create file
if sys.platform != 'win32': # open for writing is automatically exclusive on Windows
lockf(self.replayfile, LOCK_EX | LOCK_NB)
except Exception as e:
logger.debug('Failed opening "replay.jsonl"', exc_info=e)
if self.replayfile:
self.replayfile.close()
self.replayfile = None
return False
self.replaylog = [line.strip() for line in self.replayfile]
return True
def flush(self):
"""
flush flushes the replay file, clearing any data currently there that is not in the replaylog list
"""
self.replayfile.seek(0, SEEK_SET)
self.replayfile.truncate()
for line in self.replaylog:
self.replayfile.write('%s\n' % line)
self.replayfile.write(f'{line}\n')
self.replayfile.flush()
def close(self):
"""
close closes the replay file
"""
if self.replayfile:
self.replayfile.close()
self.replayfile = None
def send(self, cmdr, msg):
uploaderID = cmdr
def send(self, cmdr: str, msg: Mapping[str, Any]) -> None:
"""
Send sends an update to EDDN
msg = OrderedDict([
:param cmdr: the CMDR to use as the uploader ID
:param msg: the payload to send
"""
uploader_id = cmdr
to_send: OrderedDictT[str, str] = OrderedDict([
('$schemaRef', msg['$schemaRef']),
('header', OrderedDict([
('softwareName', '%s [%s]' % (applongname, sys.platform=='darwin' and "Mac OS" or system())),
('header', OrderedDict([
('softwareName', f'{applongname} [{system() if sys.platform != "darwin" else "Mac OS"}]'),
('softwareVersion', appversion),
('uploaderID', uploaderID),
('uploaderID', uploader_id),
])),
('message', msg['message']),
('message', msg['message']),
])
r = self.session.post(self.UPLOAD, data=json.dumps(msg), timeout=self.TIMEOUT)
if __debug__ and r.status_code != requests.codes.ok:
print('Status\t%s' % r.status_code)
print('URL\t%s' % r.url)
print('Headers\t%s' % r.headers)
print('Content:\n%s' % r.text)
r = self.session.post(self.UPLOAD, data=json.dumps(to_send), timeout=self.TIMEOUT)
if r.status_code != requests.codes.ok:
logger.debug(f':\nStatus\t{r.status_code}URL\t{r.url}Headers\t{r.headers}Content:\n{r.text}')
r.raise_for_status()
def sendreplay(self):
def sendreplay(self) -> None:
"""
sendreplay updates EDDN with cached journal lines
"""
if not self.replayfile:
return # Probably closing app
return # Probably closing app
status = self.parent.children['status']
status: Dict[str, Any] = self.parent.children['status']
if not self.replaylog:
status['text'] = ''
return
localized: str = _('Sending data to EDDN...')
if len(self.replaylog) == 1:
status['text'] = _('Sending data to EDDN...')
status['text'] = localized
else:
status['text'] = '%s [%d]' % (_('Sending data to EDDN...').replace('...',''), len(self.replaylog))
status['text'] = f'{localized.replace("...", "")} [{len(self.replaylog)}]'
self.parent.update_idletasks()
try:
cmdr, msg = json.loads(self.replaylog[0], object_pairs_hook=OrderedDict)
except:
except json.JSONDecodeError as e:
# Couldn't decode - shouldn't happen!
if __debug__:
print(self.replaylog[0])
print_exc()
self.replaylog.pop(0) # Discard and continue
logger.debug(f'\n{self.replaylog[0]}\n', exc_info=e)
# Discard and continue
self.replaylog.pop(0)
else:
# Rewrite old schema name
if msg['$schemaRef'].startswith('http://schemas.elite-markets.net/eddn/'):
msg['$schemaRef'] = 'https://eddn.edcd.io/schemas/' + msg['$schemaRef'][38:]
msg['$schemaRef'] = str(msg['$schemaRef']).replace(
'http://schemas.elite-markets.net/eddn/',
'https://eddn.edcd.io/schemas/'
)
try:
self.send(cmdr, msg)
self.replaylog.pop(0)
if not len(self.replaylog) % self.REPLAYFLUSH:
self.flush()
except requests.exceptions.RequestException as e:
if __debug__: print_exc()
logger.debug('Failed sending', exc_info=e)
status['text'] = _("Error: Can't connect to EDDN")
return # stop sending
return # stop sending
except Exception as e:
if __debug__: print_exc()
logger.debug('Failed sending', exc_info=e)
status['text'] = str(e)
return # stop sending
return # stop sending
self.parent.after(self.REPLAYPERIOD, self.sendreplay)
def export_commodities(self, data, is_beta):
commodities = []
def export_commodities(self, data: Mapping[str, Any], is_beta: bool) -> None:
"""
export_commodities updates EDDN with the commodities on the current (lastStarport) station.
Once the send is complete, this.commodities is updated with the new data.
:param data: a dict containing the starport data
:param is_beta: whether or not we're currently in beta mode
"""
commodities: List[OrderedDictT[str, Any]] = []
for commodity in data['lastStarport'].get('commodities') or []:
if (category_map.get(commodity['categoryname'], True) and # Check marketable
not commodity.get('legality')): # check not prohibited
# Check 'marketable' and 'not prohibited'
if (category_map.get(commodity['categoryname'], True)
and not commodity.get('legality')):
commodities.append(OrderedDict([
('name', commodity['name'].lower()),
('meanPrice', int(commodity['meanPrice'])),
@ -169,41 +227,68 @@ class EDDN(object):
('demand', int(commodity['demand'])),
('demandBracket', commodity['demandBracket']),
]))
if commodity['statusFlags']:
commodities[-1]['statusFlags'] = commodity['statusFlags']
commodities.sort(key = lambda c: c['name'])
if commodities and this.commodities != commodities: # Don't send empty commodities list - schema won't allow it
message = OrderedDict([
commodities.sort(key=lambda c: c['name'])
if commodities and this.commodities != commodities: # Don't send empty commodities list - schema won't allow it
message: OrderedDictT[str, Any] = OrderedDict([
('timestamp', data['timestamp']),
('systemName', data['lastSystem']['name']),
('stationName', data['lastStarport']['name']),
('marketId', data['lastStarport']['id']),
('commodities', commodities),
])
if 'economies' in data['lastStarport']:
message['economies'] = sorted(list([x for x in (data['lastStarport']['economies'] or {}).values()]), key=lambda x: x['name'])
message['economies'] = sorted(
(x for x in (data['lastStarport']['economies'] or {}).values()), key=lambda x: x['name']
)
if 'prohibited' in data['lastStarport']:
message['prohibited'] = sorted(list([x for x in (data['lastStarport']['prohibited'] or {}).values()]))
message['prohibited'] = sorted(x for x in (data['lastStarport']['prohibited'] or {}).values())
self.send(data['commander']['name'], {
'$schemaRef' : 'https://eddn.edcd.io/schemas/commodity/3' + (is_beta and '/test' or ''),
'message' : message,
'$schemaRef': f'https://eddn.edcd.io/schemas/commodity/3{"/test" if is_beta else ""}',
'message': message,
})
this.commodities = commodities
def export_outfitting(self, data, is_beta):
economies = data['lastStarport'].get('economies') or {}
modules = data['lastStarport'].get('modules') or {}
ships = data['lastStarport'].get('ships') or { 'shipyard_list': {}, 'unavailable_list': [] }
# Horizons flag - will hit at least Int_PlanetApproachSuite other than at engineer bases ("Colony"), prison or rescue Megaships, or under Pirate Attack etc
horizons = (any(economy['name'] == 'Colony' for economy in economies.values()) or
any(module.get('sku') == 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' for module in modules.values()) or
any(ship.get('sku') == 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' for ship in list((ships['shipyard_list'] or {}).values())))
outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['name'].lower()) for module in modules.values() if self.MODULE_RE.search(module['name']) and module.get('sku') in [None, 'ELITE_HORIZONS_V_PLANETARY_LANDINGS'] and module['name'] != 'Int_PlanetApproachSuite'])
if outfitting and this.outfitting != (horizons, outfitting): # Don't send empty modules list - schema won't allow it
def export_outfitting(self, data: Mapping[str, Any], is_beta: bool) -> None:
"""
export_outfitting updates EDDN with the current (lastStarport) station's outfitting options, if any.
Once the send is complete, this.outfitting is updated with the given data.
:param data: dict containing the outfitting data
:param is_beta: whether or not we're currently in beta mode
"""
modules: Dict[str, Any] = data['lastStarport'].get('modules') or {}
# Horizons flag - will hit at least Int_PlanetApproachSuite other than at engineer bases ("Colony"),
# prison or rescue Megaships, or under Pirate Attack etc
horizons: bool = is_horizons(
data['lastStarport'].get('economies', {}),
modules,
data['lastStarport'].get('ships', {'shipyard_list': {}, 'unavailable_list': []})
)
to_search: Iterator[Mapping[str, Any]] = filter(
lambda m: self.MODULE_RE.search(m['name']) and m.get('sku') in (None, HORIZ_SKU) and
m['name'] != 'Int_PlanetApproachSuite',
modules.values()
)
outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search
)
# Don't send empty modules list - schema won't allow it
if outfitting and this.outfitting != (horizons, outfitting):
self.send(data['commander']['name'], {
'$schemaRef' : 'https://eddn.edcd.io/schemas/outfitting/2' + (is_beta and '/test' or ''),
'message' : OrderedDict([
'$schemaRef': f'https://eddn.edcd.io/schemas/outfitting/2{"/test" if is_beta else ""}',
'message': OrderedDict([
('timestamp', data['timestamp']),
('systemName', data['lastSystem']['name']),
('stationName', data['lastStarport']['name']),
@ -212,20 +297,35 @@ class EDDN(object):
('modules', outfitting),
]),
})
this.outfitting = (horizons, outfitting)
def export_shipyard(self, data, is_beta):
economies = data['lastStarport'].get('economies') or {}
modules = data['lastStarport'].get('modules') or {}
ships = data['lastStarport'].get('ships') or { 'shipyard_list': {}, 'unavailable_list': [] }
horizons = (any(economy['name'] == 'Colony' for economy in economies.values()) or
any(module.get('sku') == 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' for module in modules.values()) or
any(ship.get('sku') == 'ELITE_HORIZONS_V_PLANETARY_LANDINGS' for ship in list((ships['shipyard_list'] or {}).values())))
shipyard = sorted([ship['name'].lower() for ship in list((ships['shipyard_list'] or {}).values()) + ships['unavailable_list']])
if shipyard and this.shipyard != (horizons, shipyard): # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
def export_shipyard(self, data: Dict[str, Any], is_beta: bool) -> None:
"""
export_shipyard updates EDDN with the current (lastStarport) station's outfitting options, if any.
once the send is complete, this.shipyard is updated to the new data.
:param data: dict containing the shipyard data
:param is_beta: whether or not we are in beta mode
"""
ships: Dict[str, Any] = data['lastStarport'].get('ships', {'shipyard_list': {}, 'unavailable_list': []})
horizons: bool = is_horizons(
data['lastStarport'].get('economies', {}),
data['lastStarport'].get('modules', {}),
ships
)
shipyard: List[Mapping[str, Any]] = sorted(
itertools.chain(
(ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()),
ships['unavailable_list']
)
)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
if shipyard and this.shipyard != (horizons, shipyard):
self.send(data['commander']['name'], {
'$schemaRef' : 'https://eddn.edcd.io/schemas/shipyard/2' + (is_beta and '/test' or ''),
'message' : OrderedDict([
'$schemaRef': f'https://eddn.edcd.io/schemas/shipyard/2{"/test" if is_beta else ""}',
'message': OrderedDict([
('timestamp', data['timestamp']),
('systemName', data['lastSystem']['name']),
('stationName', data['lastStarport']['name']),
@ -234,11 +334,20 @@ class EDDN(object):
('ships', shipyard),
]),
})
this.shipyard = (horizons, shipyard)
def export_journal_commodities(self, cmdr, is_beta, entry):
items = entry.get('Items') or []
commodities = sorted([OrderedDict([
def export_journal_commodities(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
"""
export_journal_commodities updates EDDN with the commodities list on the current station (lastStarport) from
data in the journal. As a side effect, it also updates this.commodities with the data
:param cmdr: The commander to send data under
:param is_beta: whether or not we're in beta mode
:param entry: the journal entry containing the commodities data
"""
items: List[Mapping[str, Any]] = entry.get('Items') or []
commodities: Sequence[OrderedDictT[AnyStr, Any]] = sorted((OrderedDict([
('name', self.canonicalise(commodity['Name'])),
('meanPrice', commodity['MeanPrice']),
('buyPrice', commodity['BuyPrice']),
@ -247,12 +356,12 @@ class EDDN(object):
('sellPrice', commodity['SellPrice']),
('demand', commodity['Demand']),
('demandBracket', commodity['DemandBracket']),
]) for commodity in items], key = lambda c: c['name'])
]) for commodity in items), key=lambda c: c['name'])
if commodities and this.commodities != commodities: # Don't send empty commodities list - schema won't allow it
if commodities and this.commodities != commodities: # Don't send empty commodities list - schema won't allow it
self.send(cmdr, {
'$schemaRef' : 'https://eddn.edcd.io/schemas/commodity/3' + (is_beta and '/test' or ''),
'message' : OrderedDict([
'$schemaRef': f'https://eddn.edcd.io/schemas/commodity/3{"/test" if is_beta else ""}',
'message': OrderedDict([
('timestamp', entry['timestamp']),
('systemName', entry['StarSystem']),
('stationName', entry['StationName']),
@ -260,16 +369,31 @@ class EDDN(object):
('commodities', commodities),
]),
})
this.commodities = commodities
def export_journal_outfitting(self, cmdr, is_beta, entry):
modules = entry.get('Items') or []
horizons = entry.get('Horizons', False)
outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) for module in modules if module['Name'] != 'int_planetapproachsuite'])
if outfitting and this.outfitting != (horizons, outfitting): # Don't send empty modules list - schema won't allow it
this.commodities: OrderedDictT[str, Any] = commodities
def export_journal_outfitting(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
"""
export_journal_outfitting updates EDDN with station outfitting based on a journal entry. As a side effect,
it also updates this.outfitting with the data
:param cmdr: The commander to send data under
:param is_beta: Whether or not we're in beta mode
:param entry: The relevant journal entry
"""
modules: List[Mapping[str, Any]] = entry.get('Items', [])
horizons: bool = entry.get('Horizons', False)
# outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name'])
# for module in modules if module['Name'] != 'int_planetapproachsuite'])
outfitting: List[str] = sorted(
self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in
filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules)
)
# Don't send empty modules list - schema won't allow it
if outfitting and this.outfitting != (horizons, outfitting):
self.send(cmdr, {
'$schemaRef' : 'https://eddn.edcd.io/schemas/outfitting/2' + (is_beta and '/test' or ''),
'message' : OrderedDict([
'$schemaRef': f'https://eddn.edcd.io/schemas/outfitting/2{"/test" if is_beta else ""}',
'message': OrderedDict([
('timestamp', entry['timestamp']),
('systemName', entry['StarSystem']),
('stationName', entry['StationName']),
@ -278,16 +402,26 @@ class EDDN(object):
('modules', outfitting),
]),
})
this.outfitting = (horizons, outfitting)
def export_journal_shipyard(self, cmdr, is_beta, entry):
ships = entry.get('PriceList') or []
horizons = entry.get('Horizons', False)
shipyard = sorted([ship['ShipType'] for ship in ships])
if shipyard and this.shipyard != (horizons, shipyard): # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
def export_journal_shipyard(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
"""
export_journal_shipyard updates EDDN with station shipyard data based on a journal entry. As a side effect,
this.shipyard is updated with the data.
:param cmdr: the commander to send this update under
:param is_beta: Whether or not we're in beta mode
:param entry: the relevant journal entry
"""
ships: List[Mapping[str, Any]] = entry.get('PriceList') or []
horizons: bool = entry.get('Horizons', False)
shipyard = sorted(ship['ShipType'] for ship in ships)
# Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard.
if shipyard and this.shipyard != (horizons, shipyard):
self.send(cmdr, {
'$schemaRef' : 'https://eddn.edcd.io/schemas/shipyard/2' + (is_beta and '/test' or ''),
'message' : OrderedDict([
'$schemaRef': f'https://eddn.edcd.io/schemas/shipyard/2{"/test" if is_beta else ""}',
'message': OrderedDict([
('timestamp', entry['timestamp']),
('systemName', entry['StarSystem']),
('stationName', entry['StationName']),
@ -296,130 +430,206 @@ class EDDN(object):
('ships', shipyard),
]),
})
this.shipyard = (horizons, shipyard)
def export_journal_entry(self, cmdr, is_beta, entry):
def export_journal_entry(self, cmdr: str, is_beta: bool, entry: Mapping[str, Any]) -> None:
"""
export_journal_entry updates EDDN with a line from the journal. Additionally if additional lines are cached,
it may send those as well.
:param cmdr: the commander under which this upload is made
:param is_beta: whether or not we are in beta mode
:param entry: the journal entry to send
"""
msg = {
'$schemaRef' : 'https://eddn.edcd.io/schemas/journal/1' + (is_beta and '/test' or ''),
'message' : entry
'$schemaRef': f'https://eddn.edcd.io/schemas/journal/1{"/test" if is_beta else ""}',
'message': entry
}
if self.replayfile or self.load():
if self.replayfile or self.load_journal_replay():
# Store the entry
self.replaylog.append(json.dumps([cmdr, msg]))
self.replayfile.write('%s\n' % self.replaylog[-1])
self.replayfile.write(f'{self.replaylog[-1]}\n')
if (
entry['event'] == 'Docked' or (entry['event'] == 'Location' and entry['Docked']) or not
(config.getint('output') & config.OUT_SYS_DELAY)
):
self.parent.after(self.REPLAYPERIOD, self.sendreplay) # Try to send this and previous entries
if (entry['event'] == 'Docked' or
(entry['event'] == 'Location' and entry['Docked']) or
not (config.getint('output') & config.OUT_SYS_DELAY)):
self.parent.after(self.REPLAYPERIOD, self.sendreplay) # Try to send this and previous entries
else:
# Can't access replay file! Send immediately.
status = self.parent.children['status']
status: MutableMapping[str, str] = self.parent.children['status']
status['text'] = _('Sending data to EDDN...')
self.parent.update_idletasks()
self.send(cmdr, msg)
status['text'] = ''
def canonicalise(self, item):
def canonicalise(self, item: str) -> str:
match = self.CANONICALISE_RE.match(item)
return match and match.group(1) or item
# Plugin callbacks
def plugin_start3(plugin_dir):
def plugin_start3(plugin_dir: str) -> str:
return 'EDDN'
def plugin_app(parent):
def plugin_app(parent: tk.Tk) -> None:
this.parent = parent
this.eddn = EDDN(parent)
# Try to obtain exclusive lock on journal cache, even if we don't need it yet
if not this.eddn.load():
this.status['text'] = 'Error: Is another copy of this app already running?' # Shouldn't happen - don't bother localizing
if not this.eddn.load_journal_replay():
# Shouldn't happen - don't bother localizing
this.status['text'] = 'Error: Is another copy of this app already running?'
def plugin_prefs(parent, cmdr, is_beta):
PADX = 10
BUTTONX = 12 # indent Checkbuttons and Radiobuttons
PADY = 2 # close spacing
def plugin_prefs(parent, cmdr: str, is_beta: bool) -> Frame:
PADX = 10 # noqa: N806
BUTTONX = 12 # noqa: N806 # indent Checkbuttons and Radiobuttons
if prefsVersion.shouldSetDefaults('0.0.0.0', not bool(config.getint('output'))):
output = (config.OUT_MKT_EDDN | config.OUT_SYS_EDDN) # default settings
output: int = (config.OUT_MKT_EDDN | config.OUT_SYS_EDDN) # default settings
else:
output = config.getint('output')
output: int = config.getint('output')
eddnframe = nb.Frame(parent)
HyperlinkLabel(eddnframe, text='Elite Dangerous Data Network', background=nb.Label().cget('background'), url='https://github.com/EDSM-NET/EDDN/wiki', underline=True).grid(padx=PADX, sticky=tk.W) # Don't translate
this.eddn_station= tk.IntVar(value = (output & config.OUT_MKT_EDDN) and 1)
this.eddn_station_button = nb.Checkbutton(eddnframe, text=_('Send station data to the Elite Dangerous Data Network'), variable=this.eddn_station, command=prefsvarchanged) # Output setting
this.eddn_station_button.grid(padx=BUTTONX, pady=(5,0), sticky=tk.W)
this.eddn_system = tk.IntVar(value = (output & config.OUT_SYS_EDDN) and 1)
this.eddn_system_button = nb.Checkbutton(eddnframe, text=_('Send system and scan data to the Elite Dangerous Data Network'), variable=this.eddn_system, command=prefsvarchanged) # Output setting new in E:D 2.2
this.eddn_system_button.grid(padx=BUTTONX, pady=(5,0), sticky=tk.W)
this.eddn_delay= tk.IntVar(value = (output & config.OUT_SYS_DELAY) and 1)
this.eddn_delay_button = nb.Checkbutton(eddnframe, text=_('Delay sending until docked'), variable=this.eddn_delay) # Output setting under 'Send system and scan data to the Elite Dangerous Data Network' new in E:D 2.2
HyperlinkLabel(
eddnframe,
text='Elite Dangerous Data Network',
background=nb.Label().cget('background'),
url='https://github.com/EDSM-NET/EDDN/wiki',
underline=True
).grid(padx=PADX, sticky=tk.W) # Don't translate
this.eddn_station = tk.IntVar(value=(output & config.OUT_MKT_EDDN) and 1)
this.eddn_station_button = nb.Checkbutton(
eddnframe,
text=_('Send station data to the Elite Dangerous Data Network'),
variable=this.eddn_station,
command=prefsvarchanged
) # Output setting
this.eddn_station_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W)
this.eddn_system = tk.IntVar(value=(output & config.OUT_SYS_EDDN) and 1)
# Output setting new in E:D 2.2
this.eddn_system_button = nb.Checkbutton(
eddnframe,
text=_('Send system and scan data to the Elite Dangerous Data Network'),
variable=this.eddn_system,
command=prefsvarchanged
)
this.eddn_system_button.grid(padx=BUTTONX, pady=(5, 0), sticky=tk.W)
this.eddn_delay = tk.IntVar(value=(output & config.OUT_SYS_DELAY) and 1)
# Output setting under 'Send system and scan data to the Elite Dangerous Data Network' new in E:D 2.2
this.eddn_delay_button = nb.Checkbutton(
eddnframe,
text=_('Delay sending until docked'),
variable=this.eddn_delay
)
this.eddn_delay_button.grid(padx=BUTTONX, sticky=tk.W)
return eddnframe
def prefsvarchanged(event=None):
def prefsvarchanged(event=None) -> None:
this.eddn_station_button['state'] = tk.NORMAL
this.eddn_system_button['state']= tk.NORMAL
this.eddn_system_button['state'] = tk.NORMAL
this.eddn_delay_button['state'] = this.eddn.replayfile and this.eddn_system.get() and tk.NORMAL or tk.DISABLED
def prefs_changed(cmdr, is_beta):
config.set('output',
(config.getint('output') & (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP |config.OUT_MKT_MANUAL)) +
(this.eddn_station.get() and config.OUT_MKT_EDDN) +
(this.eddn_system.get() and config.OUT_SYS_EDDN) +
(this.eddn_delay.get() and config.OUT_SYS_DELAY))
def plugin_stop():
def prefs_changed(cmdr: str, is_beta: bool) -> None:
config.set(
'output',
(config.getint('output') & (config.OUT_MKT_TD | config.OUT_MKT_CSV | config.OUT_SHIP | config.OUT_MKT_MANUAL)) +
(this.eddn_station.get() and config.OUT_MKT_EDDN) +
(this.eddn_system.get() and config.OUT_SYS_EDDN) +
(this.eddn_delay.get() and config.OUT_SYS_DELAY)
)
def plugin_stop() -> None:
this.eddn.close()
def journal_entry(cmdr, is_beta, system, station, entry, state):
def journal_entry(
cmdr: str, is_beta: bool, system: str, station: str, entry: MutableMapping[str, Any], state: Mapping[str, Any]
) -> Optional[str]:
# Recursively filter '*_Localised' keys from dict
def filter_localised(d):
filtered = OrderedDict()
def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]:
filtered: OrderedDictT[str, Any] = OrderedDict()
for k, v in d.items():
if k.endswith('_Localised'):
pass
elif hasattr(v, 'items'): # dict -> recurse
elif hasattr(v, 'items'): # dict -> recurse
filtered[k] = filter_localised(v)
elif isinstance(v, list): # list of dicts -> recurse
elif isinstance(v, list): # list of dicts -> recurse
filtered[k] = [filter_localised(x) if hasattr(x, 'items') else x for x in v]
else:
filtered[k] = v
return filtered
# Track location
if entry['event'] in ['Location', 'FSDJump', 'Docked', 'CarrierJump']:
if entry['event'] in ('Location', 'FSDJump', 'Docked', 'CarrierJump'):
if entry['event'] in ('Location', 'CarrierJump'):
this.planet = entry.get('Body') if entry.get('BodyType') == 'Planet' else None
this.planet: Optional[str] = entry.get('Body') if entry.get('BodyType') == 'Planet' else None
elif entry['event'] == 'FSDJump':
this.planet = None
this.planet: Optional[str] = None
if 'StarPos' in entry:
this.coordinates = tuple(entry['StarPos'])
this.coordinates: Optional[Tuple[int, int, int]] = tuple(entry['StarPos'])
elif this.systemaddress != entry.get('SystemAddress'):
this.coordinates = None # Docked event doesn't include coordinates
this.systemaddress = entry.get('SystemAddress')
this.coordinates: Optional[Tuple[int, int, int]] = None # Docked event doesn't include coordinates
this.systemaddress: Optional[str] = entry.get('SystemAddress')
elif entry['event'] == 'ApproachBody':
this.planet = entry['Body']
elif entry['event'] in ['LeaveBody', 'SupercruiseEntry']:
elif entry['event'] in ('LeaveBody', 'SupercruiseEntry'):
this.planet = None
# Send interesting events to EDDN, but not when on a crew
if (config.getint('output') & config.OUT_SYS_EDDN and not state['Captain'] and
(entry['event'] in ('Location', 'FSDJump', 'Docked', 'Scan', 'SAASignalsFound', 'CarrierJump')) and
('StarPos' in entry or this.coordinates)):
('StarPos' in entry or this.coordinates)):
# strip out properties disallowed by the schema
for thing in ['ActiveFine', 'CockpitBreach', 'BoostUsed', 'FuelLevel', 'FuelUsed', 'JumpDist', 'Latitude', 'Longitude', 'Wanted']:
for thing in (
'ActiveFine',
'CockpitBreach',
'BoostUsed',
'FuelLevel',
'FuelUsed',
'JumpDist',
'Latitude',
'Longitude',
'Wanted'
):
entry.pop(thing, None)
if 'Factions' in entry:
# Filter faction state. `entry` is a shallow copy so replace 'Factions' value rather than modify in-place.
entry['Factions'] = [ {k: v for k, v in f.items() if k not in ['HappiestSystem', 'HomeSystem', 'MyReputation', 'SquadronFaction']} for f in entry['Factions']]
# Filter faction state to comply with schema restrictions regarding personal data. `entry` is a shallow copy
# so replace 'Factions' value rather than modify in-place.
entry['Factions'] = [
{
k: v for k, v in f.items() if k not in (
'HappiestSystem', 'HomeSystem', 'MyReputation', 'SquadronFaction'
)
}
for f in entry['Factions']
]
# add planet to Docked event for planetary stations if known
if entry['event'] == 'Docked' and this.planet:
@ -429,50 +639,66 @@ def journal_entry(cmdr, is_beta, system, station, entry, state):
# add mandatory StarSystem, StarPos and SystemAddress properties to Scan events
if 'StarSystem' not in entry:
if not system:
return("system is None, can't add StarSystem")
logger.warn("system is None, can't add StarSystem")
return "system is None, can't add StarSystem"
entry['StarSystem'] = system
if 'StarPos' not in entry:
if not this.coordinates:
return("this.coordinates is None, can't add StarPos")
logger.warn("this.coordinates is None, can't add StarPos")
return "this.coordinates is None, can't add StarPos"
entry['StarPos'] = list(this.coordinates)
if 'SystemAddress' not in entry:
if not this.systemaddress:
return("this.systemaddress is None, can't add SystemAddress")
logger.warn("this.systemaddress is None, can't add SystemAddress")
return "this.systemaddress is None, can't add SystemAddress"
entry['SystemAddress'] = this.systemaddress
try:
this.eddn.export_journal_entry(cmdr, is_beta, filter_localised(entry))
except requests.exceptions.RequestException as e:
if __debug__: print_exc()
logger.debug('Failed in export_journal_entry', exc_info=e)
return _("Error: Can't connect to EDDN")
except Exception as e:
if __debug__: print_exc()
logger.debug('Failed in export_journal_entry', exc_info=e)
return str(e)
elif (config.getint('output') & config.OUT_MKT_EDDN and not state['Captain'] and
entry['event'] in ['Market', 'Outfitting', 'Shipyard']):
entry['event'] in ('Market', 'Outfitting', 'Shipyard')):
try:
if this.marketId != entry['MarketID']:
this.commodities = this.outfitting = this.shipyard = None
this.marketId = entry['MarketID']
with open(join(config.get('journaldir') or config.default_journal_dir, '%s.json' % entry['event']), 'rb') as h:
entry = json.load(h)
path = pathlib.Path(str(config.get('journaldir') or config.default_journal_dir)) / f'{entry["event"]}.json'
with path.open('rb') as f:
entry = json.load(f)
if entry['event'] == 'Market':
this.eddn.export_journal_commodities(cmdr, is_beta, entry)
elif entry['event'] == 'Outfitting':
this.eddn.export_journal_outfitting(cmdr, is_beta, entry)
elif entry['event'] == 'Shipyard':
this.eddn.export_journal_shipyard(cmdr, is_beta, entry)
except requests.exceptions.RequestException as e:
if __debug__: print_exc()
logger.debug(f'Failed exporting {entry["event"]}', exc_info=e)
return _("Error: Can't connect to EDDN")
except Exception as e:
if __debug__: print_exc()
logger.debug(f'Failed exporting {entry["event"]}', exc_info=e)
return str(e)
def cmdr_data(data, is_beta):
def cmdr_data(data: Mapping[str, Any], is_beta: bool) -> str:
if data['commander'].get('docked') and config.getint('output') & config.OUT_MKT_EDDN:
try:
if this.marketId != data['lastStarport']['id']:
@ -484,6 +710,7 @@ def cmdr_data(data, is_beta):
if not old_status:
status['text'] = _('Sending data to EDDN...')
status.update_idletasks()
this.eddn.export_commodities(data, is_beta)
this.eddn.export_outfitting(data, is_beta)
this.eddn.export_shipyard(data, is_beta)
@ -492,9 +719,20 @@ def cmdr_data(data, is_beta):
status.update_idletasks()
except requests.RequestException as e:
if __debug__: print_exc()
logger.debug('Failed exporting data', exc_info=e)
return _("Error: Can't connect to EDDN")
except Exception as e:
if __debug__: print_exc()
logger.debug('Failed exporting data', exc_info=e)
return str(e)
MAP_STR_ANY = Mapping[str, Any]
def is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_STR_ANY) -> bool:
return (
any(economy['name'] == 'Colony' for economy in economies.values()) or
any(module.get('sku') == HORIZ_SKU for module in modules.values()) or
any(ship.get('sku') == HORIZ_SKU for ship in (ships['shipyard_list'] or {}).values())
)

View File

@ -16,16 +16,16 @@ import requests
import sys
from queue import Queue
from threading import Thread
import logging
import tkinter as tk
from ttkHyperlinkLabel import HyperlinkLabel
import myNotebook as nb
import myNotebook as nb # noqa: N813
from config import appname, applongname, appversion, config
import plug
if __debug__:
from traceback import print_exc
logger = logging.getLogger(appname)
EDSM_POLL = 0.1
_TIMEOUT = 20
@ -371,23 +371,28 @@ def worker():
r.raise_for_status()
reply = r.json()
(msgnum, msg) = reply['msgnum'], reply['msg']
# 1xx = OK, 2xx = fatal error, 3&4xx not generated at top-level, 5xx = error but events saved for later processing
# 1xx = OK
# 2xx = fatal error
# 3&4xx not generated at top-level
# 5xx = error but events saved for later processing
if msgnum // 100 == 2:
print('EDSM\t%s %s\t%s' % (msgnum, msg, json.dumps(pending, separators = (',', ': '))))
logger.warning(f'EDSM\t{msgnum} {msg}\t{json.dumps(pending, separators = (",", ": "))}')
plug.show_error(_('Error: EDSM {MSG}').format(MSG=msg))
else:
for e, r in zip(pending, reply['events']):
if not closing and e['event'] in ['StartUp', 'Location', 'FSDJump', 'CarrierJump']:
# Update main window's system status
this.lastlookup = r
this.system_link.event_generate('<<EDSMStatus>>', when="tail") # calls update_status in main thread
# calls update_status in main thread
this.system_link.event_generate('<<EDSMStatus>>', when="tail")
elif r['msgnum'] // 100 != 1:
print('EDSM\t%s %s\t%s' % (r['msgnum'], r['msg'], json.dumps(e, separators = (',', ': '))))
logger.warning(f'EDSM\t{r["msgnum"]} {r["msg"]}\t'
f'{json.dumps(e, separators = (",", ": "))}')
pending = []
break
except:
if __debug__: print_exc()
except Exception as e:
logger.debug('Sending API events', exc_info=e)
retrying += 1
else:
plug.show_error(_("Error: Can't connect to EDSM"))

File diff suppressed because it is too large Load Diff

6
pyproject.toml Normal file
View File

@ -0,0 +1,6 @@
[tool.autopep8]
max_line_length = 120
[tool.isort]
multi_line_output = 5
line_length = 119

View File

@ -6,6 +6,7 @@ flake8-comprehensions==3.2.3
flake8-pep3101==1.3.0
flake8-polyfill==1.0.2
flake8-json
flake8-isort==3.0.1
pep8-naming==0.11.1
# Code formatting tools

40
timeout_session.py Normal file
View File

@ -0,0 +1,40 @@
import requests
from requests.adapters import HTTPAdapter
REQUEST_TIMEOUT = 10 # reasonable timeout that all HTTP requests should use
class TimeoutAdapter(HTTPAdapter):
"""
TimeoutAdapter is an HTTP Adapter that enforces an overridable default timeout on HTTP requests.
"""
def __init__(self, timeout, *args, **kwargs):
self.default_timeout = timeout
if kwargs.get("timeout") is not None:
del kwargs["timeout"]
super().__init__(*args, **kwargs)
def send(self, *args, **kwargs):
if kwargs["timeout"] is None:
kwargs["timeout"] = self.default_timeout
return super().send(*args, **kwargs)
def new_session(timeout: int = REQUEST_TIMEOUT, session: requests.Session = None) -> requests.Session:
"""
new_session creates a new requests.Session and overrides the default HTTPAdapter with a TimeoutAdapter.
:param timeout: the timeout to set the TimeoutAdapter to, defaults to REQUEST_TIMEOUT
:param session: the Session object to attach the Adapter to, defaults to a new session
:return: The created Session
"""
if session is None:
session = requests.Session()
adapter = TimeoutAdapter(timeout)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session