1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-17 17:42:20 +03:00

Merge pull request #1191 from A-UNDERSCORE-D/fix/1187/EDDN-Extend-killswitch-functionality-to-fields-within-events

Extend killswitch functionality to fields within events
This commit is contained in:
Athanasius 2021-08-23 11:03:57 +01:00 committed by GitHub
commit c886290ff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 980 additions and 169 deletions

View File

@ -1,62 +1,179 @@
# Kill Switches
EDMarketConnector implements a Kill Switch system that allows us to disable features based on a version mask. Meaning that we can stop major bugs from affecting the services we support, at the expense of disabling that support.
EDMarketConnector implements a Kill Switch system that allows us to disable
features based on a version mask. Meaning that we can stop major bugs from
affecting the services we support, at the expense of disabling that support.
## Format
Killswitches are stored in a JSON file that is queried by EDMC on startup. The format is as follows:
Killswitches are stored in a JSON file that is queried by EDMC on startup.
The format is as follows:
| Key | Type | Description |
| --------------: | :------: | :------------------------------------------------------------ |
| `version` | `int` | the version of the Kill Switch JSON file, always 1 |
| `last_updated` | `string` | When last the kill switches were updated (for human use only) |
| `kill_switches` | `array` | The kill switches this file contains (expanded below) |
| Key | Type | Description |
| --------------: | :------: | :------------------------------------------------------------------------------------------- |
| `version` | `int` | the version of the Kill Switch JSON file, always 2, 1 exists and will be upgraded if needed. |
| `last_updated` | `string` | When last the kill switches were updated (for human use only) |
| `kill_switches` | `array` | The kill switches this file contains (expanded below) |
The `kill_switches` array contains kill switch objects. Each contains two fields:
The `kill_switches` array contains kill switch objects. Each contains the
following fields:
| Key | Type | Description |
| --------: | :---------------------------: | :--------------------------------------------------------------------------- |
| `version` | `version spec (see Versions)` | The version of EDMC these kill switches apply to (Must be valid semver spec) |
| `kills` | `Dict[str, Dict[...]]` | The various keys disabled -> definition of the killswitch behaviour |
Each entry in `kills` must contain at least a `reason` field describing why the
killswitch was added. EDMC will show this to the user
(for internal killswitches, anyway).
| Key (* = required) | Type | Description |
| -----------------: | :--------------: | :-------------------------------------------------------------------------------------------- |
| `reason`* | `str` | The reason that this killswitch was added |
| `set_fields` | `Dict[str, Any]` | A map of key -> contents to update (or overwrite) existing data with |
| `redact_fields` | `List[str]` | A list of traversal paths to redact. This is the same as using set with a value of "REDACTED" |
| `delete_fields` | `List[str]` | A list of traversal paths in the matching event to be removed, if they exist. |
The order listed above is the precedence for actions. i.e. All set fields are
set, then all redact fields are redacted then all delete fields are deleted.
| Key | Type | Description |
| --------: | :--------------: | :--------------------------------------------------------------------------- |
| `version` | `version spec` | The version of EDMC these kill switches apply to (Must be valid semver spec) |
| `kills` | `Dict[str, str]` | The different keys to disable, and the reason for the disable |
An example follows:
```json
{
"version": 1,
"last_updated": "19 October 2020",
"version": 2,
"last_updated": "3 July 2021",
"kill_switches": [
{
"version": "1.0.0",
"kills": {
"plugins.eddn.send": "some reason"
"plugins.eddn.send": {
"reason": "some reason",
"delete_fields": ["world_domination_plans"],
"set_fields": {
"bad_bases_for_systems_of_government": ["Strange women lying in ponds distributing swords"],
"ruler_map": {"emperor": "scimitar"}
},
"redact_fields": ["relation_to_thargoids"]
},
"plugins.some_plugin.some_thing": {
"reason": "Some thing is disabled pending investigation of NMLA relations."
}
}
}
]
}
```
- `plugins.edsm.send` will have fields deleted, set, and redacted, and then
will *not* be halted, the send will continue with the modified data.
- `plugins.some_plugin.some_thing` will never be allowed to continue
(as all fields are blank)
Indexing of lists (and any other `Sequence`) is done with numbers as normal.
Negative numbers do work to reference the end of sequences.
### Traversal paths
`set_fields`, `delete_fields`, and `redact_fields` all accept a single traversal
path to target particular fields.
When following paths, there are some caveats one should know.
- If a field (for example `a.b`) exists in the data dict
(ie, `{'a.b': True}`) it will be the thing accessed by a killswitch that
includes `a.b` at the current "level". This means that if both a key `a.b`
and a nested key exist with the same name (`{'a.b': 0, 'a': { 'b': 1 }}`),
the former is always referenced. No backtracking takes place to attempt to
resolve conflicts, even if the dotted field name is not at the end of a key.
- Traversal can and will error, especially if you attempt to pass though keys
that do not exist (eg past the end of a `Sequence` or a key that does not
exist in a `Mapping`).
- If any exception occurs during your traversal, **the entire killswitch is**
**assumed to be faulty, and will be treated as a permanent killswitch**
### Individual action rules
All actions (as noted above) can fail during traversal to their targets. If this
happens, the entire killswitch is assumed to be a permanent one and execution
stops.
#### Deletes
For both `MutableMapping` and `MutableSequence` targets, deletes never fail.
If a key doesn't exist it simply is a no-op.
#### Sets
Sets always succeed for `MutableMapping` objects, either creating new keys
or updating existing ones. For `MutableSequence`s, however, a set can fail if
the index is not within `len(target)`. If the index is exactly `len(target)`,
`append` is used.
You can set values other than strings, but you are limited to what json itself
supports, and the translation thereof to python. In general this means that only
JSON primitives and their python equivalents
(`int`, `float`, `string`, `bool`, and `None` (json `null`)), and
json compound types (`object -- {}` and `array -- []`) may be set.
### Testing
Killswitch files can be tested using the script in `scripts/killswitch_test.py`.
Providing a file as an argument or `-` for stdin will output the behaviour of
the provided file, including indicating typos, if applicable.
### Versions
Versions are checked using contains checks on `semantic_version.SimpleSpec` instances.
Versions are checked using contains checks on `semantic_version.SimpleSpec`
instances. SimpleSpec supports both specific versions (`1.2.3`), non-specific
ranges (`1.0` will match `1.0.1` and `1.0.5` etc), wildcards (`1.2.*`),
and ranges (`<1.0.0`, `>=2.0.0`)
## Plugin support
Plugins may use the killswitch system simply by hosting their own version of the killswitch file, and fetching it
using `killswitch.get_kill_switches(target='https://example.com/myplugin_killswitches.json')`. The returned object can
be used to query the kill switch set, see the docstrings for more information on specifying versions.
Plugins may use the killswitch system simply by hosting their own version of the
killswitch file, and fetching it using
`killswitch.get_kill_switches(target='https://example.com/myplugin_killswitches.json')`.
The returned object can be used to query the kill switch set, see the docstrings
for more information on specifying versions.
A helper method `killswitch.get_kill_switch_thread` is provided to allow for
simple nonblocking requests for KillSwitches. It starts a new thread, performs
the HTTP request, and sends the results to the given callback.
**Note that your callback is invoked off-thread. Take precaution for locking**
**if required, and do _NOT_ use tkinter methods**
The version of the JSON file will be automatically upgraded if possible by the
code KillSwitch code. No behaviour changes will occur, any killswitches defined
in older versions will simply become unconditional kills in the new version.
## Currently supported killswitch strings
The current recognised (to EDMC and its internal plugins) killswitch strings are as follows:
| Kill Switch | Description |
| :--------------------------------------------------- | :------------------------------------------------------------------------------------------- |
| `plugins.eddn.send` | Disables all use of the send method on EDDN (effectively disables EDDN updates) |
| `plugins.(eddn\|inara\|edsm\|eddb).journal` | Disables all journal processing for EDDN/EDSM/INARA |
| `plugins.(edsm\|inara).worker` | Disables the EDSM/INARA worker thread (effectively disables updates) (does not close thread) |
| `plugins.(eddn\|inara\|edsm).journal.event.$eventname`| Specific events to disable processing for |
The current recognised (to EDMC and its internal plugins) killswitch strings are
as follows:
| Kill Switch | Supported Plugins | Description |
| :------------------------------------------- | :---------------------: | :---------------------------------------------------------------------------------------- |
| `plugins.eddn.send` | eddn | Disables all use of the send method on EDDN (effectively disables EDDN updates) |
| `plugins.<plugin>.journal` | eddn, inara, edsm, eddb | Disables all journal processing for the plugin |
| `plugins.<plugin>.worker` | edsm, *inara | Disables the plugins worker thread (effectively disables updates) (does not close thread) |
| `plugins.<plugin>.worker.<eventname>` | edsm, inara | Disables the plugin worker for the given eventname |
| `plugins.<plugin>.journal.event.<eventname>` | eddn, inara, edsm | Specific events to disable processing for. |
Killswitches marked with `*` do **not** support modification of their values via
set/redact/delete. And as such any match will simply stop processing.
For `plugin.inara.worker`, events are checked individually later by the
eventname version. Use that to modify individual inara events. This is due to
inara event batching meaning that the data that would be passed to `.worker`
would not be in a form that could be easily understood (except to blank it)
## File location
The main killswitch file is kept in the `releases` branch on the EDMC github repo. The file should NEVER be committed to
any other repos. In the case that the killswitch file is found in other repos, the one in releases should always
be taken as correct regardless of others.
The main killswitch file (`killswitches_v2.json`) is kept in the `releases`
branch on the EDMC github repo. The file should NEVER be committed to any other
repos. In the case that the killswitch file is found in other repos, the one in
releases should always be taken as correct regardless of others.

View File

@ -49,14 +49,14 @@ You will need several pieces of software installed, or the files from their
`requirements-dev.txt`.
1. You'll now need to 'pip install' several python modules.
1. Ensure you have `pip` installed. If needs be see
[Installing pip](https://pip.pypa.io/en/stable/installing/)
1. The easiest way is to utilise the `requirements-dev.txt` file:
`python -m pip install --user -r requirements-dev.txt`. This will install
all dependencies plus anything required for development.
1. Else check the contents of both `requirements.txt` and `requirements-dev.txt`,
and ensure the modules listed there are installed as per the version
requirements.
1. Ensure you have `pip` installed. If needs be see
[Installing pip](https://pip.pypa.io/en/stable/installing/)
1. The easiest way is to utilise the `requirements-dev.txt` file:
`python -m pip install --user -r requirements-dev.txt`. This will install
all dependencies plus anything required for development.
1. Else check the contents of both `requirements.txt` and `requirements-dev.txt`,
and ensure the modules listed there are installed as per the version
requirements.
If you are using different versions of any of these tools then please ensure
that the paths where they're installed match the associated lines in
@ -79,7 +79,7 @@ version strings.
a wider audience to test forthcoming changes.
1. `-rc<serial>`, i.e. `-rc1`. This is used when testing has shown this
code should be ready for full release, but you want even wider testing.
In both these cases simply increment `<serial>` for each new release. *Do*
start from `1` again when beginning `-rc` releases.
@ -88,6 +88,7 @@ version strings.
There are some things that you should always change before running your own
version of EDMC
1. The Frontier CAPI client ID. This is hardcoded in companion.py, but can be
overridden by setting a CLIENT_ID environment variable.
@ -109,29 +110,31 @@ that.
3. `appcmdname`: The CLI appname, e.g. 'EDMC'
4. `_static_appversion`: The current version, e.g. `4.0.2`. **You MUST
make this something like `4.0.2+<myversion>` to differentiate it from
upstream.** Whatever is in this field is what will be reported if
sending messages to EDDN, or any of the third-party website APIs.
This is utilising the 'build metadata' part of a Semantic version.
upstream.** Whatever is in this field is what will be reported if
sending messages to EDDN, or any of the third-party website APIs.
This is utilising the 'build metadata' part of a Semantic version.
5. `copyright`: The Copyright string.
6. `update_feed`: The URL where the application looks for current latest
version information. This URL should be hosting a renamed (so the full
URL doesn't change over application versions) version of the
appcast_win_<version>.xml file. The original upstream value is
`https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml`.
1. Location of release files. This needs to be cited correctly in the
version information. This URL should be hosting a renamed (so the full
URL doesn't change over application versions) version of the
appcast_win_<version>.xml file. The original upstream value is
`https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml`.
2. Location of release files. This needs to be cited correctly in the
`edmarketconnector.xml` file, which is what the application queries to
see if there is a newer version.
Look for the `url="...` line in the `<enclosure ...` that is like:
<enclosure
url="https://github.com/EDCD/EDMarketConnector/releases/download/Release/4.2.3/EDMarketConnector_win_4.2.3.msi"
sparkle:os="windows"
sparkle:installerArguments="/passive LAUNCH=yes"
sparkle:version="4.2.3"
length="11382784"
type="application/octet-stream"
/>
```xml
<enclosure
url="https://github.com/EDCD/EDMarketConnector/releases/download/Release/4.2.3/EDMarketConnector_win_4.2.3.msi"
sparkle:os="windows"
sparkle:installerArguments="/passive LAUNCH=yes"
sparkle:version="4.2.3"
length="11382784"
type="application/octet-stream"
/>
```
## Adding a new file
@ -150,28 +153,33 @@ You will *also* need to add the file to the `EDMarketConnector.wxs` file so
that it's actually included in the installer.
1. Location the the appropriate part of the:
```xml
<Directory Id="ProgramFilesFolder">
```
section and add a new sub-section:
```xml
<Component Id="<valid_component_id>" Guid=""*">
<File KeyPath="yes" Source="SourceDir\\<file name>" />
</Component>
```
Note that you only need `Id="<valid_component_id>"` if the filename itself
is not a valid Id, e.g. because it contains spaces.
If the new file is in a new sub-directory then you'll need to add that as
well. See the `L10n` example.
1. Now find the:
2. Now find the:
```xml
<Feature Id='Complete' Level='1'>
```
section and add an appropriate line to it. Remember to use either the
specific Id you set above or the filename (without directory) for this:
```xml
<ComponentRef Id="<valid_component_id>" />
```
@ -201,10 +209,10 @@ which branch (stable or beta) you'll be ultimately updating.
1. So as to make backing out any mistakes easier create a new branch for this
release, using a name like `release-4.0.2`. Do not use the tag
`Release/4.0.2` form, that could cause confusion.
1. `git checkout stable` # Or whichever other branch is appropriate.
1. `git pull origin` # Ensures local branch is up to date.
1. `git checkout -b release-4.0.2`
1. `git checkout stable` # Or whichever other branch is appropriate.
1. `git pull origin` # Ensures local branch is up to date.
1. `git checkout -b release-4.0.2`
1. Get all the relevant code changes into this branch. This might mean
merging from another branch, such as an issue-specific one, or possibly
cherry-picking commits. See [Contributing Guidelines](../Contributing.md)
@ -218,12 +226,12 @@ commit for this change.**
1. Prepare a changelog text for the release. You'll need this both for the
GitHub release and the contents of the `edmarketconnector.xml` file if making
a `stable` release, as well as any social media posts you make.
1. The primary location of the changelog is [Changelog.md](../Changelog.md) -
update this first.
1. To be sure you include all the changes look at the git log since the
prior appropriate (pre-)release.
1. As you're working in a version-specific branch, `release-4.0.2`, you
can safely commit these changes and push to GitHub.
1. The primary location of the changelog is [Changelog.md](../Changelog.md) -
update this first.
1. To be sure you include all the changes look at the git log since the
prior appropriate (pre-)release.
1. As you're working in a version-specific branch, `release-4.0.2`, you
can safely commit these changes and push to GitHub.
**Do not merge the branch with `releases` until the GitHub release is in place.**
If you're wondering, you needed to get the changelog prepared before building
@ -235,7 +243,10 @@ the .exe and .msi because ChangeLog.md is bundled with the install.
If anything in this new release addresses a bug that causes, e.g. bad data
to be sent over EDDN, then you should add an appropriate entry to the
killswitches.json file *in the `releases` branch*. That file **must only ever
be commited to the `releases` branch!!!** See [docs/Killswitches.md](Killswitches.md).
be committed to the `releases` branch!!!** See [docs/Killswitches.md](Killswitches.md).
Killswitch files can and should be verified using the `killswitch_test.py`
script in the `scripts` directory
# Packaging & Installer Generation
@ -244,36 +255,44 @@ a 'Git bash' window. The 'Terminal' tab of PyCharm works fine.
Assuming the correct python.exe is associated with .py files then simply run:
setup.py py2exe
```batch
setup.py py2exe
```
else you might need this, which assumes correct python.exe is in your PATH:
python.exe setup.py py2exe
```batch
python.exe setup.py py2exe
```
else you'll have to specify the path to python.exe, e.g.:
"C:\Program Files \(x86)\Python38-32\python.exe" setup.py py2exe
```batch
"C:\Program Files \(x86)\Python38-32\python.exe" setup.py py2exe
```
Output will be something like (`...` denoting parts elided for brevity):
running py2exe
...
Building 'dist.win32\EDMC.exe'.
Building 'dist.win32\EDMarketConnector.exe'.
Building shared code archive 'dist.win32\library.zip'.
...
Windows Installer XML Toolset Compiler version 3.11.1.2318
Copyright (c) .NET Foundation and contributors. All rights reserved.
...
Package language = 1033,1029,1031,1034,1035,1036,1038,1040,1041,1043,1045,1046,1049,1058,1062,2052,2070,2074,0, ProductLanguage = 1029, Database codepage = 0
MsiTran V 5.0
Copyright (c) Microsoft Corporation. All Rights Reserved
...
DonePackage language = 1033,1029,1031,1034,1035,1036,1038,1040,1041,1043,1045,1046,1049,1058,1062,2052,2070,2074,0, ProductLanguage = 0, Database codepage = 0
MsiTran V 5.0
Copyright (c) Microsoft Corporation. All Rights Reserved
```plaintext
running py2exe
...
Building 'dist.win32\EDMC.exe'.
Building 'dist.win32\EDMarketConnector.exe'.
Building shared code archive 'dist.win32\library.zip'.
...
Windows Installer XML Toolset Compiler version 3.11.1.2318
Copyright (c) .NET Foundation and contributors. All rights reserved.
...
Package language = 1033,1029,1031,1034,1035,1036,1038,1040,1041,1043,1045,1046,1049,1058,1062,2052,2070,2074,0, ProductLanguage = 1029, Database codepage = 0
MsiTran V 5.0
Copyright (c) Microsoft Corporation. All Rights Reserved
...
DonePackage language = 1033,1029,1031,1034,1035,1036,1038,1040,1041,1043,1045,1046,1049,1058,1062,2052,2070,2074,0, ProductLanguage = 0, Database codepage = 0
MsiTran V 5.0
Copyright (c) Microsoft Corporation. All Rights Reserved
Done
Done
```
**Do check the output** for things like not properly specifying extra files
to be included in the install. If they're not picked up by current rules in
@ -322,9 +341,9 @@ Once that is done then for manually built installers:
2. Create a matching `hashes.sum` file for your `.msi` file:
sha256sum EDMarketConnector_win*.msi > ./hashes.sum
and replace the one in the draft release with this.
But, **again, you should just be using the auto-build
mechanism**.
@ -353,7 +372,7 @@ the release-4.0.2 branch, as it's temporary.*
`[ ] This is a pre-release` box.** Not doing so will cause this release
to be pointed to by the 'latest' URL.
5. We always create a discussion for any new release, so tick the
`Create a discussion for this release ` box, and check it's targeted at
`Create a discussion for this release` box, and check it's targeted at
the `Announcement` category.
Once the release is created, then **only if making a `stable` release**
@ -364,8 +383,8 @@ changelog text to the correct section(s):
into `stable` above.
3. Use the following to generate HTML from the MarkDown (`pip
install grip` first if you need to):
`grip --export ChangeLog.md`
grip --export ChangeLog.md
4. Open `edmarketconnector.xml` in your editor.
5. If there's still a Mac OS section croll down past it to the Windows
section.
@ -379,7 +398,7 @@ changelog text to the correct section(s):
If, for instance, you fail to *update* this URL then upon running the 'new'
installer it will silently fail, because you made people try to install
the old version over the old version.
3. Yes, `sparkle:version` should be the Semantic Version string,
2. Yes, `sparkle:version` should be the Semantic Version string,
not the Windows A.B.C.D form.
8. `git push origin`
@ -387,7 +406,7 @@ changelog text to the correct section(s):
EDMC instances to pick up on 'Check for Updates'. The WinSparkle check for
updates specifically targets:
`https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml`
`https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml`
as per `config.py` `update_feed`.
@ -404,9 +423,9 @@ If you are making a pre-release then:
1. **DO NOT** Edit `edmarketconnector.xml` at all. No, not even if you
think you won't accidentally merge it into `releases`. Just don't change it
at all.
3. **DO NOT** merge into `releases`.
4. **DO NOT** merge into `stable`.
5. *Do* merge the code into `beta` after you have made a 'pre-release' on
1. **DO NOT** merge into `releases`.
1. **DO NOT** merge into `stable`.
1. *Do* merge the code into `beta` after you have made a 'pre-release' on
GitHub.
# Changing Python version
@ -419,11 +438,11 @@ When changing the Python version (Major.Minor.Patch) used:
1. `.github/workflows/windows-build.yml` needs updating to have the GitHub
based build use the correct version.
1. Major or Minor level changes:
1. `setup.py` will need its version check updating.
1. `EDMarketConnector.wxs` will need updating to reference the correct
2. `EDMarketConnector.wxs` will need updating to reference the correct
pythonXX.dll file.
1. `.pre-commit-config.yaml` will need the `default_language_version`
3. `.pre-commit-config.yaml` will need the `default_language_version`
section updated to the appropriate version.

View File

@ -1,54 +1,236 @@
"""Fetch kill switches from EDMC Repo."""
from typing import Dict, List, NamedTuple, Optional, Union, cast
from __future__ import annotations
import threading
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Sequence,
Tuple, TypedDict, TypeVar, Union, cast
)
import requests
import semantic_version
from semantic_version.base import Version
import config
import EDMCLogging
logger = EDMCLogging.get_main_logger()
DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json'
OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json'
DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json'
CURRENT_KILLSWITCH_VERSION = 2
UPDATABLE_DATA = Union[Mapping, Sequence]
_current_version: semantic_version.Version = config.appversion_nobuild()
T = TypeVar('T', bound=UPDATABLE_DATA)
class KillSwitch(NamedTuple):
class SingleKill(NamedTuple):
"""A single KillSwitch. Possibly with additional rules."""
match: str
reason: str
redact_fields: Optional[List[str]] = None
delete_fields: Optional[List[str]] = None
set_fields: Optional[Dict[str, Any]] = None
@property
def has_rules(self) -> bool:
"""Return whether or not this SingleKill can apply rules to a dict to make it safe to use."""
return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields))
def apply_rules(self, target: T) -> T:
"""
Apply the rules this SingleKill instance has to make some data okay to send.
Note that this MODIFIES DATA IN PLACE.
:param target: data to apply a rule to
:raises: Any and all exceptions _deep_apply and _apply can raise.
"""
for key, value in (self.set_fields if self .set_fields is not None else {}).items():
_deep_apply(target, key, value)
for key in (self.redact_fields if self.redact_fields is not None else []):
_deep_apply(target, key, "REDACTED")
for key in (self.delete_fields if self.delete_fields is not None else []):
_deep_apply(target, key, delete=True)
return target
def _apply(target: UPDATABLE_DATA, key: str, to_set: Any = None, delete: bool = False):
"""
Set or delete the given target key on the given target.
:param target: The thing to set data on
:param key: the key or index to set the data to
:param to_set: the data to set, if any, defaults to None
:param delete: whether or not to delete the key or index, defaults to False
:raises ValueError: when an unexpected target type is passed
:raises IndexError: when an invalid index is set or deleted
"""
if isinstance(target, MutableMapping):
if delete:
target.pop(key, None)
else:
target[key] = to_set
elif isinstance(target, MutableSequence):
idx = _get_int(key)
if idx is None:
raise ValueError(f'Cannot use string {key!r} as int for index into Sequence')
if delete and len(target) > 0:
length = len(target)
if idx in range(-length, length):
target.pop(idx)
elif len(target) == idx:
target.append(to_set)
else:
target[idx] = to_set # this can raise, that's fine
else:
raise ValueError(f'Dont know how to apply data to {type(target)} {target!r}')
def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): # noqa: CCR001 # Recursive silliness.
"""
Set the given path to the given value, if it exists.
if the path has dots (ascii period -- '.'), it will be successively split
if possible for deeper indices into target
:param target: the dict to modify
:param to_set: the data to set, defaults to None
:param delete: whether or not to delete the key rather than set it
:raises IndexError: when an invalid index is traversed into
:raises KeyError: when an invalid key is traversed into
"""
current = target
key: str = ""
while '.' in path:
if path in current:
# it exists on this level, dont go further
break
elif isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()):
# there is a dotted key in here that can be used for this
# if theres a dotted key in here (must be a mapping), use that if we can
keys = current.keys()
for k in filter(lambda x: '.' in x, keys):
if path.startswith(k):
key = k
path = path.removeprefix(k)
# we assume that the `.` here is for "accessing" the next key.
if path[0] == '.':
path = path[1:]
if len(path) == 0:
path = key
break
else:
key, _, path = path.partition('.')
if isinstance(current, Mapping):
current = current[key] # type: ignore # I really dont know at this point what you want from me mypy.
elif isinstance(current, Sequence):
target_idx = _get_int(key) # mypy is broken. doesn't like := here.
if target_idx is not None:
current = current[target_idx]
else:
raise ValueError(f'Cannot index sequence with non-int key {key!r}')
else:
raise ValueError(f'Dont know how to index a {type(current)} ({current!r})')
_apply(current, path, to_set, delete)
def _get_int(s: str) -> Optional[int]:
try:
return int(s)
except ValueError:
return None
class KillSwitches(NamedTuple):
"""One version's set of kill switches."""
version: semantic_version.SimpleSpec
kills: Dict[str, str]
kills: Dict[str, SingleKill]
@staticmethod
def from_dict(data: KillSwitchSetJSON) -> KillSwitches:
"""Create a KillSwitches instance from a dictionary."""
ks = {}
for match, ks_data in data['kills'].items():
ks[match] = SingleKill(
match=match,
reason=ks_data['reason'],
redact_fields=ks_data.get('redact_fields'),
set_fields=ks_data.get('set_fields'),
delete_fields=ks_data.get('delete_fields')
)
return KillSwitches(version=semantic_version.SimpleSpec(data['version']), kills=ks)
class DisabledResult(NamedTuple):
"""DisabledResult is the result returned from various is_disabled calls."""
disabled: bool
reason: str
kill: Optional[SingleKill]
@property
def reason(self) -> str:
"""Reason provided for why this killswitch exists."""
return self.kill.reason if self.kill is not None else ""
def has_kill(self) -> bool:
"""Return whether or not this DisabledResult has a Kill associated with it."""
return self.kill is not None
def has_rules(self) -> bool:
"""Return whether or not the kill on this Result contains rules."""
# HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always
# be non-None at the point it is evaluated
return self.has_kill() and self.kill.has_rules # type: ignore
class KillSwitchSet:
"""Queryable set of kill switches."""
def __init__(self, kill_switches: List[KillSwitch]) -> None:
def __init__(self, kill_switches: List[KillSwitches]) -> None:
self.kill_switches = kill_switches
def get_disabled(self, id: str, *, version=_current_version) -> DisabledResult:
def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult:
"""
Return whether or not the given feature ID is disabled by a killswitch for the given version.
:param id: The feature ID to check
:param version: The version to check killswitches for, defaults to the current EDMC version
:param version: The version to check killswitches for, defaults to the
current EDMC version
:return: a namedtuple indicating status and reason, if any
"""
if isinstance(version, str):
version = semantic_version.Version.coerce(version)
for ks in self.kill_switches:
if version not in ks.version:
continue
return DisabledResult(id in ks.kills, ks.kills.get(id, ""))
return DisabledResult(id in ks.kills, ks.kills.get(id, None))
return DisabledResult(False, "")
return DisabledResult(False, None)
def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Return whether or not a given feature ID is disabled for the given version."""
@ -58,7 +240,7 @@ class KillSwitchSet:
"""Return a reason for why the given id is disabled for the given version, if any."""
return self.get_disabled(id, version=version).reason
def kills_for_version(self, version: semantic_version.Version = _current_version) -> List[KillSwitch]:
def kills_for_version(self, version: semantic_version.Version = _current_version) -> List[KillSwitches]:
"""
Get all killswitch entries that apply to the given version.
@ -67,6 +249,58 @@ class KillSwitchSet:
"""
return [k for k in self.kill_switches if version in k.version]
def check_killswitch(
self, name: str, data: T, log=logger, version=_current_version
) -> Tuple[bool, T]:
"""
Check whether or not a killswitch is enabled. If it is, apply rules if any.
:param name: The killswitch to check
:param data: The data to modify if needed
:return: A two tuple consisting of: A bool indicating if the caller should return, and either the
original data or a *COPY* that has been modified by rules
"""
res = self.get_disabled(name, version=version)
if not res.disabled:
return False, data
log.info(f'Killswitch {name} is enabled. Checking if rules exist to make use safe')
if not res.has_rules():
logger.info('No rules exist. Stopping processing')
return True, data
if TYPE_CHECKING: # pyright, mypy, please -_-
assert res.kill is not None
try:
new_data = res.kill.apply_rules(deepcopy(data))
except Exception as e:
log.exception(f'Exception occurred while attempting to apply rules! bailing out! {e=}')
return True, data
log.info('Rules applied successfully, allowing execution to continue')
return False, new_data
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> Tuple[bool, T]:
"""
Check multiple killswitches in order.
Note that the names are applied in the order passed, and that the first true
return from check_killswitch causes this to return
:param data: the data to update
:param log: the logger to use, defaults to the standard EDMC main logger
:return: A two tuple of bool and updated data, where the bool is true when the caller _should_ halt processing
"""
for name in names:
should_return, data = self.check_killswitch(name=name, data=data, log=log, version=version)
if should_return:
return True, data
return False, data
def __str__(self) -> str:
"""Return a string representation of KillSwitchSet."""
return f'KillSwitchSet: {str(self.kill_switches)}'
@ -76,17 +310,28 @@ class KillSwitchSet:
return f'KillSwitchSet(kill_switches={self.kill_switches!r})'
KILL_SWITCH_JSON = List[Dict[str, Union[str, List[str]]]]
KILL_SWITCH_JSON_DICT = Dict[
str, Union[
str, # Last updated
int, # Version
KILL_SWITCH_JSON # kills
]
]
class BaseSingleKillSwitch(TypedDict): # noqa: D101
reason: str
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KILL_SWITCH_JSON_DICT]:
class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101
redact_fields: list[str] # set fields to "REDACTED"
delete_fields: list[str] # remove fields entirely
set_fields: dict[str, Any] # set fields to given data
class KillSwitchSetJSON(TypedDict): # noqa: D101
version: str
kills: Dict[str, SingleKillSwitchJSON]
class KillSwitchJSONFile(TypedDict): # noqa: D101
version: int
last_updated: str
kill_switches: List[KillSwitchSetJSON]
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchJSONFile]:
"""
Fetch the JSON representation of our kill switches.
@ -101,46 +346,81 @@ def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KILL_SWITCH_J
logger.warning(f"Failed to get kill switches, data was invalid: {e}")
return None
except (requests.exceptions.BaseHTTPError, requests.exceptions.ConnectionError) as e:
except (requests.exceptions.BaseHTTPError, requests.exceptions.ConnectionError) as e: # type: ignore
logger.warning(f"unable to connect to {target!r}: {e}")
return None
return data
def parse_kill_switches(data: KILL_SWITCH_JSON_DICT) -> List[KillSwitch]:
class _KillSwitchV1(TypedDict):
version: str
kills: Dict[str, str]
class _KillSwitchJSONFileV1(TypedDict):
version: int
last_updated: str
kill_switches: List[_KillSwitchV1]
def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
version = data['version']
if version == CURRENT_KILLSWITCH_VERSION:
return data
if version == 1:
logger.info('Got an old version killswitch file (v1) upgrading!')
to_return: KillSwitchJSONFile = deepcopy(data)
data_v1 = cast(_KillSwitchJSONFileV1, data)
to_return['kill_switches'] = [
cast(KillSwitchSetJSON, { # I need to cheat here a touch. It is this I promise
'version': d['version'],
'kills': {
match: {'reason': reason} for match, reason in d['kills'].items()
}
})
for d in data_v1['kill_switches']
]
to_return['version'] = CURRENT_KILLSWITCH_VERSION
return to_return
raise ValueError(f'Unknown Killswitch version {data["version"]}')
def parse_kill_switches(data: KillSwitchJSONFile) -> List[KillSwitches]:
"""
Parse kill switch dict to List of KillSwitches.
:param data: dict containing raw killswitch data
:return: a list of all provided killswitches
"""
data = _upgrade_kill_switch_dict(data)
last_updated = data['last_updated']
ks_version = data['version']
logger.info(f'Kill switches last updated {last_updated}')
if ks_version != 1:
logger.warning(f'Unknown killswitch version {ks_version}. Bailing out')
if ks_version != CURRENT_KILLSWITCH_VERSION:
logger.warning(f'Unknown killswitch version {ks_version} (expected {CURRENT_KILLSWITCH_VERSION}). Bailing out')
return []
kill_switches = cast(KILL_SWITCH_JSON, data['kill_switches'])
out: List[KillSwitch] = []
kill_switches = data['kill_switches']
out = []
for idx, ks_data in enumerate(kill_switches):
try:
ver = semantic_version.SimpleSpec(ks_data['version'])
ks = KillSwitches.from_dict(ks_data)
out.append(ks)
except ValueError as e:
logger.warning(f'could not parse killswitch idx {idx}: {e}')
continue
ks = KillSwitch(version=ver, kills=cast(Dict[str, str], ks_data['kills']))
out.append(ks)
except Exception as e:
logger.exception(f'Could not parse killswitch idx {idx}: {e}')
return out
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchSet]:
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: Optional[str] = None) -> Optional[KillSwitchSet]:
"""
Get a kill switch set object.
@ -148,12 +428,33 @@ def get_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> Optional[KillSwitchSet]:
:return: the KillSwitchSet for the URL, or None if there was an error
"""
if (data := fetch_kill_switches(target)) is None:
logger.warning('could not get killswitches')
return None
if fallback is not None:
logger.warning('could not get killswitches, trying fallback')
data = fetch_kill_switches(fallback)
if data is None:
logger.warning('Could not get killswitches.')
return None
return KillSwitchSet(parse_kill_switches(data))
def get_kill_switches_thread(
target, callback: Callable[[Optional[KillSwitchSet]], None], fallback: Optional[str] = None,
) -> None:
"""
Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available.
:param target: Target killswitch file
:param callback: The callback to pass the newly created KillSwitchSet
:param fallback: Fallback killswitch file, if any, defaults to None
"""
def make_request():
callback(get_kill_switches(target, fallback=fallback))
threading.Thread(target=make_request, daemon=True).start()
active: KillSwitchSet = KillSwitchSet([])
@ -163,12 +464,15 @@ def setup_main_list():
Plugins should NOT call this EVER.
"""
if (data := fetch_kill_switches()) is None:
if (data := get_kill_switches(DEFAULT_KILLSWITCH_URL, OLD_KILLSWITCH_URL)) is None:
logger.warning("Unable to fetch kill switches. Setting global set to an empty set")
return
global active
active = KillSwitchSet(parse_kill_switches(data))
active = data
logger.trace(f'{len(active.kill_switches)} Active Killswitches:')
for v in active.kill_switches:
logger.trace(v)
def get_disabled(id: str, *, version: semantic_version.Version = _current_version) -> DisabledResult:
@ -180,6 +484,16 @@ def get_disabled(id: str, *, version: semantic_version.Version = _current_versio
return active.get_disabled(id, version=version)
def check_killswitch(name: str, data: T, log=logger) -> Tuple[bool, T]:
"""Query the global KillSwitchSet#check_killswitch method."""
return active.check_killswitch(name, data, log)
def check_multiple_killswitches(data: T, *names: str, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_multiple method."""
return active.check_multiple_killswitches(data, *names, log=log)
def is_disabled(id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Query the global KillSwitchSet#is_disabled method."""
return active.is_disabled(id, version=version)
@ -190,6 +504,6 @@ def get_reason(id: str, *, version: semantic_version.Version = _current_version)
return active.get_reason(id, version=version)
def kills_for_version(version: semantic_version.Version = _current_version) -> List[KillSwitch]:
def kills_for_version(version: semantic_version.Version = _current_version) -> List[KillSwitches]:
"""Query the global KillSwitchSet for kills matching a particular version."""
return active.kills_for_version(version)

View File

@ -94,14 +94,14 @@ def prefs_changed(cmdr, is_beta):
def journal_entry(cmdr, is_beta, system, station, entry, state):
if (ks := killswitch.get_disabled('plugins.eddb.journal')).disabled:
logger.warning(f'Journal processing for EDDB has been disabled: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch('plugins.eddb.journal', entry)
if should_return:
# LANG: Journal Processing disabled due to an active killswitch
plug.show_error(_('EDDB Journal processing disabled. See Log.'))
return
elif (ks := killswitch.get_disabled(f'plugins.eddb.journal.event.{entry["event"]}')).disabled:
logger.warning(f'Processing of event {entry["event"]} has been disabled: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch(f'plugins.eddb.journal.event.{entry["event"]}', new_entry)
if should_return:
return
this.on_foot = state['OnFoot']

View File

@ -168,10 +168,13 @@ class EDDN:
:param cmdr: the CMDR to use as the uploader ID.
:param msg: the payload to send.
"""
if (res := killswitch.get_disabled('plugins.eddn.send')).disabled:
logger.warning(f"eddn.send has been disabled via killswitch. Returning. ({res.reason})")
should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', msg)
if should_return:
logger.warning('eddn.send has been disabled via killswitch. Returning.')
return
msg = new_data
uploader_id = cmdr
to_send: OrderedDictT[str, OrderedDict[str, Any]] = OrderedDict([
@ -787,15 +790,18 @@ def journal_entry( # noqa: C901, CCR001
:param state: `dict` - Current `monitor.state` data.
:return: `str` - Error message, or `None` if no errors.
"""
if (ks := killswitch.get_disabled("plugins.eddn.journal")).disabled:
logger.warning(f'EDDN journal handler has been disabled via killswitch: {ks.reason}')
should_return, new_data = killswitch.check_killswitch('plugins.eddn.journal', entry)
if should_return:
plug.show_error(_('EDDN journal handler disabled. See Log.')) # LANG: Killswitch disabled EDDN
return None
elif (ks := killswitch.get_disabled(f'plugins.eddn.journal.event.{entry["event"]}')).disabled:
logger.warning(f'Handling of event {entry["event"]} disabled via killswitch: {ks.reason}')
should_return, new_data = killswitch.check_killswitch(f'plugins.eddn.journal.event.{entry["event"]}', new_data)
if should_return:
return None
entry = new_data
# Recursively filter '*_Localised' keys from dict
def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]:
filtered: OrderedDictT[str, Any] = OrderedDict()

View File

@ -15,7 +15,7 @@ import tkinter as tk
from queue import Queue
from threading import Thread
from time import sleep
from typing import TYPE_CHECKING, Any, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, Any, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast
import requests
@ -395,16 +395,21 @@ def journal_entry( # noqa: C901, CCR001
cmdr: str, is_beta: bool, system: str, station: str, entry: MutableMapping[str, Any], state: Mapping[str, Any]
) -> None:
"""Journal Entry hook."""
if (ks := killswitch.get_disabled('plugins.edsm.journal')).disabled:
logger.warning(f'EDSM Journal handler disabled via killswitch: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch('plugins.edsm.journal', entry, logger)
if should_return:
# LANG: EDSM plugin - Journal handling disabled by killswitch
plug.show_error(_('EDSM Handler disabled. See Log.'))
return
elif (ks := killswitch.get_disabled(f'plugins.edsm.journal.event.{entry["event"]}')).disabled:
logger.warning(f'Handling of event {entry["event"]} has been disabled via killswitch: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch(
f'plugins.edsm.journal.event.{entry["event"]}', data=new_entry, log=logger
)
if should_return:
return
entry = new_entry
this.on_foot = state['OnFoot']
if entry['event'] in ('CarrierJump', 'FSDJump', 'Location', 'Docked'):
logger.trace_if(
@ -640,11 +645,18 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
retrying = 0
while retrying < 3:
if (res := killswitch.get_disabled("plugins.edsm.worker")).disabled:
logger.warning(
f'EDSM worker has been disabled via kill switch. Not uploading data. ({res.reason})'
)
should_skip, new_item = killswitch.check_killswitch(
'plugins.edsm.worker',
item if item is not None else cast(Tuple[str, Mapping[str, Any]], ("", {})),
logger
)
if should_skip:
break
if item is not None:
item = new_item
try:
if item and entry['event'] not in this.discarded_events:
logger.trace_if(
@ -652,6 +664,17 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
pending.append(entry)
# drop events if required by killswitch
new_pending = []
for e in pending:
skip, new = killswitch.check_killswitch(f'plugin.edsm.worker.{e["event"]}', e, logger)
if skip:
continue
new_pending.append(new)
pending = new_pending
if pending and should_send(pending, entry['event']):
logger.trace_if(CMDR_EVENTS, f'({cmdr=}, {entry["event"]=}): should_send() said True')
logger.trace_if(CMDR_EVENTS, f'pending contains:\n{chr(0x0A).join(str(p) for p in pending)}')
@ -667,7 +690,7 @@ def worker() -> None: # noqa: CCR001 C901 # Cant be broken up currently
f'"Location" event in pending passed should_send(),timestamp: {p["timestamp"]}'
)
creds = credentials(cmdr) # TODO: possibly unbound
creds = credentials(cmdr)
if creds is None:
raise ValueError("Unexpected lack of credentials")

View File

@ -5,6 +5,7 @@ import threading
import time
import tkinter as tk
from collections import OrderedDict, defaultdict, deque
from dataclasses import dataclass
from operator import itemgetter
from threading import Lock, Thread
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional
@ -47,7 +48,8 @@ class Credentials(NamedTuple):
EVENT_DATA = Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]
class Event(NamedTuple):
@dataclass
class Event:
"""Event represents an event for the Inara API."""
name: str
@ -334,16 +336,21 @@ def journal_entry( # noqa: C901, CCR001
:return: str - empty if no error, else error string.
"""
if (ks := killswitch.get_disabled('plugins.inara.journal')).disabled:
logger.warning(f'Inara support has been disabled via killswitch: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch('plugins.inara.journal', entry, logger)
if should_return:
plug.show_error(_('Inara disabled. See Log.')) # LANG: INARA support disabled via killswitch
logger.trace('returning due to killswitch match')
return ''
elif (ks := killswitch.get_disabled(f'plugins.inara.journal.event.{entry["event"]}')).disabled:
logger.warning(f'event {entry["event"]} processing has been disabled via killswitch: {ks.reason}')
should_return, new_entry = killswitch.check_killswitch(
f'plugins.inara.journal.event.{entry["event"]}', new_entry, logger
)
if should_return:
logger.trace('returning due to killswitch match')
# this can and WILL break state, but if we're concerned about it sending bad data, we'd disable globally anyway
return ''
entry = new_entry
this.on_foot = state['OnFoot']
event_name: str = entry['event']
this.cmdr = cmdr
@ -1461,6 +1468,21 @@ def new_add_event(
this.events[key].append(Event(name, timestamp, data))
def clean_event_list(event_list: List[Event]) -> List[Event]:
"""Check for killswitched events and remove or modify them as requested."""
out = []
for e in event_list:
bad, new_event = killswitch.check_killswitch(f'plugins.inara.worker.{e.name}', e.data, logger)
if bad:
continue
e.data = new_event
out.append(e)
return out
def new_worker():
"""
Handle sending events to the Inara API.
@ -1475,6 +1497,7 @@ def new_worker():
continue
for creds, event_list in events.items():
event_list = clean_event_list(event_list)
if not event_list:
continue
@ -1490,6 +1513,7 @@ def new_worker():
{'eventName': e.name, 'eventTimestamp': e.timestamp, 'eventData': e.data} for e in event_list
]
}
logger.info(f'sending {len(data["events"])} events for {creds.cmdr}')
logger.trace_if('plugin.inara.events', f'Events:\n{json.dumps(data)}\n')

125
scripts/killswitch_test.py Normal file
View File

@ -0,0 +1,125 @@
"""Print information about killswitch json files."""
import json
import sys
# Yes this is gross. No I cant fix it. EDMC doesn't use python modules currently and changing that would be messy.
sys.path.append('.')
from killswitch import KillSwitchSet, SingleKill, parse_kill_switches # noqa: E402
KNOWN_KILLSWITCH_NAMES: list[str] = [
# edsm
'plugins.edsm.worker',
'plugins.edsm.worker.$event',
'plugins.edsm.journal',
'plugins.edsm.journal.event.$event',
# inara
'plugins.inara.journal',
'plugins.inara.journal.event.$event',
'plugins.inara.worker',
'plugins.inara.worker.$event',
# eddn
'plugins.eddn.send',
'plugins.eddn.journal',
'plugins.eddn.journal.event.$event',
# eddb
'plugins.eddb.journal',
'plugins.eddb.journal.event.$event'
]
SPLIT_KNOWN_NAMES = list(map(lambda x: x.split('.'), KNOWN_KILLSWITCH_NAMES))
def match_exists(match: str) -> tuple[bool, str]:
"""Check that a match matching the above defined known list exists."""
split_match = match.split('.')
highest_match = 0
closest = []
for known_split in SPLIT_KNOWN_NAMES:
if len(known_split) != len(split_match):
continue # couldn't possibly match this
if known_split == split_match:
return True, ""
matched_fields = sum(len(k) for k, s in zip(known_split, split_match) if k == s or k[0] == '$')
if highest_match < matched_fields:
matched_fields = highest_match
closest = list(known_split)
if matched_fields == len(known_split):
return True, ""
return False, ".".join(closest)
def show_killswitch_set_info(ks: KillSwitchSet) -> None:
"""Show information about the given KillSwitchSet."""
for kill_version in ks.kill_switches:
print(f'Kills matching version mask {kill_version.version}')
for kill in kill_version.kills.values():
print_singlekill_info(kill)
def print_singlekill_info(s: SingleKill):
"""Print info about a single SingleKill instance."""
ok, closest_match = match_exists(s.match)
if ok:
print(f'\t- {s.match}')
else:
print(
f'\t- {s.match} -- Does not match existing killswitches! '
f'Typo or out of date script? (closest: {closest_match!r})'
)
print(f'\t\tReason specified is: {s.reason!r}')
print()
if not s.has_rules:
print(f'\t\tDoes not set, redact, or delete fields. This will always stop execution for {s.match}')
return
print(f'\t\tThe folowing changes are required for {s.match} execution to continue')
if s.set_fields:
max_field_len = max(len(f) for f in s.set_fields) + 3
print(f'\t\tSets {len(s.set_fields)} fields:')
for f, c in s.set_fields.items():
print(f'\t\t\t- {f.ljust(max_field_len)} -> {c}')
print()
if s.redact_fields:
max_field_len = max(len(f) for f in s.redact_fields) + 3
print(f'\t\tRedacts {len(s.redact_fields)} fields:')
for f in s.redact_fields:
print(f'\t\t\t- {f.ljust(max_field_len)} -> "REDACTED"')
print()
if s.delete_fields:
print(f'\t\tDeletes {len(s.delete_fields)} fields:')
for f in s.delete_fields:
print(f'\t\t\t- {f}')
print()
if __name__ == '__main__':
if len(sys.argv) == 1:
print("killswitch_test.py [file or - for stdin]")
sys.exit(1)
file_name = sys.argv[1]
if file_name == '-':
file = sys.stdin
else:
file = open(file_name)
res = json.load(file)
file.close()
show_killswitch_set_info(KillSwitchSet(parse_kill_switches(res)))

View File

@ -0,0 +1,90 @@
"""Test the apply functions used by killswitch to modify data."""
import copy
from typing import Any, Optional
import pytest
import killswitch
from killswitch import UPDATABLE_DATA
@pytest.mark.parametrize(
('source', 'key', 'action', 'to_set', 'result'),
[
(['this', 'is', 'a', 'test'], '1', 'delete', None, ['this', 'a', 'test']),
(['this', 'is', 'a', 'test'], '1', '', None, ['this', None, 'a', 'test']),
({'now': 'with', 'a': 'dict'}, 'now', 'delete', None, {'a': 'dict'}),
({'now': 'with', 'a': 'dict'}, 'now', '', None, {'now': None, 'a': 'dict'}),
(['test append'], '1', '', 'yay', ['test append', 'yay']),
(['test neg del'], '-1', 'delete', None, []),
(['test neg del'], '-1337', 'delete', None, ['test neg del']),
(['test neg del'], '-2', 'delete', None, ['test neg del']),
(['test too high del'], '30', 'delete', None, ['test too high del']),
]
)
def test_apply(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, result: UPDATABLE_DATA) -> None:
"""Test that a single level apply works as expected."""
cpy = copy.deepcopy(source)
killswitch._apply(target=cpy, key=key, to_set=to_set, delete=action == 'delete')
assert cpy == result
def test_apply_errors() -> None:
"""_apply should fail when passed something that isn't a Sequence or MutableMapping."""
with pytest.raises(ValueError, match=r'Dont know how to'):
killswitch._apply(set(), '0', None, False) # type: ignore # Its intentional that its broken
killswitch._apply(None, '', None) # type: ignore # Its intentional that its broken
with pytest.raises(ValueError, match=r'Cannot use string'):
killswitch._apply([], 'test', None, False)
def test_apply_no_error() -> None:
"""
_apply should not raise when deleting keys that dont exist, nor should it raise on setting keys that dont exist.
The only exception here is for lists. if a list is malformed to what a killswitch expects, it SHOULD explode,
thus causing the killswitch to fail and eat the entire message.
"""
killswitch._apply([], '0', None, True)
killswitch._apply({}, '0', None, True)
killswitch._apply({}, "this doesn't exist", None, True)
with pytest.raises(IndexError):
killswitch._apply([], '1', 'bang?')
@pytest.mark.parametrize(
('input', 'expected'),
[
('1', 1), ('1337', 1337), ('no.', None), ('0x10', None), ('010', 10),
(False, 0), (str((1 << 63)-1), (1 << 63)-1), (True, 1), (str(1 << 1337), 1 << 1337)
]
)
def test_get_int(input: str, expected: Optional[int]) -> None:
"""Check that _get_int doesn't throw when handed bad data."""
assert expected == killswitch._get_int(input)
@pytest.mark.parametrize(
('source', 'key', 'action', 'to_set', 'result'),
[
(['this', 'is', 'a', 'test'], '1', 'delete', None, ['this', 'a', 'test']),
(['this', 'is', 'a', 'test'], '1', '', None, ['this', None, 'a', 'test']),
({'now': 'with', 'a': 'dict'}, 'now', 'delete', None, {'a': 'dict'}),
({'now': 'with', 'a': 'dict'}, 'now', '', None, {'now': None, 'a': 'dict'}),
({'depth': {'is': 'important'}}, 'depth.is', '', 'nonexistent', {'depth': {'is': 'nonexistent'}}),
([{'test': ['stuff']}], '0.test.0', '', 'things', [{'test': ['things']}]),
(({'test': {'with': ['a', 'tuple']}},), '0.test.with.0', 'delete', '', ({'test': {'with': ['tuple']}},)),
({'test': ['with a', {'set', 'of', 'stuff'}]}, 'test.1', 'delete', '', {'test': ['with a']}),
({'keys.can.have.': 'dots!'}, 'keys.can.have.', '', '.s!', {'keys.can.have.': '.s!'}),
({'multilevel.keys': {'with.dots': False}}, 'multilevel.keys.with.dots',
'', True, {'multilevel.keys': {'with.dots': True}}),
({'dotted.key.one.level': False}, 'dotted.key.one.level', '', True, {'dotted.key.one.level': True}),
],
)
def test_deep_get(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, result: UPDATABLE_DATA) -> None:
"""Test _deep_get behaves as expected."""
cpy = copy.deepcopy(source)
killswitch._deep_apply(target=cpy, path=key, to_set=to_set, delete=action == 'delete')
assert cpy == result

View File

@ -0,0 +1,93 @@
"""Tests of killswitch behaviour."""
import copy
from typing import Optional
import pytest
import semantic_version
import killswitch
TEST_SET = killswitch.KillSwitchSet([
killswitch.KillSwitches(
version=semantic_version.SimpleSpec('1.0.0'),
kills={
'no-actions': killswitch.SingleKill('no-actions', 'test'),
'delete-action': killswitch.SingleKill('delete-action', 'remove stuff', delete_fields=['a', 'b.c']),
'delete-action-l': killswitch.SingleKill('delete-action-l', 'remove stuff', delete_fields=['2', '0']),
'set-action': killswitch.SingleKill('set-action', 'set stuff', set_fields={'a': False, 'b.c': True}),
'redact-action': killswitch.SingleKill('redact-action', 'redact stuff', redact_fields=['a', 'b.c'])
}
)
])
@pytest.mark.parametrize(
('input', 'kill', 'should_pass', 'result', 'version'),
[
([], 'doesnt-exist', True, None, '1.0.0'),
# should fail, attempts to use 'a' to index a list
(['a', 'b', 'c'], 'delete-action', False, ['a', 'b', 'c'], '1.0.0'),
(['a', 'b', 'c'], 'delete-action-l', True, ['b'], '1.0.0'),
(set(), 'delete-action-l', False, None, '1.0.0'), # set should be thrown out because it cant be indext
(['a', 'b'], 'delete-action-l', True, ['b'], '1.0.0'), # has a missing value, but that's fine for delete
(['a', 'b'], 'delete-action-l', True, ['a', 'b'], '1.1.0'), # wrong version
],
)
def test_killswitch(
input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: Optional[killswitch.UPDATABLE_DATA],
version: str
) -> None:
"""Simple killswitch tests."""
should_return, res = TEST_SET.check_killswitch(kill, input, version=version)
assert (not should_return) == should_pass, (
f'expected to {"pass" if should_pass else "fail"}, but {"passed" if not should_pass else "failed"}'
)
if result is None:
return # we didn't expect any result
assert res == result
@pytest.mark.parametrize(
('kill_dict', 'input', 'result'),
[
({'set_fields': {'test': None}}, {}, {'test': None}),
({'set_fields': {'test': None}, 'delete_fields': ['test']}, {}, {}),
({'set_fields': {'test': None}, 'redact_fields': ['test']}, {}, {'test': 'REDACTED'}),
({'set_fields': {'test': None}, 'redact_fields': ['test'], 'delete_fields': ['test']}, {}, {}),
],
)
def test_operator_precedence(
kill_dict: killswitch.SingleKillSwitchJSON, input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA
) -> None:
"""Ensure that operators are being applied in the correct order."""
kill = killswitch.SingleKill(
"", "", kill_dict.get('redact_fields'), kill_dict.get('delete_fields'), kill_dict.get('set_fields')
)
cpy = copy.deepcopy(input)
kill.apply_rules(cpy)
assert cpy == result
@pytest.mark.parametrize(
('names', 'input', 'result', 'expected_return'),
[
(['no-actions', 'delete-action'], {'a': 1}, {'a': 1}, True),
# this is true because delete-action keyerrors, thus causing failsafe
(['delete-action'], {'a': 1}, {'a': 1}, True),
(['delete-action'], {'a': 1, 'b': {'c': 2}}, {'b': {}}, False),
]
)
def test_check_multiple(
names: list[str], input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA, expected_return: bool
) -> None:
"""Check that order is correct when checking multiple killswitches."""
should_return, data = TEST_SET.check_multiple_killswitches(input, *names, version='1.0.0')
assert should_return == expected_return
assert data == result