mirror of
https://github.com/norohind/AudioControl.git
synced 2025-04-12 05:00:01 +03:00
init
This commit is contained in:
commit
5547b68c5a
230
AudioController.py
Normal file
230
AudioController.py
Normal file
@ -0,0 +1,230 @@
|
||||
import queue
|
||||
import time
|
||||
|
||||
import comtypes
|
||||
import psutil
|
||||
from pycaw.utils import AudioSession
|
||||
from pycaw.callbacks import AudioSessionEvents, AudioSessionNotification
|
||||
from pycaw.pycaw import AudioUtilities
|
||||
|
||||
import Events
|
||||
from ServerSideView import ServerSideView
|
||||
from queue import Queue, Empty
|
||||
|
||||
from loguru import logger
|
||||
from get_app_name import get_app_name
|
||||
|
||||
|
||||
class PerSessionCallbacks(AudioSessionEvents):
|
||||
"""Passing callbacks calls to AudioController and includes pid to calls"""
|
||||
|
||||
def __init__(self, pid: int, audio_controller: 'AudioController'):
|
||||
self.pid = pid
|
||||
self.audio_controller = audio_controller
|
||||
|
||||
def on_simple_volume_changed(self, new_volume, new_mute, event_context):
|
||||
self.audio_controller.on_volume_changed(self.pid, new_volume, new_mute, event_context)
|
||||
|
||||
def on_state_changed(self, new_state, new_state_id):
|
||||
self.audio_controller.on_state_changed(self.pid, new_state, new_state_id)
|
||||
|
||||
def on_session_disconnected(self, disconnect_reason, disconnect_reason_id):
|
||||
self.audio_controller.on_session_disconnected(self.pid, disconnect_reason, disconnect_reason_id)
|
||||
|
||||
|
||||
class SessionCreateCallback(AudioSessionNotification):
|
||||
def __init__(self, audio_controller: 'AudioController'):
|
||||
self.audio_controller = audio_controller
|
||||
|
||||
def on_session_created(self, new_session):
|
||||
self.audio_controller.on_session_created(new_session)
|
||||
|
||||
|
||||
class AudioController:
|
||||
"""
|
||||
Class aimed to keep current state of situation, handle callbacks from sessions, and do communication with clients
|
||||
vie ServerSideView
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.per_session_callbacks_class = PerSessionCallbacks
|
||||
self._sessions: dict[int, AudioSession] = dict() # Mapping pid to session
|
||||
|
||||
self.outbound_q = Queue() # from AudioController to ServerSideView
|
||||
self.inbound_q = Queue() # from ServerSideView to AudioController
|
||||
|
||||
self._state_change_q = Queue() # A queue for handling state changes as it seems to
|
||||
# work bad with all this logic in callback handler
|
||||
|
||||
self.view = ServerSideView(self.outbound_q, self.inbound_q)
|
||||
|
||||
def get_process(self, pid: int) -> psutil.Process:
|
||||
return self._sessions[pid].Process
|
||||
|
||||
def perform_discover(self):
|
||||
logger.trace('Performing discovering')
|
||||
for session in AudioUtilities.GetAllSessions():
|
||||
logger.trace(f'Checking session {session.Process} {session.ProcessId}')
|
||||
if session.Process is not None:
|
||||
if session.ProcessId not in self._sessions:
|
||||
logger.debug(f'Discovered session {session.Process}')
|
||||
self._sessions[session.ProcessId] = session
|
||||
self.on_session_created(session)
|
||||
|
||||
else:
|
||||
logger.trace(f'Already have session {session.Process} in _sessions')
|
||||
|
||||
def on_session_created(self, new_session: AudioSession):
|
||||
if new_session.Process is not None:
|
||||
logger.debug(f'New session {new_session.Process.name()}:{new_session.ProcessId}')
|
||||
|
||||
if new_session.ProcessId in self._sessions:
|
||||
logger.warning(f'Already have session {new_session}')
|
||||
return
|
||||
|
||||
self._sessions[new_session.ProcessId] = new_session
|
||||
new_session.register_notification(self.per_session_callbacks_class(new_session.ProcessId, self))
|
||||
|
||||
# Notifying
|
||||
self.outbound_q.put(Events.NewSession(new_session.ProcessId))
|
||||
self.outbound_q.put(Events.SetName(new_session.ProcessId, get_app_name(new_session.Process)))
|
||||
# TODO: Send also volume, mute, state
|
||||
|
||||
else:
|
||||
logger.debug("None's process session", new_session, new_session.ProcessId)
|
||||
|
||||
def on_volume_changed(self, pid: int, new_volume: float, new_mute: int, event_context: 'comtypes.LP_GUID'):
|
||||
logger.debug(f'Volume changed {self.get_process(pid).name()} {pid} new value: {new_volume}')
|
||||
|
||||
# Notifying
|
||||
self.outbound_q.put(Events.VolumeChanged(pid, new_volume))
|
||||
|
||||
# Yes, this will produce useless MuteStateChanged
|
||||
self.outbound_q.put(Events.MuteStateChanged(pid, bool(new_mute)))
|
||||
|
||||
def on_state_changed(self, pid: int, new_state: str, new_state_id: int):
|
||||
"""
|
||||
There have been some problems with executing some amount of code on callbacks so took it out to main thread and
|
||||
in callbacks it only put messages to queue
|
||||
"""
|
||||
|
||||
logger.debug(f'State changed {self.get_process(pid).name()} {pid} new state: {new_state} {new_state_id}')
|
||||
self._state_change_q.put((pid, new_state_id))
|
||||
|
||||
def _state_change_tick(self):
|
||||
try:
|
||||
msg = self._state_change_q.get(timeout=3)
|
||||
logger.trace(f'New state message {msg}')
|
||||
|
||||
except Empty:
|
||||
return
|
||||
|
||||
else:
|
||||
pid, new_state_id = msg
|
||||
if new_state_id == 2:
|
||||
self._generic_disconnect(pid)
|
||||
logger.trace(f'_generic_disconnect call done for {pid}')
|
||||
|
||||
else:
|
||||
# Notifying
|
||||
self.outbound_q.put(Events.StateChanged(pid, bool(new_state_id)))
|
||||
|
||||
def on_session_disconnected(self, pid: int, disconnect_reason, disconnect_reason_id):
|
||||
"""
|
||||
Is fired, when the audio session disconnected "hard".
|
||||
Mostly on_state_changed == "Expired" is what you are looking for.
|
||||
see self.AudioSessionDisconnectReason for disconnect_reason
|
||||
The use is similar to on_state_changed (ref: pycaw.callbacks.AudioSessionEvents)
|
||||
NB: expired state id = 2
|
||||
"""
|
||||
|
||||
logger.info(f'Session disconnected {self.get_process(pid).name()} {pid} {disconnect_reason} {disconnect_reason_id}')
|
||||
self._generic_disconnect(pid)
|
||||
|
||||
def _generic_disconnect(self, pid: int):
|
||||
"""
|
||||
Session can be disconnected by on_session_disconnected event and by state = expired, this method gets called
|
||||
by both of them.
|
||||
|
||||
:param pid:
|
||||
:return:
|
||||
"""
|
||||
|
||||
process = self.get_process(pid)
|
||||
if process.is_running():
|
||||
logger.warning(f'Process disconnected but still running {process}')
|
||||
|
||||
# Notifying
|
||||
self.outbound_q.put(Events.SessionClosed(pid))
|
||||
logger.debug(f'Generic disconnect done')
|
||||
|
||||
self._remove_session_by_pid(pid)
|
||||
logger.debug(f'Removing call done')
|
||||
|
||||
def _remove_session_by_pid(self, pid: int):
|
||||
session = self._sessions[pid]
|
||||
session.unregister_notification()
|
||||
logger.trace(f'Successfully unregistered notification for {session._process}')
|
||||
del self._sessions[pid]
|
||||
logger.trace(f'Removing {pid} done')
|
||||
# print_stack()
|
||||
# return
|
||||
|
||||
def pre_shutdown(self):
|
||||
"""Unregister callbacks"""
|
||||
logger.trace(f'Entering pre_shutdown')
|
||||
for pid in tuple(self._sessions.keys()):
|
||||
try:
|
||||
self._remove_session_by_pid(pid)
|
||||
|
||||
except Exception:
|
||||
logger.opt(exception=True).warning(f'Failed to unregister_notification() for pid {pid}')
|
||||
|
||||
def set_mute(self, pid: int, is_muted: bool):
|
||||
logger.trace(f'Set mute for {pid} {is_muted=}')
|
||||
self._sessions[pid].SimpleAudioVolume.SetMute(int(is_muted), None)
|
||||
|
||||
def is_muted(self, pid: int) -> bool:
|
||||
return self._sessions[pid].SimpleAudioVolume.GetMute()
|
||||
|
||||
def toggle_mute(self, pid: int):
|
||||
logger.trace(f'Toggle mute for {pid}')
|
||||
is_muted = self.is_muted(pid)
|
||||
self.set_mute(pid, not is_muted)
|
||||
|
||||
def get_volume(self, pid: int) -> float:
|
||||
logger.trace(f'Get volume for {pid}')
|
||||
return self._sessions[pid].SimpleAudioVolume.GetMasterVolume()
|
||||
|
||||
def set_volume(self, pid: int, volume: float):
|
||||
# only set volume in the range 0.0 to 1.0
|
||||
volume = min(1.0, max(0.0, volume))
|
||||
self._sessions[pid].SimpleAudioVolume.SetMasterVolume(volume, None)
|
||||
|
||||
def increment_volume(self, pid: int, increment: float):
|
||||
logger.trace(f'Increment volume for {pid}, {increment=}')
|
||||
volume = increment + self.get_volume(pid)
|
||||
self.set_volume(pid, volume)
|
||||
|
||||
def _inbound_q_tick(self):
|
||||
try:
|
||||
event: Events.ClientToServerEvent = self.inbound_q.get_nowait()
|
||||
|
||||
except queue.Empty:
|
||||
return
|
||||
|
||||
else:
|
||||
if isinstance(event, Events.VolumeIncrement):
|
||||
self.increment_volume(event.PID, event.increment)
|
||||
|
||||
elif isinstance(event, Events.MuteToggle):
|
||||
self.toggle_mute(event.PID)
|
||||
|
||||
def start_blocking(self):
|
||||
# self.perform_discover()
|
||||
logger.debug(f'Starting blocking')
|
||||
self.view.start()
|
||||
while True:
|
||||
# time.sleep(1)
|
||||
self._state_change_tick()
|
||||
self._inbound_q_tick()
|
122
Events.py
Normal file
122
Events.py
Normal file
@ -0,0 +1,122 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
"""
|
||||
Processes unique identifies by their PIDs.
|
||||
From server to client events:
|
||||
1. New session
|
||||
PID
|
||||
|
||||
2. Session closed
|
||||
PID
|
||||
|
||||
3. State changed
|
||||
PID
|
||||
is_active
|
||||
|
||||
4. Volume changed
|
||||
PID
|
||||
New volume
|
||||
|
||||
5. Mute state changed
|
||||
PID
|
||||
is_muted
|
||||
|
||||
6. Set name
|
||||
PID
|
||||
Name
|
||||
|
||||
From client to server:
|
||||
1. Volume increment
|
||||
PID
|
||||
increment (for decrement use -increment value)
|
||||
|
||||
2. Mute toggle
|
||||
PID
|
||||
|
||||
Cases:
|
||||
1. New Session:
|
||||
Send `New Session` event
|
||||
Send `Name Changed` event
|
||||
Send `Volume Changed` event
|
||||
Send `Mute State Changed` event
|
||||
Send `State changed` event
|
||||
# Let's call this set of events a full view since it fully describes current information about a session
|
||||
|
||||
2. Session closed
|
||||
Send `Session closed` event
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Event:
|
||||
PID: int
|
||||
event: str = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.event = self.__class__.__name__
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServerToClientEvent(Event):
|
||||
...
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClientToServerEvent(Event):
|
||||
...
|
||||
|
||||
|
||||
# Server to Client Events
|
||||
|
||||
@dataclass
|
||||
class NewSession(ServerToClientEvent):
|
||||
...
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionClosed(ServerToClientEvent):
|
||||
...
|
||||
|
||||
|
||||
@dataclass
|
||||
class StateChanged(ServerToClientEvent):
|
||||
is_active: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class VolumeChanged(ServerToClientEvent):
|
||||
new_volume: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class MuteStateChanged(ServerToClientEvent):
|
||||
is_muted: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class SetName(ServerToClientEvent):
|
||||
name: str
|
||||
|
||||
|
||||
# Client to Server Events
|
||||
|
||||
@dataclass
|
||||
class VolumeIncrement(ClientToServerEvent):
|
||||
increment: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class MuteToggle(ClientToServerEvent):
|
||||
...
|
||||
|
||||
|
||||
def lookup_event(event_name: str) -> Event:
|
||||
subclasses = dict()
|
||||
to_handle = [Event]
|
||||
while len(to_handle) > 0:
|
||||
current_item = to_handle.pop()
|
||||
for subclass in current_item.__subclasses__():
|
||||
subclasses[subclass.__name__] = subclass
|
||||
to_handle.append(subclass)
|
||||
|
||||
return subclasses[event_name]
|
78
ProcessAudioController.py
Normal file
78
ProcessAudioController.py
Normal file
@ -0,0 +1,78 @@
|
||||
from pycaw.pycaw import AudioUtilities
|
||||
import pycaw.utils
|
||||
from get_app_name import get_app_name
|
||||
|
||||
|
||||
def get_process_session(pid: int) -> pycaw.utils.AudioSession | None:
|
||||
sessions = AudioUtilities.GetAllSessions()
|
||||
for session in sessions:
|
||||
if session.Process and session.Process.pid == pid:
|
||||
return session
|
||||
|
||||
|
||||
class ProcessAudioController:
|
||||
def __init__(self, *, pid: int = None, audio_session: pycaw.utils.AudioSession = None):
|
||||
if pid is not None:
|
||||
self._process_session = get_process_session(pid)
|
||||
|
||||
if audio_session is not None:
|
||||
self._process_session = audio_session
|
||||
|
||||
self.process = self._process_session.Process
|
||||
self.process_description = get_app_name(self.process)
|
||||
|
||||
def mute(self):
|
||||
self._process_session.SimpleAudioVolume.SetMute(1, None)
|
||||
print(self.process.name(), "has been muted.") # debug
|
||||
|
||||
def unmute(self):
|
||||
self._process_session.SimpleAudioVolume.SetMute(0, None)
|
||||
print(self.process.name(), "has been unmuted.") # debug
|
||||
|
||||
def get_process_volume(self):
|
||||
return self._process_session.SimpleAudioVolume.GetMasterVolume()
|
||||
|
||||
@property
|
||||
def volume(self):
|
||||
return self.get_process_volume()
|
||||
|
||||
def set_volume(self, decibels: float):
|
||||
new_volume = min(1.0, max(0.0, decibels))
|
||||
self._process_session.SimpleAudioVolume.SetMasterVolume(new_volume, None)
|
||||
print("Volume set to", new_volume) # debug
|
||||
|
||||
def decrease_volume(self, decibels: float):
|
||||
volume = max(0.0, self.volume - decibels)
|
||||
self._process_session.SimpleAudioVolume.SetMasterVolume(volume, None)
|
||||
print("Volume reduced to", volume) # debug
|
||||
|
||||
def increase_volume(self, decibels: float):
|
||||
# 1.0 is the max value, raise by decibels
|
||||
new_volume = min(1.0, self.volume + decibels)
|
||||
self._process_session.SimpleAudioVolume.SetMasterVolume(new_volume, None)
|
||||
print("Volume raised to", new_volume) # debug
|
||||
|
||||
|
||||
class AudioController:
|
||||
processes: dict[int, ProcessAudioController] = dict() # PIDs as keys
|
||||
_selected_process: Optional[ProcessAudioController] = None
|
||||
|
||||
def __init__(self, view: ViewABC):
|
||||
self.view = view
|
||||
for session in AudioUtilities.GetAllSessions():
|
||||
if session.ProcessId != 0:
|
||||
audio_process_controller = ProcessAudioController(audio_session=session)
|
||||
self.processes[audio_process_controller.process_description] = audio_process_controller
|
||||
|
||||
if len(self.processes) > 0:
|
||||
self.selected_process = next(iter(self.processes))
|
||||
|
||||
@property
|
||||
def selected_process(self) -> Optional[ProcessAudioController]:
|
||||
return self._selected_process
|
||||
|
||||
@selected_process.setter
|
||||
def selected_process(self, pid_to_select: int):
|
||||
self._selected_process = self.processes[pid_to_select]
|
||||
self.view.select_process_callback(self.selected_process)
|
||||
|
85
ServerSideView.py
Normal file
85
ServerSideView.py
Normal file
@ -0,0 +1,85 @@
|
||||
import queue
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
from loguru import logger
|
||||
import Events
|
||||
from typing import TypedDict
|
||||
from dataclasses import asdict
|
||||
from Transport import Transport
|
||||
|
||||
|
||||
class SessionState(TypedDict):
|
||||
pid: int
|
||||
volume: float
|
||||
is_muted: bool
|
||||
is_active: bool
|
||||
name: str
|
||||
|
||||
|
||||
class ServerSideView(Thread):
|
||||
"""
|
||||
The `AudioController` gets called by callbacks, callbacks calls performs from
|
||||
threads which we shouldn't block for long time, so it would be wisely to put result of a callback to a
|
||||
queue which reads `ServerSideView` from its own thread.
|
||||
The common concept:
|
||||
`AudioController` put messages from callbacks to queue which reads `ServerSideView` which keep up with
|
||||
`ClientSideView` (a client).
|
||||
`ClientSideView` sends `Events` over `Transport` to `ServerSideView` which put messages to another queue
|
||||
which reads `AudioController` which performs action specified in messages.
|
||||
`AudioController`'s work with queues performs in main thread.
|
||||
Callback calls by pycaw performs in pycaw's internal threads.
|
||||
`ServerSideView` executing in its own thread.
|
||||
"""
|
||||
|
||||
daemon = True
|
||||
|
||||
def __init__(self, inbound_q: Queue, outbound_q: Queue, transport: Transport = Transport()):
|
||||
"""
|
||||
:param inbound_q: Queue from AudioController to ServerSideView
|
||||
:param outbound_q: Queue from ServerSideView to AudioController
|
||||
"""
|
||||
super().__init__()
|
||||
self.inbound_q = inbound_q
|
||||
self.outbound_q = outbound_q
|
||||
|
||||
self.transport = transport
|
||||
|
||||
self._state: dict[int, SessionState] = dict() # Holds current state of sessions received from AudioController
|
||||
# PID : SessionState
|
||||
|
||||
def run(self) -> None:
|
||||
while True:
|
||||
try:
|
||||
msg: Events.ServerToClientEvent = self.inbound_q.get_nowait()
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
else:
|
||||
logger.debug(msg)
|
||||
self._update_state(msg)
|
||||
self.transport.send(msg)
|
||||
|
||||
self.transport.tick()
|
||||
|
||||
try:
|
||||
new_msg = self.transport.receive()
|
||||
self.outbound_q.put(new_msg)
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def _update_state(self, event: Events.ServerToClientEvent) -> None:
|
||||
if isinstance(event, Events.NewSession):
|
||||
self._state[event.PID] = dict()
|
||||
|
||||
elif isinstance(event, Events.SessionClosed):
|
||||
del self._state[event.PID]
|
||||
|
||||
else:
|
||||
dicted = asdict(event)
|
||||
del dicted['event']
|
||||
|
||||
self._state[event.PID].update(dicted)
|
||||
|
||||
logger.debug(f'state: {self._state}')
|
67
Transport.py
Normal file
67
Transport.py
Normal file
@ -0,0 +1,67 @@
|
||||
from loguru import logger
|
||||
from dataclasses import asdict
|
||||
import socket
|
||||
import selectors
|
||||
from queue import Queue
|
||||
import Events
|
||||
import json
|
||||
|
||||
|
||||
class Transport:
|
||||
def __init__(self):
|
||||
self._selector = selectors.DefaultSelector()
|
||||
self._from_net_q = Queue()
|
||||
|
||||
self._sock = socket.socket()
|
||||
self._sock.bind(('localhost', 54683))
|
||||
self._sock.listen(100)
|
||||
self._sock.setblocking(False)
|
||||
self._selector.register(self._sock, selectors.EVENT_READ, self._accept)
|
||||
|
||||
self._connections: list[socket.socket] = list()
|
||||
# self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
# self.sock.bind(("127.0.0.1", 54683))
|
||||
...
|
||||
|
||||
def send(self, msg: Events.ServerToClientEvent):
|
||||
logger.debug(f'Sending {asdict(msg)}')
|
||||
msg = json.dumps(asdict(msg)).encode()
|
||||
for conn in self._connections:
|
||||
conn.sendall(msg)
|
||||
|
||||
def receive(self) -> Events.ClientToServerEvent:
|
||||
return self._from_net_q.get_nowait()
|
||||
|
||||
def _accept(self, sock: socket.socket, mask: int):
|
||||
conn, addr = sock.accept() # Should be ready
|
||||
print('accepted', conn, 'from', addr)
|
||||
conn.setblocking(False)
|
||||
self._selector.register(conn, selectors.EVENT_READ, self._read)
|
||||
self._connections.append(conn)
|
||||
|
||||
def _read(self, conn: socket.socket, mask: int):
|
||||
data = conn.recv(1000)
|
||||
if not data:
|
||||
logger.debug(f'Closing connection to {conn.getpeername()}')
|
||||
self._selector.unregister(conn)
|
||||
self._connections.remove(conn)
|
||||
conn.close()
|
||||
return
|
||||
|
||||
try:
|
||||
event_dict = json.loads(data)
|
||||
event_name = event_dict['event']
|
||||
event_cls = Events.lookup_event(event_name)
|
||||
del event_dict['event']
|
||||
logger.trace(f'Passing msg {event_dict} from client {conn.getpeername()}')
|
||||
self._from_net_q.put(event_cls(**event_dict))
|
||||
|
||||
except Exception:
|
||||
logger.opt(exception=True).warning(f"Couldn't parse message from client: {data}")
|
||||
|
||||
def tick(self):
|
||||
events = self._selector.select(timeout=0)
|
||||
for key, mask in events:
|
||||
callback = key.data
|
||||
callback(key.fileobj, mask)
|
58
get_app_name.py
Normal file
58
get_app_name.py
Normal file
@ -0,0 +1,58 @@
|
||||
import ctypes
|
||||
from ctypes import wintypes as w
|
||||
import array
|
||||
from functools import lru_cache
|
||||
from psutil import Process
|
||||
import traceback
|
||||
|
||||
|
||||
ver = ctypes.WinDLL('version')
|
||||
ver.GetFileVersionInfoSizeW.argtypes = w.LPCWSTR, w.LPDWORD
|
||||
ver.GetFileVersionInfoSizeW.restype = w.DWORD
|
||||
ver.GetFileVersionInfoW.argtypes = w.LPCWSTR, w.DWORD, w.DWORD, w.LPVOID
|
||||
ver.GetFileVersionInfoW.restype = w.BOOL
|
||||
ver.VerQueryValueW.argtypes = w.LPCVOID, w.LPCWSTR, ctypes.POINTER(w.LPVOID), w.PUINT
|
||||
ver.VerQueryValueW.restype = w.BOOL
|
||||
|
||||
|
||||
def get_file_description(filepath: str) -> str:
|
||||
# https://stackoverflow.com/questions/42604493/verqueryvaluew-issue-python-3
|
||||
size = ver.GetFileVersionInfoSizeW(filepath, None)
|
||||
if not size:
|
||||
raise RuntimeError('version info not found')
|
||||
|
||||
res = ctypes.create_string_buffer(size)
|
||||
if not ver.GetFileVersionInfoW(filepath, 0, size, res):
|
||||
raise RuntimeError('GetFileVersionInfoW failed')
|
||||
|
||||
buf = w.LPVOID()
|
||||
length = w.UINT()
|
||||
# Look for codepages
|
||||
if not ver.VerQueryValueW(res, r'\VarFileInfo\Translation', ctypes.byref(buf), ctypes.byref(length)):
|
||||
raise RuntimeError('VerQueryValueW failed to find translation')
|
||||
|
||||
if length.value == 0:
|
||||
raise RuntimeError('no code pages')
|
||||
|
||||
codepages = array.array('H', ctypes.string_at(buf.value, length.value))
|
||||
codepage = codepages[:2]
|
||||
# Extract information
|
||||
if not ver.VerQueryValueW(
|
||||
res,
|
||||
rf'\StringFileInfo\{codepage[0]:04x}{codepage[1]:04x}\FileDescription',
|
||||
ctypes.byref(buf),
|
||||
ctypes.byref(length)
|
||||
):
|
||||
raise RuntimeError('VerQueryValueW failed to find file version')
|
||||
|
||||
return ctypes.wstring_at(buf.value, length.value - 1)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_app_name(app_process: Process) -> str:
|
||||
try:
|
||||
return get_file_description(app_process.exe())
|
||||
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
return app_process.name()
|
46
main.py
Normal file
46
main.py
Normal file
@ -0,0 +1,46 @@
|
||||
import sys; sys.coinit_flags = 0 # noqa
|
||||
from loguru import logger
|
||||
import logging
|
||||
|
||||
|
||||
class InterceptHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
# Get corresponding Loguru level if it exists
|
||||
try:
|
||||
level = logger.level(record.levelname).name
|
||||
except ValueError:
|
||||
level = record.levelno
|
||||
|
||||
# Find caller from where originated the logged message
|
||||
frame, depth = logging.currentframe(), 2
|
||||
while frame.f_code.co_filename == logging.__file__:
|
||||
frame = frame.f_back
|
||||
depth += 1
|
||||
|
||||
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
|
||||
|
||||
|
||||
logging.basicConfig(handlers=[InterceptHandler()])
|
||||
|
||||
from pycaw.pycaw import AudioUtilities
|
||||
import AudioController
|
||||
|
||||
|
||||
mgr = AudioUtilities.GetAudioSessionManager()
|
||||
|
||||
audio_controller = AudioController.AudioController()
|
||||
callback = AudioController.SessionCreateCallback(audio_controller)
|
||||
|
||||
mgr.RegisterSessionNotification(callback)
|
||||
mgr.GetSessionEnumerator()
|
||||
|
||||
try:
|
||||
audio_controller.start_blocking()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
finally:
|
||||
mgr.UnregisterSessionNotification(callback)
|
||||
audio_controller.pre_shutdown()
|
||||
logger.debug(audio_controller._sessions)
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@ -0,0 +1,4 @@
|
||||
comtypes~=1.1.11
|
||||
pycaw~=20220416
|
||||
psutil~=5.9.1
|
||||
loguru~=0.6.0
|
Loading…
x
Reference in New Issue
Block a user