AudioControl/AudioController.py

264 lines
10 KiB
Python

import queue
import comtypes
import psutil
from pycaw.utils import AudioSession
from pycaw.callbacks import AudioSessionEvents, AudioSessionNotification
from pycaw.pycaw import AudioUtilities
import Events
from ServerSideView import ServerSideView
from queue import Queue, Empty
from loguru import logger
from get_app_name import get_app_name
class PerSessionCallbacks(AudioSessionEvents):
"""Passing callbacks calls to AudioController and includes pid to calls"""
def __init__(self, pid: int, audio_controller: 'AudioController'):
self.pid = pid
self.audio_controller = audio_controller
self._is_muted: bool | None = None
self._volume: int | None = None
def on_simple_volume_changed(self, new_volume, new_mute, event_context):
new_mute = bool(new_mute)
new_volume = int(new_volume * 100)
if new_mute != self._is_muted:
self._is_muted = new_mute
self.audio_controller.on_mute_changed(self.pid, self._is_muted)
if new_volume != self._volume:
self._volume = new_volume
self.audio_controller.on_volume_changed(self.pid, self._volume, event_context)
def on_state_changed(self, new_state, new_state_id):
self.audio_controller.on_state_changed(self.pid, new_state, new_state_id)
def on_session_disconnected(self, disconnect_reason, disconnect_reason_id):
self.audio_controller.on_session_disconnected(self.pid, disconnect_reason, disconnect_reason_id)
class SessionCreateCallback(AudioSessionNotification):
def __init__(self, audio_controller: 'AudioController'):
self.audio_controller = audio_controller
def on_session_created(self, new_session):
self.audio_controller.on_session_created(new_session)
class AudioController:
"""
Class aimed to keep current state of situation, handle callbacks from sessions, and do communication with clients
vie ServerSideView
"""
def __init__(self):
self.running = True
self.per_session_callbacks_class = PerSessionCallbacks
self._sessions: dict[int, AudioSession] = dict() # Mapping pid to session
self.outbound_q = Queue() # from AudioController to ServerSideView
self.inbound_q = Queue() # from ServerSideView to AudioController
self._state_change_q = Queue() # A queue for handling state changes as it seems to
# work bad with all this logic in callback handler
self.view = ServerSideView(self.outbound_q, self.inbound_q)
def shutdown_callback(self, sig, frame):
"""Gets called by signal module as handler"""
logger.info(f'Shutting down by signal {sig}')
self.running = False
def get_process(self, pid: int) -> psutil.Process:
return self._sessions[pid].Process
def perform_discover(self):
logger.trace('Performing discovering')
for session in AudioUtilities.GetAllSessions():
logger.trace(f'Checking session {session.Process}')
if session.Process is not None:
# if session.ProcessId not in self._sessions:
logger.debug(f'Discovered session {session.Process}')
self.on_session_created(session)
# else:
# logger.trace(f'Already have session {session.Process} in _sessions')
def on_session_created(self, new_session: AudioSession):
if new_session.Process is not None:
logger.debug(f'New session {new_session.Process}')
if new_session.ProcessId in self._sessions:
logger.warning(f'Already have session {new_session.Process}, removing it first')
self._remove_session_by_pid(new_session.ProcessId)
self._sessions[new_session.ProcessId] = new_session
new_session.register_notification(self.per_session_callbacks_class(new_session.ProcessId, self))
# Notifying
pid = new_session.ProcessId
self.outbound_q.put(Events.NewSession(pid))
self.outbound_q.put(Events.VolumeChanged(pid, self.get_volume(pid)))
self.outbound_q.put(Events.SetName(pid, get_app_name(new_session.Process)))
self.outbound_q.put(Events.MuteStateChanged(pid, self.is_muted(pid)))
self.outbound_q.put(Events.StateChanged(pid, bool(self._sessions[pid].State)))
else:
logger.debug("None's process session", new_session, new_session.ProcessId)
def on_volume_changed(self, pid: int, new_volume: int, event_context: 'comtypes.LP_GUID'):
logger.debug(f'Volume changed {self.get_process(pid)}: new value: {new_volume}')
self.outbound_q.put(Events.VolumeChanged(pid, new_volume))
def on_mute_changed(self, pid, new_mute: bool):
logger.debug(f'Mute changed {self.get_process(pid)}: new value: {new_mute}')
self.outbound_q.put(Events.MuteStateChanged(pid, new_mute))
def on_state_changed(self, pid: int, new_state: str, new_state_id: int):
"""
There have been some problems with executing some amount of code on callbacks so took it out to main thread and
in callbacks it only put messages to queue
"""
logger.debug(f'State changed {self.get_process(pid).name()} {pid} new state: {new_state} {new_state_id}')
self._state_change_q.put((pid, new_state_id))
def _state_change_tick(self):
try:
msg = self._state_change_q.get_nowait()
logger.trace(f'New state message {msg}')
except Empty:
return
else:
pid, new_state_id = msg
if new_state_id == 2:
self._generic_disconnect(pid)
logger.trace(f'_generic_disconnect call done for {pid}')
else:
# Notifying
self.outbound_q.put(Events.StateChanged(pid, bool(new_state_id)))
def on_session_disconnected(self, pid: int, disconnect_reason, disconnect_reason_id):
"""
Is fired, when the audio session disconnected "hard".
Mostly on_state_changed == "Expired" is what you are looking for.
see self.AudioSessionDisconnectReason for disconnect_reason
The use is similar to on_state_changed (ref: pycaw.callbacks.AudioSessionEvents)
NB: expired state id = 2
"""
logger.info(f'Session disconnected {self.get_process(pid).name()} {pid} {disconnect_reason} {disconnect_reason_id}')
self._generic_disconnect(pid)
def _generic_disconnect(self, pid: int):
"""
Session can be disconnected by on_session_disconnected event and by state = expired, this method gets called
by both of them.
:param pid:
:return:
"""
process = self.get_process(pid)
if process.is_running():
logger.warning(f'Process disconnected but still running {process}')
# Notifying
self.outbound_q.put(Events.SessionClosed(pid))
logger.debug(f'Generic disconnect done')
self._remove_session_by_pid(pid)
logger.debug(f'Removing call done')
def _remove_session_by_pid(self, pid: int):
session = self._sessions[pid]
session.unregister_notification()
logger.trace(f'Successfully unregistered notification for {session._process}')
del self._sessions[pid]
logger.trace(f'Removing {pid} done')
# print_stack()
# return
def pre_shutdown(self):
"""Unregister callbacks"""
logger.trace(f'Entering pre_shutdown')
for pid in tuple(self._sessions.keys()):
try:
self._remove_session_by_pid(pid)
except Exception:
logger.opt(exception=True).warning(f'Failed to unregister_notification() for pid {pid}')
# Notify ServerSideView to stop
self.view.running = False
self.view.join(1)
logger.trace(f'pre_shutdown completed')
def set_mute(self, pid: int, is_muted: bool):
logger.trace(f'Set mute for {pid} {is_muted=}')
self._sessions[pid].SimpleAudioVolume.SetMute(int(is_muted), None)
def is_muted(self, pid: int) -> bool:
return bool(self._sessions[pid].SimpleAudioVolume.GetMute())
def toggle_mute(self, pid: int):
logger.trace(f'Toggle mute for {pid}')
is_muted = self.is_muted(pid)
self.set_mute(pid, not is_muted)
def get_volume(self, pid: int) -> int:
logger.trace(f'Get volume for {pid}')
return int(self._sessions[pid].SimpleAudioVolume.GetMasterVolume() * 100)
def set_volume(self, pid: int, volume: int):
# only set volume in the range 0 to 100
volume = min(100, max(0, volume))
volume = float(volume) / 100
self._sessions[pid].SimpleAudioVolume.SetMasterVolume(volume, None)
def increment_volume(self, pid: int, increment: int):
logger.trace(f'Increment volume for {pid}, {increment=}')
volume = increment + self.get_volume(pid)
self.set_volume(pid, volume)
def _inbound_q_tick(self):
try:
event: Events.ClientToServerEvent = self.inbound_q.get(timeout=0.1)
except queue.Empty:
return
else:
try:
self.get_process(event.PID)
except KeyError:
logger.warning(f'Event for unknown process {event}')
return
if isinstance(event, Events.VolumeIncrement):
self.increment_volume(event.PID, event.increment)
elif isinstance(event, Events.MuteToggle):
self.toggle_mute(event.PID)
elif isinstance(event, Events.SetVolume):
self.set_volume(event.PID, event.volume)
def start_blocking(self):
# self.perform_discover()
logger.debug(f'Starting blocking')
self.view.start()
while self.running:
# time.sleep(1)
self._state_change_tick()
self._inbound_q_tick()