From 86ea5e49f4654db4b3783c1130e47ed4697cbf95 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Wed, 7 Apr 2021 23:58:41 +0200 Subject: [PATCH] working on better piping system --- tests/test_cmdpipe.py | 47 ++++++++++++++++ zfs_autobackup/CmdPipe.py | 111 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 tests/test_cmdpipe.py create mode 100644 zfs_autobackup/CmdPipe.py diff --git a/tests/test_cmdpipe.py b/tests/test_cmdpipe.py new file mode 100644 index 0000000..920860a --- /dev/null +++ b/tests/test_cmdpipe.py @@ -0,0 +1,47 @@ +from basetest import * +from zfs_autobackup.CmdPipe import CmdPipe + + +class TestCmdPipe(unittest2.TestCase): + + def test_single(self): + """single process stdout and stderr""" + p=CmdPipe(readonly=False, inp=None) + err=[] + out=[] + p.add(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line)) + exits=p.execute(stdout_handler=lambda line: out.append(line)) + + self.assertEqual(err, ["ls: cannot access '/nonexistent': No such file or directory"]) + self.assertEqual(out, ["/","/"]) + self.assertEqual(exits, [2]) + + def test_input(self): + """test stdinput""" + p=CmdPipe(readonly=False, inp="test") + err=[] + out=[] + p.add(["echo", "test"], stderr_handler=lambda line: err.append(line)) + exits=p.execute(stdout_handler=lambda line: out.append(line)) + + self.assertEqual(err, []) + self.assertEqual(out, ["test"]) + self.assertEqual(exits, [0]) + + def test_pipe(self): + """test piped""" + p=CmdPipe(readonly=False) + err1=[] + err2=[] + err3=[] + out=[] + p.add(["echo", "test"], stderr_handler=lambda line: err1.append(line)) + p.add(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line)) + p.add(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line)) + exits=p.execute(stdout_handler=lambda line: out.append(line)) + + self.assertEqual(err1, []) + self.assertEqual(err2, []) + self.assertEqual(err3, []) + self.assertEqual(out, ["TEsT"]) + self.assertEqual(exits, [0,0,0]) diff --git a/zfs_autobackup/CmdPipe.py b/zfs_autobackup/CmdPipe.py new file mode 100644 index 0000000..a3d47a3 --- /dev/null +++ b/zfs_autobackup/CmdPipe.py @@ -0,0 +1,111 @@ +import subprocess +import os +import select + +class CmdPipe: + """a pipe of one or more commands """ + + def __init__(self, readonly=False, inp=None): + """ + :param inp: input string for stdin + :param readonly: Only execute if entire pipe consist of readonly commands + """ + # list of commands + error handlers to execute + self.items = [] + + self.inp = inp + self.readonly = readonly + self._should_execute = True + + def add(self, cmd, readonly=False, stderr_handler=None): + """adds a command to pipe""" + + self.items.append({ + 'cmd': cmd, + 'stderr_handler': stderr_handler + }) + + if not readonly and self.readonly: + self._should_execute = False + + def __str__(self): + """transform into oneliner for debugging and testing """ + ret = "" + for item in self.items: + if ret: + ret = ret + " | " + ret = ret + "(" + " ".join(item['cmd']) + ")" + + return ret + + def execute(self, stdout_handler): + """run the pipe""" + + if not self._should_execute: + return False + + # first process should have actual user input as stdin: + selectors = [] + + # create processes + last_stdout = None + stdin = subprocess.PIPE + for item in self.items: + item['process'] = subprocess.Popen(item['cmd'], env=os.environ, stdout=subprocess.PIPE, stdin=stdin, + stderr=subprocess.PIPE) + + selectors.append(item['process'].stderr) + + if last_stdout is None: + # we're the first process in the pipe, do we have some input? + if self.inp is not None: + # TODO: make streaming to support big inputs? + item['process'].stdin.write(self.inp.encode('utf-8')) + item['process'].stdin.close() + else: + #last stdout was piped to this stdin already, so close it because we dont need it anymore + last_stdout.close() + + last_stdout = item['process'].stdout + stdin=last_stdout + + # monitor last stdout as well + selectors.append(last_stdout) + + while True: + # wait for output on one of the stderrs or last_stdout + (read_ready, write_ready, ex_ready) = select.select(selectors, [], []) + eof_count = 0 + done_count = 0 + + # read line and call appropriate handlers + if last_stdout in read_ready: + line = last_stdout.readline().decode('utf-8').rstrip() + if line != "": + stdout_handler(line) + else: + eof_count = eof_count + 1 + + for item in self.items: + if item['process'].stderr in read_ready: + line = item['process'].stderr.readline().decode('utf-8').rstrip() + if line != "": + item['stderr_handler'](line) + else: + eof_count = eof_count + 1 + + if item['process'].poll() is not None: + done_count = done_count + 1 + + # all filehandles are eof and all processes are done (poll() is not None) + if eof_count == len(selectors) and done_count == len(self.items): + break + + # close all filehandles and get all exit codes + ret = [] + last_stdout.close() + for item in self.items: + item['process'].stderr.close() + ret.append(item['process'].returncode) + + return ret