diff --git a/model/postgres_model.py b/model/postgres_model.py index 4fe12d4..10b1df0 100644 --- a/model/postgres_model.py +++ b/model/postgres_model.py @@ -4,6 +4,7 @@ import datetime import psycopg2.extensions import psycopg2.extras +from psycopg2 import pool import config from .sqlite_cache import cache @@ -16,31 +17,13 @@ logger = get_main_logger() logger.propagate = False -def errors_catcher(func: callable) -> callable: - def decorated(*args, **kwargs): - self: PostgresModel = args[0] - try: - result = func(*args, **kwargs) - - except psycopg2.InterfaceError: - self.open_model() # args[0] - self - result = func(*args, **kwargs) - - except Exception: - # just reset transaction at least - self.db.rollback() - raise - - return result - - return decorated - - class PostgresModel(AbstractModel): - db: psycopg2.extensions.connection + db_pool: psycopg2.extensions.connection def open_model(self): - self.db: psycopg2.extensions.connection = psycopg2.connect( + self.db_pool: pool.ThreadedConnectionPool = pool.ThreadedConnectionPool( + minconn=1, + maxconn=10, user=config.postgres_username, password=config.postgres_password, host=config.postgres_hostname, @@ -48,17 +31,17 @@ class PostgresModel(AbstractModel): database=config.postgres_database_name, cursor_factory=psycopg2.extras.DictCursor) - logger.info(f'Connected to {self.db.dsn}') + conn: psycopg2.extensions.connection = self.db_pool.getconn() + logger.info(f'Connected to {conn.dsn}') - with self.db: - with self.db.cursor() as cursor: + with conn: + with conn.cursor() as cursor: cursor.execute(postgres_sql_requests.schema_create) # schema creation def close_model(self): - self.db.close() - logger.info(f'Connection to {self.db.dsn} closed successfully') + self.db_pool.closeall() + logger.info(f'Connection to db closed successfully') - @errors_catcher def get_activity_changes(self, platform: str, leaderboard_type: str, limit: int, low_timestamp, high_timestamp)\ -> list: cache_key: str = f'{platform}_{leaderboard_type}_{limit}_{low_timestamp}_{high_timestamp}' @@ -84,23 +67,27 @@ class PostgresModel(AbstractModel): if limit is None: limit = 10 - with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: - cursor.execute(postgres_sql_requests.select_activity_pretty_names, { - 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, - 'platform': utils.Platform(platform.upper()).value, - 'limit': limit, - 'high_timestamp': high_timestamp, - 'low_timestamp': low_timestamp - }) + conn = self.db_pool.getconn() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute(postgres_sql_requests.select_activity_pretty_names, { + 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, + 'platform': utils.Platform(platform.upper()).value, + 'limit': limit, + 'high_timestamp': high_timestamp, + 'low_timestamp': low_timestamp + }) - result: list = cursor.fetchall() + result: list = cursor.fetchall() - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) - return result + return result + + finally: + self.db_pool.putconn(conn) - @errors_catcher def insert_leaderboard_db(self, leaderboard_list: dict) -> int: """ Takes leaderboard as list, it platform, type, db connection and insert leaderboard to DB @@ -115,22 +102,26 @@ class PostgresModel(AbstractModel): action_id: int # not last, current that we will use - with self.db.cursor() as cursor: - cursor.execute(postgres_sql_requests.create_new_action_id, {'LB_type': LB_type, 'platform': platform}) - action_id: int = cursor.fetchone()[0] + conn = self.db_pool.getconn() + try: + with conn.cursor() as cursor: + cursor.execute(postgres_sql_requests.create_new_action_id, {'LB_type': LB_type, 'platform': platform}) + action_id: int = cursor.fetchone()[0] - # Patch for additional values - for squad in leaderboard: - squad.update({'action_id': action_id}) + # Patch for additional values + for squad in leaderboard: + squad.update({'action_id': action_id}) - cursor.executemany(postgres_sql_requests.insert_leaderboard, leaderboard) + cursor.executemany(postgres_sql_requests.insert_leaderboard, leaderboard) - self.db.commit() + conn.commit() - cache.delete_all() # drop cache - return action_id + cache.delete_all() # drop cache + return action_id + + finally: + self.db_pool.putconn(conn) - @errors_catcher def get_diff_action_id(self, action_id: int) -> list: """ Takes action_id and returns which squadrons has been changed in leaderboard as in action_id and @@ -148,16 +139,20 @@ class PostgresModel(AbstractModel): logger.debug(f'Not cached result for {cache_key}') - with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: - cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id}) - result: list = cursor.fetchall() + conn = self.db_pool.getconn() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute(postgres_sql_requests.select_diff_by_action_id, {'action_id': action_id}) + result: list = cursor.fetchall() - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) - return result + return result + + finally: + self.db_pool.putconn(conn) - @errors_catcher def get_leaderboard_sum_history(self, platform: str, leaderboard_type: str) -> list[dict]: cache_key = f'sum_history_{platform}_{leaderboard_type}' cached_result: typing.Union[str, None] = cache.get(cache_key) @@ -166,21 +161,25 @@ class PostgresModel(AbstractModel): logger.debug(f'Cached result for {cache_key}') return json.loads(cached_result) - with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: - cursor.execute( - postgres_sql_requests.select_leaderboard_sum_history, - { - 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, - 'platform': utils.Platform(platform.upper()).value} - ) - result: list = cursor.fetchall() + conn = self.db_pool.getconn() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute( + postgres_sql_requests.select_leaderboard_sum_history, + { + 'LB_type': utils.LeaderboardTypes(leaderboard_type.lower()).value, + 'platform': utils.Platform(platform.upper()).value} + ) + result: list = cursor.fetchall() - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) - return result + return result + + finally: + self.db_pool.putconn(conn) - @errors_catcher def get_leaderboard_by_action_id(self, action_id: int) -> list[dict]: cache_key = f'leaderboard_by_action_id_{action_id}' @@ -190,22 +189,26 @@ class PostgresModel(AbstractModel): logger.debug(f'Cached result for {cache_key}') return json.loads(cached_result) - with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: - cursor.execute( - postgres_sql_requests.select_leaderboard_by_action_id, - { - 'action_id': action_id - } - ) + conn = self.db_pool.getconn() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute( + postgres_sql_requests.select_leaderboard_by_action_id, + { + 'action_id': action_id + } + ) - result: list[dict] = cursor.fetchall() + result: list[dict] = cursor.fetchall() - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) - return result + return result + + finally: + self.db_pool.putconn(conn) - @errors_catcher def get_latest_leaderboard(self, platform: str, leaderboard_type: str) -> list[dict]: cache_key = f'latest_leaderboard_{platform}_{leaderboard_type}' cached_result: typing.Union[str, None] = cache.get(cache_key) @@ -216,17 +219,22 @@ class PostgresModel(AbstractModel): logger.debug(f'Not cached result for {cache_key}') - with self.db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: - cursor.execute( - postgres_sql_requests.select_latest_leaderboard, - { - 'platform': platform.upper(), - 'LB_type': leaderboard_type.lower() - } - ) - result: list[dict] = cursor.fetchall() + conn = self.db_pool.getconn() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: + cursor.execute( + postgres_sql_requests.select_latest_leaderboard, + { + 'platform': platform.upper(), + 'LB_type': leaderboard_type.lower() + } + ) + result: list[dict] = cursor.fetchall() - if not cache.disabled: - cache.set(cache_key, json.dumps(result)) + if not cache.disabled: + cache.set(cache_key, json.dumps(result)) - return result + return result + + finally: + self.db_pool.putconn(conn)