This commit is contained in:
norohind 2021-11-19 00:10:20 +03:00
commit 5b1216dd41
Signed by: norohind
GPG Key ID: 01C3BECC26FB59E1
7 changed files with 893 additions and 0 deletions

448
EDMCLogging.py Normal file
View File

@ -0,0 +1,448 @@
"""
Set up required logging for the application.
This module provides for a common logging-powered log facility.
Mostly it implements a logging.Filter() in order to get two extra
members on the logging.LogRecord instance for use in logging.Formatter()
strings.
If type checking, e.g. mypy, objects to `logging.trace(...)` then include this
stanza:
# See EDMCLogging.py docs.
# isort: off
if TYPE_CHECKING:
from logging import trace, TRACE # type: ignore # noqa: F401
# isort: on
This is needed because we add the TRACE level and the trace() function
ourselves at runtime.
To utilise logging in core code, or internal plugins, include this:
from EDMCLogging import get_main_logger
logger = get_main_logger()
To utilise logging in a 'found' (third-party) plugin, include this:
import os
import logging
plugin_name = os.path.basename(os.path.dirname(__file__))
# plugin_name here *must* be the name of the folder the plugin resides in
# See, plug.py:load_plugins()
logger = logging.getLogger(f'{appname}.{plugin_name}')
"""
import inspect
import logging
import logging.handlers
import os
from contextlib import suppress
from fnmatch import fnmatch
# So that any warning about accessing a protected member is only in one place.
from sys import _getframe as getframe
from threading import get_native_id as thread_native_id
from traceback import print_exc
from typing import TYPE_CHECKING, Tuple, cast
# TODO: Tests:
#
# 1. Call from bare function in file.
# 2. Call from `if __name__ == "__main__":` section
#
# 3. Call from 1st level function in 1st level Class in file
# 4. Call from 2nd level function in 1st level Class in file
# 5. Call from 3rd level function in 1st level Class in file
#
# 6. Call from 1st level function in 2nd level Class in file
# 7. Call from 2nd level function in 2nd level Class in file
# 8. Call from 3rd level function in 2nd level Class in file
#
# 9. Call from 1st level function in 3rd level Class in file
# 10. Call from 2nd level function in 3rd level Class in file
# 11. Call from 3rd level function in 3rd level Class in file
#
# 12. Call from 2nd level file, all as above.
#
# 13. Call from *module*
#
# 14. Call from *package*
_default_loglevel = logging.DEBUG
# Define a TRACE level
LEVEL_TRACE = 5
LEVEL_TRACE_ALL = 3
logging.addLevelName(LEVEL_TRACE, "TRACE")
logging.addLevelName(LEVEL_TRACE_ALL, "TRACE_ALL")
logging.TRACE = LEVEL_TRACE # type: ignore
logging.TRACE_ALL = LEVEL_TRACE_ALL # type: ignore
logging.Logger.trace = lambda self, message, *args, **kwargs: self._log( # type: ignore
logging.TRACE, # type: ignore
message,
args,
**kwargs
)
def _trace_if(self: logging.Logger, condition: str, message: str, *args, **kwargs) -> None:
if any(fnmatch(condition, p) for p in []):
self._log(logging.TRACE, message, args, **kwargs) # type: ignore # we added it
return
self._log(logging.TRACE_ALL, message, args, **kwargs) # type: ignore # we added it
logging.Logger.trace_if = _trace_if # type: ignore
# we cant hide this from `from xxx` imports and I'd really rather no-one other than `logging` had access to it
del _trace_if
if TYPE_CHECKING:
from types import FrameType
# Fake type that we can use here to tell type checkers that trace exists
class LoggerMixin(logging.Logger):
"""LoggerMixin is a fake class that tells type checkers that trace exists on a given type."""
def trace(self, message, *args, **kwargs) -> None:
"""See implementation above."""
...
def trace_if(self, condition: str, message, *args, **kwargs) -> None:
"""
Fake trace if method, traces only if condition exists in trace_on.
See implementation above.
"""
...
class Logger:
"""
Wrapper class for all logging configuration and code.
Class instantiation requires the 'logger name' and optional loglevel.
It is intended that this 'logger name' be re-used in all files/modules
that need to log.
Users of this class should then call getLogger() to get the
logging.Logger instance.
"""
def __init__(self, logger_name: str, loglevel: int = _default_loglevel):
"""
Set up a `logging.Logger` with our preferred configuration.
This includes using an EDMCContextFilter to add 'class' and 'qualname'
expansions for logging.Formatter().
"""
self.logger = logging.getLogger(logger_name)
# Configure the logging.Logger
# This needs to always be TRACE in order to let TRACE level messages
# through to check the *handler* levels.
self.logger.setLevel(logging.TRACE) # type: ignore
# Set up filter for adding class name
self.logger_filter = EDMCContextFilter()
self.logger.addFilter(self.logger_filter)
# Our basic channel handling stdout
self.logger_channel = logging.StreamHandler()
# This should be affected by the user configured log level
self.logger_channel.setLevel(loglevel)
self.logger_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(process)d:%(thread)d:%(osthreadid)d %(module)s.%(qualname)s:%(lineno)d: %(message)s') # noqa: E501
self.logger_formatter.default_time_format = '%Y-%m-%d %H:%M:%S'
self.logger_formatter.default_msec_format = '%s.%03d'
self.logger_channel.setFormatter(self.logger_formatter)
self.logger.addHandler(self.logger_channel)
def get_logger(self) -> 'LoggerMixin':
"""
Obtain the self.logger of the class instance.
Not to be confused with logging.getLogger().
"""
return cast('LoggerMixin', self.logger)
def get_streamhandler(self) -> logging.Handler:
"""
Obtain the self.logger_channel StreamHandler instance.
:return: logging.StreamHandler
"""
return self.logger_channel
def set_channels_loglevel(self, level: int) -> None:
"""
Set the specified log level on the channels.
:param level: A valid `logging` level.
:return: None
"""
self.logger_channel.setLevel(level)
def set_console_loglevel(self, level: int) -> None:
"""
Set the specified log level on the console channel.
:param level: A valid `logging` level.
:return: None
"""
if self.logger_channel.level != logging.TRACE: # type: ignore
self.logger_channel.setLevel(level)
else:
logger.trace("Not changing log level because it's TRACE") # type: ignore
class EDMCContextFilter(logging.Filter):
"""
Implements filtering to add extra format specifiers, and tweak others.
logging.Filter sub-class to place extra attributes of the calling site
into the record.
"""
def filter(self, record: logging.LogRecord) -> bool:
"""
Attempt to set/change fields in the LogRecord.
1. class = class name(s) of the call site, if applicable
2. qualname = __qualname__ of the call site. This simplifies
logging.Formatter() as you can use just this no matter if there is
a class involved or not, so you get a nice clean:
<file/module>.<classA>[.classB....].<function>
3. osthreadid = OS level thread ID.
If we fail to be able to properly set either then:
1. Use print() to alert, to be SURE a message is seen.
2. But also return strings noting the error, so there'll be
something in the log output if it happens.
:param record: The LogRecord we're "filtering"
:return: bool - Always true in order for this record to be logged.
"""
(class_name, qualname, module_name) = self.caller_attributes(module_name=getattr(record, 'module'))
# Only set if we got a useful value
if module_name:
setattr(record, 'module', module_name)
# Only set if not already provided by logging itself
if getattr(record, 'class', None) is None:
setattr(record, 'class', class_name)
# Only set if not already provided by logging itself
if getattr(record, 'qualname', None) is None:
setattr(record, 'qualname', qualname)
setattr(record, 'osthreadid', thread_native_id())
return True
@classmethod
def caller_attributes(cls, module_name: str = '') -> Tuple[str, str, str]: # noqa: CCR001, E501, C901 # this is as refactored as is sensible
"""
Determine extra or changed fields for the caller.
1. qualname finds the relevant object and its __qualname__
2. caller_class_names is just the full class names of the calling
class if relevant.
3. module is munged if we detect the caller is an EDMC plugin,
whether internal or found.
:param module_name: The name of the calling module.
:return: Tuple[str, str, str] - class_name, qualname, module_name
"""
frame = cls.find_caller_frame()
caller_qualname = caller_class_names = ''
if frame:
# <https://stackoverflow.com/questions/2203424/python-how-to-retrieve-class-information-from-a-frame-object#2220759>
try:
frame_info = inspect.getframeinfo(frame)
# raise(IndexError) # TODO: Remove, only for testing
except Exception:
# Separate from the print below to guarantee we see at least this much.
print('EDMCLogging:EDMCContextFilter:caller_attributes(): Failed in `inspect.getframinfo(frame)`')
# We want to *attempt* to show something about the nature of 'frame',
# but at this point we can't trust it will work.
try:
print(f'frame: {frame}')
except Exception:
pass
# We've given up, so just return '??' to signal we couldn't get the info
return '??', '??', module_name
try:
args, _, _, value_dict = inspect.getargvalues(frame)
if len(args) and args[0] in ('self', 'cls'):
frame_class: 'object' = value_dict[args[0]]
if frame_class:
# See https://en.wikipedia.org/wiki/Name_mangling#Python for how name mangling works.
# For more detail, see _Py_Mangle in CPython's Python/compile.c.
name = frame_info.function
class_name = frame_class.__class__.__name__.lstrip("_")
if name.startswith("__") and not name.endswith("__") and class_name:
name = f'_{class_name}{frame_info.function}'
# Find __qualname__ of the caller
fn = inspect.getattr_static(frame_class, name, None)
if fn is None:
# For some reason getattr_static cant grab this. Try and grab it with getattr, bail out
# if we get a RecursionError indicating a property
try:
fn = getattr(frame_class, name, None)
except RecursionError:
print(
"EDMCLogging:EDMCContextFilter:caller_attributes():"
"Failed to get attribute for function info. Bailing out"
)
# class_name is better than nothing for __qualname__
return class_name, class_name, module_name
if fn is not None:
if isinstance(fn, property):
class_name = str(frame_class)
# If somehow you make your __class__ or __class__.__qualname__ recursive,
# I'll be impressed.
if hasattr(frame_class, '__class__') and hasattr(frame_class.__class__, "__qualname__"):
class_name = frame_class.__class__.__qualname__
caller_qualname = f"{class_name}.{name}(property)"
else:
caller_qualname = f"<property {name} on {class_name}>"
elif not hasattr(fn, '__qualname__'):
caller_qualname = name
elif hasattr(fn, '__qualname__') and fn.__qualname__:
caller_qualname = fn.__qualname__
# Find containing class name(s) of caller, if any
if (
frame_class.__class__ and hasattr(frame_class.__class__, '__qualname__')
and frame_class.__class__.__qualname__
):
caller_class_names = frame_class.__class__.__qualname__
# It's a call from the top level module file
elif frame_info.function == '<module>':
caller_class_names = '<none>'
caller_qualname = value_dict['__name__']
elif frame_info.function != '':
caller_class_names = '<none>'
caller_qualname = frame_info.function
module_name = cls.munge_module_name(frame_info, module_name)
except Exception as e:
print('ALERT! Something went VERY wrong in handling finding info to log')
print('ALERT! Information is as follows')
with suppress(Exception):
print(f'ALERT! {e=}')
print_exc()
print(f'ALERT! {frame=}')
with suppress(Exception):
print(f'ALERT! {fn=}') # type: ignore
with suppress(Exception):
print(f'ALERT! {cls=}')
finally: # Ensure this always happens
# https://docs.python.org/3.7/library/inspect.html#the-interpreter-stack
del frame
if caller_qualname == '':
print('ALERT! Something went wrong with finding caller qualname for logging!')
caller_qualname = '<ERROR in EDMCLogging.caller_class_and_qualname() for "qualname">'
if caller_class_names == '':
print('ALERT! Something went wrong with finding caller class name(s) for logging!')
caller_class_names = '<ERROR in EDMCLogging.caller_class_and_qualname() for "class">'
return caller_class_names, caller_qualname, module_name
@classmethod
def find_caller_frame(cls):
"""
Find the stack frame of the logging caller.
:returns: 'frame' object such as from sys._getframe()
"""
# Go up through stack frames until we find the first with a
# type(f_locals.self) of logging.Logger. This should be the start
# of the frames internal to logging.
frame: 'FrameType' = getframe(0)
while frame:
if isinstance(frame.f_locals.get('self'), logging.Logger):
frame = cast('FrameType', frame.f_back) # Want to start on the next frame below
break
frame = cast('FrameType', frame.f_back)
# Now continue up through frames until we find the next one where
# that is *not* true, as it should be the call site of the logger
# call
while frame:
if not isinstance(frame.f_locals.get('self'), logging.Logger):
break # We've found the frame we want
frame = cast('FrameType', frame.f_back)
return frame
@classmethod
def munge_module_name(cls, frame_info: inspect.Traceback, module_name: str) -> str:
"""
Adjust module_name based on the file path for the given frame.
We want to distinguish between other code and both our internal plugins
and the 'found' ones.
For internal plugins we want "plugins.<filename>".
For 'found' plugins we want "<plugins>.<plugin_name>...".
:param frame_info: The frame_info of the caller.
:param module_name: The module_name string to munge.
:return: The munged module_name.
"""
# file_name = pathlib.Path(frame_info.filename).expanduser()
# plugin_dir = pathlib.Path(config.plugin_dir_path).expanduser()
# internal_plugin_dir = pathlib.Path(config.internal_plugin_dir_path).expanduser()
# if internal_plugin_dir in file_name.parents:
# # its an internal plugin
# return f'plugins.{".".join(file_name.relative_to(internal_plugin_dir).parent.parts)}'
# elif plugin_dir in file_name.parents:
# return f'<plugin>.{".".join(file_name.relative_to(plugin_dir).parent.parts)}'
return module_name
def get_main_logger(sublogger_name: str = '') -> 'LoggerMixin':
"""Return the correct logger for how the program is being run. (outdated)"""
# if not os.getenv("EDMC_NO_UI"):
# # GUI app being run
# return cast('LoggerMixin', logging.getLogger(appname))
# else:
# # Must be the CLI
# return cast('LoggerMixin', logging.getLogger(appcmdname))
return cast('LoggerMixin', logging.getLogger(__name__))
# Singleton
loglevel = logging._nameToLevel.get(os.getenv('LOG_LEVEL', 'DEBUG').upper(), logging.DEBUG) # noqa:
base_logger_name = __name__
edmclogger = Logger(base_logger_name, loglevel=loglevel)
logger: 'LoggerMixin' = edmclogger.get_logger()

183
main.py Normal file
View File

@ -0,0 +1,183 @@
"""
Run by third side (i.e. cron)
Request all or specified combination of platform + activity
Write results to sqlite DB in journal like format
AP - appropriate field from json
DB structure
states:
action_id integer - id of insertion, all records from one request will have the same action_id
leaderboard_type string - (AP)
platform string - platform of data
squadron_id integer - (squadron field)
score integer - (AP)
percentile integer - (AP)
rank integer - (AP)
name string - (AP)
tag string - (AP)
timestamp - inserts by DB, default TIMESTAMP
"""
import typing
import requests
import sqlite3
import sql_requests
# from EDMCLogging import get_main_logger
import utils
db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3')
db.executescript(sql_requests.schema_create)
def request_leaderboard(platform_enum: utils.Platform, leaderboard_type_enum: utils.LeaderboardTypes) -> dict:
"""
Requests specified leaderboard and returns list of squads in specified leaderboard
:param platform_enum: leaderboard platform
:param leaderboard_type_enum: leaderboard type
:return: list of squads from leaderboard
"""
platform: str = platform_enum.value
leaderboard_type = leaderboard_type_enum.value
SAPIRequest: requests.Response = utils.proxied_request(
'https://api.orerve.net/2.0/website/squadron/season/leaderboards',
params={'leaderboardType': leaderboard_type, 'platform': platform})
return {
'leaderboard': SAPIRequest.json()['leaderboards'][leaderboard_type], # list
'platform': platform, # str
'type': leaderboard_type # str
}
# return SAPIRequest.json()['leaderboards'][leaderboard_type]
def insert_leaderboard_db(db_conn: sqlite3.Connection, leaderboard_list: dict) -> None:
"""
Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB
:param db_conn: Connection to DB
:param leaderboard_list: list from request_leaderboard
:return:
"""
platform: str = leaderboard_list['platform']
LB_type: str = leaderboard_list['type']
leaderboard: list = leaderboard_list['leaderboard']
action_id: int # not last, current that we will use
sql_req_action_id: sqlite3.Cursor = db_conn.execute(sql_requests.select_last_action_id)
action_id_fetch_one: typing.Union[None, tuple[int]] = sql_req_action_id.fetchone()
if action_id_fetch_one is None:
# i.e. first launch
action_id = 1 # yep, not 0
else:
action_id = action_id_fetch_one[0] + 1
# Patch for additional values
for squad in leaderboard:
squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform})
with db_conn:
db_conn.executemany(
sql_requests.insert_leader_board,
leaderboard)
def get_and_save_leaderboard(platform_enum: utils.Platform,
leaderboard_type_enum: utils.LeaderboardTypes,
db_conn: sqlite3.Connection) -> None:
"""
High logic function to get and save information about specified for type and platform leaderboard
:param platform_enum:
:param leaderboard_type_enum:
:param db_conn:
:return:
"""
req = request_leaderboard(platform_enum, leaderboard_type_enum)
insert_leaderboard_db(db_conn, req)
def main():
"""
Run in specified mode from command line
main.py update all
- make all 21 requests (7 leaderboards * 3 platforms)
main.py update <leaderboard: string>
- update specified leaderboard for all platforms (3 requests)
main.py update <leaderboard: string> <platform: string>
- update specified leaderboard for specified platform
:return:
"""
from sys import argv
help_msg: str = main.__doc__[46:-19]
def failed_args(exit_code: int = 0):
print(help_msg)
exit(exit_code)
if len(argv) == 3: # update all
if argv[1] == 'update' and argv[2] == 'all':
# main.py update all
for platform_enum in utils.Platform:
for LB_type_enum in utils.LeaderboardTypes:
get_and_save_leaderboard(platform_enum, LB_type_enum, db)
exit(0)
elif argv[1] == 'update':
# main.py update <leaderboard: string>
leaderboard: str = argv[2].lower()
try:
leaderboard_enum: utils.LeaderboardTypes = utils.LeaderboardTypes(leaderboard)
for platform_enum in utils.Platform:
get_and_save_leaderboard(platform_enum, leaderboard_enum, db)
exit(0)
except ValueError:
print('leaderboard must be correct leaderboard type')
exit(1)
else:
failed_args(1)
elif len(argv) == 4:
# main.py update <leaderboard: string> <platform: string>
if argv[1] == 'update':
leaderboard = argv[2].lower()
platform = argv[3].upper()
try:
leaderboard_enum: utils.LeaderboardTypes = utils.LeaderboardTypes(leaderboard)
platform_enum: utils.Platform = utils.Platform(platform)
get_and_save_leaderboard(platform_enum, leaderboard_enum, db)
exit(0)
except ValueError:
print('leaderboard must be correct leaderboard type, platform must be correct platform')
exit(0)
else:
failed_args(1)
if __name__ == '__main__':
main()

23
model.py Normal file
View File

@ -0,0 +1,23 @@
import sqlite3
import sql_requests
import utils
db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False)
# thx https://stackoverflow.com/a/48789604
db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
cur = db.cursor()
def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp) -> list:
sql_req: sqlite3.Cursor = db.execute(sql_requests.select_activity, {
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
'platform': utils.Platform(platform.upper()).value,
'limit': limit,
'high_timestamp': high_timestamp,
'low_timestamp': low_timestamp
})
return sql_req.fetchall()

BIN
requirements.txt Normal file

Binary file not shown.

58
sql_requests.py Normal file
View File

@ -0,0 +1,58 @@
schema_create = """create table if not exists squads_stats_states (
action_id integer,
leaderboard_type string,
platform string,
squadron_id integer,
score integer,
percentile integer,
rank integer,
name string,
tag string,
timestamp default current_timestamp);
create view if not exists current_cqc_pc as
select * from squads_stats_states where action_id in
(select distinct action_id
from squads_stats_states
where leaderboard_type = 'cqc' and platform = 'PC'
order by action_id desc limit 1) and platform = 'PC';
create view if not exists prev_cqc_pc as
select *
from squads_stats_states
where action_id in
(select distinct action_id
from squads_stats_states
where leaderboard_type = 'cqc' and platform = 'PC' order by action_id desc limit 1, 1) and platform = 'PC';
create index if not exists idx_action_id_0 on squads_stats_states (action_id);
create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);
create view if not exists diff_pc_cqc as
select current_cqc_pc.name, current_cqc_pc.score, prev_cqc_pc.score, current_cqc_pc.score - prev_cqc_pc.score as diff
from current_cqc_pc left outer join prev_cqc_pc on prev_cqc_pc.squadron_id = current_cqc_pc.squadron_id;"""
select_last_action_id = """select action_id
from squads_stats_states
order by action_id desc
limit 1;"""
insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score,
percentile, rank, name, tag)
values (:action_id, :LB_type, :platform, :squadron, :score, :percentile, :rank, :name, :tag);"""
select_activity = """select *, sum_score - sum_score_old as diff from
(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old
from (
select sum(score) as sum_score, timestamp, action_id
from squads_stats_states
where
leaderboard_type = :LB_type and
platform = :platform and
:high_timestamp >= timestamp and
timestamp >= :low_timestamp
group by action_id
)
group by sum_score
order by timestamp desc
limit :limit);"""

145
utils.py Normal file
View File

@ -0,0 +1,145 @@
import json
import os
import time
import enum
import requests
from EDMCLogging import get_main_logger
logger = get_main_logger()
BASE_URL = 'https://api.orerve.net/2.0/website/squadron/'
INFO_ENDPOINT = 'info'
NEWS_ENDPOINT = 'news/list'
TIME_BETWEEN_REQUESTS: float = 3.0
if os.getenv("JUBILANT_TIME_BETWEEN_REQUESTS") is not None:
try:
TIME_BETWEEN_REQUESTS = float(os.getenv("JUBILANT_TIME_BETWEEN_REQUESTS"))
except TypeError: # env doesn't contain a float
pass
logger.debug(f'TIME_BETWEEN_REQUESTS = {TIME_BETWEEN_REQUESTS} {type(TIME_BETWEEN_REQUESTS)}')
# proxy: last request time
# ssh -C2 -T -n -N -D 2081 <a server>
try:
PROXIES_DICT: list[dict] = json.load(open('proxies.json', 'r'))
except FileNotFoundError:
PROXIES_DICT: list[dict] = [{'url': None, 'last_try': 0}]
# ofc I could do it without enums, I just wanted to try them
class Platform(enum.Enum):
"""
Enumeration for platforms
"""
PC = 'PC'
PS4 = 'PS4'
XBOX = 'XBOX'
class LeaderboardTypes(enum.Enum):
EXPLORATION = 'exploration'
CQC = 'cqc'
COMBAT = 'combat'
AEGIS = 'aegis'
BGS = 'bgs'
POWERPLAY = 'powerplay'
TRADE = 'trade'
class FAPIDownForMaintenance(Exception):
pass
def proxied_request(url: str, method: str = 'get', **kwargs) -> requests.Response:
"""Makes request through one of proxies in round robin manner, respects fdev request kd for every proxy
:param url: url to request
:param method: method to use in request
:param kwargs: kwargs
:return: requests.Response object
detect oldest used proxy
if selected proxy is banned, then switch to next
detect how many we have to sleep to respect it 3 sec timeout for each proxy
sleep it
perform request with it
if request failed -> write last_try for current proxy and try next proxy
"""
global PROXIES_DICT
while True:
selected_proxy = min(PROXIES_DICT, key=lambda x: x['last_try'])
logger.debug(f'Requesting {method.upper()} {url!r}, kwargs: {kwargs}; Using {selected_proxy["url"]} proxy')
# let's detect how much we have to wait
time_to_sleep: float = (selected_proxy['last_try'] + TIME_BETWEEN_REQUESTS) - time.time()
if 0 < time_to_sleep <= TIME_BETWEEN_REQUESTS:
logger.debug(f'Sleeping {time_to_sleep} s')
time.sleep(time_to_sleep)
if selected_proxy['url'] is None:
proxies: dict = None # noqa
else:
proxies: dict = {'https': selected_proxy['url']}
try:
proxiedFapiRequest: requests.Response = requests.request(
method=method,
url=url,
proxies=proxies,
headers={'Authorization': f'Bearer {_get_bearer()}'},
**kwargs
)
logger.debug(f'Request complete, code {proxiedFapiRequest.status_code!r}, len '
f'{len(proxiedFapiRequest.content)}')
except requests.exceptions.ConnectionError as e:
logger.error(f'Proxy {selected_proxy["url"]} is invalid: {str(e.__class__.__name__)}')
selected_proxy['last_try'] = time.time() # because link, lol
continue
selected_proxy['last_try'] = time.time() # because link, lol
if proxiedFapiRequest.status_code == 418: # FAPI is on maintenance
logger.warning(f'{method.upper()} {proxiedFapiRequest.url} returned 418, content dump:\n'
f'{proxiedFapiRequest.content}')
raise FAPIDownForMaintenance
elif proxiedFapiRequest.status_code != 200:
logger.warning(f"Request to {method.upper()} {url!r} with kwargs: {kwargs}, using {selected_proxy['url']} "
f"proxy ends with {proxiedFapiRequest.status_code} status code, content: "
f"{proxiedFapiRequest.content}")
return proxiedFapiRequest
def _get_bearer() -> str:
"""Gets bearer token from capi.demb.design (companion-api project, I will upload it on GH one day...)
:return: bearer token as str
"""
bearer_request: requests.Response = requests.get(
url='https://capi.demb.design/random_token', headers={'auth': os.environ['DEMB_CAPI_AUTH']})
try:
bearer: str = bearer_request.json()['access_token']
except Exception as e:
logger.exception(f'Unable to parse capi.demb.design answer\nrequested: {bearer_request.url!r}\n'
f'code: {bearer_request.status_code!r}\nresponse: {bearer_request.content!r}', exc_info=e)
raise e
return bearer

36
web.py Normal file
View File

@ -0,0 +1,36 @@
import waitress
import model
import json
import falcon
"""
Request /activity/cqc?platform=pc[&limit=50&after=&before]
"""
class Activity:
def on_get(self, req: falcon.request.Request, resp: falcon.response.Response, leaderboard: str) -> None:
resp.content_type = falcon.MEDIA_JSON
args_activity_changes = {
'platform': req.params.get('platform', 'pc'),
'leaderboard_type': leaderboard,
'limit': req.params.get('limit', 10),
'high_timestamp': req.params.get('before', 'a'),
'low_timestamp': req.params.get('after', 0)
}
try:
resp.text = json.dumps(model.get_activity_changes(**args_activity_changes))
except Exception as e:
resp.text = json.dumps({'status': 'error', 'msg': str(e)})
app = falcon.App()
app.add_route('/activity/{leaderboard}', Activity())
if __name__ == '__main__':
waitress.serve(app, host='127.0.0.1', port=9485)