diff --git a/tests/test_cmdpipe.py b/tests/test_cmdpipe.py index 1f5de75..ce3e09a 100644 --- a/tests/test_cmdpipe.py +++ b/tests/test_cmdpipe.py @@ -9,8 +9,8 @@ class TestCmdPipe(unittest2.TestCase): p=CmdPipe(readonly=False, inp=None) err=[] out=[] - p.add(CmdItem(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2))) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2), stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err, ["ls: cannot access '/nonexistent': No such file or directory"]) self.assertEqual(out, ["/","/"]) @@ -21,8 +21,8 @@ class TestCmdPipe(unittest2.TestCase): p=CmdPipe(readonly=False, inp="test") err=[] out=[] - p.add(CmdItem(["cat"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0))) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["cat"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line) )) + executed=p.execute() self.assertEqual(err, []) self.assertEqual(out, ["test"]) @@ -37,8 +37,8 @@ class TestCmdPipe(unittest2.TestCase): out=[] p.add(CmdItem(["echo", "test"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0))) p.add(CmdItem(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0))) - p.add(CmdItem(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0))) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err1, []) self.assertEqual(err2, []) @@ -58,8 +58,8 @@ class TestCmdPipe(unittest2.TestCase): out=[] p.add(CmdItem(["ls", "/nonexistent1"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2))) p.add(CmdItem(["ls", "/nonexistent2"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2))) - p.add(CmdItem(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2))) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2), stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err1, ["ls: cannot access '/nonexistent1': No such file or directory"]) self.assertEqual(err2, ["ls: cannot access '/nonexistent2': No such file or directory"]) @@ -76,8 +76,8 @@ class TestCmdPipe(unittest2.TestCase): out=[] p.add(CmdItem(["bash", "-c", "exit 1"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,1))) p.add(CmdItem(["bash", "-c", "exit 2"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2))) - p.add(CmdItem(["bash", "-c", "exit 3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,3))) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["bash", "-c", "exit 3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,3), stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err1, []) self.assertEqual(err2, []) @@ -97,8 +97,8 @@ class TestCmdPipe(unittest2.TestCase): return True p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), exit_handler=true_exit, readonly=True)) - p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), exit_handler=true_exit, readonly=True)) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), exit_handler=true_exit, readonly=True, stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err1, []) self.assertEqual(err2, []) @@ -113,8 +113,8 @@ class TestCmdPipe(unittest2.TestCase): err2=[] out=[] p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=False)) - p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True)) - executed=p.execute(stdout_handler=lambda line: out.append(line)) + p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True, stdout_handler=lambda line: out.append(line))) + executed=p.execute() self.assertEqual(err1, []) self.assertEqual(err2, []) diff --git a/tests/test_executenode.py b/tests/test_executenode.py index 270ab95..2b9d933 100644 --- a/tests/test_executenode.py +++ b/tests/test_executenode.py @@ -156,16 +156,16 @@ class TestExecuteNode(unittest2.TestCase): self.assertEqual(nodeb.run(cmd=["pwd", ExecuteNode.PIPE, "cat"], cwd="/tmp/space test"), ["/tmp/space test"]) self.assertEqual(nodeb.run(cmd=["cat", ExecuteNode.PIPE, "pwd"], cwd="/tmp/space test"), ["/tmp/space test"]) - - def test_script(self): - - def stdout_handler(line): - print("handle: " + line) - - nodea=ExecuteNode(debug_output=True, ssh_to="localhost") - - cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"]) - cmd_pipe.execute(stdout_handler) + # # + # def test_script(self): + # + # def stdout_handler(line): + # print("handle: " + line) + # + # nodea=ExecuteNode(debug_output=True, ssh_to="localhost") + # + # cmd_pipe=nodea.script(lines=["echo line1", "echo line 2"]) + # cmd_pipe.execute(stdout_handler) diff --git a/zfs_autobackup/CmdPipe.py b/zfs_autobackup/CmdPipe.py index 04680d0..72de4a6 100644 --- a/zfs_autobackup/CmdPipe.py +++ b/zfs_autobackup/CmdPipe.py @@ -17,17 +17,23 @@ except ImportError: class CmdItem: """one command item, to be added to a CmdPipe""" - def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, shell=False): + def __init__(self, cmd, readonly=False, stderr_handler=None, exit_handler=None, stdout_handler=None, shell=False): """create item. caller has to make sure cmd is properly escaped when using shell. + + If stdout_handler is None, it will connect the stdout to the stdin of the next item in the pipe, like + and actual system pipe. (no python overhead) + :type cmd: list of str """ self.cmd = cmd self.readonly = readonly self.stderr_handler = stderr_handler + self.stdout_handler = stdout_handler self.exit_handler = exit_handler self.shell = shell self.process = None + self.next_item = None #next item in pipe, set by CmdPipe def __str__(self): """return copy-pastable version of command.""" @@ -90,38 +96,52 @@ class CmdPipe: def should_execute(self): return self._should_execute - def execute(self, stdout_handler): + def execute(self): """run the pipe. returns True all exit handlers returned true""" if not self._should_execute: return True - # first process should have actual user input as stdin: selectors = [] # create processes last_stdout = None - stdin = subprocess.PIPE + next_stdin = subprocess.PIPE # means we write input via python instead of an actual system pipe + first=True + prev_item=None for item in self.items: - item.create(stdin) + #creates the actual subprocess via subprocess.popen + item.create(next_stdin) + + #we piped previous process? dont forget to close its stdout + if next_stdin != subprocess.PIPE: + next_stdin.close() + selectors.append(item.process.stderr) - if last_stdout is None: - # we're the first process in the pipe, do we have some input? + # we're the first process in the pipe + if first: if self.inp is not None: - # TODO: make streaming to support big inputs? + #write the input we have item.process.stdin.write(self.inp.encode('utf-8')) item.process.stdin.close() + first=False + + #manual stdout handling or pipe it to the next process? + if item.stdout_handler is None: + # no manual stdout handling, pipe it to the next process via sytem pipe + next_stdin=item.process.stdout else: - # last stdout was piped to this stdin already, so close it because we dont need it anymore - last_stdout.close() + # manual stdout handling via python + selectors.append(item.process.stdout) + # next process will get input from python: + next_stdin= subprocess.PIPE - last_stdout = item.process.stdout - stdin = last_stdout + if prev_item is not None: + prev_item.next=item - # monitor last stdout as well - selectors.append(last_stdout) + prev_item=item while True: # wait for output on one of the stderrs or last_stdout @@ -130,12 +150,6 @@ class CmdPipe: done_count = 0 # read line and call appropriate handlers - if last_stdout in read_ready: - line = last_stdout.readline().decode('utf-8').rstrip() - if line != "": - stdout_handler(line) - else: - eof_count = eof_count + 1 for item in self.items: if item.process.stderr in read_ready: @@ -145,6 +159,13 @@ class CmdPipe: else: eof_count = eof_count + 1 + if item.process.stdout in read_ready: + line = item.process.stdout.readline().decode('utf-8').rstrip() + if line != "": + item.stdout_handler(line) + else: + eof_count = eof_count + 1 + if item.process.poll() is not None: done_count = done_count + 1 @@ -153,9 +174,10 @@ class CmdPipe: break # close filehandles - last_stdout.close() for item in self.items: item.process.stderr.close() + item.process.stdout.close() + # item.process.stdin.close() # call exit handlers success = True diff --git a/zfs_autobackup/ExecuteNode.py b/zfs_autobackup/ExecuteNode.py index a487692..99917f6 100644 --- a/zfs_autobackup/ExecuteNode.py +++ b/zfs_autobackup/ExecuteNode.py @@ -144,23 +144,28 @@ class ExecuteNode(LogStub): return True - # add shell command and handlers to pipe - cmd_item=CmdItem(cmd=self._shell_cmd(cmd, cwd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local()) - cmd_pipe.add(cmd_item) - - # return pipe instead of executing? - if pipe: - return cmd_pipe - # stdout parser output_lines = [] - def stdout_handler(line): - if tab_split: - output_lines.append(line.rstrip().split('\t')) - else: - output_lines.append(line.rstrip()) - self._parse_stdout(line) + if pipe: + # dont specify output handler, so it will get piped to next process + stdout_handler=None + else: + # handle output manually, dont pipe it + def stdout_handler(line): + if tab_split: + output_lines.append(line.rstrip().split('\t')) + else: + output_lines.append(line.rstrip()) + self._parse_stdout(line) + + # add shell command and handlers to pipe + cmd_item=CmdItem(cmd=self._shell_cmd(cmd, cwd), readonly=readonly, stderr_handler=stderr_handler, exit_handler=exit_handler, shell=self.is_local(), stdout_handler=stdout_handler) + cmd_pipe.add(cmd_item) + + # return CmdPipe instead of executing? + if pipe: + return cmd_pipe if cmd_pipe.should_execute(): self.debug("CMD > {}".format(cmd_pipe)) @@ -168,7 +173,7 @@ class ExecuteNode(LogStub): self.debug("CMDSKIP> {}".format(cmd_pipe)) # execute and calls handlers in CmdPipe - if not cmd_pipe.execute(stdout_handler=stdout_handler): + if not cmd_pipe.execute(): raise(ExecuteError("Last command returned error")) if return_all: