# -*- coding: utf-8 -*- import asyncio import time try: import rfoo from rfoo.utils import rconsole except ImportError as e: print("not importing rfoo", e) from threading import Thread from typing import Optional, Coroutine from discord.ext import commands from discord.ext.commands import Context from loguru import logger from formatting import format_table class SingletonBase: def __new__(cls, *args, **kwargs): if getattr(cls, 'instance') is None: cls.instance = super().__new__(cls, *args, **kwargs) # logger.debug(id(getattr(cls, 'instance'))) return getattr(cls, 'instance') class BotManagement(commands.Cog, SingletonBase): instance: Optional['BotManagement'] = None rfoo_server_thread: Optional[Thread] = None rfoo_server: Optional['rfoo.InetServer'] = None # def __new__(cls, *args, **kwargs): # if cls.instance is None: # cls.instance = super().__new__(cls, *args, **kwargs) # # return cls.instance def __init__(self, bot: commands.Bot): self.bot = bot @commands.is_owner() @commands.command('listServers') async def list_servers(self, ctx: Context): text_table = format_table(((server.name, str(server.id)) for server in self.bot.guilds), ('name', 'id')) await ctx.channel.send(text_table) @commands.is_owner() @commands.command('listVoice') async def list_voice_connections(self, ctx: Context): text_table = format_table(((it.guild.name,) for it in self.bot.voice_clients)) await ctx.channel.send(text_table) def start_rfoo(self) -> bool: # True if started, False if already started if self.rfoo_server_thread is None: self.rfoo_server = rfoo.InetServer(rconsole.ConsoleHandler, {'bot': self.bot, 'ct': self.ct}) self.rfoo_server_thread = Thread(target=lambda: self.rfoo_server.start(rfoo.LOOPBACK, 54321)) self.rfoo_server_thread.daemon = True self.rfoo_server_thread.start() logger.info('Rfoo thread started by msg') return True return False def stop_rfoo(self) -> bool: if self.rfoo_server_thread is not None: self.rfoo_server.stop() del self.rfoo_server_thread logger.info('Rfoo thread stopped by msg') return True return False @commands.is_owner() @commands.command('rfooStart') async def start(self, ctx: Context): if self.start_rfoo(): await ctx.send('Rfoo thread started') else: await ctx.send('Rfoo thread already started') @commands.is_owner() @commands.command('rfooStop') async def stop(self, ctx: Context): if self.stop_rfoo(): await ctx.send('Rfoo server stopped') else: await ctx.send('Rfoo server already stopped') def ct(self, coro: Coroutine): """ ct - short from create_task execute coroutine and get result """ task = self.bot.loop.create_task(coro) while not task.done(): time.sleep(0.1) try: return task.result() except asyncio.exceptions.InvalidStateError: return task.exception() @commands.command('shutdown') async def shutdown(self, ctx: Context) -> None: """Shutdown bot with hope docker daemon will restart it, `@a31` and `@furiz__` has rights for it""" if ctx.author.id in (420130693696323585, # @a31 444819880781545472): # @furiz__ log_msg = f"Got shutdown command by {ctx.author}" logger.info(log_msg) await ctx.reply('Shutting down') try: await (await self.bot.fetch_user(420130693696323585)).send(log_msg) except Exception as e: logger.opt(exception=e).warning(f'Failed to send shutdown message', ) self.bot.loop.create_task(self.bot.close()) else: await ctx.reply('No rights for you') async def setup(bot: commands.Bot): await bot.add_cog(BotManagement(bot)) async def teardown(bot): stop_res = BotManagement(bot).stop_rfoo() logger.info(f'Unloaded rfoo with result {stop_res} during BotManagement unload')