264 lines
9.5 KiB
Python
264 lines
9.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
import subprocess
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import Union, Optional
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord.ext.commands import Context
|
|
from loguru import logger
|
|
|
|
import DB
|
|
import Observ
|
|
import formatting
|
|
from FFmpegPCMAudioModified import FFmpegPCMAudio
|
|
from SpeakersSettingsAdapterDiscord import SpeakersSettingsAdapterDiscord
|
|
from TTSServer import TTSServerCached
|
|
from TTSServer.Speakers import ModelDescription
|
|
from cogErrorHandlers import cogErrorHandlers
|
|
|
|
|
|
class TTSCore(commands.Cog, Observ.Observer):
|
|
def __init__(self, bot: Union[commands.Bot, Observ.Subject]):
|
|
self.bot = bot
|
|
self.cog_command_error = cogErrorHandlers.missing_argument_handler
|
|
self.bot.subscribe(self) # subscribe for messages that aren't commands
|
|
self.tts = TTSServerCached()
|
|
self.tts_queues: dict[int, list[discord.AudioSource]] = defaultdict(list)
|
|
self.speakers_adapter: SpeakersSettingsAdapterDiscord = SpeakersSettingsAdapterDiscord()
|
|
|
|
@commands.command('drop')
|
|
async def drop_queue(self, ctx: Context):
|
|
"""
|
|
Drop tts queue for current server
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
try:
|
|
del self.tts_queues[ctx.guild.id]
|
|
await ctx.send('Queue dropped')
|
|
|
|
except KeyError:
|
|
await ctx.send('Failed on dropping queue')
|
|
|
|
@commands.command('exit')
|
|
async def leave_voice(self, ctx: Context):
|
|
"""
|
|
Disconnect bot from current channel, it also drop messages queue
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
if ctx.guild.voice_client is not None:
|
|
await ctx.guild.voice_client.disconnect(force=False)
|
|
await ctx.channel.send(f"Left voice channel")
|
|
return
|
|
|
|
else:
|
|
await ctx.channel.send("I'm not in any voice channel")
|
|
return
|
|
|
|
async def update(self, message: discord.Message):
|
|
"""
|
|
Like on_message but only for messages which aren't commands
|
|
|
|
:param message:
|
|
:return:
|
|
"""
|
|
|
|
if message.author == self.bot.user:
|
|
return
|
|
|
|
if message.author.bot:
|
|
return
|
|
|
|
if not isinstance(message.channel, discord.TextChannel):
|
|
return
|
|
|
|
logger.info(f'Message to say: {message.content}')
|
|
user_voice_state = message.author.voice
|
|
if user_voice_state is None:
|
|
await message.channel.send(f"You're not in a voice channel")
|
|
return
|
|
|
|
# noinspection PyTypeChecker
|
|
voice_client: discord.VoiceClient = message.guild.voice_client
|
|
if voice_client is None:
|
|
voice_client: discord.VoiceClient = await user_voice_state.channel.connect()
|
|
|
|
if user_voice_state.channel.id != voice_client.channel.id:
|
|
await message.channel.send('We are in different voice channels')
|
|
return
|
|
|
|
speaker_str: str = self.speakers_adapter.get_speaker(message.guild.id, message.author.id)
|
|
speaker: ModelDescription | None = await self.tts.speaker_to_description(speaker_str)
|
|
if speaker is None:
|
|
speaker = self.tts.default_speaker()
|
|
|
|
# check if message will fail on synthesis
|
|
if DB.SynthesisErrors.select() \
|
|
.where(DB.SynthesisErrors.speaker == speaker_str) \
|
|
.where(DB.SynthesisErrors.text == message.content) \
|
|
.count() == 1:
|
|
# Then we will not try to synthesis it
|
|
await message.channel.send(f"I will not synthesis this message due to TTS engine limitations")
|
|
return
|
|
|
|
try:
|
|
wav_file_like_object: bytes = await self.tts.synthesize_text(message.content, speaker=speaker)
|
|
sound_source = FFmpegPCMAudio(wav_file_like_object, pipe=True, stderr=subprocess.PIPE)
|
|
|
|
except Exception as synth_exception:
|
|
logger.opt(exception=True).warning(f'Exception on synthesize {message.content!r}: {synth_exception}')
|
|
await message.channel.send(f'Synthesize error')
|
|
DB.SynthesisErrors.create(speaker=speaker, text=message.content)
|
|
return
|
|
|
|
else:
|
|
try:
|
|
if voice_client.is_playing():
|
|
# Then we need to enqueue prepared sound for playing through self.tts_queues mechanism
|
|
self.tts_queues[message.guild.id].append(sound_source)
|
|
await message.channel.send(
|
|
f"Enqueued for play, queue size: {len(self.tts_queues[message.guild.id])}")
|
|
return
|
|
|
|
voice_client.play(sound_source, after=lambda e: self.queue_player(message))
|
|
|
|
except Exception as play_exception:
|
|
logger.opt(exception=True).warning(
|
|
f'Exception on playing for: {message.guild.name}[#{message.channel.name}]: {message.author.display_name} / {play_exception}')
|
|
await message.channel.send(f'Playing error')
|
|
return
|
|
|
|
def queue_player(self, message: discord.Message):
|
|
for sound_source in self.tts_queues[message.guild.id]:
|
|
if len(self.tts_queues[message.guild.id]) == 0:
|
|
return
|
|
|
|
voice_client: Optional[discord.VoiceClient] = message.guild.voice_client
|
|
if voice_client is None:
|
|
# don't play anything and clear queue for whole guild
|
|
break
|
|
|
|
try:
|
|
voice_client.play(sound_source)
|
|
|
|
except discord.errors.ClientException: # Here we expect Not connected to voice
|
|
break
|
|
|
|
while voice_client.is_playing():
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
del self.tts_queues[message.guild.id]
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState,
|
|
after: discord.VoiceState):
|
|
if after.channel is None:
|
|
members = before.channel.members
|
|
if len(members) == 1:
|
|
if members[0].id == self.bot.user.id:
|
|
await before.channel.guild.voice_client.disconnect(force=False)
|
|
|
|
# TODO: leave voice channel after being moved there alone
|
|
|
|
# Ex TTSSettings.py
|
|
@commands.command('getAllSpeakers')
|
|
async def get_speakers(self, ctx: Context):
|
|
"""
|
|
Enumerate all available to set speakers
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
|
|
await ctx.send(
|
|
formatting.format_table(
|
|
tuple((model.engine + '_' + model.name, model.description)
|
|
for model in await self.tts.discovery()),
|
|
('model', 'description')
|
|
))
|
|
|
|
@commands.command('setPersonalSpeaker')
|
|
async def set_user_speaker(self, ctx: Context, speaker: str):
|
|
"""
|
|
Set personal speaker on this server
|
|
|
|
:param ctx:
|
|
:param speaker:
|
|
:return:
|
|
"""
|
|
|
|
if await self.tts.speaker_to_description(speaker) is None:
|
|
return await ctx.send(
|
|
f"Provided speaker is invalid, provided speaker must be from `getAllSpeakers` command")
|
|
|
|
self.speakers_adapter.set_speaker_user(ctx.guild.id, ctx.author.id, speaker)
|
|
await ctx.reply(f'Successfully set **your personal** speaker to `{speaker}`')
|
|
|
|
@commands.command('setServerSpeaker')
|
|
async def set_server_speaker(self, ctx: Context, speaker: str):
|
|
"""
|
|
Set global server speaker
|
|
|
|
:param ctx:
|
|
:param speaker:
|
|
:return:
|
|
"""
|
|
if await self.tts.speaker_to_description(speaker) is None:
|
|
return await ctx.send(
|
|
f"Provided speaker is invalid, provided speaker must be from `getAllSpeakers` command")
|
|
|
|
self.speakers_adapter.set_speaker_global(ctx.guild.id, ctx.author.id, speaker)
|
|
await ctx.reply(f'Successfully set **your personal** speaker to `{speaker}`')
|
|
|
|
@commands.command('getSpeaker')
|
|
async def get_speaker(self, ctx: Context):
|
|
"""
|
|
Tell first appropriate speaker for a user, it can be user specified, server specified or server default
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
speaker = self.speakers_adapter.get_speaker(ctx.guild.id, ctx.author.id)
|
|
|
|
await ctx.reply(f'Your current speaker is `{speaker}`')
|
|
|
|
@commands.command('getPersonalSpeaker')
|
|
async def get_personal_speaker(self, ctx: Context):
|
|
"""
|
|
Tell user his personal speaker on this server, if user don't have personal speaker, tells server default one
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
speaker = self.speakers_adapter.get_speaker_user(ctx.guild.id, ctx.author.id)
|
|
if speaker is None:
|
|
server_speaker = self.speakers_adapter.get_speaker_global(ctx.guild.id)
|
|
await ctx.send(f"You currently don't have a personal speaker, current server speaker is `{server_speaker}`")
|
|
|
|
else:
|
|
await ctx.reply(f"Your personal speaker is `{speaker}`")
|
|
|
|
@commands.command('getServerSpeaker')
|
|
async def get_server_speaker(self, ctx: Context):
|
|
"""
|
|
Tell server global speaker
|
|
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
speaker = self.speakers_adapter.get_speaker_global(ctx.guild.id)
|
|
await ctx.send(f"Current server speaker is `{speaker}`")
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(TTSCore(bot))
|