WIP: migration from sqlite to postgres

This commit is contained in:
norohind 2021-11-28 02:28:31 +03:00
parent a5218db824
commit b5d0c43b7e
Signed by: norohind
GPG Key ID: 01C3BECC26FB59E1
10 changed files with 377 additions and 80 deletions

View File

@ -21,13 +21,10 @@ timestamp - inserts by DB, default TIMESTAMP
"""
import requests
import sqlite3
import model
from model import model
# from EDMCLogging import get_main_logger
import utils
db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3')
def request_leaderboard(platform_enum: utils.Platform, leaderboard_type_enum: utils.LeaderboardTypes) -> dict:
"""

25
model/__init__.py Normal file
View File

@ -0,0 +1,25 @@
import model.postgres_model
import utils
from EDMCLogging import get_main_logger
import os
logger = get_main_logger()
env_choose = os.getenv('DB_NAME')
env_choose = 'sqlite' # TODO: remove
if env_choose == 'postgres':
logger.info('Using postgres DB')
from .import postgres_model as model
elif env_choose == 'sqlite':
logger.info('Using sqlite DB')
from .import sqlite_model as model
else:
logger.info('Using sqlite DB')
from . import sqlite_model as model
model.get_diff_action_id = utils.measure(model.get_diff_action_id)
model.get_activity_changes = utils.measure(model.get_activity_changes)

119
model/postgres_model.py Normal file
View File

@ -0,0 +1,119 @@
import json
import sqlite3
import typing
import psycopg2.extensions
import psycopg2.extras
from .sqlite_cache import cache
from . import postgres_sql_requests
import utils
from EDMCLogging import get_main_logger
logger = get_main_logger()
logger.propagate = False
db: psycopg2.extensions.connection = psycopg2.connect(
user='user2',
password='1',
host='192.168.1.68',
port='5432',
database='test0',
cursor_factory=psycopg2.extras.DictCursor)
with db:
with db.cursor() as cursor:
cursor.execute(postgres_sql_requests.schema_create) # schema creation
def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp) -> list:
cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}'
cached_result: typing.Union[str, None] = cache.get(cache_key)
if cached_result is not None:
logger.debug(f'Cached result for {cache_key}')
return json.loads(cached_result)
logger.debug(f'Not cached result for {cache_key}')
with db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(postgres_sql_requests.select_activity_pretty_names, {
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
'platform': utils.Platform(platform.upper()).value,
'limit': limit,
'high_timestamp': high_timestamp,
'low_timestamp': low_timestamp
})
result: list = cursor.fetchall()
if not cache.disabled:
cache.set(cache_key, json.dumps(result))
return result
def insert_leaderboard_db(leaderboard_list: dict) -> None:
"""
Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB
:param leaderboard_list: list from request_leaderboard
:return:
"""
platform: str = leaderboard_list['platform']
LB_type: str = leaderboard_list['type']
leaderboard: list = leaderboard_list['leaderboard']
action_id: int # not last, current that we will use
with db.cursor() as cursor:
cursor.execute(postgres_sql_requests.select_last_action_id)
action_id_fetch_one: typing.Union[None, dict[str, int]] = cursor.fetchone()
if action_id_fetch_one is None:
# i.e. first launch
action_id = 1 # yep, not 0
else:
action_id = action_id_fetch_one['action_id'] + 1
# Patch for additional values
for squad in leaderboard:
squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform})
with db:
with db.cursor() as cursor:
cursor.executemany(
postgres_sql_requests.insert_leader_board,
leaderboard)
cache.delete_all() # drop cache
@utils.measure
def get_diff_action_id(action_id: int) -> list:
"""
Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and
experience they got in compassion to action_id - 1 for the same leaderboard and platform
:param action_id:
:return:
"""
cache_key: str = f'{action_id}'
cached_result: typing.Union[str, None] = cache.get(cache_key)
if cached_result is not None:
logger.debug(f'Cached result for {cache_key}')
return json.loads(cached_result)
logger.debug(f'Not cached result for {cache_key}')
with db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id})
result: list = cursor.fetchall()
if not cache.disabled:
cache.set(cache_key, json.dumps(result))
return result

View File

@ -0,0 +1,103 @@
schema_create = """create table if not exists squads_stats_states (
action_id integer,
leaderboard_type text,
platform text,
squadron_id integer,
score bigint,
percentile integer,
rank integer,
name text,
tag text,
timestamp timestamp default timezone('utc', now()));
create index if not exists idx_action_id_0 on squads_stats_states (action_id);
create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);
"""
select_last_action_id = """select action_id
from squads_stats_states
order by action_id desc
limit 1;"""
insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score,
percentile, rank, name, tag)
values
(%(action_id)s, %(LB_type)s, %(platform)s, %(squadron)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s);"""
select_activity = """select *, sum_score - sum_score_old as diff from
(select sum_score, min(timestamp) as timestamp, action_id, lag (sum_score, 1, 0) over (order by sum_score) sum_score_old
from (
select sum(score) as sum_score, timestamp, action_id
from squads_stats_states
where
leaderboard_type = :LB_type and
platform = :platform and
:high_timestamp >= timestamp and
timestamp >= :low_timestamp
group by action_id
)
group by sum_score
order by timestamp desc
limit :limit);"""
select_activity_pretty_names = """select
sum_score::bigint as "TotalExperience",
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as "Timestamp UTC",
action_id::bigint as "ActionId",
sum_score_old::bigint as "TotalExperienceOld",
(sum_score - sum_score_old)::bigint as "Diff"
from
(
select
sum_score,
min(timestamp) as timestamp,
action_id,
lag (sum_score, 1) over (order by sum_score) sum_score_old
from (
select sum(score) as sum_score, min(timestamp) as timestamp, action_id
from squads_stats_states
where
leaderboard_type = %(LB_type)s and
platform = %(platform)s and
%(high_timestamp)s::timestamp >= timestamp and
timestamp >= %(low_timestamp)s::timestamp
group by action_id
) as foo
group by sum_score, action_id
order by timestamp desc
) as foo1
where (sum_score - sum_score_old) > 0
limit %(limit)s;"""
select_diff_by_action_id = """select
new_stats.name as SquadronName,
new_stats.tag,
new_stats.score as TotalExperience,
old_stats.score as TotalExperienceOld,
new_stats.score - old_stats.score as TotalExperienceDiff,
new_stats.leaderboard_type as LeaderBoardType,
new_stats.platform as Platform
from (
select *
from squads_stats_states
where action_id = %(action_id)s) new_stats
inner join
(
select *
from squads_stats_states
where action_id in (
select distinct squads_stats_states.action_id
from squads_stats_states, (
select timestamp, platform, leaderboard_type, action_id
from squads_stats_states
where action_id = %(action_id)s limit 1) sub1
where
squads_stats_states.platform = sub1.platform and
squads_stats_states.leaderboard_type = sub1.leaderboard_type and
squads_stats_states.action_id < sub1.action_id
order by squads_stats_states.action_id desc
limit 1)) old_stats
on new_stats.squadron_id = old_stats.squadron_id
where new_stats.score - old_stats.score > 0
order by new_stats.score - old_stats.score desc;"""

61
model/sqlite_cache.py Normal file
View File

@ -0,0 +1,61 @@
import sqlite3
from EDMCLogging import get_main_logger
logger = get_main_logger()
class Cache:
def __init__(self, disabled: bool = False):
self.disabled = disabled
if disabled:
return
try:
self.db: sqlite3.Connection = sqlite3.connect('squads_stat_cache.sqlite3', check_same_thread=False)
except Exception as e:
logger.warning('Cannot create cache DB due to', exc_info=e)
logger.warning('Cache is disabled')
self.disabled = True
return
self.db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
with self.db:
self.db.execute("create table if not exists cache (key text unique, value text);")
def set(self, key, value) -> None:
if self.disabled:
return
with self.db:
if self.db.execute('select count(key) as count from cache where key = ?;', [key]).fetchone()['count'] == 1:
# key exists, just need to update value
self.db.execute('update cache set value = ? where key = ?;', [value, key])
else:
# key doesn't exists, need to insert new row
self.db.execute('insert into cache (key, value) values (?, ?);', [key, value])
def get(self, key, default=None):
if self.disabled:
return
res = self.db.execute('select value from cache where key = ?;', [key]).fetchone()
if res is None:
return default
return res['value']
def delete(self, key):
self.db.execute('delete from cache where key = ?;', [key])
def delete_all(self):
logger.debug('Dropping cache')
with self.db:
self.db.execute('delete from cache;')
cache = Cache(True)

View File

@ -2,7 +2,8 @@ import sqlite3
import typing
import json
import sql_requests
from . import sqlite_sql_requests
from .sqlite_cache import cache
import utils
from EDMCLogging import get_main_logger
@ -11,7 +12,7 @@ logger.propagate = False
db: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False)
db.executescript(sql_requests.schema_create) # schema creation
db.executescript(sqlite_sql_requests.schema_create) # schema creation
# thx https://stackoverflow.com/a/48789604
db.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
@ -27,7 +28,7 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t
logger.debug(f'Not cached result for {cache_key}')
sql_req: sqlite3.Cursor = db.execute(sql_requests.select_activity_pretty_names, {
sql_req: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_activity_pretty_names, {
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
'platform': utils.Platform(platform.upper()).value,
'limit': limit,
@ -36,7 +37,9 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t
})
result: list = sql_req.fetchall()
cache.set(cache_key, json.dumps(result))
if not cache.disabled:
cache.set(cache_key, json.dumps(result))
return result
@ -44,7 +47,6 @@ def get_activity_changes(platform: str, leaderboard_type: str, limit: int, low_t
def insert_leaderboard_db(leaderboard_list: dict) -> None:
"""
Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB
:param leaderboard_list: list from request_leaderboard
:return:
"""
@ -55,7 +57,7 @@ def insert_leaderboard_db(leaderboard_list: dict) -> None:
action_id: int # not last, current that we will use
sql_req_action_id: sqlite3.Cursor = db.execute(sql_requests.select_last_action_id)
sql_req_action_id: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_last_action_id)
action_id_fetch_one: typing.Union[None, dict[str, int]] = sql_req_action_id.fetchone()
if action_id_fetch_one is None:
# i.e. first launch
@ -70,7 +72,7 @@ def insert_leaderboard_db(leaderboard_list: dict) -> None:
with db:
db.executemany(
sql_requests.insert_leader_board,
sqlite_sql_requests.insert_leader_board,
leaderboard)
cache.delete_all() # drop cache
@ -80,7 +82,6 @@ def get_diff_action_id(action_id: int) -> list:
"""
Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and
experience they got in compassion to action_id - 1 for the same leaderboard and platform
:param action_id:
:return:
"""
@ -92,51 +93,10 @@ def get_diff_action_id(action_id: int) -> list:
return json.loads(cached_result)
logger.debug(f'Not cached result for {cache_key}')
sql_req: sqlite3.Cursor = db.execute(sql_requests.select_diff_by_action_id, {'action_id': action_id})
sql_req: sqlite3.Cursor = db.execute(sqlite_sql_requests.select_diff_by_action_id, {'action_id': action_id})
result: list = sql_req.fetchall()
cache.set(cache_key, json.dumps(result))
if not cache.disabled:
cache.set(cache_key, json.dumps(result))
return result
class Cache:
def __init__(self, db_conn: sqlite3.Connection, disabled: bool = False):
self.disabled = disabled
self.db: sqlite3.Connection = db_conn
with self.db:
self.db.execute("create table if not exists cache (key text unique, value text);")
def set(self, key, value) -> None:
if self.disabled:
return
with db:
if self.db.execute('select count(key) as count from cache where key = ?;', [key]).fetchone()['count'] == 1:
# key exists, just need to update value
self.db.execute('update cache set value = ? where key = ?;', [value, key])
else:
# key doesn't exists, need to insert new row
self.db.execute('insert into cache (key, value) values (?, ?);', [key, value])
def get(self, key, default=None):
if self.disabled:
return
res = self.db.execute('select value from cache where key = ?;', [key]).fetchone()
if res is None:
return default
return res['value']
def delete(self, key):
self.db.execute('delete from cache where key = ?;', [key])
def delete_all(self):
logger.debug('Dropping cache')
with self.db:
self.db.execute('delete from cache;')
cache = Cache(db)

View File

@ -10,27 +10,8 @@ name string,
tag string,
timestamp default current_timestamp);
create view if not exists current_cqc_pc as
select * from squads_stats_states where action_id in
(select distinct action_id
from squads_stats_states
where leaderboard_type = 'cqc' and platform = 'PC'
order by action_id desc limit 1) and platform = 'PC';
create view if not exists prev_cqc_pc as
select *
from squads_stats_states
where action_id in
(select distinct action_id
from squads_stats_states
where leaderboard_type = 'cqc' and platform = 'PC' order by action_id desc limit 1, 1) and platform = 'PC';
create index if not exists idx_action_id_0 on squads_stats_states (action_id);
create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);
create view if not exists diff_pc_cqc as
select current_cqc_pc.name, current_cqc_pc.score, prev_cqc_pc.score, current_cqc_pc.score - prev_cqc_pc.score as diff
from current_cqc_pc left outer join prev_cqc_pc on prev_cqc_pc.squadron_id = current_cqc_pc.squadron_id;"""
create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);"""
select_last_action_id = """select action_id
from squads_stats_states

Binary file not shown.

50
sqlite2postgres.py Normal file
View File

@ -0,0 +1,50 @@
"""
Script to transfer data from sqlite DB to postgres DB
"""
import psycopg2.extensions
import psycopg2.extras
import sqlite3
from model import postgres_sql_requests
import time
insert_pg = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score,
percentile, rank, name, tag, timestamp)
values
(%(action_id)s, %(leaderboard_type)s, %(platform)s, %(squadron_id)s, %(score)s,
%(percentile)s, %(rank)s, %(name)s, %(tag)s, %(timestamp)s);"""
sqlite_conn: sqlite3.Connection = sqlite3.connect('squads_stat.sqlite3', check_same_thread=False)
sqlite_conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
pg_conn: psycopg2.extensions.connection = psycopg2.connect(
user='user2',
password='1',
host='192.168.1.68',
port='5432',
database='test0'
)
with pg_conn:
with pg_conn.cursor() as cursor:
cursor.execute(postgres_sql_requests.schema_create)
with pg_conn:
with pg_conn.cursor() as cursor:
start = time.time()
insert_statement = ''
sqlite_content = sqlite_conn.execute('select * from squads_stats_states;').fetchall()
# for row in sqlite_content:
# # cursor.execute(insert_pg, row)
# insert_statement += cursor.mogrify(insert_pg, row).decode('utf-8')
# print((time.time() - start) * 100, 'ms for creating insert_statement')
# print('inserting')
cursor.executemany(insert_pg, sqlite_content)
# 149619.26856040955 ms total
print((time.time() - start) * 100, 'ms total')
pg_conn.close()
sqlite_conn.close()

7
web.py
View File

@ -1,4 +1,4 @@
import model
from model import model
import json
import falcon
import os
@ -20,8 +20,8 @@ class Activity:
'platform': platform,
'leaderboard_type': leaderboard,
'limit': req.params.get('limit', 10),
'high_timestamp': req.params.get('before', 'a'),
'low_timestamp': req.params.get('after', 0)
'high_timestamp': req.params.get('before', '3307-12-12'),
'low_timestamp': req.params.get('after', '0001-01-01')
}
try:
@ -37,6 +37,7 @@ class ActivityHtml:
Activity().on_get(req, resp, leaderboard, platform)
table_in_json: str = resp.text
resp.content_type = falcon.MEDIA_HTML
resp.text = utils.activity_table_html_template.replace('{items}', json.dumps(table_in_json))
# what? f-strings? .format? never heard about them