diff --git a/tests/test_cmdpipe.py b/tests/test_cmdpipe.py index ce3e09a..a65ed57 100644 --- a/tests/test_cmdpipe.py +++ b/tests/test_cmdpipe.py @@ -121,3 +121,55 @@ class TestCmdPipe(unittest2.TestCase): self.assertEqual(out, []) self.assertTrue(executed) + def test_no_handlers(self): + with self.assertRaises(Exception): + p=CmdPipe() + p.add(CmdItem([ "echo" ])) + p.execute() + + #NOTE: this will give some resource warnings + + def test_manual_pipes(self): + + # manual piping means: a command in the pipe has a stdout_handler, which is responsible for sending the data into the next item of the pipe. + + result=[] + + + def stdout_handler(line): + item2.process.stdin.write(line.encode('utf8')) + + # item2.process.stdin.close() + + item1=CmdItem(["echo", "test"], stdout_handler=stdout_handler) + item2=CmdItem(["tr", "e", "E"], stdout_handler=lambda line: result.append(line)) + + p=CmdPipe() + p.add(item1) + p.add(item2) + p.execute() + + self.assertEqual(result, ["tEst"]) + + def test_multiprocess(self): + + #dont do any piping at all, just run multiple processes and handle outputs + + result1=[] + result2=[] + result3=[] + + item1=CmdItem(["echo", "test1"], stdout_handler=lambda line: result1.append(line)) + item2=CmdItem(["echo", "test2"], stdout_handler=lambda line: result2.append(line)) + item3=CmdItem(["echo", "test3"], stdout_handler=lambda line: result3.append(line)) + + p=CmdPipe() + p.add(item1) + p.add(item2) + p.add(item3) + p.execute() + + self.assertEqual(result1, ["test1"]) + self.assertEqual(result2, ["test2"]) + self.assertEqual(result3, ["test3"]) + diff --git a/tests/test_executenode.py b/tests/test_executenode.py index 2b9d933..ea8b7bd 100644 --- a/tests/test_executenode.py +++ b/tests/test_executenode.py @@ -164,8 +164,8 @@ class TestExecuteNode(unittest2.TestCase): # # nodea=ExecuteNode(debug_output=True, ssh_to="localhost") # - # cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"]) - # cmd_pipe.execute(stdout_handler) + # cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"], stdout_handler=stdout_handler) + # cmd_pipe.execute() diff --git a/zfs_autobackup/CmdPipe.py b/zfs_autobackup/CmdPipe.py index 72de4a6..54f5461 100644 --- a/zfs_autobackup/CmdPipe.py +++ b/zfs_autobackup/CmdPipe.py @@ -1,8 +1,16 @@ # This is the low level process executing stuff. -# It makes piping multiple process easier. +# It makes piping and parallel process handling more easy. + # You can specify a handler for each line of stderr output for each item in the pipe. # Every item also has its own exitcode handler. -# There is one stdout handler for the last item in the pipe. This is also called for each line. + +# Normally you add a stdout_handler to the last item in the pipe. +# However: You can also add stdout_handler to other items in a pipe. This will turn that item in to a manual pipe: your +# handler is responsible for sending data into the next item of the pipe. (avaiable in item.next) + +# You can also use manual pipe mode to just execute multiple command in parallel and handle their output parallel, +# without doing any actual pipe stuff. (because you dont HAVE to send data into the next item.) + import subprocess import os @@ -33,7 +41,7 @@ class CmdItem: self.exit_handler = exit_handler self.shell = shell self.process = None - self.next_item = None #next item in pipe, set by CmdPipe + self.next = None #next item in pipe, set by CmdPipe def __str__(self): """return copy-pastable version of command.""" @@ -97,55 +105,38 @@ class CmdPipe: return self._should_execute def execute(self): - """run the pipe. returns True all exit handlers returned true""" + """run the pipe. returns True all exit handlers returned true. (otherwise it will be False/None depending on exit handlers returncode) """ if not self._should_execute: return True - selectors = [] + selectors = self.__create() - # create processes - last_stdout = None - next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe - first=True - prev_item=None + if not selectors: + raise (Exception("Cant use cmdpipe without any output handlers.")) + + self.__process_outputs(selectors) + + # close filehandles for item in self.items: + item.process.stderr.close() + item.process.stdout.close() - #creates the actual subprocess via subprocess.popen - item.create(next_stdin) + # call exit handlers + success = True + for item in self.items: + if item.exit_handler is not None: + success=item.exit_handler(item.process.returncode) and success - #we piped previous process? dont forget to close its stdout - if next_stdin != subprocess.PIPE: - next_stdin.close() + return success - selectors.append(item.process.stderr) - - # we're the first process in the pipe - if first: - if self.inp is not None: - #write the input we have - item.process.stdin.write(self.inp.encode('utf-8')) - item.process.stdin.close() - first=False - - #manual stdout handling or pipe it to the next process? - if item.stdout_handler is None: - # no manual stdout handling, pipe it to the next process via sytem pipe - next_stdin=item.process.stdout - else: - # manual stdout handling via python - selectors.append(item.process.stdout) - # next process will get input from python: - next_stdin= subprocess.PIPE - - if prev_item is not None: - prev_item.next=item - - prev_item=item + def __process_outputs(self, selectors): + """watch all output selectors and call handlers""" while True: # wait for output on one of the stderrs or last_stdout (read_ready, write_ready, ex_ready) = select.select(selectors, [], []) + eof_count = 0 done_count = 0 @@ -165,6 +156,8 @@ class CmdPipe: item.stdout_handler(line) else: eof_count = eof_count + 1 + if item.next: + item.next.process.stdin.close() if item.process.poll() is not None: done_count = done_count + 1 @@ -173,16 +166,48 @@ class CmdPipe: if eof_count == len(selectors) and done_count == len(self.items): break - # close filehandles - for item in self.items: - item.process.stderr.close() - item.process.stdout.close() - # item.process.stdin.close() - # call exit handlers - success = True - for item in self.items: - if item.exit_handler is not None: - success=item.exit_handler(item.process.returncode) and success - return success + def __create(self): + """create actual processes, do piping and return selectors.""" + + selectors = [] + next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe + first = True + prev_item = None + + for item in self.items: + + # creates the actual subprocess via subprocess.popen + item.create(next_stdin) + + # we piped previous process? dont forget to close its stdout + if next_stdin != subprocess.PIPE: + next_stdin.close() + + if item.stderr_handler: + selectors.append(item.process.stderr) + + # we're the first process in the pipe + if first: + if self.inp is not None: + # write the input we have + item.process.stdin.write(self.inp.encode('utf-8')) + item.process.stdin.close() + first = False + + # manual stdout handling or pipe it to the next process? + if item.stdout_handler is None: + # no manual stdout handling, pipe it to the next process via sytem pipe + next_stdin = item.process.stdout + else: + # manual stdout handling via python + selectors.append(item.process.stdout) + # next process will get input from python: + next_stdin = subprocess.PIPE + + if prev_item is not None: + prev_item.next = item + + prev_item = item + return selectors