2022-01-28 23:59:50 +01:00

215 lines
7.4 KiB
Python

# This is the low level process executing stuff.
# It makes piping and parallel process handling more easy.
# You can specify a handler for each line of stderr output for each item in the pipe.
# Every item also has its own exitcode handler.
# Normally you add a stdout_handler to the last item in the pipe.
# However: You can also add stdout_handler to other items in a pipe. This will turn that item in to a manual pipe: your
# handler is responsible for sending data into the next item of the pipe. (avaiable in item.next)
# You can also use manual pipe mode to just execute multiple command in parallel and handle their output parallel,
# without doing any actual pipe stuff. (because you dont HAVE to send data into the next item.)
import subprocess
import os
import select
try:
from shlex import quote as cmd_quote
except ImportError:
from pipes import quote as cmd_quote
class CmdItem:
"""one command item, to be added to a CmdPipe"""
def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, stdout_handler=None, shell=False):
"""create item. caller has to make sure cmd is properly escaped when using shell.
If stdout_handler is None, it will connect the stdout to the stdin of the next item in the pipe, like
and actual system pipe. (no python overhead)
:type cmd: list of str
"""
self.cmd = cmd
self.readonly = readonly
self.stderr_handler = stderr_handler
self.stdout_handler = stdout_handler
self.exit_handler = exit_handler
self.shell = shell
self.process = None
self.next = None #next item in pipe, set by CmdPipe
def __str__(self):
"""return copy-pastable version of command."""
if self.shell:
# its already copy pastable for a shell:
return " ".join(self.cmd)
else:
# make it copy-pastable, will make a mess of quotes sometimes, but is correct
return " ".join(map(cmd_quote, self.cmd))
def create(self, stdin):
"""actually create the subprocess (called by CmdPipe)"""
# make sure the command gets all the data in utf8 format:
# (this is necessary if LC_ALL=en_US.utf8 is not set in the environment)
encoded_cmd = []
for arg in self.cmd:
encoded_cmd.append(arg.encode('utf-8'))
self.process = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin,
stderr=subprocess.PIPE, shell=self.shell)
class CmdPipe:
"""a pipe of one or more commands. also takes care of utf-8 encoding/decoding and line based parsing"""
def __init__(self, readonly=False, inp=None):
"""
:param inp: input string for stdin
:param readonly: Only execute if entire pipe consist of readonly commands
"""
# list of commands + error handlers to execute
self.items = []
self.inp = inp
self.readonly = readonly
self._should_execute = True
def add(self, cmd_item):
"""adds a CmdItem to pipe.
:type cmd_item: CmdItem
"""
self.items.append(cmd_item)
if not cmd_item.readonly and self.readonly:
self._should_execute = False
def __str__(self):
"""transform whole pipe into oneliner for debugging and testing. this should generate a copy-pastable string for in a console """
ret = ""
for item in self.items:
if ret:
ret = ret + " | "
ret = ret + "({})".format(item) # this will do proper escaping to make it copypastable
return ret
def should_execute(self):
return self._should_execute
def execute(self):
"""run the pipe. returns True all exit handlers returned true. (otherwise it will be False/None depending on exit handlers returncode) """
if not self._should_execute:
return True
selectors = self.__create()
if not selectors:
raise (Exception("Cant use cmdpipe without any output handlers."))
self.__process_outputs(selectors)
# close filehandles
for item in self.items:
item.process.stderr.close()
item.process.stdout.close()
# call exit handlers
success = True
for item in self.items:
if item.exit_handler is not None:
success=item.exit_handler(item.process.returncode) and success
return success
def __process_outputs(self, selectors):
"""watch all output selectors and call handlers"""
while True:
# wait for output on one of the stderrs or last_stdout
(read_ready, write_ready, ex_ready) = select.select(selectors, [], [])
eof_count = 0
done_count = 0
# read line and call appropriate handlers
for item in self.items:
if item.process.stdout in read_ready:
line = item.process.stdout.readline().decode('utf-8').rstrip()
if line != "":
item.stdout_handler(line)
else:
eof_count = eof_count + 1
if item.next:
item.next.process.stdin.close()
if item.process.stderr in read_ready:
line = item.process.stderr.readline().decode('utf-8').rstrip()
if line != "":
item.stderr_handler(line)
else:
eof_count = eof_count + 1
if item.process.poll() is not None:
done_count = done_count + 1
# all filehandles are eof and all processes are done (poll() is not None)
if eof_count == len(selectors) and done_count == len(self.items):
break
def __create(self):
"""create actual processes, do piping and return selectors."""
selectors = []
next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe
first = True
prev_item = None
for item in self.items:
# creates the actual subprocess via subprocess.popen
item.create(next_stdin)
# we piped previous process? dont forget to close its stdout
if next_stdin != subprocess.PIPE:
next_stdin.close()
if item.stderr_handler:
selectors.append(item.process.stderr)
# we're the first process in the pipe
if first:
if self.inp is not None:
# write the input we have
item.process.stdin.write(self.inp.encode('utf-8'))
item.process.stdin.close()
first = False
# manual stdout handling or pipe it to the next process?
if item.stdout_handler is None:
# no manual stdout handling, pipe it to the next process via sytem pipe
next_stdin = item.process.stdout
else:
# manual stdout handling via python
selectors.append(item.process.stdout)
# next process will get input from python:
next_stdin = subprocess.PIPE
if prev_item is not None:
prev_item.next = item
prev_item = item
return selectors