mirror of
https://github.com/norohind/AudioControl.git
synced 2025-04-12 05:00:01 +03:00
264 lines
10 KiB
Python
264 lines
10 KiB
Python
import queue
|
|
|
|
import comtypes
|
|
import psutil
|
|
from pycaw.utils import AudioSession
|
|
from pycaw.callbacks import AudioSessionEvents, AudioSessionNotification
|
|
from pycaw.pycaw import AudioUtilities
|
|
|
|
import Events
|
|
from ServerSideView import ServerSideView
|
|
from queue import Queue, Empty
|
|
|
|
from loguru import logger
|
|
from get_app_name import get_app_name
|
|
|
|
|
|
class PerSessionCallbacks(AudioSessionEvents):
|
|
"""Passing callbacks calls to AudioController and includes pid to calls"""
|
|
|
|
def __init__(self, pid: int, audio_controller: 'AudioController'):
|
|
self.pid = pid
|
|
self.audio_controller = audio_controller
|
|
self._is_muted: bool | None = None
|
|
self._volume: int | None = None
|
|
|
|
def on_simple_volume_changed(self, new_volume, new_mute, event_context):
|
|
new_mute = bool(new_mute)
|
|
new_volume = int(new_volume * 100)
|
|
|
|
if new_mute != self._is_muted:
|
|
self._is_muted = new_mute
|
|
self.audio_controller.on_mute_changed(self.pid, self._is_muted)
|
|
|
|
if new_volume != self._volume:
|
|
self._volume = new_volume
|
|
self.audio_controller.on_volume_changed(self.pid, self._volume, event_context)
|
|
|
|
def on_state_changed(self, new_state, new_state_id):
|
|
self.audio_controller.on_state_changed(self.pid, new_state, new_state_id)
|
|
|
|
def on_session_disconnected(self, disconnect_reason, disconnect_reason_id):
|
|
self.audio_controller.on_session_disconnected(self.pid, disconnect_reason, disconnect_reason_id)
|
|
|
|
|
|
class SessionCreateCallback(AudioSessionNotification):
|
|
def __init__(self, audio_controller: 'AudioController'):
|
|
self.audio_controller = audio_controller
|
|
|
|
def on_session_created(self, new_session):
|
|
self.audio_controller.on_session_created(new_session)
|
|
|
|
|
|
class AudioController:
|
|
"""
|
|
Class aimed to keep current state of situation, handle callbacks from sessions, and do communication with clients
|
|
vie ServerSideView
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.running = True
|
|
self.per_session_callbacks_class = PerSessionCallbacks
|
|
self._sessions: dict[int, AudioSession] = dict() # Mapping pid to session
|
|
|
|
self.outbound_q = Queue() # from AudioController to ServerSideView
|
|
self.inbound_q = Queue() # from ServerSideView to AudioController
|
|
|
|
self._state_change_q = Queue() # A queue for handling state changes as it seems to
|
|
# work bad with all this logic in callback handler
|
|
|
|
self.view = ServerSideView(self.outbound_q, self.inbound_q)
|
|
|
|
def shutdown_callback(self, sig, frame):
|
|
"""Gets called by signal module as handler"""
|
|
logger.info(f'Shutting down by signal {sig}')
|
|
self.running = False
|
|
|
|
def get_process(self, pid: int) -> psutil.Process:
|
|
return self._sessions[pid].Process
|
|
|
|
def perform_discover(self):
|
|
logger.trace('Performing discovering')
|
|
for session in AudioUtilities.GetAllSessions():
|
|
logger.trace(f'Checking session {session.Process}')
|
|
if session.Process is not None:
|
|
# if session.ProcessId not in self._sessions:
|
|
logger.debug(f'Discovered session {session.Process}')
|
|
self.on_session_created(session)
|
|
|
|
# else:
|
|
# logger.trace(f'Already have session {session.Process} in _sessions')
|
|
|
|
def on_session_created(self, new_session: AudioSession):
|
|
if new_session.Process is not None:
|
|
logger.debug(f'New session {new_session.Process}')
|
|
|
|
if new_session.ProcessId in self._sessions:
|
|
logger.warning(f'Already have session {new_session.Process}, removing it first')
|
|
self._remove_session_by_pid(new_session.ProcessId)
|
|
|
|
self._sessions[new_session.ProcessId] = new_session
|
|
new_session.register_notification(self.per_session_callbacks_class(new_session.ProcessId, self))
|
|
|
|
# Notifying
|
|
pid = new_session.ProcessId
|
|
self.outbound_q.put(Events.NewSession(pid))
|
|
self.outbound_q.put(Events.VolumeChanged(pid, self.get_volume(pid)))
|
|
self.outbound_q.put(Events.SetName(pid, get_app_name(new_session.Process)))
|
|
self.outbound_q.put(Events.MuteStateChanged(pid, self.is_muted(pid)))
|
|
self.outbound_q.put(Events.StateChanged(pid, bool(self._sessions[pid].State)))
|
|
|
|
else:
|
|
logger.debug("None's process session", new_session, new_session.ProcessId)
|
|
|
|
def on_volume_changed(self, pid: int, new_volume: int, event_context: 'comtypes.LP_GUID'):
|
|
logger.debug(f'Volume changed {self.get_process(pid)}: new value: {new_volume}')
|
|
self.outbound_q.put(Events.VolumeChanged(pid, new_volume))
|
|
|
|
def on_mute_changed(self, pid, new_mute: bool):
|
|
logger.debug(f'Mute changed {self.get_process(pid)}: new value: {new_mute}')
|
|
self.outbound_q.put(Events.MuteStateChanged(pid, new_mute))
|
|
|
|
def on_state_changed(self, pid: int, new_state: str, new_state_id: int):
|
|
"""
|
|
There have been some problems with executing some amount of code on callbacks so took it out to main thread and
|
|
in callbacks it only put messages to queue
|
|
"""
|
|
|
|
logger.debug(f'State changed {self.get_process(pid).name()} {pid} new state: {new_state} {new_state_id}')
|
|
self._state_change_q.put((pid, new_state_id))
|
|
|
|
def _state_change_tick(self):
|
|
try:
|
|
msg = self._state_change_q.get_nowait()
|
|
logger.trace(f'New state message {msg}')
|
|
|
|
except Empty:
|
|
return
|
|
|
|
else:
|
|
pid, new_state_id = msg
|
|
if new_state_id == 2:
|
|
self._generic_disconnect(pid)
|
|
logger.trace(f'_generic_disconnect call done for {pid}')
|
|
|
|
else:
|
|
# Notifying
|
|
self.outbound_q.put(Events.StateChanged(pid, bool(new_state_id)))
|
|
|
|
def on_session_disconnected(self, pid: int, disconnect_reason, disconnect_reason_id):
|
|
"""
|
|
Is fired, when the audio session disconnected "hard".
|
|
Mostly on_state_changed == "Expired" is what you are looking for.
|
|
see self.AudioSessionDisconnectReason for disconnect_reason
|
|
The use is similar to on_state_changed (ref: pycaw.callbacks.AudioSessionEvents)
|
|
NB: expired state id = 2
|
|
"""
|
|
|
|
logger.info(f'Session disconnected {self.get_process(pid).name()} {pid} {disconnect_reason} {disconnect_reason_id}')
|
|
self._generic_disconnect(pid)
|
|
|
|
def _generic_disconnect(self, pid: int):
|
|
"""
|
|
Session can be disconnected by on_session_disconnected event and by state = expired, this method gets called
|
|
by both of them.
|
|
|
|
:param pid:
|
|
:return:
|
|
"""
|
|
|
|
process = self.get_process(pid)
|
|
if process.is_running():
|
|
logger.warning(f'Process disconnected but still running {process}')
|
|
|
|
# Notifying
|
|
self.outbound_q.put(Events.SessionClosed(pid))
|
|
logger.debug(f'Generic disconnect done')
|
|
|
|
self._remove_session_by_pid(pid)
|
|
logger.debug(f'Removing call done')
|
|
|
|
def _remove_session_by_pid(self, pid: int):
|
|
session = self._sessions[pid]
|
|
session.unregister_notification()
|
|
logger.trace(f'Successfully unregistered notification for {session._process}')
|
|
del self._sessions[pid]
|
|
logger.trace(f'Removing {pid} done')
|
|
# print_stack()
|
|
# return
|
|
|
|
def pre_shutdown(self):
|
|
"""Unregister callbacks"""
|
|
logger.trace(f'Entering pre_shutdown')
|
|
for pid in tuple(self._sessions.keys()):
|
|
try:
|
|
self._remove_session_by_pid(pid)
|
|
|
|
except Exception:
|
|
logger.opt(exception=True).warning(f'Failed to unregister_notification() for pid {pid}')
|
|
|
|
# Notify ServerSideView to stop
|
|
self.view.running = False
|
|
self.view.join(1)
|
|
logger.trace(f'pre_shutdown completed')
|
|
|
|
def set_mute(self, pid: int, is_muted: bool):
|
|
logger.trace(f'Set mute for {pid} {is_muted=}')
|
|
self._sessions[pid].SimpleAudioVolume.SetMute(int(is_muted), None)
|
|
|
|
def is_muted(self, pid: int) -> bool:
|
|
return bool(self._sessions[pid].SimpleAudioVolume.GetMute())
|
|
|
|
def toggle_mute(self, pid: int):
|
|
logger.trace(f'Toggle mute for {pid}')
|
|
is_muted = self.is_muted(pid)
|
|
self.set_mute(pid, not is_muted)
|
|
|
|
def get_volume(self, pid: int) -> int:
|
|
logger.trace(f'Get volume for {pid}')
|
|
return int(self._sessions[pid].SimpleAudioVolume.GetMasterVolume() * 100)
|
|
|
|
def set_volume(self, pid: int, volume: int):
|
|
# only set volume in the range 0 to 100
|
|
volume = min(100, max(0, volume))
|
|
volume = float(volume) / 100
|
|
self._sessions[pid].SimpleAudioVolume.SetMasterVolume(volume, None)
|
|
|
|
def increment_volume(self, pid: int, increment: int):
|
|
logger.trace(f'Increment volume for {pid}, {increment=}')
|
|
volume = increment + self.get_volume(pid)
|
|
self.set_volume(pid, volume)
|
|
|
|
def _inbound_q_tick(self):
|
|
try:
|
|
event: Events.ClientToServerEvent = self.inbound_q.get(timeout=0.1)
|
|
|
|
except queue.Empty:
|
|
return
|
|
|
|
else:
|
|
try:
|
|
self.get_process(event.PID)
|
|
|
|
except KeyError:
|
|
logger.warning(f'Event for unknown process {event}')
|
|
return
|
|
|
|
if isinstance(event, Events.VolumeIncrement):
|
|
self.increment_volume(event.PID, event.increment)
|
|
|
|
elif isinstance(event, Events.MuteToggle):
|
|
self.toggle_mute(event.PID)
|
|
|
|
elif isinstance(event, Events.SetVolume):
|
|
self.set_volume(event.PID, event.volume)
|
|
|
|
def start_blocking(self):
|
|
# self.perform_discover()
|
|
logger.debug(f'Starting blocking')
|
|
self.view.start()
|
|
while self.running:
|
|
# time.sleep(1)
|
|
self._state_change_tick()
|
|
self._inbound_q_tick()
|