cogs/BotManagement: BotManagement as singleton in order to easily access it in teardown() callback (just because I can, ofc it can be solved in less complecated way)

But I noted it already implemented and being uncommitted for months so here we go
This commit is contained in:
norohind 2022-08-23 00:32:10 +03:00
parent 236e24af4a
commit 3dc3431801
Signed by: norohind
GPG Key ID: 01C3BECC26FB59E1

View File

@ -11,10 +11,27 @@ from typing import Optional, Coroutine
from loguru import logger
class BotManagement(commands.Cog):
class SingletonBase:
def __new__(cls, *args, **kwargs):
if getattr(cls, 'instance') is None:
cls.instance = super().__new__(cls, *args, **kwargs)
# logger.debug(id(getattr(cls, 'instance')))
return getattr(cls, 'instance')
class BotManagement(commands.Cog, SingletonBase):
instance: Optional['BotManagement'] = None
rfoo_server_thread: Optional[Thread] = None
rfoo_server: Optional[rfoo.InetServer] = None
# def __new__(cls, *args, **kwargs):
# if cls.instance is None:
# cls.instance = super().__new__(cls, *args, **kwargs)
#
# return cls.instance
def __init__(self, bot: commands.Bot):
self.bot = bot
@ -30,15 +47,31 @@ class BotManagement(commands.Cog):
text_table = format_table(((it.guild.name,) for it in self.bot.voice_clients))
await ctx.channel.send(text_table)
@commands.is_owner()
@commands.command('rfooStart')
async def start(self, ctx: Context):
def start_rfoo(self) -> bool:
# True if started, False if already started
if self.rfoo_server_thread is None:
self.rfoo_server = rfoo.InetServer(rconsole.ConsoleHandler, {'bot': self.bot, 'ct': self.ct})
self.rfoo_server_thread = Thread(target=lambda: self.rfoo_server.start(rfoo.LOOPBACK, 54321))
self.rfoo_server_thread.daemon = True
self.rfoo_server_thread.start()
logger.info('Rfoo thread started by msg')
return True
return False
def stop_rfoo(self) -> bool:
if self.rfoo_server_thread is not None:
self.rfoo_server.stop()
del self.rfoo_server_thread
logger.info('Rfoo thread stopped by msg')
return True
return False
@commands.is_owner()
@commands.command('rfooStart')
async def start(self, ctx: Context):
if self.start_rfoo():
await ctx.send('Rfoo thread started')
else:
@ -47,10 +80,7 @@ class BotManagement(commands.Cog):
@commands.is_owner()
@commands.command('rfooStop')
async def stop(self, ctx: Context):
if self.rfoo_server_thread is not None:
self.rfoo_server.stop()
del self.rfoo_server_thread
logger.info('Rfoo thread stopped by msg')
if self.stop_rfoo():
await ctx.send('Rfoo server stopped')
else:
@ -73,5 +103,10 @@ class BotManagement(commands.Cog):
return task.exception()
async def setup(bot):
async def setup(bot: commands.Bot):
await bot.add_cog(BotManagement(bot))
async def teardown(bot):
stop_res = BotManagement(bot).stop_rfoo()
logger.info(f'Unloaded rfoo with result {stop_res} during BotManagement unload')