From 5547b68c5a32335861ecb7bebc7f0d13dff9aa85 Mon Sep 17 00:00:00 2001 From: norohind <60548839+norohind@users.noreply.github.com> Date: Sun, 6 Nov 2022 22:23:39 +0300 Subject: [PATCH] init --- AudioController.py | 230 ++++++++++++++++++++++++++++++++++++++ Events.py | 122 ++++++++++++++++++++ ProcessAudioController.py | 78 +++++++++++++ ServerSideView.py | 85 ++++++++++++++ Transport.py | 67 +++++++++++ get_app_name.py | 58 ++++++++++ main.py | 46 ++++++++ requirements.txt | 4 + 8 files changed, 690 insertions(+) create mode 100644 AudioController.py create mode 100644 Events.py create mode 100644 ProcessAudioController.py create mode 100644 ServerSideView.py create mode 100644 Transport.py create mode 100644 get_app_name.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/AudioController.py b/AudioController.py new file mode 100644 index 0000000..1c6891e --- /dev/null +++ b/AudioController.py @@ -0,0 +1,230 @@ +import queue +import time + +import comtypes +import psutil +from pycaw.utils import AudioSession +from pycaw.callbacks import AudioSessionEvents, AudioSessionNotification +from pycaw.pycaw import AudioUtilities + +import Events +from ServerSideView import ServerSideView +from queue import Queue, Empty + +from loguru import logger +from get_app_name import get_app_name + + +class PerSessionCallbacks(AudioSessionEvents): + """Passing callbacks calls to AudioController and includes pid to calls""" + + def __init__(self, pid: int, audio_controller: 'AudioController'): + self.pid = pid + self.audio_controller = audio_controller + + def on_simple_volume_changed(self, new_volume, new_mute, event_context): + self.audio_controller.on_volume_changed(self.pid, new_volume, new_mute, event_context) + + def on_state_changed(self, new_state, new_state_id): + self.audio_controller.on_state_changed(self.pid, new_state, new_state_id) + + def on_session_disconnected(self, disconnect_reason, disconnect_reason_id): + self.audio_controller.on_session_disconnected(self.pid, disconnect_reason, disconnect_reason_id) + + +class SessionCreateCallback(AudioSessionNotification): + def __init__(self, audio_controller: 'AudioController'): + self.audio_controller = audio_controller + + def on_session_created(self, new_session): + self.audio_controller.on_session_created(new_session) + + +class AudioController: + """ + Class aimed to keep current state of situation, handle callbacks from sessions, and do communication with clients + vie ServerSideView + """ + + def __init__(self): + self.per_session_callbacks_class = PerSessionCallbacks + self._sessions: dict[int, AudioSession] = dict() # Mapping pid to session + + self.outbound_q = Queue() # from AudioController to ServerSideView + self.inbound_q = Queue() # from ServerSideView to AudioController + + self._state_change_q = Queue() # A queue for handling state changes as it seems to + # work bad with all this logic in callback handler + + self.view = ServerSideView(self.outbound_q, self.inbound_q) + + def get_process(self, pid: int) -> psutil.Process: + return self._sessions[pid].Process + + def perform_discover(self): + logger.trace('Performing discovering') + for session in AudioUtilities.GetAllSessions(): + logger.trace(f'Checking session {session.Process} {session.ProcessId}') + if session.Process is not None: + if session.ProcessId not in self._sessions: + logger.debug(f'Discovered session {session.Process}') + self._sessions[session.ProcessId] = session + self.on_session_created(session) + + else: + logger.trace(f'Already have session {session.Process} in _sessions') + + def on_session_created(self, new_session: AudioSession): + if new_session.Process is not None: + logger.debug(f'New session {new_session.Process.name()}:{new_session.ProcessId}') + + if new_session.ProcessId in self._sessions: + logger.warning(f'Already have session {new_session}') + return + + self._sessions[new_session.ProcessId] = new_session + new_session.register_notification(self.per_session_callbacks_class(new_session.ProcessId, self)) + + # Notifying + self.outbound_q.put(Events.NewSession(new_session.ProcessId)) + self.outbound_q.put(Events.SetName(new_session.ProcessId, get_app_name(new_session.Process))) + # TODO: Send also volume, mute, state + + else: + logger.debug("None's process session", new_session, new_session.ProcessId) + + def on_volume_changed(self, pid: int, new_volume: float, new_mute: int, event_context: 'comtypes.LP_GUID'): + logger.debug(f'Volume changed {self.get_process(pid).name()} {pid} new value: {new_volume}') + + # Notifying + self.outbound_q.put(Events.VolumeChanged(pid, new_volume)) + + # Yes, this will produce useless MuteStateChanged + self.outbound_q.put(Events.MuteStateChanged(pid, bool(new_mute))) + + def on_state_changed(self, pid: int, new_state: str, new_state_id: int): + """ + There have been some problems with executing some amount of code on callbacks so took it out to main thread and + in callbacks it only put messages to queue + """ + + logger.debug(f'State changed {self.get_process(pid).name()} {pid} new state: {new_state} {new_state_id}') + self._state_change_q.put((pid, new_state_id)) + + def _state_change_tick(self): + try: + msg = self._state_change_q.get(timeout=3) + logger.trace(f'New state message {msg}') + + except Empty: + return + + else: + pid, new_state_id = msg + if new_state_id == 2: + self._generic_disconnect(pid) + logger.trace(f'_generic_disconnect call done for {pid}') + + else: + # Notifying + self.outbound_q.put(Events.StateChanged(pid, bool(new_state_id))) + + def on_session_disconnected(self, pid: int, disconnect_reason, disconnect_reason_id): + """ + Is fired, when the audio session disconnected "hard". + Mostly on_state_changed == "Expired" is what you are looking for. + see self.AudioSessionDisconnectReason for disconnect_reason + The use is similar to on_state_changed (ref: pycaw.callbacks.AudioSessionEvents) + NB: expired state id = 2 + """ + + logger.info(f'Session disconnected {self.get_process(pid).name()} {pid} {disconnect_reason} {disconnect_reason_id}') + self._generic_disconnect(pid) + + def _generic_disconnect(self, pid: int): + """ + Session can be disconnected by on_session_disconnected event and by state = expired, this method gets called + by both of them. + + :param pid: + :return: + """ + + process = self.get_process(pid) + if process.is_running(): + logger.warning(f'Process disconnected but still running {process}') + + # Notifying + self.outbound_q.put(Events.SessionClosed(pid)) + logger.debug(f'Generic disconnect done') + + self._remove_session_by_pid(pid) + logger.debug(f'Removing call done') + + def _remove_session_by_pid(self, pid: int): + session = self._sessions[pid] + session.unregister_notification() + logger.trace(f'Successfully unregistered notification for {session._process}') + del self._sessions[pid] + logger.trace(f'Removing {pid} done') + # print_stack() + # return + + def pre_shutdown(self): + """Unregister callbacks""" + logger.trace(f'Entering pre_shutdown') + for pid in tuple(self._sessions.keys()): + try: + self._remove_session_by_pid(pid) + + except Exception: + logger.opt(exception=True).warning(f'Failed to unregister_notification() for pid {pid}') + + def set_mute(self, pid: int, is_muted: bool): + logger.trace(f'Set mute for {pid} {is_muted=}') + self._sessions[pid].SimpleAudioVolume.SetMute(int(is_muted), None) + + def is_muted(self, pid: int) -> bool: + return self._sessions[pid].SimpleAudioVolume.GetMute() + + def toggle_mute(self, pid: int): + logger.trace(f'Toggle mute for {pid}') + is_muted = self.is_muted(pid) + self.set_mute(pid, not is_muted) + + def get_volume(self, pid: int) -> float: + logger.trace(f'Get volume for {pid}') + return self._sessions[pid].SimpleAudioVolume.GetMasterVolume() + + def set_volume(self, pid: int, volume: float): + # only set volume in the range 0.0 to 1.0 + volume = min(1.0, max(0.0, volume)) + self._sessions[pid].SimpleAudioVolume.SetMasterVolume(volume, None) + + def increment_volume(self, pid: int, increment: float): + logger.trace(f'Increment volume for {pid}, {increment=}') + volume = increment + self.get_volume(pid) + self.set_volume(pid, volume) + + def _inbound_q_tick(self): + try: + event: Events.ClientToServerEvent = self.inbound_q.get_nowait() + + except queue.Empty: + return + + else: + if isinstance(event, Events.VolumeIncrement): + self.increment_volume(event.PID, event.increment) + + elif isinstance(event, Events.MuteToggle): + self.toggle_mute(event.PID) + + def start_blocking(self): + # self.perform_discover() + logger.debug(f'Starting blocking') + self.view.start() + while True: + # time.sleep(1) + self._state_change_tick() + self._inbound_q_tick() diff --git a/Events.py b/Events.py new file mode 100644 index 0000000..7ba809c --- /dev/null +++ b/Events.py @@ -0,0 +1,122 @@ +from dataclasses import dataclass, field + +""" +Processes unique identifies by their PIDs. +From server to client events: +1. New session + PID + +2. Session closed + PID + +3. State changed + PID + is_active + +4. Volume changed + PID + New volume + +5. Mute state changed + PID + is_muted + +6. Set name + PID + Name + +From client to server: +1. Volume increment + PID + increment (for decrement use -increment value) + +2. Mute toggle + PID + +Cases: +1. New Session: + Send `New Session` event + Send `Name Changed` event + Send `Volume Changed` event + Send `Mute State Changed` event + Send `State changed` event + # Let's call this set of events a full view since it fully describes current information about a session + +2. Session closed + Send `Session closed` event +""" + + +@dataclass +class Event: + PID: int + event: str = field(init=False) + + def __post_init__(self): + self.event = self.__class__.__name__ + + +@dataclass +class ServerToClientEvent(Event): + ... + + +@dataclass +class ClientToServerEvent(Event): + ... + + +# Server to Client Events + +@dataclass +class NewSession(ServerToClientEvent): + ... + + +@dataclass +class SessionClosed(ServerToClientEvent): + ... + + +@dataclass +class StateChanged(ServerToClientEvent): + is_active: bool + + +@dataclass +class VolumeChanged(ServerToClientEvent): + new_volume: float + + +@dataclass +class MuteStateChanged(ServerToClientEvent): + is_muted: bool + + +@dataclass +class SetName(ServerToClientEvent): + name: str + + +# Client to Server Events + +@dataclass +class VolumeIncrement(ClientToServerEvent): + increment: float + + +@dataclass +class MuteToggle(ClientToServerEvent): + ... + + +def lookup_event(event_name: str) -> Event: + subclasses = dict() + to_handle = [Event] + while len(to_handle) > 0: + current_item = to_handle.pop() + for subclass in current_item.__subclasses__(): + subclasses[subclass.__name__] = subclass + to_handle.append(subclass) + + return subclasses[event_name] diff --git a/ProcessAudioController.py b/ProcessAudioController.py new file mode 100644 index 0000000..df185ba --- /dev/null +++ b/ProcessAudioController.py @@ -0,0 +1,78 @@ +from pycaw.pycaw import AudioUtilities +import pycaw.utils +from get_app_name import get_app_name + + +def get_process_session(pid: int) -> pycaw.utils.AudioSession | None: + sessions = AudioUtilities.GetAllSessions() + for session in sessions: + if session.Process and session.Process.pid == pid: + return session + + +class ProcessAudioController: + def __init__(self, *, pid: int = None, audio_session: pycaw.utils.AudioSession = None): + if pid is not None: + self._process_session = get_process_session(pid) + + if audio_session is not None: + self._process_session = audio_session + + self.process = self._process_session.Process + self.process_description = get_app_name(self.process) + + def mute(self): + self._process_session.SimpleAudioVolume.SetMute(1, None) + print(self.process.name(), "has been muted.") # debug + + def unmute(self): + self._process_session.SimpleAudioVolume.SetMute(0, None) + print(self.process.name(), "has been unmuted.") # debug + + def get_process_volume(self): + return self._process_session.SimpleAudioVolume.GetMasterVolume() + + @property + def volume(self): + return self.get_process_volume() + + def set_volume(self, decibels: float): + new_volume = min(1.0, max(0.0, decibels)) + self._process_session.SimpleAudioVolume.SetMasterVolume(new_volume, None) + print("Volume set to", new_volume) # debug + + def decrease_volume(self, decibels: float): + volume = max(0.0, self.volume - decibels) + self._process_session.SimpleAudioVolume.SetMasterVolume(volume, None) + print("Volume reduced to", volume) # debug + + def increase_volume(self, decibels: float): + # 1.0 is the max value, raise by decibels + new_volume = min(1.0, self.volume + decibels) + self._process_session.SimpleAudioVolume.SetMasterVolume(new_volume, None) + print("Volume raised to", new_volume) # debug + + +class AudioController: + processes: dict[int, ProcessAudioController] = dict() # PIDs as keys + _selected_process: Optional[ProcessAudioController] = None + + def __init__(self, view: ViewABC): + self.view = view + for session in AudioUtilities.GetAllSessions(): + if session.ProcessId != 0: + audio_process_controller = ProcessAudioController(audio_session=session) + self.processes[audio_process_controller.process_description] = audio_process_controller + + if len(self.processes) > 0: + self.selected_process = next(iter(self.processes)) + + @property + def selected_process(self) -> Optional[ProcessAudioController]: + return self._selected_process + + @selected_process.setter + def selected_process(self, pid_to_select: int): + self._selected_process = self.processes[pid_to_select] + self.view.select_process_callback(self.selected_process) + diff --git a/ServerSideView.py b/ServerSideView.py new file mode 100644 index 0000000..32ae96e --- /dev/null +++ b/ServerSideView.py @@ -0,0 +1,85 @@ +import queue +from queue import Queue +from threading import Thread +from loguru import logger +import Events +from typing import TypedDict +from dataclasses import asdict +from Transport import Transport + + +class SessionState(TypedDict): + pid: int + volume: float + is_muted: bool + is_active: bool + name: str + + +class ServerSideView(Thread): + """ + The `AudioController` gets called by callbacks, callbacks calls performs from + threads which we shouldn't block for long time, so it would be wisely to put result of a callback to a + queue which reads `ServerSideView` from its own thread. + The common concept: + `AudioController` put messages from callbacks to queue which reads `ServerSideView` which keep up with + `ClientSideView` (a client). + `ClientSideView` sends `Events` over `Transport` to `ServerSideView` which put messages to another queue + which reads `AudioController` which performs action specified in messages. + `AudioController`'s work with queues performs in main thread. + Callback calls by pycaw performs in pycaw's internal threads. + `ServerSideView` executing in its own thread. + """ + + daemon = True + + def __init__(self, inbound_q: Queue, outbound_q: Queue, transport: Transport = Transport()): + """ + :param inbound_q: Queue from AudioController to ServerSideView + :param outbound_q: Queue from ServerSideView to AudioController + """ + super().__init__() + self.inbound_q = inbound_q + self.outbound_q = outbound_q + + self.transport = transport + + self._state: dict[int, SessionState] = dict() # Holds current state of sessions received from AudioController + # PID : SessionState + + def run(self) -> None: + while True: + try: + msg: Events.ServerToClientEvent = self.inbound_q.get_nowait() + + except queue.Empty: + pass + + else: + logger.debug(msg) + self._update_state(msg) + self.transport.send(msg) + + self.transport.tick() + + try: + new_msg = self.transport.receive() + self.outbound_q.put(new_msg) + + except queue.Empty: + pass + + def _update_state(self, event: Events.ServerToClientEvent) -> None: + if isinstance(event, Events.NewSession): + self._state[event.PID] = dict() + + elif isinstance(event, Events.SessionClosed): + del self._state[event.PID] + + else: + dicted = asdict(event) + del dicted['event'] + + self._state[event.PID].update(dicted) + + logger.debug(f'state: {self._state}') diff --git a/Transport.py b/Transport.py new file mode 100644 index 0000000..749cbfb --- /dev/null +++ b/Transport.py @@ -0,0 +1,67 @@ +from loguru import logger +from dataclasses import asdict +import socket +import selectors +from queue import Queue +import Events +import json + + +class Transport: + def __init__(self): + self._selector = selectors.DefaultSelector() + self._from_net_q = Queue() + + self._sock = socket.socket() + self._sock.bind(('localhost', 54683)) + self._sock.listen(100) + self._sock.setblocking(False) + self._selector.register(self._sock, selectors.EVENT_READ, self._accept) + + self._connections: list[socket.socket] = list() + # self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # self.sock.bind(("127.0.0.1", 54683)) + ... + + def send(self, msg: Events.ServerToClientEvent): + logger.debug(f'Sending {asdict(msg)}') + msg = json.dumps(asdict(msg)).encode() + for conn in self._connections: + conn.sendall(msg) + + def receive(self) -> Events.ClientToServerEvent: + return self._from_net_q.get_nowait() + + def _accept(self, sock: socket.socket, mask: int): + conn, addr = sock.accept() # Should be ready + print('accepted', conn, 'from', addr) + conn.setblocking(False) + self._selector.register(conn, selectors.EVENT_READ, self._read) + self._connections.append(conn) + + def _read(self, conn: socket.socket, mask: int): + data = conn.recv(1000) + if not data: + logger.debug(f'Closing connection to {conn.getpeername()}') + self._selector.unregister(conn) + self._connections.remove(conn) + conn.close() + return + + try: + event_dict = json.loads(data) + event_name = event_dict['event'] + event_cls = Events.lookup_event(event_name) + del event_dict['event'] + logger.trace(f'Passing msg {event_dict} from client {conn.getpeername()}') + self._from_net_q.put(event_cls(**event_dict)) + + except Exception: + logger.opt(exception=True).warning(f"Couldn't parse message from client: {data}") + + def tick(self): + events = self._selector.select(timeout=0) + for key, mask in events: + callback = key.data + callback(key.fileobj, mask) diff --git a/get_app_name.py b/get_app_name.py new file mode 100644 index 0000000..40dcce2 --- /dev/null +++ b/get_app_name.py @@ -0,0 +1,58 @@ +import ctypes +from ctypes import wintypes as w +import array +from functools import lru_cache +from psutil import Process +import traceback + + +ver = ctypes.WinDLL('version') +ver.GetFileVersionInfoSizeW.argtypes = w.LPCWSTR, w.LPDWORD +ver.GetFileVersionInfoSizeW.restype = w.DWORD +ver.GetFileVersionInfoW.argtypes = w.LPCWSTR, w.DWORD, w.DWORD, w.LPVOID +ver.GetFileVersionInfoW.restype = w.BOOL +ver.VerQueryValueW.argtypes = w.LPCVOID, w.LPCWSTR, ctypes.POINTER(w.LPVOID), w.PUINT +ver.VerQueryValueW.restype = w.BOOL + + +def get_file_description(filepath: str) -> str: + # https://stackoverflow.com/questions/42604493/verqueryvaluew-issue-python-3 + size = ver.GetFileVersionInfoSizeW(filepath, None) + if not size: + raise RuntimeError('version info not found') + + res = ctypes.create_string_buffer(size) + if not ver.GetFileVersionInfoW(filepath, 0, size, res): + raise RuntimeError('GetFileVersionInfoW failed') + + buf = w.LPVOID() + length = w.UINT() + # Look for codepages + if not ver.VerQueryValueW(res, r'\VarFileInfo\Translation', ctypes.byref(buf), ctypes.byref(length)): + raise RuntimeError('VerQueryValueW failed to find translation') + + if length.value == 0: + raise RuntimeError('no code pages') + + codepages = array.array('H', ctypes.string_at(buf.value, length.value)) + codepage = codepages[:2] + # Extract information + if not ver.VerQueryValueW( + res, + rf'\StringFileInfo\{codepage[0]:04x}{codepage[1]:04x}\FileDescription', + ctypes.byref(buf), + ctypes.byref(length) + ): + raise RuntimeError('VerQueryValueW failed to find file version') + + return ctypes.wstring_at(buf.value, length.value - 1) + + +@lru_cache +def get_app_name(app_process: Process) -> str: + try: + return get_file_description(app_process.exe()) + + except Exception: + traceback.print_exc() + return app_process.name() diff --git a/main.py b/main.py new file mode 100644 index 0000000..8da4ac4 --- /dev/null +++ b/main.py @@ -0,0 +1,46 @@ +import sys; sys.coinit_flags = 0 # noqa +from loguru import logger +import logging + + +class InterceptHandler(logging.Handler): + def emit(self, record): + # Get corresponding Loguru level if it exists + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + +logging.basicConfig(handlers=[InterceptHandler()]) + +from pycaw.pycaw import AudioUtilities +import AudioController + + +mgr = AudioUtilities.GetAudioSessionManager() + +audio_controller = AudioController.AudioController() +callback = AudioController.SessionCreateCallback(audio_controller) + +mgr.RegisterSessionNotification(callback) +mgr.GetSessionEnumerator() + +try: + audio_controller.start_blocking() + +except KeyboardInterrupt: + pass + +finally: + mgr.UnregisterSessionNotification(callback) + audio_controller.pre_shutdown() + logger.debug(audio_controller._sessions) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..54ca63b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +comtypes~=1.1.11 +pycaw~=20220416 +psutil~=5.9.1 +loguru~=0.6.0 \ No newline at end of file