AudioControl/ServerSideView.py
2022-11-17 20:22:52 +03:00

121 lines
4.2 KiB
Python

import queue
from queue import Queue
from threading import Thread
from loguru import logger
import Events
# from typing import TypedDict
from dataclasses import asdict
from TransportABC import TransportABC
from NetworkTransport import NetworkTransport
# class SessionState(TypedDict):
# pid: int
# volume: float
# is_muted: bool
# is_active: bool
# name: str
class ServerSideView(Thread):
"""
The `AudioController` gets called by callbacks, callbacks calls performs from
threads which we shouldn't block for long time, so it would be wisely to put result of a callback to a
queue which reads `ServerSideView` from its own thread.
The common concept:
`AudioController` put messages from callbacks to queue which reads `ServerSideView` which keep up with
`ClientSideView` (a client).
`ClientSideView` sends `Events` over `Transport` to `ServerSideView`. `Transport` calls `ServerSideView` callback
which put messages to queue which is reading by `AudioController` which performs action specified in messages.
`AudioController`'s work with queues performs in main thread.
Callback calls by pycaw performs in pycaw's internal threads.
`ServerSideView` executing in its own thread.
"""
daemon = True
running = True
def __init__(self, inbound_q: Queue, outbound_q: Queue):
"""
:param inbound_q: Queue from AudioController to ServerSideView
:param outbound_q: Queue from ServerSideView to AudioController
"""
super().__init__()
self.inbound_q = inbound_q
self.outbound_q = outbound_q
self.transport: TransportABC = NetworkTransport(self.rcv_callback)
self._state: dict[int, dict[str, int | str]] = dict() # Holds current state of sessions received from AudioController
# PID : SessionState
def rcv_callback(self, event: Events.ClientToServerEvent):
if isinstance(event, Events.NewClient):
self.inbound_q.put(event)
else:
self.outbound_q.put(event)
def run(self) -> None:
while self.running:
try:
msg: Events.Event = self.inbound_q.get_nowait()
except queue.Empty:
pass
else:
# logger.debug(msg)
if isinstance(msg, Events.ServerToClientEvent):
self._update_state(msg)
self.transport.send(msg)
elif isinstance(msg, Events.NewClient):
self._send_full_state()
else:
logger.warning(f'Unknown event {msg}')
self.transport.tick()
self.transport.shutdown()
def _update_state(self, event: Events.ServerToClientEvent) -> None:
if isinstance(event, Events.NewSession):
self._state[event.PID] = dict()
elif isinstance(event, Events.SessionClosed):
del self._state[event.PID]
else:
dicted = asdict(event)
del dicted['event']
self._state[event.PID].update(dicted)
# logger.trace(f'state: {self._state}')
def _send_full_state(self):
"""Send full state of sessions to clients"""
logger.trace(f'Sending full state')
subclasses = tuple(Events.enumerate_subclasses(Events.ServerToClientEvent))
for session in self._state.values():
for cls in subclasses:
if cls.__name__ == 'SessionClosed':
continue
try:
kwargs = dict()
for field in cls.__dict__['__dataclass_fields__'].keys():
if field != 'event':
# args.append(session[field])
kwargs[field] = session[field]
event: Events.ServerToClientEvent = cls(**kwargs) # Noqa
self.transport.send(event)
except KeyError: # We don't have appropriate field in state for this kind of events
# logger.debug(f'Passing {cls}')
pass