mirror of
https://github.com/norohind/jubilant-system-core.git
synced 2025-04-12 09:50:01 +03:00
204 lines
6.6 KiB
Python
204 lines
6.6 KiB
Python
import dataclasses
|
|
from collections import defaultdict
|
|
from loguru import logger
|
|
import requests
|
|
import os
|
|
import json
|
|
|
|
with open('available.json', 'r', encoding='utf-8') as available_file:
|
|
TAG_COLLECTIONS: dict = json.load(available_file)['SquadronTagData']['SquadronTagCollections']
|
|
|
|
del available_file
|
|
|
|
|
|
def notify_discord(message: str) -> None:
|
|
"""Just sends message to discord, without rate limits respect"""
|
|
logger.debug('Sending discord message')
|
|
|
|
if len(message) >= 2000: # discord limitation
|
|
logger.warning(f'Refuse to send len={len(message)}, content dump:\n{message}')
|
|
message = 'Len > 2000, check logs'
|
|
|
|
hookURL: str = os.environ['DISCORD_NOTIFICATIONS_HOOK']
|
|
content: bytes = f'content={requests.utils.quote(message)}'.encode('utf-8')
|
|
|
|
discord_request: requests.Response = requests.post(
|
|
url=hookURL,
|
|
data=content,
|
|
headers={'Content-Type': 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
try:
|
|
discord_request.raise_for_status()
|
|
|
|
except Exception as e:
|
|
logger.exception(f'Fail on sending message to discord ({"/".join(hookURL.split("/")[-2:])})'
|
|
f'\n{discord_request.content}', exc_info=e)
|
|
return
|
|
|
|
logger.debug('Sending successful')
|
|
return
|
|
|
|
|
|
class SQL_REQUESTS:
|
|
GET_HISTORICAL_INFO = """
|
|
select *
|
|
from squadrons_historical_data
|
|
inner join operations_info on squadrons_historical_data.operation_id = operations_info.operation_id
|
|
left join squadrons_news_historical snh on operations_info.operation_id = snh.operation_id
|
|
where
|
|
squad_id = (select squad_id from operations_info where operation_id = :operation_id) and
|
|
operations_info.operation_id <= :operation_id
|
|
order by operations_info.operation_id desc
|
|
limit :limit;
|
|
"""
|
|
|
|
|
|
def resolve_user_tag(single_user_tag: int) -> [str, str]:
|
|
"""
|
|
Resolves one tag to category and tag itself
|
|
|
|
:param single_user_tag:
|
|
:return:
|
|
"""
|
|
for tag_collection in TAG_COLLECTIONS:
|
|
for tag in tag_collection['SquadronTags']:
|
|
if tag['ServerUniqueId'] == single_user_tag:
|
|
return tag_collection['localisedCollectionName'], tag['LocalisedString']
|
|
|
|
|
|
def resolve_user_tags(user_tags: list[int]) -> dict[str, list[str]]:
|
|
"""Function to resolve user_tags list of ints to dict with tag collections as keys and list of tags as value
|
|
|
|
:param user_tags: list of ints of tags to resolve
|
|
:return: dict of tags
|
|
"""
|
|
|
|
_resolved_tags: dict[str, list[str]] = dict()
|
|
|
|
for user_tag in user_tags:
|
|
collection_name, tag_name = resolve_user_tag(user_tag)
|
|
if collection_name in _resolved_tags: # if key in dict
|
|
_resolved_tags[collection_name].append(tag_name)
|
|
|
|
else:
|
|
_resolved_tags.update({collection_name: [tag_name]})
|
|
|
|
return _resolved_tags
|
|
|
|
|
|
def humanify_resolved_user_tags(user_tags: dict[str, list[str]], do_tabulate=True) -> str:
|
|
"""Function to make result of resolve_user_tags more human-readable
|
|
|
|
:param do_tabulate: if we should insert tabulation or you already did it in source data, default to True
|
|
:param user_tags: result of resolve_user_tags function
|
|
:return: string with human-friendly tags list
|
|
"""
|
|
|
|
result_str: str = str()
|
|
if do_tabulate:
|
|
tab = ' '
|
|
|
|
else:
|
|
tab = str()
|
|
|
|
for tag_collection_name in user_tags:
|
|
result_str += f"{tag_collection_name}:\n"
|
|
|
|
for tag in user_tags[tag_collection_name]:
|
|
result_str += f"{tab}{tag}\n"
|
|
|
|
return result_str
|
|
|
|
|
|
def tags_diff2str(new_tags_ids: list, old_tags_ids: list) -> str:
|
|
"""Compares two list of tags, new and old, and returns it in diff like str
|
|
|
|
:param new_tags_ids: list ids of new tags
|
|
:param old_tags_ids: list ids of old tags
|
|
:return: diff as str
|
|
"""
|
|
|
|
resolved_tags: dict[str, list[str]] = defaultdict(list)
|
|
|
|
removed_tags_ids: list = list(set(old_tags_ids) - set(new_tags_ids))
|
|
added_tags_ids: list = list(set(new_tags_ids) - set(old_tags_ids))
|
|
|
|
tags_union_ids: list = list(set(new_tags_ids).union(set(old_tags_ids)))
|
|
|
|
for tag_id in tags_union_ids:
|
|
collection_name, tag_name = resolve_user_tag(tag_id)
|
|
|
|
if tag_id in removed_tags_ids:
|
|
resolved_tags[collection_name].append(f'- {tag_name}')
|
|
|
|
elif tag_id in added_tags_ids:
|
|
resolved_tags[collection_name].append(f'+ {tag_name}')
|
|
|
|
else: # tag_id not in added_tags_ids and not in removed_tags_ids - nothing changed
|
|
resolved_tags[collection_name].append(f' {tag_name}')
|
|
|
|
return humanify_resolved_user_tags(resolved_tags, do_tabulate=False)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Column:
|
|
""" For diff_columns"""
|
|
name: str
|
|
label: str
|
|
is_screen_values: bool = False
|
|
is_user_tags: bool = False
|
|
screening_value_start: str = dataclasses.field(init=False)
|
|
screening_value_end: str = dataclasses.field(init=False)
|
|
|
|
def __post_init__(self):
|
|
if self.is_screen_values and not self.is_user_tags:
|
|
self.screening_value_start = self.screening_value_end = '`'
|
|
|
|
elif self.is_user_tags:
|
|
self.is_screen_values = True
|
|
self.screening_value_start = '```diff\n'
|
|
self.screening_value_end = f'```\n'
|
|
|
|
else:
|
|
self.screening_value_start = self.screening_value_end = ''
|
|
|
|
def screen(self, value: str) -> str:
|
|
return f'{self.screening_value_start}{value}{self.screening_value_end}'
|
|
|
|
|
|
def diff_columns(info: list[dict], columns_to_diff: list[Column]) -> str:
|
|
"""
|
|
Takes list of two history records about squad and returns diff in str for specified column
|
|
|
|
:param info: must be two list of two records
|
|
:param columns_to_diff: list of columns to diff
|
|
:return:
|
|
"""
|
|
|
|
msg = ''
|
|
new_record = info[0]
|
|
old_record = info[1]
|
|
for column in columns_to_diff:
|
|
new_column = new_record[column.name]
|
|
old_column = old_record[column.name]
|
|
|
|
if new_column != old_column:
|
|
if column.is_user_tags:
|
|
msg += column.screen(tags_diff2str(new_column, old_column))
|
|
|
|
else:
|
|
msg += f'{column.label} {column.screen(old_column)} -> {column.screen(new_column)}\n'
|
|
|
|
return msg
|
|
|
|
|
|
def generate_message_header(record: dict, squadron_type: str) -> str:
|
|
return f"""
|
|
State changed for {squadron_type} squad `{record['name']}` [{record['tag']}]
|
|
platform: {record['platform']}
|
|
members: {record['member_count']}
|
|
created: {record['created']}
|
|
owner: `{record['owner_name']}`
|
|
"""
|