mirror of
https://github.com/norohind/SquadsActivityMonitor.git
synced 2025-04-12 13:00:02 +03:00
use pool
This commit is contained in:
parent
9ffff121ff
commit
6711b2dd80
@ -4,6 +4,7 @@ import datetime
|
|||||||
|
|
||||||
import psycopg2.extensions
|
import psycopg2.extensions
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
|
from psycopg2 import pool
|
||||||
import config
|
import config
|
||||||
|
|
||||||
from .sqlite_cache import cache
|
from .sqlite_cache import cache
|
||||||
@ -16,31 +17,13 @@ logger = get_main_logger()
|
|||||||
logger.propagate = False
|
logger.propagate = False
|
||||||
|
|
||||||
|
|
||||||
def errors_catcher(func: callable) -> callable:
|
|
||||||
def decorated(*args, **kwargs):
|
|
||||||
self: PostgresModel = args[0]
|
|
||||||
try:
|
|
||||||
result = func(*args, **kwargs)
|
|
||||||
|
|
||||||
except psycopg2.InterfaceError:
|
|
||||||
self.open_model() # args[0] - self
|
|
||||||
result = func(*args, **kwargs)
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
# just reset transaction at least
|
|
||||||
self.db.rollback()
|
|
||||||
raise
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
return decorated
|
|
||||||
|
|
||||||
|
|
||||||
class PostgresModel(AbstractModel):
|
class PostgresModel(AbstractModel):
|
||||||
db: psycopg2.extensions.connection
|
db_pool: psycopg2.extensions.connection
|
||||||
|
|
||||||
def open_model(self):
|
def open_model(self):
|
||||||
self.db: psycopg2.extensions.connection = psycopg2.connect(
|
self.db_pool: pool.ThreadedConnectionPool = pool.ThreadedConnectionPool(
|
||||||
|
minconn=1,
|
||||||
|
maxconn=10,
|
||||||
user=config.postgres_username,
|
user=config.postgres_username,
|
||||||
password=config.postgres_password,
|
password=config.postgres_password,
|
||||||
host=config.postgres_hostname,
|
host=config.postgres_hostname,
|
||||||
@ -48,17 +31,17 @@ class PostgresModel(AbstractModel):
|
|||||||
database=config.postgres_database_name,
|
database=config.postgres_database_name,
|
||||||
cursor_factory=psycopg2.extras.DictCursor)
|
cursor_factory=psycopg2.extras.DictCursor)
|
||||||
|
|
||||||
logger.info(f'Connected to {self.db.dsn}')
|
conn: psycopg2.extensions.connection = self.db_pool.getconn()
|
||||||
|
logger.info(f'Connected to {conn.dsn}')
|
||||||
|
|
||||||
with self.db:
|
with conn:
|
||||||
with self.db.cursor() as cursor:
|
with conn.cursor() as cursor:
|
||||||
cursor.execute(postgres_sql_requests.schema_create) # schema creation
|
cursor.execute(postgres_sql_requests.schema_create) # schema creation
|
||||||
|
|
||||||
def close_model(self):
|
def close_model(self):
|
||||||
self.db.close()
|
self.db_pool.closeall()
|
||||||
logger.info(f'Connection to {self.db.dsn} closed successfully')
|
logger.info(f'Connection to db closed successfully')
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def get_activity_changes(self, platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp)\
|
def get_activity_changes(self, platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp)\
|
||||||
-> list:
|
-> list:
|
||||||
cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}'
|
cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}'
|
||||||
@ -84,23 +67,27 @@ class PostgresModel(AbstractModel):
|
|||||||
if limit is None:
|
if limit is None:
|
||||||
limit = 10
|
limit = 10
|
||||||
|
|
||||||
with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(postgres_sql_requests.select_activity_pretty_names, {
|
try:
|
||||||
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
||||||
'platform': utils.Platform(platform.upper()).value,
|
cursor.execute(postgres_sql_requests.select_activity_pretty_names, {
|
||||||
'limit': limit,
|
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
|
||||||
'high_timestamp': high_timestamp,
|
'platform': utils.Platform(platform.upper()).value,
|
||||||
'low_timestamp': low_timestamp
|
'limit': limit,
|
||||||
})
|
'high_timestamp': high_timestamp,
|
||||||
|
'low_timestamp': low_timestamp
|
||||||
|
})
|
||||||
|
|
||||||
result: list = cursor.fetchall()
|
result: list = cursor.fetchall()
|
||||||
|
|
||||||
if not cache.disabled:
|
if not cache.disabled:
|
||||||
cache.set(cache_key, json.dumps(result))
|
cache.set(cache_key, json.dumps(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def insert_leaderboard_db(self, leaderboard_list: dict) -> int:
|
def insert_leaderboard_db(self, leaderboard_list: dict) -> int:
|
||||||
"""
|
"""
|
||||||
Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB
|
Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB
|
||||||
@ -115,22 +102,26 @@ class PostgresModel(AbstractModel):
|
|||||||
|
|
||||||
action_id: int # not last, current that we will use
|
action_id: int # not last, current that we will use
|
||||||
|
|
||||||
with self.db.cursor() as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(postgres_sql_requests.create_new_action_id, {'LB_type': LB_type, 'platform': platform})
|
try:
|
||||||
action_id: int = cursor.fetchone()[0]
|
with conn.cursor() as cursor:
|
||||||
|
cursor.execute(postgres_sql_requests.create_new_action_id, {'LB_type': LB_type, 'platform': platform})
|
||||||
|
action_id: int = cursor.fetchone()[0]
|
||||||
|
|
||||||
# Patch for additional values
|
# Patch for additional values
|
||||||
for squad in leaderboard:
|
for squad in leaderboard:
|
||||||
squad.update({'action_id': action_id})
|
squad.update({'action_id': action_id})
|
||||||
|
|
||||||
cursor.executemany(postgres_sql_requests.insert_leaderboard, leaderboard)
|
cursor.executemany(postgres_sql_requests.insert_leaderboard, leaderboard)
|
||||||
|
|
||||||
self.db.commit()
|
conn.commit()
|
||||||
|
|
||||||
cache.delete_all() # drop cache
|
cache.delete_all() # drop cache
|
||||||
return action_id
|
return action_id
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def get_diff_action_id(self, action_id: int) -> list:
|
def get_diff_action_id(self, action_id: int) -> list:
|
||||||
"""
|
"""
|
||||||
Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and
|
Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and
|
||||||
@ -148,16 +139,20 @@ class PostgresModel(AbstractModel):
|
|||||||
|
|
||||||
logger.debug(f'Not cached result for {cache_key}')
|
logger.debug(f'Not cached result for {cache_key}')
|
||||||
|
|
||||||
with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id})
|
try:
|
||||||
result: list = cursor.fetchall()
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
||||||
|
cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id})
|
||||||
|
result: list = cursor.fetchall()
|
||||||
|
|
||||||
if not cache.disabled:
|
if not cache.disabled:
|
||||||
cache.set(cache_key, json.dumps(result))
|
cache.set(cache_key, json.dumps(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def get_leaderboard_sum_history(self, platform: str, leaderboard_type: str) -> list[dict]:
|
def get_leaderboard_sum_history(self, platform: str, leaderboard_type: str) -> list[dict]:
|
||||||
cache_key = f'sum_history_{platform}_{leaderboard_type}'
|
cache_key = f'sum_history_{platform}_{leaderboard_type}'
|
||||||
cached_result: typing.Union[str, None] = cache.get(cache_key)
|
cached_result: typing.Union[str, None] = cache.get(cache_key)
|
||||||
@ -166,21 +161,25 @@ class PostgresModel(AbstractModel):
|
|||||||
logger.debug(f'Cached result for {cache_key}')
|
logger.debug(f'Cached result for {cache_key}')
|
||||||
return json.loads(cached_result)
|
return json.loads(cached_result)
|
||||||
|
|
||||||
with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(
|
try:
|
||||||
postgres_sql_requests.select_leaderboard_sum_history,
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
||||||
{
|
cursor.execute(
|
||||||
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
|
postgres_sql_requests.select_leaderboard_sum_history,
|
||||||
'platform': utils.Platform(platform.upper()).value}
|
{
|
||||||
)
|
'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value,
|
||||||
result: list = cursor.fetchall()
|
'platform': utils.Platform(platform.upper()).value}
|
||||||
|
)
|
||||||
|
result: list = cursor.fetchall()
|
||||||
|
|
||||||
if not cache.disabled:
|
if not cache.disabled:
|
||||||
cache.set(cache_key, json.dumps(result))
|
cache.set(cache_key, json.dumps(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def get_leaderboard_by_action_id(self, action_id: int) -> list[dict]:
|
def get_leaderboard_by_action_id(self, action_id: int) -> list[dict]:
|
||||||
cache_key = f'leaderboard_by_action_id_{action_id}'
|
cache_key = f'leaderboard_by_action_id_{action_id}'
|
||||||
|
|
||||||
@ -190,22 +189,26 @@ class PostgresModel(AbstractModel):
|
|||||||
logger.debug(f'Cached result for {cache_key}')
|
logger.debug(f'Cached result for {cache_key}')
|
||||||
return json.loads(cached_result)
|
return json.loads(cached_result)
|
||||||
|
|
||||||
with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(
|
try:
|
||||||
postgres_sql_requests.select_leaderboard_by_action_id,
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
||||||
{
|
cursor.execute(
|
||||||
'action_id': action_id
|
postgres_sql_requests.select_leaderboard_by_action_id,
|
||||||
}
|
{
|
||||||
)
|
'action_id': action_id
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
result: list[dict] = cursor.fetchall()
|
result: list[dict] = cursor.fetchall()
|
||||||
|
|
||||||
if not cache.disabled:
|
if not cache.disabled:
|
||||||
cache.set(cache_key, json.dumps(result))
|
cache.set(cache_key, json.dumps(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
@errors_catcher
|
|
||||||
def get_latest_leaderboard(self, platform: str, leaderboard_type: str) -> list[dict]:
|
def get_latest_leaderboard(self, platform: str, leaderboard_type: str) -> list[dict]:
|
||||||
cache_key = f'latest_leaderboard_{platform}_{leaderboard_type}'
|
cache_key = f'latest_leaderboard_{platform}_{leaderboard_type}'
|
||||||
cached_result: typing.Union[str, None] = cache.get(cache_key)
|
cached_result: typing.Union[str, None] = cache.get(cache_key)
|
||||||
@ -216,17 +219,22 @@ class PostgresModel(AbstractModel):
|
|||||||
|
|
||||||
logger.debug(f'Not cached result for {cache_key}')
|
logger.debug(f'Not cached result for {cache_key}')
|
||||||
|
|
||||||
with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
conn = self.db_pool.getconn()
|
||||||
cursor.execute(
|
try:
|
||||||
postgres_sql_requests.select_latest_leaderboard,
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
||||||
{
|
cursor.execute(
|
||||||
'platform': platform.upper(),
|
postgres_sql_requests.select_latest_leaderboard,
|
||||||
'LB_type': leaderboard_type.lower()
|
{
|
||||||
}
|
'platform': platform.upper(),
|
||||||
)
|
'LB_type': leaderboard_type.lower()
|
||||||
result: list[dict] = cursor.fetchall()
|
}
|
||||||
|
)
|
||||||
|
result: list[dict] = cursor.fetchall()
|
||||||
|
|
||||||
if not cache.disabled:
|
if not cache.disabled:
|
||||||
cache.set(cache_key, json.dumps(result))
|
cache.set(cache_key, json.dumps(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user