FDEV-CAPI-Handler/capi/__init__.py
2021-12-05 23:21:07 +03:00

176 lines
6.1 KiB
Python

from . import model
from . import utils
from . import exceptions
import base64
import os
import time
from typing import Union
import config
from EDMCLogging import get_main_logger
logger = get_main_logger()
class CAPIAuthorizer:
def __init__(self, _model: model.Model):
self.model: model.Model = _model
def auth_init(self) -> str:
"""
Generates initial url for fdev auth
:return:
"""
code_verifier = base64.urlsafe_b64encode(os.urandom(32))
code_challenge = utils.generate_challenge(code_verifier)
state_string = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8")
self.model.auth_init(code_verifier.decode('utf-8'), state_string)
redirect_user_to_fdev = f"{config.AUTH_URL}?" \
f"audience=all&" \
f"scope=capi&" \
f"response_type=code&" \
f"client_id={config.CLIENT_ID}&" \
f"code_challenge={code_challenge}&" \
f"code_challenge_method=S256&" \
f"state={state_string}&" \
f"redirect_uri={config.REDIRECT_URL}"
return redirect_user_to_fdev
def fdev_callback(self, code: str, state: str) -> dict[str, str]:
if type(code) is not str or type(state) != str:
raise TypeError('code and state must be strings')
code_verifier = self.model.get_verifier(state)
self.model.set_code(code, state)
token_request = utils.get_tokens_request(code, code_verifier)
try:
token_request.raise_for_status()
except Exception as e:
logger.exception(
f'token_request failed. Status: {token_request.status_code!r}, text: {token_request.text!r}',
exc_info=e
)
# self.model.delete_row(state)
raise e
tokens = token_request.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]
expires_in = tokens["expires_in"]
timestamp_got_expires_in = int(time.time())
self.model.set_tokens(access_token, refresh_token, expires_in, timestamp_got_expires_in, state)
try:
nickname = utils.get_nickname(access_token)
except Exception as e:
logger.warning(f"Couldn't get nickname for state: {state!r}", exc_info=e)
raise KeyError(f"Couldn't get nickname for state: {state!r}")
msg = {'status': 'ok', 'description': '', 'state': ''}
if not self.model.set_nickname(nickname, state):
msg['description'] = 'Tokens updated'
else:
msg['description'] = 'Tokens saved'
msg['state'] = self.model.get_state_by_nickname(nickname)
return msg
def get_token_by_state(self, state: str) -> Union[dict, None]:
self.refresh_by_state(state)
row = self.model.get_token_for_user(state)
if row is None:
return None
row['expires_over'] = int(row['expires_on']) - int(time.time())
return row
def refresh_by_state(self, state: str, force_refresh=False, failure_tolerance=True) -> dict:
"""
:param state:
:param force_refresh: if we should update token when its time hasn't come
:param failure_tolerance: if we shouldn't remove row in case of update's failure
:return:
"""
msg = {'status': '', 'description': '', 'state': ''}
row = self.model.get_row(state)
if row is None:
# No such state in DB
msg['status'] = 'error'
msg['description'] = 'No such state in DB'
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
msg['state'] = state
if int(time.time()) < int(row['timestamp_got_expires_in']) + int(row['expires_in']) and not force_refresh:
msg['status'] = 'ok'
msg['description'] = "Didn't refresh since it isn't required"
return msg # token isn't expired and we don't force updating
try:
refresh_request = utils.refresh_request(row['refresh_token'])
if refresh_request.status_code == 418: # Server's maintenance
logger.warning(f'FDEV maintenance 418, text: {refresh_request.text!r}')
msg['status'] = 'error'
msg['description'] = 'FDEV on maintenance'
raise exceptions.RefreshFail(msg['message'], msg['status'], state)
refresh_request.raise_for_status()
tokens = refresh_request.json()
tokens['timestamp_got_expires_in'] = int(time.time())
self.model.set_tokens(*tokens, state)
msg['status'] = 'ok'
msg['description'] = 'Token were successfully updated'
return msg
except Exception as e:
# probably here something don't work
logger.warning(f'Fail on refreshing token for {state}, row:{row}')
msg['status'] = 'error'
self.model.increment_refresh_tries(state)
if not failure_tolerance:
if row['refresh_tries'] >= 5: # hardcoded limit
msg['description'] = "Refresh failed. You were removed from DB due to refresh rate limitings"
self.model.delete_row(state)
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
msg['description'] = "Refresh failed. Try later"
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
def delete_by_state(self, state: str) -> None:
self.model.delete_row(state)
def list_all_users(self) -> list[dict]:
return self.model.list_all_records()
def cleanup_orphans(self) -> None:
self.model.cleanup_orphans_records()
capi_authorizer = CAPIAuthorizer(model.Model())