1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-12 15:27:14 +03:00

[#610] Enable game_running on Linux

This commit is contained in:
David Sangrey 2024-06-18 12:20:08 -04:00
parent 81a3b4fd5f
commit f1df1c7da7
No known key found for this signature in database
GPG Key ID: 3AEADBB0186884BC

View File

@ -15,10 +15,11 @@ import sys
import threading
from calendar import timegm
from collections import defaultdict
from os import SEEK_END, SEEK_SET, listdir, environ
from os import SEEK_END, SEEK_SET, listdir
from os.path import basename, expanduser, getctime, isdir, join
from time import gmtime, localtime, mktime, sleep, strftime, strptime, time
from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping
import psutil
import semantic_version
import util_ships
from config import config
@ -35,12 +36,6 @@ MAX_NAVROUTE_DISCREPANCY = 5 # Timestamp difference in seconds
MAX_FCMATERIALS_DISCREPANCY = 5 # Timestamp difference in seconds
if sys.platform == 'win32':
import win32process
import win32con
import win32security
import win32gui
import win32api
import pywintypes
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver
@ -60,7 +55,8 @@ class EDLogs(FileSystemEventHandler):
"""Monitoring of Journal files."""
# Magic with FileSystemEventHandler can confuse type checkers when they do not have access to every import
_POLL = 1 # Polling is cheap, so do it often
_POLL = 1 # Polling while running is cheap, so do it often
_INACTIVE_POLL = 10 # Polling while not running isn't as cheap, so do it less often
_RE_CANONICALISE = re.compile(r'\$(.+)_name;')
_RE_CATEGORY = re.compile(r'\$MICRORESOURCE_CATEGORY_(.+);')
_RE_LOGFILE = re.compile(r'^Journal(Alpha|Beta)?\.[0-9]{2,4}(-)?[0-9]{2}(-)?[0-9]{2}(T)?[0-9]{2}[0-9]{2}[0-9]{2}'
@ -90,6 +86,7 @@ class EDLogs(FileSystemEventHandler):
self.catching_up = False
self.game_was_running = False # For generation of the "ShutDown" event
self.running_process = None
# Context for journal handling
self.version: str | None = None
@ -111,13 +108,6 @@ class EDLogs(FileSystemEventHandler):
# be >= for Live, and < for Legacy.
self.live_galaxy_base_version = semantic_version.Version('4.0.0')
if sys.platform == 'win32':
# Get the SID of the user we're running as for later use in
# `game_running()`
self.user_sid, self.user_domain, self.user_type = win32security.LookupAccountName(
None, environ['USERNAME']
)
self.__init_state()
def __init_state(self) -> None:
@ -469,7 +459,10 @@ class EDLogs(FileSystemEventHandler):
loghandle = open(logfile, 'rb', 0) # unbuffered
log_pos = 0
sleep(self._POLL)
if self.game_was_running:
sleep(self._POLL)
else:
sleep(self._INACTIVE_POLL)
# Check whether we're still supposed to be running
if threading.current_thread() != self.thread:
@ -2117,70 +2110,34 @@ class EDLogs(FileSystemEventHandler):
return entry
def game_running(self) -> bool: # noqa: CCR001
def game_running(self) -> bool:
"""
Determine if the game is currently running.
TODO: Implement on Linux
:return: bool - True if the game is running.
"""
if sys.platform == 'win32':
def WindowTitle(h): # noqa: N802
if h:
return win32gui.GetWindowText(h)
return None
def callback(hwnd, hwnds): # noqa: N803
name = WindowTitle(hwnd)
if name and name.startswith('Elite - Dangerous'):
# We've found a window that *looks* like an ED game process, but now we need to check
# if it's owned by the current user.
# Get the process_id of the window we found
# <https://mhammond.github.io/pywin32/win32process__GetWindowThreadProcessId_meth.html>
thread_id, process_id = win32process.GetWindowThreadProcessId(hwnd)
# Use that to get a process handle
# <https://mhammond.github.io/pywin32/win32api__OpenProcess_meth.html>
# The first arg can't simply be `0`, and `win32con.PROCESS_TERMINATE` works
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, False, process_id)
if handle:
# We got the handle OK, now we need a token for it
process_token = win32security.OpenProcessToken(handle, win32security.TOKEN_QUERY)
# So we can use that to get information about the User
token_information, i = win32security.GetTokenInformation(
process_token, win32security.TokenUser
)
# And lastly check if token_information, which should be a PySID object, matches
# that of the current user we looked up in `__init__()`.
if token_information == self.user_sid:
# This can be used to convert the token to username, domain name, and account type
# user, domain, name_use = win32security.LookupAccountSid(None, token_information)
hwnds.append(hwnd)
return False # Indicate window found, so stop iterating
return True
# Ref: <http://timgolden.me.uk/python/win32_how_do_i/find-the-window-for-my-subprocess.html>
ed_windows: list[int] = []
if self.running_process:
p = self.running_process
try:
win32gui.EnumWindows(callback, ed_windows)
except pywintypes.error as e:
# Ref: <https://lists.archive.carbon60.com/python/python/135503>
# Because False is returned in the callback to indicate "found the window, stop
# processing", this causes EnumWindows() to return `0`, which is generically
# treated as an error, so exception is raised.
# So, check the exception's .winerror, and ignore if `0`.
if e.winerror != 0:
logger.exception("EnumWindows exception:")
bacon = bool(ed_windows)
print(bacon)
return bacon
return False
with p.oneshot():
if p.status() not in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING]:
raise psutil.NoSuchProcess
except psutil.NoSuchProcess:
# Process likely expired
self.running_process = None
if not self.running_process:
edmc_process = psutil.Process()
edmc_user = edmc_process.username()
try:
for pid in psutil.pids():
proc = psutil.Process(pid)
if 'EliteDangerous' in proc.name() and proc.username() == edmc_user:
self.running_process = proc
return True
except psutil.NoSuchProcess:
pass
return False
return bool(self.running_process)
def ship(self, timestamped=True) -> MutableMapping[str, Any] | None:
"""