discord-presence-tracker/PresenceTracker.py

137 lines
5.2 KiB
Python

import sqlite3
from datetime import datetime
from loguru import logger
from datetime_utils import to_unix
def get_db(db_path) -> sqlite3.Connection:
SCHEMA = """
create table if not exists activities (
id integer primary key autoincrement,
name text not null unique
);
create table if not exists users_cache (
user_id integer primary key,
nickname text not null
);
create table if not exists presence_journal (
user_id integer not null,
start_time integer not null, -- In unix timestamp
end_time integer not null,
activity_name_id integer not null,
primary key (user_id, start_time),
foreign key (activity_name_id) references activities(id)
);"""
db = sqlite3.connect(db_path)
db.executescript(SCHEMA)
return db
class PresenceTracker:
def __init__(self, db: sqlite3.Connection):
self.db = db
def log_activity(self, user_id: int, activity_name: str, start_time: datetime, end_time: datetime | None = None):
"""Create record or update uncommited end_time"""
# end_time behaves like "last seen with this activity"
logger.info(f'Logging {user_id=}, {activity_name=}, {start_time.isoformat()=}, {end_time=}')
activity_id = self.get_activity_id(activity_name)
_end_time = self.to_unix_default(end_time) # Defaulted end_time
unix_start_time = to_unix(start_time)
if unix_start_time > _end_time:
logger.warning(f'Got {unix_start_time=} > {_end_time}, {_end_time - unix_start_time=}, {user_id=}, {activity_name=}')
with self.db:
self.db.execute(
"""insert into presence_journal (user_id, start_time, activity_name_id, end_time)
values (:user_id, :start_time, :activity_name_id, :end_time)
on conflict do update set end_time = :end_time;""",
{
'user_id': user_id,
'start_time': to_unix(start_time),
'activity_name_id': activity_id,
'end_time': _end_time
}
)
def get_activity_id(self, activity_name: str) -> int:
"""Get id for given activity, for insert to another table, for example"""
# Try to find first, insert after
params = (activity_name,)
query = self.db.execute('select id from activities where name = ?;', params).fetchone()
if query is not None:
logger.trace(f'Found record for {activity_name!r} id={query}')
return query[0]
with self.db:
query = self.db.execute('insert into activities (name) values (?) returning id;', params).fetchone()
logger.debug(f'Inserted record for {activity_name!r} {query}')
return query[0]
def saturate_users_cache(self, user_id: int, user_name: str):
"""Ensure given user is in cache or add a user to cache"""
logger.trace(f'Adding {user_name!r} to cache')
with self.db:
self.db.execute(
'insert or ignore into users_cache (user_id, nickname) VALUES (?, ?);',
(user_id, user_name)
)
def user_breakdown(self, user_id: int) -> dict[str, int]:
"""Return dict with activities names as keys and spent time in hours as values"""
res = self.db.execute(
'select a.name, round(sum(end_time - start_time) / 3600.0, 1) as total '
'from presence_journal '
'left join activities a on a.id = presence_journal.activity_name_id '
'where user_id = ? '
'group by activity_name_id '
'order by total desc '
'limit 20;',
(user_id,)
).fetchall()
return {row[0]: row[1] for row in res}
def top_users(self, last_days: int | None) -> dict[int, tuple[str | None, int]]:
"""Returns keys - ids, values - tuple(nickname?, hours spent)""" # TODO: implement last_daysa
res = self.db.execute(
"""select uc.user_id, uc.nickname, round(sum(end_time - presence_journal.start_time) / 3600.0, 1) as total
from presence_journal left join users_cache uc on presence_journal.user_id = uc.user_id
group by uc.user_id
order by total desc
limit 50;""").fetchall()
return {row[0]: row[1:3] for row in res}
def top_games(self) -> dict[str, int]:
"""Returns keys - names, values - spent hours"""
res = self.db.execute(
"""select a.name, round(sum(end_time - presence_journal.start_time) / 3600.0) as total
from presence_journal left join activities a on a.id = presence_journal.activity_name_id
group by activity_name_id
order by total desc
limit 50;""").fetchall()
return {row[0]: row[1] for row in res}
@staticmethod
def default_end_time(end_time: datetime | None) -> datetime:
return datetime.now() if end_time is None else end_time
def to_unix_default(self, end_time: datetime | None) -> int:
return to_unix(self.default_end_time(end_time))