137 lines
5.2 KiB
Python
137 lines
5.2 KiB
Python
import sqlite3
|
|
from datetime import datetime
|
|
from loguru import logger
|
|
from datetime_utils import to_unix
|
|
|
|
|
|
def get_db(db_path) -> sqlite3.Connection:
|
|
SCHEMA = """
|
|
create table if not exists activities (
|
|
id integer primary key autoincrement,
|
|
name text not null unique
|
|
);
|
|
|
|
create table if not exists users_cache (
|
|
user_id integer primary key,
|
|
nickname text not null
|
|
);
|
|
|
|
create table if not exists presence_journal (
|
|
user_id integer not null,
|
|
start_time integer not null, -- In unix timestamp
|
|
end_time integer not null,
|
|
activity_name_id integer not null,
|
|
primary key (user_id, start_time),
|
|
foreign key (activity_name_id) references activities(id)
|
|
);"""
|
|
|
|
db = sqlite3.connect(db_path)
|
|
db.executescript(SCHEMA)
|
|
return db
|
|
|
|
|
|
class PresenceTracker:
|
|
def __init__(self, db: sqlite3.Connection):
|
|
self.db = db
|
|
|
|
def log_activity(self, user_id: int, activity_name: str, start_time: datetime, end_time: datetime | None = None):
|
|
"""Create record or update uncommited end_time"""
|
|
|
|
# end_time behaves like "last seen with this activity"
|
|
|
|
logger.info(f'Logging {user_id=}, {activity_name=}, {start_time.isoformat()=}, {end_time=}')
|
|
|
|
activity_id = self.get_activity_id(activity_name)
|
|
_end_time = self.to_unix_default(end_time) # Defaulted end_time
|
|
|
|
unix_start_time = to_unix(start_time)
|
|
|
|
if unix_start_time > _end_time:
|
|
logger.warning(f'Got {unix_start_time=} > {_end_time}, {_end_time - unix_start_time=}, {user_id=}, {activity_name=}')
|
|
|
|
|
|
with self.db:
|
|
self.db.execute(
|
|
"""insert into presence_journal (user_id, start_time, activity_name_id, end_time)
|
|
values (:user_id, :start_time, :activity_name_id, :end_time)
|
|
on conflict do update set end_time = :end_time;""",
|
|
{
|
|
'user_id': user_id,
|
|
'start_time': to_unix(start_time),
|
|
'activity_name_id': activity_id,
|
|
'end_time': _end_time
|
|
}
|
|
)
|
|
|
|
def get_activity_id(self, activity_name: str) -> int:
|
|
"""Get id for given activity, for insert to another table, for example"""
|
|
|
|
# Try to find first, insert after
|
|
params = (activity_name,)
|
|
query = self.db.execute('select id from activities where name = ?;', params).fetchone()
|
|
if query is not None:
|
|
logger.trace(f'Found record for {activity_name!r} id={query}')
|
|
return query[0]
|
|
|
|
with self.db:
|
|
query = self.db.execute('insert into activities (name) values (?) returning id;', params).fetchone()
|
|
|
|
logger.debug(f'Inserted record for {activity_name!r} {query}')
|
|
return query[0]
|
|
|
|
def saturate_users_cache(self, user_id: int, user_name: str):
|
|
"""Ensure given user is in cache or add a user to cache"""
|
|
|
|
logger.trace(f'Adding {user_name!r} to cache')
|
|
with self.db:
|
|
self.db.execute(
|
|
'insert or ignore into users_cache (user_id, nickname) VALUES (?, ?);',
|
|
(user_id, user_name)
|
|
)
|
|
|
|
def user_breakdown(self, user_id: int) -> dict[str, int]:
|
|
"""Return dict with activities names as keys and spent time in hours as values"""
|
|
|
|
res = self.db.execute(
|
|
'select a.name, round(sum(end_time - start_time) / 3600.0, 1) as total '
|
|
'from presence_journal '
|
|
'left join activities a on a.id = presence_journal.activity_name_id '
|
|
'where user_id = ? '
|
|
'group by activity_name_id '
|
|
'order by total desc '
|
|
'limit 20;',
|
|
(user_id,)
|
|
).fetchall()
|
|
|
|
return {row[0]: row[1] for row in res}
|
|
|
|
def top_users(self, last_days: int | None) -> dict[int, tuple[str | None, int]]:
|
|
"""Returns keys - ids, values - tuple(nickname?, hours spent)""" # TODO: implement last_daysa
|
|
|
|
res = self.db.execute(
|
|
"""select uc.user_id, uc.nickname, round(sum(end_time - presence_journal.start_time) / 3600.0, 1) as total
|
|
from presence_journal left join users_cache uc on presence_journal.user_id = uc.user_id
|
|
group by uc.user_id
|
|
order by total desc
|
|
limit 50;""").fetchall()
|
|
|
|
return {row[0]: row[1:3] for row in res}
|
|
|
|
def top_games(self) -> dict[str, int]:
|
|
"""Returns keys - names, values - spent hours"""
|
|
res = self.db.execute(
|
|
"""select a.name, round(sum(end_time - presence_journal.start_time) / 3600.0) as total
|
|
from presence_journal left join activities a on a.id = presence_journal.activity_name_id
|
|
group by activity_name_id
|
|
order by total desc
|
|
limit 50;""").fetchall()
|
|
|
|
return {row[0]: row[1] for row in res}
|
|
|
|
@staticmethod
|
|
def default_end_time(end_time: datetime | None) -> datetime:
|
|
return datetime.now() if end_time is None else end_time
|
|
|
|
def to_unix_default(self, end_time: datetime | None) -> int:
|
|
return to_unix(self.default_end_time(end_time))
|