1
0
mirror of https://github.com/EDCD/EDMarketConnector.git synced 2025-04-15 16:50:34 +03:00

Final type annotation pass

This commit is contained in:
A_D 2021-03-08 17:43:22 +02:00 committed by Athanasius
parent adba94efa2
commit ceb2f74855

View File

@ -1,5 +1,4 @@
# edmc: protocol handler for cAPI authorisation """protocol handler for cAPI authorisation."""
import sys import sys
import threading import threading
@ -30,20 +29,20 @@ if sys.platform == 'win32':
class GenericProtocolHandler: class GenericProtocolHandler:
"""Base Protocol Handler.""" """Base Protocol Handler."""
def __init__(self): def __init__(self) -> None:
self.redirect = protocolhandler_redirect # Base redirection URL self.redirect = protocolhandler_redirect # Base redirection URL
self.master = None self.master: 'tkinter.Tk' = None # type: ignore
self.lastpayload = None self.lastpayload = None
def start(self, master: 'tkinter.Tk'): def start(self, master: 'tkinter.Tk') -> None:
"""Start Protocol Handler.""" """Start Protocol Handler."""
self.master = master self.master = master
def close(self): def close(self) -> None:
"""Stop / Close Protocol Handler.""" """Stop / Close Protocol Handler."""
pass pass
def event(self, url): def event(self, url) -> None:
"""Generate an auth event.""" """Generate an auth event."""
self.lastpayload = url self.lastpayload = url
@ -69,13 +68,13 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i
POLL = 100 # ms POLL = 100 # ms
def start(self, master): def start(self, master: 'tkinter.Tk') -> None:
"""Start Protocol Handler.""" """Start Protocol Handler."""
GenericProtocolHandler.start(self, master) GenericProtocolHandler.start(self, master)
self.lasturl: Optional[str] = None self.lasturl: Optional[str] = None
self.eventhandler = EventHandler.alloc().init() self.eventhandler = EventHandler.alloc().init()
def poll(self): def poll(self) -> None:
"""Poll event until URL is updated.""" """Poll event until URL is updated."""
# No way of signalling to Tkinter from within the callback handler block that doesn't cause Python to crash, # No way of signalling to Tkinter from within the callback handler block that doesn't cause Python to crash,
# so poll. TODO: Resolved? # so poll. TODO: Resolved?
@ -86,7 +85,7 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i
class EventHandler(NSObject): class EventHandler(NSObject):
"""Handle NSAppleEventManager IPC stuff.""" """Handle NSAppleEventManager IPC stuff."""
def init(self): def init(self) -> None:
""" """
Init method for handler. Init method for handler.
@ -101,7 +100,7 @@ if sys.platform == 'darwin' and getattr(sys, 'frozen', False): # noqa: C901 # i
) )
return self return self
def handleEvent_withReplyEvent_(self, event, replyEvent): # noqa: N802 N803 # Required to override def handleEvent_withReplyEvent_(self, event, replyEvent) -> None: # noqa: N802 N803 # Required to override
"""Actual event handling from NSAppleEventManager.""" """Actual event handling from NSAppleEventManager."""
protocolhandler.lasturl = urllib.parse.unquote( # type: ignore # Its going to be a DPH in this code protocolhandler.lasturl = urllib.parse.unquote( # type: ignore # Its going to be a DPH in this code
event.paramDescriptorForKeyword_(keyDirectObject).stringValue() event.paramDescriptorForKeyword_(keyDirectObject).stringValue()
@ -229,18 +228,18 @@ elif sys.platform == 'win32' and getattr(sys, 'frozen', False) and not is_wine a
https://en.wikipedia.org/wiki/Dynamic_Data_Exchange https://en.wikipedia.org/wiki/Dynamic_Data_Exchange
""" """
def __init__(self): def __init__(self) -> None:
super().__init__() super().__init__()
self.thread: Optional[threading.Thread] = None self.thread: Optional[threading.Thread] = None
def start(self, master): def start(self, master: 'tkinter.Tk') -> None:
"""Start the DDE thread.""" """Start the DDE thread."""
super().start(master) super().start(master)
self.thread = threading.Thread(target=self.worker, name='DDE worker') self.thread = threading.Thread(target=self.worker, name='DDE worker')
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
def close(self): def close(self) -> None:
"""Stop the DDE thread.""" """Stop the DDE thread."""
thread = self.thread thread = self.thread
if thread: if thread:
@ -248,7 +247,7 @@ elif sys.platform == 'win32' and getattr(sys, 'frozen', False) and not is_wine a
PostThreadMessageW(thread.ident, WM_QUIT, 0, 0) PostThreadMessageW(thread.ident, WM_QUIT, 0, 0)
thread.join() # Wait for it to quit thread.join() # Wait for it to quit
def worker(self): def worker(self) -> None:
"""Start a DDE server.""" """Start a DDE server."""
wndclass = WNDCLASS() wndclass = WNDCLASS()
wndclass.style = 0 wndclass.style = 0
@ -320,21 +319,21 @@ else: # Linux / Run from source
This implementation uses a localhost HTTP server This implementation uses a localhost HTTP server
""" """
def __init__(self): def __init__(self) -> None:
GenericProtocolHandler.__init__(self) super().__init__()
self.httpd = HTTPServer(('localhost', 0), HTTPRequestHandler) self.httpd = HTTPServer(('localhost', 0), HTTPRequestHandler)
self.redirect = f'http://localhost:{self.httpd.server_port}/auth' self.redirect = f'http://localhost:{self.httpd.server_port}/auth'
logger.trace(f'Web server listening on {self.redirect}') logger.trace(f'Web server listening on {self.redirect}')
self.thread = None self.thread: Optional[threading.Thread] = None
def start(self, master): def start(self, master) -> None:
"""Start the HTTP server thread.""" """Start the HTTP server thread."""
GenericProtocolHandler.start(self, master) GenericProtocolHandler.start(self, master)
self.thread = threading.Thread(target=self.worker, name='OAuth worker') self.thread = threading.Thread(target=self.worker, name='OAuth worker')
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
def close(self): def close(self) -> None:
"""Shutdown the HTTP server thread.""" """Shutdown the HTTP server thread."""
thread = self.thread thread = self.thread
if thread: if thread:
@ -353,7 +352,7 @@ else: # Linux / Run from source
logger.debug('Done.') logger.debug('Done.')
def worker(self): def worker(self) -> None:
"""HTTP Worker.""" """HTTP Worker."""
# TODO: This should probably be more ephemeral, and only handle one request, as its all we're expecting # TODO: This should probably be more ephemeral, and only handle one request, as its all we're expecting
self.httpd.serve_forever() self.httpd.serve_forever()
@ -361,7 +360,7 @@ else: # Linux / Run from source
class HTTPRequestHandler(BaseHTTPRequestHandler): class HTTPRequestHandler(BaseHTTPRequestHandler):
"""Simple HTTP server to handle IPC from protocol handler.""" """Simple HTTP server to handle IPC from protocol handler."""
def parse(self): def parse(self) -> bool:
"""Parse a request.""" """Parse a request."""
url = urllib.parse.unquote(self.path) url = urllib.parse.unquote(self.path)
if url.startswith('/auth'): if url.startswith('/auth'):
@ -373,12 +372,12 @@ else: # Linux / Run from source
self.send_response(404) # Not found self.send_response(404) # Not found
return False return False
def do_HEAD(self): # noqa: N802 # Required to override def do_HEAD(self) -> None: # noqa: N802 # Required to override
"""Handle HEAD Request.""" """Handle HEAD Request."""
self.parse() self.parse()
self.end_headers() self.end_headers()
def do_GET(self): # noqa: N802 # Required to override def do_GET(self) -> None: # noqa: N802 # Required to override
"""Handle GET Request.""" """Handle GET Request."""
if self.parse(): if self.parse():
self.send_header('Content-Type', 'text/html') self.send_header('Content-Type', 'text/html')