125 lines
4.3 KiB
Python
125 lines
4.3 KiB
Python
import asyncio
|
|
import os
|
|
import signal
|
|
import logging
|
|
import sys
|
|
from typing import Any
|
|
from itertools import chain
|
|
|
|
from loguru import logger
|
|
from PresenceTracker import PresenceTracker, get_db
|
|
import discord
|
|
from discord.ext.commands import Bot as BotBase
|
|
from pathlib import Path
|
|
|
|
"""
|
|
Cases should be considered:
|
|
Case 1 None -> Playing
|
|
Case 2 Playing -> None
|
|
Case 3 Playing -> bot shutdown -> Already stopped playing (just commit last end_time?)
|
|
Case 4 None -> bot shutdown -> Already playing (catch on stop playing, so Case 2)
|
|
Case 5 Playing -> Gone invisible -> Gone visible with the same activity (handles by Case 1 and Case 2)
|
|
Case 6: Playing -> Still playing (for end_time updating)
|
|
|
|
TODO: Handle negative diff
|
|
"""
|
|
|
|
|
|
class InterceptHandler(logging.Handler):
|
|
def emit(self, record):
|
|
# Get corresponding Loguru level if it exists
|
|
try:
|
|
level = logger.level(record.levelname).name
|
|
except ValueError:
|
|
level = record.levelno
|
|
|
|
# Find caller from where originated the logged message
|
|
frame, depth = logging.currentframe(), 2
|
|
while frame.f_code.co_filename == logging.__file__:
|
|
frame = frame.f_back
|
|
depth += 1
|
|
|
|
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
|
|
|
|
logging.basicConfig(handlers=[InterceptHandler()], level=logging.INFO)
|
|
|
|
|
|
class Bot(BotBase):
|
|
def __init__(self, *, intents: discord.Intents, activity_tracker: PresenceTracker, **options: Any):
|
|
super().__init__(intents=intents, **options)
|
|
self.activity_tracker = activity_tracker
|
|
|
|
async def setup_hook(self) -> None:
|
|
logger.info(f'Invite URL: {"No self.user" if self.user is None else discord.utils.oauth_url(self.user.id)}')
|
|
|
|
for filepath in Path('extensions').glob('*.py'):
|
|
filename = filepath.name
|
|
logger.info(f'Loading extension {filename}')
|
|
await self.load_extension(f"extensions.{filename[:-3]}")
|
|
|
|
# self.tree.clear_commands(guild=discord.Object(648268554386407432))
|
|
# await self.tree.sync(guild=discord.Object(648268554386407432))
|
|
#
|
|
# self.tree.copy_global_to(guild=discord.Object(648268554386407432))
|
|
# await self.tree.sync(guild=discord.Object(648268554386407432))
|
|
|
|
# await self.tree.sync()
|
|
|
|
|
|
def signal_handler(self, signame, _):
|
|
logger.info(f'Got {signame} signal, shutting down')
|
|
self.loop.create_task(self.close())
|
|
|
|
async def on_ready(self):
|
|
logger.info(f'Ready {self.user}')
|
|
|
|
async def on_presence_update(self, before: discord.Member, after: discord.Member):
|
|
if before.bot:
|
|
return
|
|
|
|
for activity in chain(before.activities, after.activities):
|
|
if activity.type == discord.ActivityType.playing:
|
|
if isinstance(activity, (discord.Game, discord.Activity)):
|
|
if activity.name is not None and activity.start is not None:
|
|
self.activity_tracker.log_activity(after.id, activity.name, activity.start, activity.end)
|
|
self.activity_tracker.saturate_users_cache(after.id, after.name + '#' + after.discriminator)
|
|
|
|
else:
|
|
logger.warning(f'Got activity with missing name or start: {activity.to_dict()!r}')
|
|
|
|
else:
|
|
logger.warning(
|
|
f'Got discord.ActivityType.playing with unusual type: {type(activity)}; {activity.to_dict()}')
|
|
|
|
else:
|
|
logger.trace(f'Got not playing activity: {activity.to_dict()}')
|
|
|
|
|
|
async def async_main():
|
|
# logger.disable('discord')
|
|
logger.remove()
|
|
logger.add(sink=sys.stderr) # Use env var LOGURU_LEVEL to set desire level
|
|
|
|
intents = discord.Intents.default()
|
|
intents.presences = True
|
|
intents.members = True
|
|
|
|
db = get_db(os.environ['DB_PATH'])
|
|
|
|
activity_tracker = PresenceTracker(db)
|
|
bot = Bot(intents=intents, activity_tracker=activity_tracker, command_prefix='')
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
signal.signal(sig, bot.signal_handler)
|
|
|
|
await bot.start(os.environ['TOKEN'])
|
|
|
|
|
|
def main():
|
|
loop = asyncio.new_event_loop()
|
|
loop.run_until_complete(async_main())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|