Migrating to new DB schema (WIP)

This commit is contained in:
norohind 2022-04-03 23:41:13 +03:00
parent 8e82a6aa48
commit ee575a726a
Signed by: norohind
GPG Key ID: 01C3BECC26FB59E1
3 changed files with 315 additions and 116 deletions

View File

@ -116,25 +116,16 @@ class PostgresModel(AbstractModel):
action_id: int # not last, current that we will use
with self.db.cursor() as cursor:
cursor.execute(postgres_sql_requests.select_last_action_id)
action_id_fetch_one: typing.Union[None, dict[str, int]] = cursor.fetchone()
cursor.execute(postgres_sql_requests.create_new_action_id, {'LB_type': LB_type, 'platform': platform})
action_id: int = cursor.fetchone()[0]
if action_id_fetch_one is None:
# i.e. first launch
action_id = 1 # yep, not 0
# Patch for additional values
for squad in leaderboard:
squad.update({'action_id': action_id})
else:
action_id = action_id_fetch_one['action_id'] + 1
cursor.executemany(postgres_sql_requests.insert_leaderboard, leaderboard)
# Patch for additional values
for squad in leaderboard:
squad.update({'action_id': action_id, 'LB_type': LB_type, 'platform': platform})
with self.db:
with self.db.cursor() as cursor:
cursor.executemany(
postgres_sql_requests.insert_leader_board,
leaderboard)
self.db.commit()
cache.delete_all() # drop cache

View File

@ -1,129 +1,178 @@
schema_create = """create table if not exists squads_stats_states (
action_id integer,
schema_create = """create table if not exists squads_stats_states_action_info (
action_id serial primary key,
leaderboard_type text,
platform text,
timestamp timestamp default timezone('utc', now())
);
create table if not exists squads_stats_states_data (
action_id integer,
squadron_id integer,
score bigint,
percentile integer,
rank integer,
name text,
tag text,
timestamp timestamp default timezone('utc', now()));
create index if not exists idx_action_id_0 on squads_stats_states (action_id);
create index if not exists idx_platform_leaderboard_type_1 on squads_stats_states(platform, leaderboard_type);
create index if not exists idx_timestamp_0 on squads_stats_states(timestamp);
foreign key (action_id) references squads_stats_states_action_info(action_id)
);
"""
select_last_action_id = """select action_id
from squads_stats_states
create_new_action_id = """
insert into squads_stats_states_action_info (leaderboard_type, platform) values (%(LB_type)s, %(platform)s);
select action_id
from squads_stats_states_action_info
order by action_id desc
limit 1;"""
insert_leader_board = """insert into squads_stats_states (action_id, leaderboard_type, platform, squadron_id, score,
percentile, rank, name, tag)
values
(%(action_id)s, %(LB_type)s, %(platform)s, %(squadron)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s);"""
select_activity_pretty_names = """select
sum_score::bigint as "sum_score",
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as "timestamp",
action_id::bigint as "action_id",
sum_score_old::bigint as "sum_score_old",
(sum_score - sum_score_old)::bigint as "diff"
from
(
select
sum_score,
min(timestamp) as timestamp,
action_id,
lag (sum_score, 1) over (order by action_id) sum_score_old
from (
select sum(score) as sum_score, min(timestamp) as timestamp, action_id
from squads_stats_states
where
leaderboard_type = %(LB_type)s and
platform = %(platform)s and
%(high_timestamp)s::timestamp >= timestamp and
timestamp >= %(low_timestamp)s::timestamp
group by action_id
) as foo
group by sum_score, action_id
order by timestamp desc
) as foo1
where (sum_score - sum_score_old) <> 0
insert_leaderboard = """
insert into squads_stats_states_data (action_id, squadron_id, score, percentile, rank, name, tag)
values
(%(action_id)s, %(squadron)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s);"""
select_activity_pretty_names = """
with action_ids as (
select action_id, timestamp
from squads_stats_states_action_info
where
leaderboard_type = %(LB_type)s and
platform = %(platform)s and
%(high_timestamp)s::timestamp >= timestamp
and timestamp >= %(low_timestamp)s::timestamp
order by action_id desc
),
sum_history as (
select sum(score) as sum_score, squads_stats_states_data.action_id as action_id, action_ids.timestamp as timestamp
from squads_stats_states_data, action_ids
where squads_stats_states_data.action_id = action_ids.action_id
group by squads_stats_states_data.action_id, action_ids.timestamp
),
sum_history_old_calculated as (select
sum_score,
min(timestamp) as timestamp,
action_id,
lag (sum_score, 1) over (order by action_id) sum_score_old
from sum_history
group by sum_score, action_id
order by timestamp desc
)
select
sum_score::bigint as "sum_score",
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as "timestamp",
action_id::bigint as "action_id",
sum_score_old::bigint as "sum_score_old",
(sum_score - sum_score_old)::bigint as "diff"
from sum_history_old_calculated
where (sum_score - sum_score_old) <> 0
limit %(limit)s;"""
select_diff_by_action_id = """select
select_diff_by_action_id = """
with origin_record as (
select action_id, timestamp, leaderboard_type, platform
from squads_stats_states_action_info
where action_id = %(action_id)s
),
prev_records_lag as (
select
squads_stats_states_action_info.action_id,
lag(squads_stats_states_action_info.action_id, 1) over (order by squads_stats_states_action_info.action_id) prev_action_id
from squads_stats_states_action_info, origin_record
where squads_stats_states_action_info.platform = origin_record.platform
and squads_stats_states_action_info.leaderboard_type = origin_record.leaderboard_type
),
prev_record as (
select prev_action_id
from prev_records_lag, origin_record
where prev_records_lag.action_id = origin_record.action_id
)
select
coalesce(new_stats.name, old_stats.name) as "squadron_name",
coalesce(new_stats.tag, old_stats.tag) as "tag",
coalesce(new_stats.score, 0) as "total_experience",
coalesce(old_stats.score, 0) as "total_experience_old",
coalesce(new_stats.score, 0) - coalesce(old_stats.score, 0) as "total_experience_diff",
coalesce(new_stats.leaderboard_type, old_stats.leaderboard_type) as "leaderboard_type",
coalesce(new_stats.platform, old_stats.platform) as "platform"
coalesce(new_stats.tag, old_stats.tag) as "tag",
coalesce(new_stats.score, 0) as "total_experience",
coalesce(old_stats.score, 0) as "total_experience_old",
coalesce(new_stats.score, 0) - coalesce(old_stats.score, 0) as "total_experience_diff"
from (
select *
from squads_stats_states
where action_id = %(action_id)s) new_stats
full join
(
select *
from squads_stats_states
where action_id in (
select distinct squads_stats_states.action_id
from squads_stats_states, (
select timestamp, platform, leaderboard_type, action_id
from squads_stats_states
where action_id = %(action_id)s limit 1) sub1
where
squads_stats_states.platform = sub1.platform and
squads_stats_states.leaderboard_type = sub1.leaderboard_type and
squads_stats_states.action_id < sub1.action_id
order by squads_stats_states.action_id desc
limit 1)) old_stats
on new_stats.squadron_id = old_stats.squadron_id
select *
from squads_stats_states_data
where action_id = %(action_id)s
) new_stats
full join (
select *
from squads_stats_states_data, prev_record
where action_id = prev_record.prev_action_id
) old_stats
on new_stats.squadron_id = old_stats.squadron_id
where coalesce(new_stats.score, 0) - coalesce(old_stats.score, 0) <> 0
order by coalesce(new_stats.score, 0) - coalesce(old_stats.score, 0) desc;"""
select_leaderboard_sum_history = """select
sum(score)::bigint as "Score Sum",
to_char(max(timestamp), 'YYYY-MM-DD HH24:MI:SS') as "Timestamp UTC"
from squads_stats_states
where leaderboard_type = %(LB_type)s and platform = %(platform)s
group by action_id
select_leaderboard_sum_history = """
select
sum(score)::bigint as "Score Sum",
to_char(max(timestamp), 'YYYY-MM-DD HH24:MI:SS') as "Timestamp UTC"
from squads_stats_states_data
inner join squads_stats_states_action_info
on squads_stats_states_action_info.action_id = squads_stats_states_data.action_id
where leaderboard_type = 'cqc' and platform = 'PC'
group by squads_stats_states_data.action_id
order by "Timestamp UTC" desc
limit 1000;
"""
select_leaderboard_by_action_id = """select
name as squadron_name,
tag,
rank,
score,
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as timestamp,
leaderboard_type,
platform,
squadron_id
from squads_stats_states
where action_id = %(action_id)s
select_leaderboard_by_action_id = """
select
name,
rank,
score,
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as timestamp,
leaderboard_type,
platform,
squadron_id
from squads_stats_states_data
inner join squads_stats_states_action_info
on squads_stats_states_data.action_id = squads_stats_states_action_info.action_id
where squads_stats_states_action_info.action_id = %(action_id)s
order by score desc;
"""
select_latest_leaderboard = """select
name as squadron_name,
tag,
rank,
score,
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') as timestamp,
leaderboard_type,
platform,
squadron_id
from squads_stats_states
where action_id = (
select max(action_id) as action_id
from squads_stats_states
where leaderboard_type = %(LB_type)s and platform = %(platform)s)
order by score desc;
select_latest_leaderboard = """
with max_action_id as (
select
max(action_id) as action_id
from squads_stats_states_action_info
where
leaderboard_type = 'cqc' and
platform = 'PC'
),
leaderboard as (
select
name as squadron_name,
tag,
rank,
score,
squadron_id,
squads_stats_states_data.action_id as action_id
from squads_stats_states_data, max_action_id
where squads_stats_states_data.action_id = max_action_id.action_id
)
select
squadron_name,
tag,
rank,
score,
to_char(squads_stats_states_action_info.timestamp, 'YYYY-MM-DD HH24:MI:SS') as timestamp,
squads_stats_states_action_info.leaderboard_type as leaderboard_type,
squads_stats_states_action_info.platform as platform,
squadron_id
from leaderboard
inner join squads_stats_states_action_info
on squads_stats_states_action_info.action_id = leaderboard.action_id
"""

View File

@ -0,0 +1,159 @@
"""
Script to migrate data from old flat schema to new one
"""
import psycopg2.extensions
import psycopg2.extras
import config
import signal
import time
exiting = False
def exit_handler(_, __):
global exiting
exiting = True
signal.signal(signal.SIGTERM, exit_handler)
signal.signal(signal.SIGINT, exit_handler)
db: psycopg2.extensions.connection = psycopg2.connect(
user=config.postgres_username,
password=config.postgres_password,
host=config.postgres_hostname,
port=config.postgres_port,
database=config.postgres_database_name,
cursor_factory=psycopg2.extras.DictCursor)
old_table = 'squads_stats_states'
action_info_table = 'squads_stats_states_action_info'
data_table = 'squads_stats_states_data'
schema_create = """create table if not exists squads_stats_states_action_info (
action_id serial primary key,
leaderboard_type text,
platform text,
timestamp timestamp default timezone('utc', now())
);
create table if not exists squads_stats_states_data (
action_id integer,
squadron_id integer,
score bigint,
percentile integer,
rank integer,
name text,
tag text,
foreign key (action_id) references squads_stats_states_action_info(action_id)
);
create table if not exists squads_stats_states_metainfo (
property text primary key,
int_value integer,
text_value text
);
insert into squads_stats_states_metainfo (property, int_value) values ('origin_offset', 0) on conflict do nothing;
"""
last_new_action_id_q = f"select max(action_id) - (select int_value from squads_stats_states_metainfo where property = 'origin_offset') from {action_info_table};"
select_under_action_id = f'select distinct action_id, leaderboard_type, platform from {old_table} where action_id > %(action_id)s order by action_id;'
select_unique_leaderboard = f'select * from {old_table} where action_id = %(action_id)s and leaderboard_type = %(LB_type)s and platform = %(platform)s order by rank limit 1;'
db_side_insert_unique_leaderboard = """
insert into squads_stats_states_data (action_id, squadron_id, score, percentile, rank, name, tag)
select action_id + (select int_value from squads_stats_states_metainfo where property = 'origin_offset') as action_id, squadron_id, score, percentile, rank, name, tag
from squads_stats_states
where
action_id = %(action_id)s and
leaderboard_type = %(LB_type)s and
platform = %(platform)s
order by rank"""
insert_new_action_id = """
insert into squads_stats_states_action_info (action_id, leaderboard_type, platform, timestamp) values (%(action_id)s, %(LB_type)s, %(platform)s, %(timestamp)s);
"""
insert_new_leaderboard = """
insert into squads_stats_states_data (action_id, squadron_id, score, percentile, rank, name, tag)
values
(%(action_id)s, %(squadron_id)s, %(score)s, %(percentile)s, %(rank)s, %(name)s, %(tag)s);"""
cursor: psycopg2.extras.DictCursor = db.cursor()
cursor.execute(schema_create)
cursor.execute(last_new_action_id_q)
last_new_action_id = cursor.fetchall()
if last_new_action_id[0][0] is None:
last_action_id = 0
else:
last_action_id = last_new_action_id[0][0]
print(f'{last_action_id=}')
initial_q_timer = time.time()
cursor.execute(select_under_action_id, {'action_id': last_action_id})
print(f'select_under_action_id took: {time.time() - initial_q_timer}')
prev_action_id = None
records_counter = 0
timer = time.time()
for unique_lb_platform in cursor.fetchall():
action_id: int = unique_lb_platform['action_id']
if prev_action_id == action_id:
# increment offset
cursor.execute("update squads_stats_states_metainfo set int_value = int_value + 1 where property = 'origin_offset';")
db.commit()
LB_type = unique_lb_platform['leaderboard_type']
platform = unique_lb_platform['platform']
if exiting:
break
# get all items under action_id
# perform sanity check
# insert to new tables
first_q_timer = time.time()
cursor.execute(select_unique_leaderboard,
{
'action_id': action_id,
'LB_type': LB_type,
'platform': platform
})
old_leaderboard = cursor.fetchall()
timestamp = old_leaderboard[0]['timestamp']
cursor.execute("select int_value from squads_stats_states_metainfo where property = 'origin_offset';")
offseted_action_id = cursor.fetchone()[0]
cursor.execute(insert_new_action_id, {
'action_id': action_id + offseted_action_id,
'LB_type': LB_type,
'platform': platform,
'timestamp': timestamp
})
cursor.execute(db_side_insert_unique_leaderboard,
{
'action_id': action_id,
'LB_type': LB_type,
'platform': platform
})
if records_counter % 10 == 0:
db.commit()
prev_action_id = action_id
records_counter += 1
db.commit()
db.close()
timed = time.time() - timer
print(f'Inserted: {records_counter} for {timed} s, avg rate {records_counter / timed} records/s')