From 5b1216dd4147f7c2a7906755c1dfc73d13600219 Mon Sep 17 00:00:00 2001 From: norohind <60548839+norohind@users.noreply.github.com> Date: Fri, 19 Nov 2021 00:10:20 +0300 Subject: [PATCH] init --- EDMCLogging.py | 448 +++++++++++++++++++++++++++++++++++++++++++++++ main.py | 183 +++++++++++++++++++ model.py | 23 +++ requirements.txt | Bin 0 -> 284 bytes sql_requests.py | 58 ++++++ utils.py | 145 +++++++++++++++ web.py | 36 ++++ 7 files changed, 893 insertions(+) create mode 100644 EDMCLogging.py create mode 100644 main.py create mode 100644 model.py create mode 100644 requirements.txt create mode 100644 sql_requests.py create mode 100644 utils.py create mode 100644 web.py diff --git a/EDMCLogging.py b/EDMCLogging.py new file mode 100644 index 0000000..c54b060 --- /dev/null +++ b/EDMCLogging.py @@ -0,0 +1,448 @@ +""" +Set up required logging for the application. + +This module provides for a common logging-powered log facility. +Mostly it implements a logging.Filter() in order to get two extra +members on the logging.LogRecord instance for use in logging.Formatter() +strings. + +If type checking, e.g. mypy, objects to `logging.trace(...)` then include this +stanza: + + # See EDMCLogging.py docs. + # isort: off + if TYPE_CHECKING: + from logging import trace, TRACE # type: ignore # noqa: F401 + # isort: on + +This is needed because we add the TRACE level and the trace() function +ourselves at runtime. + +To utilise logging in core code, or internal plugins, include this: + + from EDMCLogging import get_main_logger + + logger = get_main_logger() + +To utilise logging in a 'found' (third-party) plugin, include this: + + import os + import logging + + plugin_name = os.path.basename(os.path.dirname(__file__)) + # plugin_name here *must* be the name of the folder the plugin resides in + # See, plug.py:load_plugins() + logger = logging.getLogger(f'{appname}.{plugin_name}') +""" + +import inspect +import logging +import logging.handlers +import os +from contextlib import suppress +from fnmatch import fnmatch +# So that any warning about accessing a protected member is only in one place. +from sys import _getframe as getframe +from threading import get_native_id as thread_native_id +from traceback import print_exc +from typing import TYPE_CHECKING, Tuple, cast + + +# TODO: Tests: +# +# 1. Call from bare function in file. +# 2. Call from `if __name__ == "__main__":` section +# +# 3. Call from 1st level function in 1st level Class in file +# 4. Call from 2nd level function in 1st level Class in file +# 5. Call from 3rd level function in 1st level Class in file +# +# 6. Call from 1st level function in 2nd level Class in file +# 7. Call from 2nd level function in 2nd level Class in file +# 8. Call from 3rd level function in 2nd level Class in file +# +# 9. Call from 1st level function in 3rd level Class in file +# 10. Call from 2nd level function in 3rd level Class in file +# 11. Call from 3rd level function in 3rd level Class in file +# +# 12. Call from 2nd level file, all as above. +# +# 13. Call from *module* +# +# 14. Call from *package* + +_default_loglevel = logging.DEBUG + +# Define a TRACE level +LEVEL_TRACE = 5 +LEVEL_TRACE_ALL = 3 +logging.addLevelName(LEVEL_TRACE, "TRACE") +logging.addLevelName(LEVEL_TRACE_ALL, "TRACE_ALL") +logging.TRACE = LEVEL_TRACE # type: ignore +logging.TRACE_ALL = LEVEL_TRACE_ALL # type: ignore +logging.Logger.trace = lambda self, message, *args, **kwargs: self._log( # type: ignore + logging.TRACE, # type: ignore + message, + args, + **kwargs +) + + +def _trace_if(self: logging.Logger, condition: str, message: str, *args, **kwargs) -> None: + if any(fnmatch(condition, p) for p in []): + self._log(logging.TRACE, message, args, **kwargs) # type: ignore # we added it + return + + self._log(logging.TRACE_ALL, message, args, **kwargs) # type: ignore # we added it + + +logging.Logger.trace_if = _trace_if # type: ignore + +# we cant hide this from `from xxx` imports and I'd really rather no-one other than `logging` had access to it +del _trace_if + +if TYPE_CHECKING: + from types import FrameType + + # Fake type that we can use here to tell type checkers that trace exists + + class LoggerMixin(logging.Logger): + """LoggerMixin is a fake class that tells type checkers that trace exists on a given type.""" + + def trace(self, message, *args, **kwargs) -> None: + """See implementation above.""" + ... + + def trace_if(self, condition: str, message, *args, **kwargs) -> None: + """ + Fake trace if method, traces only if condition exists in trace_on. + + See implementation above. + """ + ... + + +class Logger: + """ + Wrapper class for all logging configuration and code. + + Class instantiation requires the 'logger name' and optional loglevel. + It is intended that this 'logger name' be re-used in all files/modules + that need to log. + + Users of this class should then call getLogger() to get the + logging.Logger instance. + """ + + def __init__(self, logger_name: str, loglevel: int = _default_loglevel): + """ + Set up a `logging.Logger` with our preferred configuration. + + This includes using an EDMCContextFilter to add 'class' and 'qualname' + expansions for logging.Formatter(). + """ + self.logger = logging.getLogger(logger_name) + # Configure the logging.Logger + # This needs to always be TRACE in order to let TRACE level messages + # through to check the *handler* levels. + self.logger.setLevel(logging.TRACE) # type: ignore + + # Set up filter for adding class name + self.logger_filter = EDMCContextFilter() + self.logger.addFilter(self.logger_filter) + + # Our basic channel handling stdout + self.logger_channel = logging.StreamHandler() + # This should be affected by the user configured log level + self.logger_channel.setLevel(loglevel) + + self.logger_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(process)d:%(thread)d:%(osthreadid)d %(module)s.%(qualname)s:%(lineno)d: %(message)s') # noqa: E501 + self.logger_formatter.default_time_format = '%Y-%m-%d %H:%M:%S' + self.logger_formatter.default_msec_format = '%s.%03d' + + self.logger_channel.setFormatter(self.logger_formatter) + self.logger.addHandler(self.logger_channel) + + def get_logger(self) -> 'LoggerMixin': + """ + Obtain the self.logger of the class instance. + + Not to be confused with logging.getLogger(). + """ + return cast('LoggerMixin', self.logger) + + def get_streamhandler(self) -> logging.Handler: + """ + Obtain the self.logger_channel StreamHandler instance. + + :return: logging.StreamHandler + """ + return self.logger_channel + + def set_channels_loglevel(self, level: int) -> None: + """ + Set the specified log level on the channels. + + :param level: A valid `logging` level. + :return: None + """ + self.logger_channel.setLevel(level) + + def set_console_loglevel(self, level: int) -> None: + """ + Set the specified log level on the console channel. + + :param level: A valid `logging` level. + :return: None + """ + if self.logger_channel.level != logging.TRACE: # type: ignore + self.logger_channel.setLevel(level) + else: + logger.trace("Not changing log level because it's TRACE") # type: ignore + + +class EDMCContextFilter(logging.Filter): + """ + Implements filtering to add extra format specifiers, and tweak others. + + logging.Filter sub-class to place extra attributes of the calling site + into the record. + """ + + def filter(self, record: logging.LogRecord) -> bool: + """ + Attempt to set/change fields in the LogRecord. + + 1. class = class name(s) of the call site, if applicable + 2. qualname = __qualname__ of the call site. This simplifies + logging.Formatter() as you can use just this no matter if there is + a class involved or not, so you get a nice clean: + .[.classB....]. + 3. osthreadid = OS level thread ID. + + If we fail to be able to properly set either then: + + 1. Use print() to alert, to be SURE a message is seen. + 2. But also return strings noting the error, so there'll be + something in the log output if it happens. + + :param record: The LogRecord we're "filtering" + :return: bool - Always true in order for this record to be logged. + """ + (class_name, qualname, module_name) = self.caller_attributes(module_name=getattr(record, 'module')) + + # Only set if we got a useful value + if module_name: + setattr(record, 'module', module_name) + + # Only set if not already provided by logging itself + if getattr(record, 'class', None) is None: + setattr(record, 'class', class_name) + + # Only set if not already provided by logging itself + if getattr(record, 'qualname', None) is None: + setattr(record, 'qualname', qualname) + + setattr(record, 'osthreadid', thread_native_id()) + + return True + + @classmethod + def caller_attributes(cls, module_name: str = '') -> Tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible + """ + Determine extra or changed fields for the caller. + + 1. qualname finds the relevant object and its __qualname__ + 2. caller_class_names is just the full class names of the calling + class if relevant. + 3. module is munged if we detect the caller is an EDMC plugin, + whether internal or found. + + :param module_name: The name of the calling module. + :return: Tuple[str, str, str] - class_name, qualname, module_name + """ + frame = cls.find_caller_frame() + + caller_qualname = caller_class_names = '' + if frame: + # + try: + frame_info = inspect.getframeinfo(frame) + # raise(IndexError) # TODO: Remove, only for testing + + except Exception: + # Separate from the print below to guarantee we see at least this much. + print('EDMCLogging:EDMCContextFilter:caller_attributes(): Failed in `inspect.getframinfo(frame)`') + + # We want to *attempt* to show something about the nature of 'frame', + # but at this point we can't trust it will work. + try: + print(f'frame: {frame}') + + except Exception: + pass + + # We've given up, so just return '??' to signal we couldn't get the info + return '??', '??', module_name + try: + args, _, _, value_dict = inspect.getargvalues(frame) + if len(args) and args[0] in ('self', 'cls'): + frame_class: 'object' = value_dict[args[0]] + + if frame_class: + # See https://en.wikipedia.org/wiki/Name_mangling#Python for how name mangling works. + # For more detail, see _Py_Mangle in CPython's Python/compile.c. + name = frame_info.function + class_name = frame_class.__class__.__name__.lstrip("_") + if name.startswith("__") and not name.endswith("__") and class_name: + name = f'_{class_name}{frame_info.function}' + + # Find __qualname__ of the caller + fn = inspect.getattr_static(frame_class, name, None) + if fn is None: + # For some reason getattr_static cant grab this. Try and grab it with getattr, bail out + # if we get a RecursionError indicating a property + try: + fn = getattr(frame_class, name, None) + except RecursionError: + print( + "EDMCLogging:EDMCContextFilter:caller_attributes():" + "Failed to get attribute for function info. Bailing out" + ) + # class_name is better than nothing for __qualname__ + return class_name, class_name, module_name + + if fn is not None: + if isinstance(fn, property): + class_name = str(frame_class) + # If somehow you make your __class__ or __class__.__qualname__ recursive, + # I'll be impressed. + if hasattr(frame_class, '__class__') and hasattr(frame_class.__class__, "__qualname__"): + class_name = frame_class.__class__.__qualname__ + caller_qualname = f"{class_name}.{name}(property)" + + else: + caller_qualname = f"" + + elif not hasattr(fn, '__qualname__'): + caller_qualname = name + + elif hasattr(fn, '__qualname__') and fn.__qualname__: + caller_qualname = fn.__qualname__ + + # Find containing class name(s) of caller, if any + if ( + frame_class.__class__ and hasattr(frame_class.__class__, '__qualname__') + and frame_class.__class__.__qualname__ + ): + caller_class_names = frame_class.__class__.__qualname__ + + # It's a call from the top level module file + elif frame_info.function == '': + caller_class_names = '' + caller_qualname = value_dict['__name__'] + + elif frame_info.function != '': + caller_class_names = '' + caller_qualname = frame_info.function + + module_name = cls.munge_module_name(frame_info, module_name) + + except Exception as e: + print('ALERT! Something went VERY wrong in handling finding info to log') + print('ALERT! Information is as follows') + with suppress(Exception): + + print(f'ALERT! {e=}') + print_exc() + print(f'ALERT! {frame=}') + with suppress(Exception): + print(f'ALERT! {fn=}') # type: ignore + with suppress(Exception): + print(f'ALERT! {cls=}') + + finally: # Ensure this always happens + # https://docs.python.org/3.7/library/inspect.html#the-interpreter-stack + del frame + + if caller_qualname == '': + print('ALERT! Something went wrong with finding caller qualname for logging!') + caller_qualname = '' + + if caller_class_names == '': + print('ALERT! Something went wrong with finding caller class name(s) for logging!') + caller_class_names = '' + + return caller_class_names, caller_qualname, module_name + + @classmethod + def find_caller_frame(cls): + """ + Find the stack frame of the logging caller. + + :returns: 'frame' object such as from sys._getframe() + """ + # Go up through stack frames until we find the first with a + # type(f_locals.self) of logging.Logger. This should be the start + # of the frames internal to logging. + frame: 'FrameType' = getframe(0) + while frame: + if isinstance(frame.f_locals.get('self'), logging.Logger): + frame = cast('FrameType', frame.f_back) # Want to start on the next frame below + break + frame = cast('FrameType', frame.f_back) + # Now continue up through frames until we find the next one where + # that is *not* true, as it should be the call site of the logger + # call + while frame: + if not isinstance(frame.f_locals.get('self'), logging.Logger): + break # We've found the frame we want + frame = cast('FrameType', frame.f_back) + return frame + + @classmethod + def munge_module_name(cls, frame_info: inspect.Traceback, module_name: str) -> str: + """ + Adjust module_name based on the file path for the given frame. + + We want to distinguish between other code and both our internal plugins + and the 'found' ones. + + For internal plugins we want "plugins.". + For 'found' plugins we want "....". + + :param frame_info: The frame_info of the caller. + :param module_name: The module_name string to munge. + :return: The munged module_name. + """ + # file_name = pathlib.Path(frame_info.filename).expanduser() + # plugin_dir = pathlib.Path(config.plugin_dir_path).expanduser() + # internal_plugin_dir = pathlib.Path(config.internal_plugin_dir_path).expanduser() + # if internal_plugin_dir in file_name.parents: + # # its an internal plugin + # return f'plugins.{".".join(file_name.relative_to(internal_plugin_dir).parent.parts)}' + + # elif plugin_dir in file_name.parents: + # return f'.{".".join(file_name.relative_to(plugin_dir).parent.parts)}' + + return module_name + + +def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin': + """Return the correct logger for how the program is being run. (outdated)""" + # if not os.getenv("EDMC_NO_UI"): + # # GUI app being run + # return cast('LoggerMixin', logging.getLogger(appname)) + # else: + # # Must be the CLI + # return cast('LoggerMixin', logging.getLogger(appcmdname)) + return cast('LoggerMixin', logging.getLogger(__name__)) + + +# Singleton +loglevel = logging._nameToLevel.get(os.getenv('LOG_LEVEL', 'DEBUG').upper(), logging.DEBUG) # noqa: + +base_logger_name = __name__ + +edmclogger = Logger(base_logger_name, loglevel=loglevel) +logger: 'LoggerMixin' = edmclogger.get_logger() diff --git a/main.py b/main.py new file mode 100644 index 0000000..bb0b294 --- /dev/null +++ b/main.py @@ -0,0 +1,183 @@ +""" +Run by third side (i.e. cron) +Request all or specified combination of platform + activity +Write results to sqlite DB in journal like format + + +AP - appropriate field from json +DB structure +states: +action_id integer - id of insertion, all records from one request will have the same action_id +leaderboard_type string - (AP) +platform string - platform of data +squadron_id integer - (squadron field) +score integer - (AP) +percentile integer - (AP) +rank integer - (AP) +name string - (AP) +tag string - (AP) +timestamp - inserts by DB, default TIMESTAMP + +""" +import typing + +import requests +import sqlite3 + +import sql_requests +# from EDMCLogging import get_main_logger +import utils + +db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3') + +db.executescript(sql_requests.schema_create) + + +def request_leaderboard(platform_enum: utils.Platform, leaderboard_type_enum: utils.LeaderboardTypes) -> dict: + """ + Requests specified leaderboard and returns list of squads in specified leaderboard + + :param platform_enum: leaderboard platform + :param leaderboard_type_enum: leaderboard type + :return: list of squads from leaderboard + """ + + platform: str = platform_enum.value + leaderboard_type = leaderboard_type_enum.value + + SAPIRequest: requests.Response = utils.proxied_request( + 'https://api.orerve.net/2.0/website/squadron/season/leaderboards', + params={'leaderboardType': leaderboard_type, 'platform': platform}) + + return { + 'leaderboard': SAPIRequest.json()['leaderboards'][leaderboard_type], # list + 'platform': platform, # str + 'type': leaderboard_type # str + } + + # return SAPIRequest.json()['leaderboards'][leaderboard_type] + + +def insert_leaderboard_db(db_conn: sqlite3.Connection, leaderboard_list: dict) -> None: + """ + Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB + + :param db_conn: Connection to DB + :param leaderboard_list: list from request_leaderboard + :return: + """ + + platform: str = leaderboard_list['platform'] + LB_type: str = leaderboard_list['type'] + leaderboard: list = leaderboard_list['leaderboard'] + + action_id: int # not last, current that we will use + + sql_req_action_id: sqlite3.Cursor = db_conn.execute(sql_requests.select_last_action_id) + action_id_fetch_one: typing.Union[None, tuple[int]] = sql_req_action_id.fetchone() + if action_id_fetch_one is None: + # i.e. first launch + action_id = 1 # yep, not 0 + + else: + action_id = action_id_fetch_one[0] + 1 + + # Patch for additional values + for squad in leaderboard: + squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform}) + + with db_conn: + db_conn.executemany( + sql_requests.insert_leader_board, + leaderboard) + + +def get_and_save_leaderboard(platform_enum: utils.Platform, + leaderboard_type_enum: utils.LeaderboardTypes, + db_conn: sqlite3.Connection) -> None: + """ + High logic function to get and save information about specified for type and platform leaderboard + + :param platform_enum: + :param leaderboard_type_enum: + :param db_conn: + :return: + """ + + req = request_leaderboard(platform_enum, leaderboard_type_enum) + insert_leaderboard_db(db_conn, req) + + +def main(): + """ + Run in specified mode from command line + + main.py update all + - make all 21 requests (7 leaderboards * 3 platforms) + + main.py update + - update specified leaderboard for all platforms (3 requests) + + main.py update + - update specified leaderboard for specified platform + + :return: + """ + + from sys import argv + + help_msg: str = main.__doc__[46:-19] + + def failed_args(exit_code: int = 0): + print(help_msg) + exit(exit_code) + + if len(argv) == 3: # update all + if argv[1] == 'update' and argv[2] == 'all': + # main.py update all + for platform_enum in utils.Platform: + for LB_type_enum in utils.LeaderboardTypes: + get_and_save_leaderboard(platform_enum, LB_type_enum, db) + + exit(0) + + elif argv[1] == 'update': + # main.py update + leaderboard: str = argv[2].lower() + try: + leaderboard_enum: utils.LeaderboardTypes = utils.LeaderboardTypes(leaderboard) + + for platform_enum in utils.Platform: + get_and_save_leaderboard(platform_enum, leaderboard_enum, db) + + exit(0) + + except ValueError: + print('leaderboard must be correct leaderboard type') + exit(1) + + else: + failed_args(1) + + elif len(argv) == 4: + # main.py update + if argv[1] == 'update': + leaderboard = argv[2].lower() + platform = argv[3].upper() + + try: + leaderboard_enum: utils.LeaderboardTypes = utils.LeaderboardTypes(leaderboard) + platform_enum: utils.Platform = utils.Platform(platform) + + get_and_save_leaderboard(platform_enum, leaderboard_enum, db) + exit(0) + + except ValueError: + print('leaderboard must be correct leaderboard type, platform must be correct platform') + exit(0) + else: + failed_args(1) + + +if __name__ == '__main__': + main() diff --git a/model.py b/model.py new file mode 100644 index 0000000..c75bacb --- /dev/null +++ b/model.py @@ -0,0 +1,23 @@ +import sqlite3 +import sql_requests +import utils + + +db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False) + +# thx https://stackoverflow.com/a/48789604 +db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) + +cur = db.cursor() + + +def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp) -> list: + sql_req: sqlite3.Cursor = db.execute(sql_requests.select_activity, { + 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, + 'platform': utils.Platform(platform.upper()).value, + 'limit': limit, + 'high_timestamp': high_timestamp, + 'low_timestamp': low_timestamp + }) + + return sql_req.fetchall() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..e1c499c993d20fe0ccfda8068692b4630751e022 GIT binary patch literal 284 zcmXw!TMEK35Jm5|;8HAU3)Ua^Ah?1WEf#CVJ`nwI>FSvY5+=~(-gDCLC+eg@lUg;* zmQ^S)xiTiOZ?x1xQD@a8NAiUHDm%2`M6<&@OUyj6Lt)sbZF+!9vlr#n5hryq5J zXDzc;ZZy{b?XH=9#FY{Sw}sa1!5K5D&fN#7e@03qw~-ouU&PhxP0HS2|CBr9!EKAd Md?U*e=CMT34+>u@HUIzs literal 0 HcmV?d00001 diff --git a/sql_requests.py b/sql_requests.py new file mode 100644 index 0000000..d8c9661 --- /dev/null +++ b/sql_requests.py @@ -0,0 +1,58 @@ +schema_create = """create table if not exists squads_stats_states ( +action_id integer, +leaderboard_type string, +platform string, +squadron_id integer, +score integer, +percentile integer, +rank integer, +name string, +tag string, +timestamp default current_timestamp); + +create view if not exists current_cqc_pc as +select * from squads_stats_states where action_id in +(select distinct action_id +from squads_stats_states +where leaderboard_type = 'cqc' and platform = 'PC' +order by action_id desc limit 1) and platform = 'PC'; + +create view if not exists prev_cqc_pc as +select * +from squads_stats_states +where action_id in +(select distinct action_id +from squads_stats_states +where leaderboard_type = 'cqc' and platform = 'PC' order by action_id desc limit 1, 1) and platform = 'PC'; + +create index if not exists idx_action_id_0 on squads_stats_states (action_id); +create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type); + +create view if not exists diff_pc_cqc as +select current_cqc_pc.name, current_cqc_pc.score, prev_cqc_pc.score, current_cqc_pc.score - prev_cqc_pc.score as diff +from current_cqc_pc left outer join prev_cqc_pc on prev_cqc_pc.squadron_id = current_cqc_pc.squadron_id;""" + +select_last_action_id = """select action_id +from squads_stats_states +order by action_id desc +limit 1;""" + +insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score, +percentile, rank, name, tag) +values (:action_id, :LB_type, :platform, :squadron, :score, :percentile, :rank, :name, :tag);""" + +select_activity = """select *, sum_score - sum_score_old as diff from +(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old +from ( + select sum(score) as sum_score, timestamp, action_id + from squads_stats_states + where + leaderboard_type = :LB_type and + platform = :platform and + :high_timestamp >= timestamp and + timestamp >= :low_timestamp + group by action_id + ) +group by sum_score +order by timestamp desc +limit :limit);""" diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..d33dc7f --- /dev/null +++ b/utils.py @@ -0,0 +1,145 @@ +import json +import os +import time +import enum + +import requests + + +from EDMCLogging import get_main_logger + +logger = get_main_logger() + +BASE_URL = 'https://api.orerve.net/2.0/website/squadron/' +INFO_ENDPOINT = 'info' +NEWS_ENDPOINT = 'news/list' + +TIME_BETWEEN_REQUESTS: float = 3.0 +if os.getenv("JUBILANT_TIME_BETWEEN_REQUESTS") is not None: + try: + TIME_BETWEEN_REQUESTS = float(os.getenv("JUBILANT_TIME_BETWEEN_REQUESTS")) + + except TypeError: # env doesn't contain a float + pass + +logger.debug(f'TIME_BETWEEN_REQUESTS = {TIME_BETWEEN_REQUESTS} {type(TIME_BETWEEN_REQUESTS)}') + +# proxy: last request time +# ssh -C2 -T -n -N -D 2081 +try: + PROXIES_DICT: list[dict] = json.load(open('proxies.json', 'r')) + +except FileNotFoundError: + PROXIES_DICT: list[dict] = [{'url': None, 'last_try': 0}] + + +# ofc I could do it without enums, I just wanted to try them +class Platform(enum.Enum): + """ + Enumeration for platforms + """ + PC = 'PC' + PS4 = 'PS4' + XBOX = 'XBOX' + + +class LeaderboardTypes(enum.Enum): + EXPLORATION = 'exploration' + CQC = 'cqc' + COMBAT = 'combat' + AEGIS = 'aegis' + BGS = 'bgs' + POWERPLAY = 'powerplay' + TRADE = 'trade' + + +class FAPIDownForMaintenance(Exception): + pass + + +def proxied_request(url: str, method: str = 'get', **kwargs) -> requests.Response: + """Makes request through one of proxies in round robin manner, respects fdev request kd for every proxy + + :param url: url to request + :param method: method to use in request + :param kwargs: kwargs + :return: requests.Response object + + detect oldest used proxy + if selected proxy is banned, then switch to next + detect how many we have to sleep to respect it 3 sec timeout for each proxy + sleep it + perform request with it + if request failed -> write last_try for current proxy and try next proxy + """ + + global PROXIES_DICT + + while True: + + selected_proxy = min(PROXIES_DICT, key=lambda x: x['last_try']) + logger.debug(f'Requesting {method.upper()} {url!r}, kwargs: {kwargs}; Using {selected_proxy["url"]} proxy') + + # let's detect how much we have to wait + time_to_sleep: float = (selected_proxy['last_try'] + TIME_BETWEEN_REQUESTS) - time.time() + + if 0 < time_to_sleep <= TIME_BETWEEN_REQUESTS: + logger.debug(f'Sleeping {time_to_sleep} s') + time.sleep(time_to_sleep) + + if selected_proxy['url'] is None: + proxies: dict = None # noqa + + else: + proxies: dict = {'https': selected_proxy['url']} + + try: + proxiedFapiRequest: requests.Response = requests.request( + method=method, + url=url, + proxies=proxies, + headers={'Authorization': f'Bearer {_get_bearer()}'}, + **kwargs + ) + + logger.debug(f'Request complete, code {proxiedFapiRequest.status_code!r}, len ' + f'{len(proxiedFapiRequest.content)}') + + except requests.exceptions.ConnectionError as e: + logger.error(f'Proxy {selected_proxy["url"]} is invalid: {str(e.__class__.__name__)}') + selected_proxy['last_try'] = time.time() # because link, lol + continue + + selected_proxy['last_try'] = time.time() # because link, lol + + if proxiedFapiRequest.status_code == 418: # FAPI is on maintenance + logger.warning(f'{method.upper()} {proxiedFapiRequest.url} returned 418, content dump:\n' + f'{proxiedFapiRequest.content}') + + raise FAPIDownForMaintenance + + elif proxiedFapiRequest.status_code != 200: + logger.warning(f"Request to {method.upper()} {url!r} with kwargs: {kwargs}, using {selected_proxy['url']} " + f"proxy ends with {proxiedFapiRequest.status_code} status code, content: " + f"{proxiedFapiRequest.content}") + + return proxiedFapiRequest + + +def _get_bearer() -> str: + """Gets bearer token from capi.demb.design (companion-api project, I will upload it on GH one day...) + + :return: bearer token as str + """ + bearer_request: requests.Response = requests.get( + url='https://capi.demb.design/random_token', headers={'auth': os.environ['DEMB_CAPI_AUTH']}) + + try: + bearer: str = bearer_request.json()['access_token'] + + except Exception as e: + logger.exception(f'Unable to parse capi.demb.design answer\nrequested: {bearer_request.url!r}\n' + f'code: {bearer_request.status_code!r}\nresponse: {bearer_request.content!r}', exc_info=e) + raise e + + return bearer diff --git a/web.py b/web.py new file mode 100644 index 0000000..5b0c6c2 --- /dev/null +++ b/web.py @@ -0,0 +1,36 @@ +import waitress + +import model +import json +import falcon + +""" +Request /activity/cqc?platform=pc[&limit=50&after=&before] + +""" + + +class Activity: + def on_get(self, req: falcon.request.Request, resp: falcon.response.Response, leaderboard: str) -> None: + resp.content_type = falcon.MEDIA_JSON + + args_activity_changes = { + 'platform': req.params.get('platform', 'pc'), + 'leaderboard_type': leaderboard, + 'limit': req.params.get('limit', 10), + 'high_timestamp': req.params.get('before', 'a'), + 'low_timestamp': req.params.get('after', 0) + } + + try: + resp.text = json.dumps(model.get_activity_changes(**args_activity_changes)) + + except Exception as e: + resp.text = json.dumps({'status': 'error', 'msg': str(e)}) + + +app = falcon.App() +app.add_route('/activity/{leaderboard}', Activity()) + +if __name__ == '__main__': + waitress.serve(app, host='127.0.0.1', port=9485)