mirror of
https://github.com/norohind/jubilant-system-core.git
synced 2025-05-30 07:09:13 +03:00
HookUtils: add various functions-helper for hooks
This commit is contained in:
parent
dd8b20ede0
commit
27faaeee05
203
HookUtils.py
Normal file
203
HookUtils.py
Normal file
@ -0,0 +1,203 @@
|
||||
import dataclasses
|
||||
from collections import defaultdict
|
||||
from loguru import logger
|
||||
import requests
|
||||
import os
|
||||
import json
|
||||
|
||||
with open('available.json', 'r', encoding='utf-8') as available_file:
|
||||
TAG_COLLECTIONS: dict = json.load(available_file)['SquadronTagData']['SquadronTagCollections']
|
||||
|
||||
del available_file
|
||||
|
||||
|
||||
def notify_discord(message: str) -> None:
|
||||
"""Just sends message to discord, without rate limits respect"""
|
||||
logger.debug('Sending discord message')
|
||||
|
||||
if len(message) >= 2000: # discord limitation
|
||||
logger.warning(f'Refuse to send len={len(message)}, content dump:\n{message}')
|
||||
message = 'Len > 2000, check logs'
|
||||
|
||||
hookURL: str = os.environ['DISCORD_NOTIFICATIONS_HOOK']
|
||||
content: bytes = f'content={requests.utils.quote(message)}'.encode('utf-8')
|
||||
|
||||
discord_request: requests.Response = requests.post(
|
||||
url=hookURL,
|
||||
data=content,
|
||||
headers={'Content-Type': 'application/x-www-form-urlencoded'}
|
||||
)
|
||||
|
||||
try:
|
||||
discord_request.raise_for_status()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f'Fail on sending message to discord ({"/".join(hookURL.split("/")[-2:])})'
|
||||
f'\n{discord_request.content}', exc_info=e)
|
||||
return
|
||||
|
||||
logger.debug('Sending successful')
|
||||
return
|
||||
|
||||
|
||||
class SQL_REQUESTS:
|
||||
GET_HISTORICAL_INFO = """
|
||||
select *
|
||||
from squadrons_historical_data
|
||||
inner join operations_info on squadrons_historical_data.operation_id = operations_info.operation_id
|
||||
left join squadrons_news_historical snh on operations_info.operation_id = snh.operation_id
|
||||
where
|
||||
squad_id = (select squad_id from operations_info where operation_id = :operation_id) and
|
||||
operations_info.operation_id <= :operation_id
|
||||
order by operations_info.operation_id desc
|
||||
limit :limit;
|
||||
"""
|
||||
|
||||
|
||||
def resolve_user_tag(single_user_tag: int) -> [str, str]:
|
||||
"""
|
||||
Resolves one tag to category and tag itself
|
||||
|
||||
:param single_user_tag:
|
||||
:return:
|
||||
"""
|
||||
for tag_collection in TAG_COLLECTIONS:
|
||||
for tag in tag_collection['SquadronTags']:
|
||||
if tag['ServerUniqueId'] == single_user_tag:
|
||||
return tag_collection['localisedCollectionName'], tag['LocalisedString']
|
||||
|
||||
|
||||
def resolve_user_tags(user_tags: list[int]) -> dict[str, list[str]]:
|
||||
"""Function to resolve user_tags list of ints to dict with tag collections as keys and list of tags as value
|
||||
|
||||
:param user_tags: list of ints of tags to resolve
|
||||
:return: dict of tags
|
||||
"""
|
||||
|
||||
_resolved_tags: dict[str, list[str]] = dict()
|
||||
|
||||
for user_tag in user_tags:
|
||||
collection_name, tag_name = resolve_user_tag(user_tag)
|
||||
if collection_name in _resolved_tags: # if key in dict
|
||||
_resolved_tags[collection_name].append(tag_name)
|
||||
|
||||
else:
|
||||
_resolved_tags.update({collection_name: [tag_name]})
|
||||
|
||||
return _resolved_tags
|
||||
|
||||
|
||||
def humanify_resolved_user_tags(user_tags: dict[str, list[str]], do_tabulate=True) -> str:
|
||||
"""Function to make result of resolve_user_tags more human-readable
|
||||
|
||||
:param do_tabulate: if we should insert tabulation or you already did it in source data, default to True
|
||||
:param user_tags: result of resolve_user_tags function
|
||||
:return: string with human-friendly tags list
|
||||
"""
|
||||
|
||||
result_str: str = str()
|
||||
if do_tabulate:
|
||||
tab = ' '
|
||||
|
||||
else:
|
||||
tab = str()
|
||||
|
||||
for tag_collection_name in user_tags:
|
||||
result_str += f"{tag_collection_name}:\n"
|
||||
|
||||
for tag in user_tags[tag_collection_name]:
|
||||
result_str += f"{tab}{tag}\n"
|
||||
|
||||
return result_str
|
||||
|
||||
|
||||
def tags_diff2str(new_tags_ids: list, old_tags_ids: list) -> str:
|
||||
"""Compares two list of tags, new and old, and returns it in diff like str
|
||||
|
||||
:param new_tags_ids: list ids of new tags
|
||||
:param old_tags_ids: list ids of old tags
|
||||
:return: diff as str
|
||||
"""
|
||||
|
||||
resolved_tags: dict[str, list[str]] = defaultdict(list)
|
||||
|
||||
removed_tags_ids: list = list(set(old_tags_ids) - set(new_tags_ids))
|
||||
added_tags_ids: list = list(set(new_tags_ids) - set(old_tags_ids))
|
||||
|
||||
tags_union_ids: list = list(set(new_tags_ids).union(set(old_tags_ids)))
|
||||
|
||||
for tag_id in tags_union_ids:
|
||||
collection_name, tag_name = resolve_user_tag(tag_id)
|
||||
|
||||
if tag_id in removed_tags_ids:
|
||||
resolved_tags[collection_name].append(f'- {tag_name}')
|
||||
|
||||
elif tag_id in added_tags_ids:
|
||||
resolved_tags[collection_name].append(f'+ {tag_name}')
|
||||
|
||||
else: # tag_id not in added_tags_ids and not in removed_tags_ids - nothing changed
|
||||
resolved_tags[collection_name].append(f' {tag_name}')
|
||||
|
||||
return humanify_resolved_user_tags(resolved_tags, do_tabulate=False)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Column:
|
||||
""" For diff_columns"""
|
||||
name: str
|
||||
label: str
|
||||
is_screen_values: bool = False
|
||||
is_user_tags: bool = False
|
||||
screening_value_start: str = dataclasses.field(init=False)
|
||||
screening_value_end: str = dataclasses.field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.is_screen_values and not self.is_user_tags:
|
||||
self.screening_value_start = self.screening_value_end = '`'
|
||||
|
||||
elif self.is_user_tags:
|
||||
self.is_screen_values = True
|
||||
self.screening_value_start = '```diff\n'
|
||||
self.screening_value_end = f'```\n'
|
||||
|
||||
else:
|
||||
self.screening_value_start = self.screening_value_end = ''
|
||||
|
||||
def screen(self, value: str) -> str:
|
||||
return f'{self.screening_value_start}{value}{self.screening_value_end}'
|
||||
|
||||
|
||||
def diff_columns(info: list[dict], columns_to_diff: list[Column]) -> str:
|
||||
"""
|
||||
Takes list of two history records about squad and returns diff in str for specified column
|
||||
|
||||
:param info: must be two list of two records
|
||||
:param columns_to_diff: list of columns to diff
|
||||
:return:
|
||||
"""
|
||||
|
||||
msg = ''
|
||||
new_record = info[0]
|
||||
old_record = info[1]
|
||||
for column in columns_to_diff:
|
||||
new_column = new_record[column.name]
|
||||
old_column = old_record[column.name]
|
||||
|
||||
if new_column != old_column:
|
||||
if column.is_user_tags:
|
||||
msg += column.screen(tags_diff2str(new_column, old_column))
|
||||
|
||||
else:
|
||||
msg += f'{column.label} {column.screen(old_column)} -> {column.screen(new_column)}\n'
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def generate_message_header(record: dict, squadron_type: str) -> str:
|
||||
return f"""
|
||||
State changed for {squadron_type} squad `{record['name']}` [{record['tag']}]
|
||||
platform: {record['platform']}
|
||||
members: {record['member_count']}
|
||||
created: {record['created']}
|
||||
owner: `{record['owner_name']}`
|
||||
"""
|
Loading…
x
Reference in New Issue
Block a user