import datetime from pathlib import Path import time from dataclasses import dataclass from typing import Generator, Self from loguru import logger from abc import ABC, abstractmethod def parse_timestamp(line: str) -> datetime.datetime | None: # <2025-05-23T16:48:53.672Z> try: line = line.strip("<>").replace("Z", "+00:00") return datetime.datetime.fromisoformat(line) except Exception: logger.warning(f"Failed to parse timestamp from {line!r}") return None damage_type_to_emoji = { "ElectricArc": ":cloud_lightning:", "Explosion": ":boom:", "VehicleDestruction": ":boom: :airplane:", "TakeDown": ":martial_arts_uniform: :face_with_peeking_eye:", } @dataclass(frozen=True) class Line: line: str is_realtime: bool @dataclass(frozen=True) class Event: timestamp: datetime.datetime | None def to_str(self, include_ts: bool = False, rich_text: bool = False) -> str: return str(self) @dataclass(frozen=True) class Kill(Event): killer: str victim: str zone: str killed_by: str damage_type: str @staticmethod def from_line(line: str) -> "Self": # <2025-05-23T16:48:53.672Z> [Notice] CActor::Kill: 'Neon0ne' [1895570109956] in zone 'ANVL_Ballista_3700492224326' killed by 'Aleksey31' [1978962253870] using 'MRCK_S07_ANVL_Ballista_Duel_3700492224372' [Class unknown] with damage type 'VehicleDestruction' from direction x: 0.000000, y: 0.000000, z: 0.000000 [Team_ActorTech][Actor] splited = [i.strip("'") for i in line.split()] return Kill( timestamp=parse_timestamp(splited[0]), victim=splited[5], zone=splited[9], killer=splited[12], killed_by=splited[15], damage_type=splited[21], ) def to_str(self, include_ts: bool = False, rich_text: bool = True) -> str: ts = (self.timestamp.strftime("%Y-%m-%d %H:%M:%S") + ": ") * include_ts if self.victim == self.killer: return f"{ts}{'**' * rich_text}{self.victim}{'**' * rich_text} РКН{' 🤦' * rich_text}" emoji = " " + damage_type_to_emoji.get(self.damage_type, "") return f"{ts}{'**' * rich_text}{self.victim}{'**' * rich_text} killed by {'🔫 ' * rich_text}{'**' * rich_text}{self.killer}{'**' * rich_text}{emoji * rich_text}" @dataclass class Filter: only_personal_feed: bool include_npc: bool nickname: str | None = None def filter_event(self, event: Event) -> bool: if isinstance(event, Kill): if "_NPC_" in event.killer + event.victim and not self.include_npc: return False if all( [ self.nickname not in (event.killer, event.victim), self.only_personal_feed is True, self.nickname is not None, ] ): return False return True else: logger.warning( f"Don't know how to filter event {type(event)}, letting through" ) return True class SubscriberABC(ABC): def __init__(self, event_filter: Filter): self.filter = event_filter @abstractmethod def consume_event(self, event: Event) -> None: raise NotImplementedError @dataclass class Monitor: log_file_path: Path subscribers: set[SubscriberABC] do_backread: bool = False nickname: str | None = None running: bool = False def try_set_self_name(self, line: str) -> None: # <2025-05-23T16:19:10.244Z> [Notice] Character: createdAt 1747220301029 - updatedAt 1747220301703 - geid 1978962253870 - accountId 2261831 - name Aleksey31 - state STATE_CURRENT [Team_GameServices][Login] if self.nickname is not None: logger.warning( "Warning, AccountLoginCharacterStatus_Character duplicate", line ) return try: self.nickname = line.split()[17] logger.info(f"Extracted {self.nickname=}") except IndexError: logger.warning("Failed to extract nickname from line", line) def call_subscribers(self, event: Event) -> None: logger.info(f"Passing event: {event}") if len(self.subscribers) == 0: logger.warning("set of subscribers is empty") for subscriber in self.subscribers: subscriber.consume_event(event) def process_lines(self) -> None: logger.info("Beginning lines consuming") self.running = True for line in self.monitor_log_file(): process_important_realtime_events = self.do_backread or line.is_realtime if "" in line.line: self.try_set_self_name(line.line) continue if ( "[Notice] CActor::Kill:" in line.line and process_important_realtime_events ): kill = Kill.from_line(line.line) self.call_subscribers(kill) def monitor_log_file(self) -> Generator[Line, None, None]: is_realtime = False with self.log_file_path.open("r", encoding="utf-8") as file: while self.running: current_position = file.tell() line = file.readline() if not line: if not is_realtime: logger.info("Reached edge of the file") is_realtime = True time.sleep(0.15) file.seek(current_position) line = line.removesuffix("\n") yield Line(line, is_realtime) logger.info("Exiting reading loop")