diff --git a/config.py b/config.py index 03c7f20..3cd5bf5 100644 --- a/config.py +++ b/config.py @@ -2,8 +2,6 @@ import os cache_disabled: bool = os.getenv('CACHE_DISABLED', 'True').lower() == 'true' -DBMS_name = os.getenv('DB_NAME') - postgres_username = os.getenv('DB_USERNAME') postgres_password = os.getenv('DB_PASSWORD') postgres_hostname = os.getenv('DB_HOSTNAME') diff --git a/model/__init__.py b/model/__init__.py index e0db6e0..6e10f20 100644 --- a/model/__init__.py +++ b/model/__init__.py @@ -1,5 +1,4 @@ from model.postgres_model import PostgresModel -from model.sqlite_model import Sqlite3Model from model.abstract_model import AbstractModel import config import utils @@ -7,22 +6,7 @@ from EDMCLogging import get_main_logger logger = get_main_logger() -env_choose = config.DBMS_name - -model: AbstractModel - -if env_choose == 'postgres': - logger.info('Using postgres DB') - model = PostgresModel() - -elif env_choose == 'sqlite': - logger.info('Using sqlite DB') - model = Sqlite3Model() - -else: - logger.error(f'Unknown DB {env_choose!r}') - - assert False, 'env variable DB_NAME must be "postgres" or "sqlite"' +model: AbstractModel = PostgresModel() if config.log_level == 'DEBUG': model.get_diff_action_id = utils.measure(model.get_diff_action_id) diff --git a/model/sqlite_model.py b/model/sqlite_model.py deleted file mode 100644 index f7175fb..0000000 --- a/model/sqlite_model.py +++ /dev/null @@ -1,114 +0,0 @@ -import sqlite3 -import typing -import json - -import config -from . import sqlite_sql_requests -from .abstract_model import AbstractModel -from .sqlite_cache import cache -import utils -from EDMCLogging import get_main_logger - -logger = get_main_logger() -logger.propagate = False - - -class Sqlite3Model(AbstractModel): - db: sqlite3.Connection - - def open_model(self): - self.db = sqlite3.connect(config.sqlite_db_path, check_same_thread=False) - - logger.debug(f'Connected to squads_stat.sqlite3') - - self.db.executescript(sqlite_sql_requests.schema_create) # schema creation - - # thx https://stackoverflow.com/a/48789604 - self.db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) - - def close_model(self): - self.db.close() - logger.info(f'Connection to squads_stat.sqlite3 closed successfully') - - def get_activity_changes(self, platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp)\ - -> list: - cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}' - cached_result: typing.Union[str, None] = cache.get(cache_key) - - if cached_result is not None: - logger.debug(f'Cached result for {cache_key}') - return json.loads(cached_result) - - logger.debug(f'Not cached result for {cache_key}') - - sql_req: sqlite3.Cursor = self.db.execute(sqlite_sql_requests.select_activity_pretty_names, { - 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, - 'platform': utils.Platform(platform.upper()).value, - 'limit': limit, - 'high_timestamp': high_timestamp, - 'low_timestamp': low_timestamp - }) - - result: list = sql_req.fetchall() - - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) - - return result - - def insert_leaderboard_db(self, leaderboard_list: dict) -> None: - """ - Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB - :param leaderboard_list: list from request_leaderboard - :return: - """ - - platform: str = leaderboard_list['platform'] - LB_type: str = leaderboard_list['type'] - leaderboard: list = leaderboard_list['leaderboard'] - - action_id: int # not last, current that we will use - - sql_req_action_id: sqlite3.Cursor = self.db.execute(sqlite_sql_requests.select_last_action_id) - action_id_fetch_one: typing.Union[None, dict[str, int]] = sql_req_action_id.fetchone() - if action_id_fetch_one is None: - # i.e. first launch - action_id = 1 # yep, not 0 - - else: - action_id = action_id_fetch_one['action_id'] + 1 - - # Patch for additional values - for squad in leaderboard: - squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform}) - - with self.db: - self.db.executemany( - sqlite_sql_requests.insert_leader_board, - leaderboard) - - cache.delete_all() # drop cache - - def get_diff_action_id(self, action_id: int) -> list: - """ - Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and - experience they got in compassion to action_id - 1 for the same leaderboard and platform - :param action_id: - :return: - """ - cache_key: str = f'{action_id}' - cached_result: typing.Union[str, None] = cache.get(cache_key) - - if cached_result is not None: - logger.debug(f'Cached result for {cache_key}') - return json.loads(cached_result) - - logger.debug(f'Not cached result for {cache_key}') - sql_req: sqlite3.Cursor = self.db.execute(sqlite_sql_requests.select_diff_by_action_id, - {'action_id': action_id}) - result: list = sql_req.fetchall() - - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) - - return result diff --git a/model/sqlite_sql_requests.py b/model/sqlite_sql_requests.py deleted file mode 100644 index 501c09f..0000000 --- a/model/sqlite_sql_requests.py +++ /dev/null @@ -1,93 +0,0 @@ -schema_create = """create table if not exists squads_stats_states ( -action_id integer, -leaderboard_type string, -platform string, -squadron_id integer, -score integer, -percentile integer, -rank integer, -name string, -tag string, -timestamp default current_timestamp); - -create index if not exists idx_action_id_0 on squads_stats_states (action_id); -create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);""" - -select_last_action_id = """select action_id -from squads_stats_states -order by action_id desc -limit 1;""" - -insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score, -percentile, rank, name, tag) -values (:action_id, :LB_type, :platform, :squadron, :score, :percentile, :rank, :name, :tag);""" - -select_activity = """select *, sum_score - sum_score_old as diff from -(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old -from ( - select sum(score) as sum_score, timestamp, action_id - from squads_stats_states - where - leaderboard_type = :LB_type and - platform = :platform and - :high_timestamp >= timestamp and - timestamp >= :low_timestamp - group by action_id - ) -group by sum_score -order by timestamp desc -limit :limit);""" - -select_activity_pretty_names = """select -sum_score as TotalExperience, -timestamp as 'Timestamp UTC', -action_id as ActionId, -sum_score_old as TotalExperienceOld, -sum_score - sum_score_old as Diff -from -(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old -from ( - select sum(score) as sum_score, timestamp, action_id - from squads_stats_states - where - leaderboard_type = :LB_type and - platform = :platform and - :high_timestamp >= timestamp and - timestamp >= :low_timestamp - group by action_id - ) -group by sum_score -order by timestamp desc -limit :limit);""" - -select_diff_by_action_id = """select - new_stats.name as SquadronName, - new_stats.tag, - new_stats.score as TotalExperience, - old_stats.score as TotalExperienceOld, - new_stats.score - old_stats.score as TotalExperienceDiff, - new_stats.leaderboard_type as LeaderBoardType, - new_stats.platform as Platform -from ( - select * - from squads_stats_states - where action_id = :action_id) new_stats -inner join - ( - select * - from squads_stats_states - where action_id in ( - select distinct squads_stats_states.action_id - from squads_stats_states, ( - select timestamp, platform, leaderboard_type, action_id - from squads_stats_states - where action_id = :action_id limit 1) sub1 - where - squads_stats_states.platform = sub1.platform and - squads_stats_states.leaderboard_type = sub1.leaderboard_type and - squads_stats_states.action_id < sub1.action_id - order by squads_stats_states.action_id desc - limit 1)) old_stats -on new_stats.squadron_id = old_stats.squadron_id -where TotalExperienceDiff > 0 -order by TotalExperienceDiff desc;""" \ No newline at end of file diff --git a/sqlite2postgres.py b/sqlite2postgres.py index c78d159..7d40691 100644 --- a/sqlite2postgres.py +++ b/sqlite2postgres.py @@ -6,7 +6,6 @@ import psycopg2.extensions import psycopg2.extras import sqlite3 from model import postgres_sql_requests -import time import config import sys @@ -16,6 +15,12 @@ values (%(action_id)s, %(leaderboard_type)s, %(platform)s, %(squadron_id)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s, %(timestamp)s);""" +insert_sqlite = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score, +percentile, rank, name, tag, timestamp) +values +(:action_id, :leaderboard_type, :platform, :squadron_id, :score, +:percentile, :rank, :name, :tag, :timestamp);""" + sqlite_conn: sqlite3.Connection = sqlite3.connect(config.sqlite2postgres_sqlite_location, check_same_thread=False) sqlite_conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) @@ -41,19 +46,9 @@ def initial_pull_sqlite_to_postgres() -> None: with pg_conn: with pg_conn.cursor() as cursor: - start = time.time() - insert_statement = '' sqlite_content = sqlite_conn.execute('select * from squads_stats_states;').fetchall() - # for row in sqlite_content: - # # cursor.execute(insert_pg, row) - # insert_statement += cursor.mogrify(insert_pg, row).decode('utf-8') - - # print((time.time() - start) * 100, 'ms for creating insert_statement') - # print('inserting') cursor.executemany(insert_pg, sqlite_content) - # 149619.26856040955 ms total - print((time.time() - start) * 100, 'ms total') def continues_pull_sqlite_to_postgres() -> None: @@ -75,6 +70,21 @@ def continues_pull_sqlite_to_postgres() -> None: cursor.executemany(insert_pg, new_records) +def continues_pull_postgres_to_sqlite() -> None: + with pg_conn: + with pg_conn.cursor() as cursor: + cursor: psycopg2.extensions.cursor + l_act_id = sqlite_conn.execute('select action_id from squads_stats_states order by action_id desc limit 1;') + last_action_id_sqlite: int = l_act_id.fetchone()['action_id'] + + cursor.execute('select * from squads_stats_states where action_id > %(action_id)s;', + {'action_id': last_action_id_sqlite}) + new_records = cursor.fetchall() + + with sqlite_conn: + sqlite_conn.executemany(insert_sqlite, new_records) + + cli_error = 'argument must be "initial" or "continues"' if len(sys.argv) != 2: @@ -93,6 +103,11 @@ elif sys.argv[1] == 'continues': pg_conn.close() sqlite_conn.close() +elif sys.argv[1] == 'continues_to_sqlite': + continues_pull_postgres_to_sqlite() + pg_conn.close() + sqlite_conn.close() + else: print(cli_error)