From b5d0c43b7e348cb9c0f5cdb3336b869c930e4458 Mon Sep 17 00:00:00 2001 From: norohind <60548839+norohind@users.noreply.github.com> Date: Sun, 28 Nov 2021 02:28:31 +0300 Subject: [PATCH] WIP: migration from sqlite to postgres --- main.py | 5 +- model/__init__.py | 25 ++++ model/postgres_model.py | 119 ++++++++++++++++++ model/postgres_sql_requests.py | 103 +++++++++++++++ model/sqlite_cache.py | 61 +++++++++ model.py => model/sqlite_model.py | 66 ++-------- .../sqlite_sql_requests.py | 21 +--- requirements.txt | Bin 284 -> 132 bytes sqlite2postgres.py | 50 ++++++++ web.py | 7 +- 10 files changed, 377 insertions(+), 80 deletions(-) create mode 100644 model/__init__.py create mode 100644 model/postgres_model.py create mode 100644 model/postgres_sql_requests.py create mode 100644 model/sqlite_cache.py rename model.py => model/sqlite_model.py (58%) rename sql_requests.py => model/sqlite_sql_requests.py (78%) create mode 100644 sqlite2postgres.py diff --git a/main.py b/main.py index 139b8a0..8345407 100644 --- a/main.py +++ b/main.py @@ -21,13 +21,10 @@ timestamp - inserts by DB, default TIMESTAMP """ import requests -import sqlite3 -import model +from model import model # from EDMCLogging import get_main_logger import utils -db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3') - def request_leaderboard(platform_enum: utils.Platform, leaderboard_type_enum: utils.LeaderboardTypes) -> dict: """ diff --git a/model/__init__.py b/model/__init__.py new file mode 100644 index 0000000..dd1cc4f --- /dev/null +++ b/model/__init__.py @@ -0,0 +1,25 @@ +import model.postgres_model +import utils +from EDMCLogging import get_main_logger +import os + +logger = get_main_logger() + +env_choose = os.getenv('DB_NAME') + +env_choose = 'sqlite' # TODO: remove + +if env_choose == 'postgres': + logger.info('Using postgres DB') + from .import postgres_model as model + +elif env_choose == 'sqlite': + logger.info('Using sqlite DB') + from .import sqlite_model as model + +else: + logger.info('Using sqlite DB') + from . import sqlite_model as model + +model.get_diff_action_id = utils.measure(model.get_diff_action_id) +model.get_activity_changes = utils.measure(model.get_activity_changes) diff --git a/model/postgres_model.py b/model/postgres_model.py new file mode 100644 index 0000000..2678914 --- /dev/null +++ b/model/postgres_model.py @@ -0,0 +1,119 @@ +import json +import sqlite3 +import typing + +import psycopg2.extensions +import psycopg2.extras + +from .sqlite_cache import cache +from . import postgres_sql_requests +import utils +from EDMCLogging import get_main_logger + +logger = get_main_logger() +logger.propagate = False + +db: psycopg2.extensions.connection = psycopg2.connect( + user='user2', + password='1', + host='192.168.1.68', + port='5432', + database='test0', + cursor_factory=psycopg2.extras.DictCursor) + +with db: + with db.cursor() as cursor: + cursor.execute(postgres_sql_requests.schema_create) # schema creation + + +def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp) -> list: + cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}' + cached_result: typing.Union[str, None] = cache.get(cache_key) + + if cached_result is not None: + logger.debug(f'Cached result for {cache_key}') + return json.loads(cached_result) + + logger.debug(f'Not cached result for {cache_key}') + + with db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute(postgres_sql_requests.select_activity_pretty_names, { + 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, + 'platform': utils.Platform(platform.upper()).value, + 'limit': limit, + 'high_timestamp': high_timestamp, + 'low_timestamp': low_timestamp + }) + + result: list = cursor.fetchall() + + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) + + return result + + +def insert_leaderboard_db(leaderboard_list: dict) -> None: + """ + Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB + + :param leaderboard_list: list from request_leaderboard + :return: + """ + + platform: str = leaderboard_list['platform'] + LB_type: str = leaderboard_list['type'] + leaderboard: list = leaderboard_list['leaderboard'] + + action_id: int # not last, current that we will use + + with db.cursor() as cursor: + cursor.execute(postgres_sql_requests.select_last_action_id) + action_id_fetch_one: typing.Union[None, dict[str, int]] = cursor.fetchone() + + if action_id_fetch_one is None: + # i.e. first launch + action_id = 1 # yep, not 0 + + else: + action_id = action_id_fetch_one['action_id'] + 1 + + # Patch for additional values + for squad in leaderboard: + squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform}) + + with db: + with db.cursor() as cursor: + cursor.executemany( + postgres_sql_requests.insert_leader_board, + leaderboard) + + cache.delete_all() # drop cache + + +@utils.measure +def get_diff_action_id(action_id: int) -> list: + """ + Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and + experience they got in compassion to action_id - 1 for the same leaderboard and platform + + :param action_id: + :return: + """ + cache_key: str = f'{action_id}' + cached_result: typing.Union[str, None] = cache.get(cache_key) + + if cached_result is not None: + logger.debug(f'Cached result for {cache_key}') + return json.loads(cached_result) + + logger.debug(f'Not cached result for {cache_key}') + + with db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id}) + result: list = cursor.fetchall() + + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) + + return result diff --git a/model/postgres_sql_requests.py b/model/postgres_sql_requests.py new file mode 100644 index 0000000..9d14315 --- /dev/null +++ b/model/postgres_sql_requests.py @@ -0,0 +1,103 @@ +schema_create = """create table if not exists squads_stats_states ( +action_id integer, +leaderboard_type text, +platform text, +squadron_id integer, +score bigint, +percentile integer, +rank integer, +name text, +tag text, +timestamp timestamp default timezone('utc', now())); + +create index if not exists idx_action_id_0 on squads_stats_states (action_id); +create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type); +""" + +select_last_action_id = """select action_id +from squads_stats_states +order by action_id desc +limit 1;""" + +insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score, +percentile, rank, name, tag) +values +(%(action_id)s, %(LB_type)s, %(platform)s, %(squadron)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s);""" + +select_activity = """select *, sum_score - sum_score_old as diff from +(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old +from ( + select sum(score) as sum_score, timestamp, action_id + from squads_stats_states + where + leaderboard_type = :LB_type and + platform = :platform and + :high_timestamp >= timestamp and + timestamp >= :low_timestamp + group by action_id + ) +group by sum_score +order by timestamp desc +limit :limit);""" + +select_activity_pretty_names = """select +sum_score::bigint as "TotalExperience", +to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as "Timestamp UTC", +action_id::bigint as "ActionId", +sum_score_old::bigint as "TotalExperienceOld", +(sum_score - sum_score_old)::bigint as "Diff" +from + ( + select + sum_score, + min(timestamp) as timestamp, + action_id, + lag (sum_score, 1) over (order by sum_score) sum_score_old + from ( + select sum(score) as sum_score, min(timestamp) as timestamp, action_id + from squads_stats_states + where + leaderboard_type = %(LB_type)s and + platform = %(platform)s and + %(high_timestamp)s::timestamp >= timestamp and + timestamp >= %(low_timestamp)s::timestamp + group by action_id + ) as foo + group by sum_score, action_id + order by timestamp desc + + ) as foo1 +where (sum_score - sum_score_old) > 0 +limit %(limit)s;""" + +select_diff_by_action_id = """select + new_stats.name as SquadronName, + new_stats.tag, + new_stats.score as TotalExperience, + old_stats.score as TotalExperienceOld, + new_stats.score - old_stats.score as TotalExperienceDiff, + new_stats.leaderboard_type as LeaderBoardType, + new_stats.platform as Platform +from ( + select * + from squads_stats_states + where action_id = %(action_id)s) new_stats +inner join + ( + select * + from squads_stats_states + where action_id in ( + select distinct squads_stats_states.action_id + from squads_stats_states, ( + select timestamp, platform, leaderboard_type, action_id + from squads_stats_states + where action_id = %(action_id)s limit 1) sub1 + where + squads_stats_states.platform = sub1.platform and + squads_stats_states.leaderboard_type = sub1.leaderboard_type and + squads_stats_states.action_id < sub1.action_id + order by squads_stats_states.action_id desc + limit 1)) old_stats +on new_stats.squadron_id = old_stats.squadron_id +where new_stats.score - old_stats.score > 0 +order by new_stats.score - old_stats.score desc;""" \ No newline at end of file diff --git a/model/sqlite_cache.py b/model/sqlite_cache.py new file mode 100644 index 0000000..e672bf2 --- /dev/null +++ b/model/sqlite_cache.py @@ -0,0 +1,61 @@ +import sqlite3 +from EDMCLogging import get_main_logger + +logger = get_main_logger() + + +class Cache: + def __init__(self, disabled: bool = False): + self.disabled = disabled + + if disabled: + return + + try: + self.db: sqlite3.Connection = sqlite3.connect('squads_stat_cache.sqlite3', check_same_thread=False) + + except Exception as e: + logger.warning('Cannot create cache DB due to', exc_info=e) + logger.warning('Cache is disabled') + self.disabled = True + return + + self.db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) + + with self.db: + self.db.execute("create table if not exists cache (key text unique, value text);") + + def set(self, key, value) -> None: + if self.disabled: + return + + with self.db: + if self.db.execute('select count(key) as count from cache where key = ?;', [key]).fetchone()['count'] == 1: + + # key exists, just need to update value + self.db.execute('update cache set value = ? where key = ?;', [value, key]) + else: + + # key doesn't exists, need to insert new row + self.db.execute('insert into cache (key, value) values (?, ?);', [key, value]) + + def get(self, key, default=None): + if self.disabled: + return + + res = self.db.execute('select value from cache where key = ?;', [key]).fetchone() + if res is None: + return default + + return res['value'] + + def delete(self, key): + self.db.execute('delete from cache where key = ?;', [key]) + + def delete_all(self): + logger.debug('Dropping cache') + with self.db: + self.db.execute('delete from cache;') + + +cache = Cache(True) diff --git a/model.py b/model/sqlite_model.py similarity index 58% rename from model.py rename to model/sqlite_model.py index 2953b2b..60bf588 100644 --- a/model.py +++ b/model/sqlite_model.py @@ -2,7 +2,8 @@ import sqlite3 import typing import json -import sql_requests +from . import sqlite_sql_requests +from .sqlite_cache import cache import utils from EDMCLogging import get_main_logger @@ -11,7 +12,7 @@ logger.propagate = False db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False) -db.executescript(sql_requests.schema_create) # schema creation +db.executescript(sqlite_sql_requests.schema_create) # schema creation # thx https://stackoverflow.com/a/48789604 db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) @@ -27,7 +28,7 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t logger.debug(f'Not cached result for {cache_key}') - sql_req: sqlite3.Cursor = db.execute(sql_requests.select_activity_pretty_names, { + sql_req: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_activity_pretty_names, { 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, 'platform': utils.Platform(platform.upper()).value, 'limit': limit, @@ -36,7 +37,9 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t }) result: list = sql_req.fetchall() - cache.set(cache_key, json.dumps(result)) + + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) return result @@ -44,7 +47,6 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t def insert_leaderboard_db(leaderboard_list: dict) -> None: """ Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB - :param leaderboard_list: list from request_leaderboard :return: """ @@ -55,7 +57,7 @@ def insert_leaderboard_db(leaderboard_list: dict) -> None: action_id: int # not last, current that we will use - sql_req_action_id: sqlite3.Cursor = db.execute(sql_requests.select_last_action_id) + sql_req_action_id: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_last_action_id) action_id_fetch_one: typing.Union[None, dict[str, int]] = sql_req_action_id.fetchone() if action_id_fetch_one is None: # i.e. first launch @@ -70,7 +72,7 @@ def insert_leaderboard_db(leaderboard_list: dict) -> None: with db: db.executemany( - sql_requests.insert_leader_board, + sqlite_sql_requests.insert_leader_board, leaderboard) cache.delete_all() # drop cache @@ -80,7 +82,6 @@ def get_diff_action_id(action_id: int) -> list: """ Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and experience they got in compassion to action_id - 1 for the same leaderboard and platform - :param action_id: :return: """ @@ -92,51 +93,10 @@ def get_diff_action_id(action_id: int) -> list: return json.loads(cached_result) logger.debug(f'Not cached result for {cache_key}') - sql_req: sqlite3.Cursor = db.execute(sql_requests.select_diff_by_action_id, {'action_id': action_id}) + sql_req: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_diff_by_action_id, {'action_id': action_id}) result: list = sql_req.fetchall() - cache.set(cache_key, json.dumps(result)) + + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) return result - - -class Cache: - def __init__(self, db_conn: sqlite3.Connection, disabled: bool = False): - self.disabled = disabled - self.db: sqlite3.Connection = db_conn - with self.db: - self.db.execute("create table if not exists cache (key text unique, value text);") - - def set(self, key, value) -> None: - if self.disabled: - return - - with db: - if self.db.execute('select count(key) as count from cache where key = ?;', [key]).fetchone()['count'] == 1: - - # key exists, just need to update value - self.db.execute('update cache set value = ? where key = ?;', [value, key]) - else: - - # key doesn't exists, need to insert new row - self.db.execute('insert into cache (key, value) values (?, ?);', [key, value]) - - def get(self, key, default=None): - if self.disabled: - return - - res = self.db.execute('select value from cache where key = ?;', [key]).fetchone() - if res is None: - return default - - return res['value'] - - def delete(self, key): - self.db.execute('delete from cache where key = ?;', [key]) - - def delete_all(self): - logger.debug('Dropping cache') - with self.db: - self.db.execute('delete from cache;') - - -cache = Cache(db) diff --git a/sql_requests.py b/model/sqlite_sql_requests.py similarity index 78% rename from sql_requests.py rename to model/sqlite_sql_requests.py index 3e59197..501c09f 100644 --- a/sql_requests.py +++ b/model/sqlite_sql_requests.py @@ -10,27 +10,8 @@ name string, tag string, timestamp default current_timestamp); -create view if not exists current_cqc_pc as -select * from squads_stats_states where action_id in -(select distinct action_id -from squads_stats_states -where leaderboard_type = 'cqc' and platform = 'PC' -order by action_id desc limit 1) and platform = 'PC'; - -create view if not exists prev_cqc_pc as -select * -from squads_stats_states -where action_id in -(select distinct action_id -from squads_stats_states -where leaderboard_type = 'cqc' and platform = 'PC' order by action_id desc limit 1, 1) and platform = 'PC'; - create index if not exists idx_action_id_0 on squads_stats_states (action_id); -create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type); - -create view if not exists diff_pc_cqc as -select current_cqc_pc.name, current_cqc_pc.score, prev_cqc_pc.score, current_cqc_pc.score - prev_cqc_pc.score as diff -from current_cqc_pc left outer join prev_cqc_pc on prev_cqc_pc.squadron_id = current_cqc_pc.squadron_id;""" +create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);""" select_last_action_id = """select action_id from squads_stats_states diff --git a/requirements.txt b/requirements.txt index e1c499c993d20fe0ccfda8068692b4630751e022..68abc2a349e53f559980a99cb3c48253e6b6c78a 100644 GIT binary patch delta 59 zcmbQk)WRtL|6c(^F+(LoGDAK?0Yf^25knn=Es)e>umobGiMJgmP7)D-i5W4N0YwcM E0L&u|00000 literal 284 zcmXw!TMEK35Jm5|;8HAU3)Ua^Ah?1WEf#CVJ`nwI>FSvY5+=~(-gDCLC+eg@lUg;* zmQ^S)xiTiOZ?x1xQD@a8NAiUHDm%2`M6<&@OUyj6Lt)sbZF+!9vlr#n5hryq5J zXDzc;ZZy{b?XH=9#FY{Sw}sa1!5K5D&fN#7e@03qw~-ouU&PhxP0HS2|CBr9!EKAd Md?U*e=CMT34+>u@HUIzs diff --git a/sqlite2postgres.py b/sqlite2postgres.py new file mode 100644 index 0000000..259790d --- /dev/null +++ b/sqlite2postgres.py @@ -0,0 +1,50 @@ +""" +Script to transfer data from sqlite DB to postgres DB +""" + +import psycopg2.extensions +import psycopg2.extras +import sqlite3 +from model import postgres_sql_requests +import time + +insert_pg = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score, +percentile, rank, name, tag, timestamp) +values +(%(action_id)s, %(leaderboard_type)s, %(platform)s, %(squadron_id)s, %(score)s, +%(percentile)s, %(rank)s, %(name)s, %(tag)s, %(timestamp)s);""" + +sqlite_conn: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False) +sqlite_conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) + +pg_conn: psycopg2.extensions.connection = psycopg2.connect( + user='user2', + password='1', + host='192.168.1.68', + port='5432', + database='test0' + ) + +with pg_conn: + with pg_conn.cursor() as cursor: + cursor.execute(postgres_sql_requests.schema_create) + +with pg_conn: + with pg_conn.cursor() as cursor: + start = time.time() + + insert_statement = '' + sqlite_content = sqlite_conn.execute('select * from squads_stats_states;').fetchall() + # for row in sqlite_content: + # # cursor.execute(insert_pg, row) + # insert_statement += cursor.mogrify(insert_pg, row).decode('utf-8') + + # print((time.time() - start) * 100, 'ms for creating insert_statement') + # print('inserting') + cursor.executemany(insert_pg, sqlite_content) + # 149619.26856040955 ms total + print((time.time() - start) * 100, 'ms total') + +pg_conn.close() +sqlite_conn.close() + diff --git a/web.py b/web.py index c679d86..e750f80 100644 --- a/web.py +++ b/web.py @@ -1,4 +1,4 @@ -import model +from model import model import json import falcon import os @@ -20,8 +20,8 @@ class Activity: 'platform': platform, 'leaderboard_type': leaderboard, 'limit': req.params.get('limit', 10), - 'high_timestamp': req.params.get('before', 'a'), - 'low_timestamp': req.params.get('after', 0) + 'high_timestamp': req.params.get('before', '3307-12-12'), + 'low_timestamp': req.params.get('after', '0001-01-01') } try: @@ -37,6 +37,7 @@ class ActivityHtml: Activity().on_get(req, resp, leaderboard, platform) table_in_json: str = resp.text resp.content_type = falcon.MEDIA_HTML + resp.text = utils.activity_table_html_template.replace('{items}', json.dumps(table_in_json)) # what? f-strings? .format? never heard about them