mirror of
https://github.com/norohind/FDEV-CAPI-Handler.git
synced 2025-04-20 00:57:35 +03:00
211 lines
7.4 KiB
Python
211 lines
7.4 KiB
Python
import requests
|
|
|
|
from . import model
|
|
from . import utils
|
|
from . import exceptions
|
|
|
|
import base64
|
|
import os
|
|
import time
|
|
from typing import Union
|
|
|
|
import config
|
|
from EDMCLogging import get_main_logger
|
|
|
|
logger = get_main_logger()
|
|
|
|
|
|
class CAPIAuthorizer:
|
|
def __init__(self, _model: model.Model):
|
|
self.model: model.Model = _model
|
|
|
|
def auth_init(self) -> str:
|
|
"""
|
|
Generates initial url for fdev auth
|
|
|
|
:return:
|
|
"""
|
|
|
|
code_verifier = base64.urlsafe_b64encode(os.urandom(32))
|
|
code_challenge = utils.generate_challenge(code_verifier)
|
|
state_string = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8")
|
|
|
|
self.model.auth_init(code_verifier.decode('utf-8'), state_string)
|
|
|
|
redirect_user_to_fdev = f"{config.AUTH_URL}?" \
|
|
f"audience=all&" \
|
|
f"scope=auth%20capi&" \
|
|
f"response_type=code&" \
|
|
f"client_id={config.CLIENT_ID}&" \
|
|
f"code_challenge={code_challenge}&" \
|
|
f"code_challenge_method=S256&" \
|
|
f"state={state_string}&" \
|
|
f"redirect_uri={config.REDIRECT_URL}"
|
|
|
|
return redirect_user_to_fdev
|
|
|
|
def fdev_callback(self, code: str, state: str) -> dict[str, str]:
|
|
if type(code) is not str or type(state) != str:
|
|
raise TypeError('code and state must be strings')
|
|
|
|
code_verifier = self.model.get_verifier(state)
|
|
|
|
self.model.set_code(code, state)
|
|
|
|
token_request = utils.get_tokens_request(code, code_verifier)
|
|
|
|
try:
|
|
token_request.raise_for_status()
|
|
|
|
except Exception as e:
|
|
logger.exception(
|
|
f'token_request failed. Status: {token_request.status_code!r}, text: {token_request.text!r}',
|
|
exc_info=e
|
|
)
|
|
|
|
# self.model.delete_row(state)
|
|
|
|
raise e
|
|
|
|
tokens = token_request.json()
|
|
|
|
access_token = tokens["access_token"]
|
|
refresh_token = tokens["refresh_token"]
|
|
expires_in = tokens["expires_in"]
|
|
timestamp_got_expires_in = int(time.time())
|
|
|
|
self.model.set_tokens(access_token, refresh_token, expires_in, timestamp_got_expires_in, state)
|
|
|
|
nickname = utils.get_nickname(access_token)
|
|
self.model.set_nickname(nickname, state)
|
|
if nickname is None:
|
|
logger.warning(f"Couldn't get or set nickname for state: {state!r}") # msg inform: no nickname
|
|
|
|
fid = utils.get_fid(access_token)
|
|
if fid is None:
|
|
logger.error(f"Couldn't get FID for state: {state!r}")
|
|
raise exceptions.NoFID(f'No FID for {state!r}')
|
|
|
|
msg = {'status': 'ok', 'description': '', 'state': ''}
|
|
|
|
if not self.model.set_fid(fid, state):
|
|
msg['description'] = 'Tokens updated'
|
|
|
|
else:
|
|
msg['description'] = 'Tokens saved'
|
|
|
|
msg['state'] = self.model.get_state_by_fid(fid)
|
|
|
|
return msg
|
|
|
|
def get_token_by_state(self, state: str) -> Union[dict, None]:
|
|
try:
|
|
self.refresh_by_state(state)
|
|
|
|
except exceptions.CAPIException:
|
|
return None
|
|
|
|
row = self.model.get_token_for_user(state)
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
row['expires_over'] = int(row['expires_on']) - int(time.time())
|
|
|
|
return row
|
|
|
|
def refresh_by_state(self, state: str, force_refresh=False, failure_tolerance=config.default_failure_tolerance) -> dict:
|
|
"""
|
|
|
|
:param state:
|
|
:param force_refresh: if we should update token when its time hasn't come
|
|
:param failure_tolerance: if we shouldn't remove row in case of update's failure
|
|
:return:
|
|
"""
|
|
|
|
msg = {'status': '', 'description': '', 'state': ''}
|
|
row = self.model.get_row(state)
|
|
|
|
if row is None:
|
|
# No such state in DB
|
|
msg['status'] = 'error'
|
|
msg['description'] = 'No such state in DB'
|
|
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
|
|
|
|
msg['state'] = state
|
|
|
|
if int(time.time()) < int(row['timestamp_got_expires_in']) + int(row['expires_in']) - 400 and not force_refresh:
|
|
# current time < when we got token + token lifetime - 400, 400 is need just to refresh token on 400 secs
|
|
# earlier than lifetime stated by FDEV, for safe
|
|
msg['status'] = 'ok'
|
|
msg['description'] = "Didn't refresh since it isn't required"
|
|
|
|
return msg # token isn't expired and we don't force updating
|
|
|
|
try:
|
|
refresh_request = utils.refresh_request(row['refresh_token'])
|
|
if refresh_request.status_code == 418: # Server's maintenance
|
|
logger.warning(f'FDEV maintenance 418, text: {refresh_request.text!r}')
|
|
msg['status'] = 'error'
|
|
msg['description'] = 'FDEV on maintenance'
|
|
raise exceptions.RefreshFail(msg['message'], msg['status'], state)
|
|
|
|
refresh_request.raise_for_status()
|
|
|
|
tokens = refresh_request.json()
|
|
self.model.set_tokens(
|
|
access_token=tokens["access_token"],
|
|
refresh_token=tokens["refresh_token"],
|
|
expires_in=tokens["expires_in"],
|
|
timestamp_got_expires_in=int(time.time()),
|
|
state=state
|
|
)
|
|
msg['status'] = 'ok'
|
|
msg['description'] = 'Token were successfully updated'
|
|
return msg
|
|
|
|
except requests.ConnectionError as e:
|
|
logger.warning(f'Connection problem on refresh for {state!r}', exc_info=e)
|
|
msg['status'] = 'error'
|
|
msg['description'] = 'Connection Error'
|
|
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
|
|
|
|
except Exception as e:
|
|
# probably here something don't work
|
|
text = ''
|
|
try:
|
|
text = refresh_request.text
|
|
|
|
except Exception as e1:
|
|
text = f"can't get text because {e1}"
|
|
|
|
logger.warning(
|
|
f'Fail on refreshing token for {state!r}, row:{row!r}, text: {text}', exc_info=e)
|
|
msg['status'] = 'error'
|
|
|
|
self.model.increment_refresh_tries(state)
|
|
|
|
if not failure_tolerance:
|
|
if row['refresh_tries'] >= 5: # hardcoded limit
|
|
msg['description'] = "Refresh failed. You were removed from DB due to refresh rate limitings"
|
|
self.model.delete_row(state)
|
|
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
|
|
|
|
msg['description'] = "Refresh failed. Try later"
|
|
raise exceptions.RefreshFail(msg['description'], msg['status'], state)
|
|
|
|
def delete_by_state(self, state: str) -> None:
|
|
self.model.delete_row(state)
|
|
|
|
def list_all_valid_users(self) -> list[dict]:
|
|
return self.model.list_all_valid_records()
|
|
|
|
def cleanup_orphans(self) -> None:
|
|
self.model.cleanup_orphans_records()
|
|
|
|
def list_all_records(self) -> list[dict]:
|
|
return self.model.list_all_records()
|
|
|
|
|
|
capi_authorizer = CAPIAuthorizer(model.Model(config.db_location))
|