From 4b97f789dfaacd963380aacadb5ce15807d506e8 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Mon, 12 Apr 2021 18:16:42 +0200 Subject: [PATCH] run() now uses CmdPipe for better pipe handling and cleaner code --- tests/test_cmdpipe.py | 34 +++-- tests/test_executenode.py | 16 +-- tests/test_scaling.py | 8 +- zfs_autobackup/CmdPipe.py | 31 ++-- zfs_autobackup/ExecuteNode.py | 260 +++++++++------------------------- zfs_autobackup/ZfsDataset.py | 2 +- zfs_autobackup/ZfsNode.py | 4 +- 7 files changed, 125 insertions(+), 230 deletions(-) diff --git a/tests/test_cmdpipe.py b/tests/test_cmdpipe.py index 3f60adf..edfebc8 100644 --- a/tests/test_cmdpipe.py +++ b/tests/test_cmdpipe.py @@ -10,11 +10,12 @@ class TestCmdPipe(unittest2.TestCase): err=[] out=[] p.add(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line)) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err, ["ls: cannot access '/nonexistent': No such file or directory"]) self.assertEqual(out, ["/","/"]) - self.assertEqual(exits, [2]) + self.assertTrue(executed) + self.assertEqual(p.items[0]['process'].returncode,2) def test_input(self): """test stdinput""" @@ -22,11 +23,12 @@ class TestCmdPipe(unittest2.TestCase): err=[] out=[] p.add(["echo", "test"], stderr_handler=lambda line: err.append(line)) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err, []) self.assertEqual(out, ["test"]) - self.assertEqual(exits, [0]) + self.assertTrue(executed) + self.assertEqual(p.items[0]['process'].returncode,0) def test_pipe(self): """test piped""" @@ -38,13 +40,16 @@ class TestCmdPipe(unittest2.TestCase): p.add(["echo", "test"], stderr_handler=lambda line: err1.append(line)) p.add(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line)) p.add(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line)) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err1, []) self.assertEqual(err2, []) self.assertEqual(err3, []) self.assertEqual(out, ["TEsT"]) - self.assertEqual(exits, [0,0,0]) + self.assertTrue(executed) + self.assertEqual(p.items[0]['process'].returncode,0) + self.assertEqual(p.items[1]['process'].returncode,0) + self.assertEqual(p.items[2]['process'].returncode,0) #test str representation as well self.assertEqual(str(p), "(echo test) | (tr e E) | (tr t T)") @@ -59,13 +64,16 @@ class TestCmdPipe(unittest2.TestCase): p.add(["ls", "/nonexistent1"], stderr_handler=lambda line: err1.append(line)) p.add(["ls", "/nonexistent2"], stderr_handler=lambda line: err2.append(line)) p.add(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line)) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err1, ["ls: cannot access '/nonexistent1': No such file or directory"]) self.assertEqual(err2, ["ls: cannot access '/nonexistent2': No such file or directory"]) self.assertEqual(err3, ["ls: cannot access '/nonexistent3': No such file or directory"]) self.assertEqual(out, []) - self.assertEqual(exits, [2,2,2]) + self.assertTrue(executed) + self.assertEqual(p.items[0]['process'].returncode,2) + self.assertEqual(p.items[1]['process'].returncode,2) + self.assertEqual(p.items[2]['process'].returncode,2) def test_readonly_execute(self): """everything readonly, just should execute""" @@ -76,12 +84,14 @@ class TestCmdPipe(unittest2.TestCase): out=[] p.add(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=True) p.add(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err1, []) self.assertEqual(err2, []) self.assertEqual(out, ["test2"]) - self.assertEqual(exits, [0,0]) + self.assertTrue(executed) + self.assertEqual(p.items[0]['process'].returncode,0) + self.assertEqual(p.items[1]['process'].returncode,0) def test_readonly_skip(self): """one command not readonly, skip""" @@ -92,10 +102,10 @@ class TestCmdPipe(unittest2.TestCase): out=[] p.add(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=False) p.add(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True) - exits=p.execute(stdout_handler=lambda line: out.append(line)) + executed=p.execute(stdout_handler=lambda line: out.append(line)) self.assertEqual(err1, []) self.assertEqual(err2, []) self.assertEqual(out, []) - self.assertEqual(exits, None) + self.assertFalse(executed) diff --git a/tests/test_executenode.py b/tests/test_executenode.py index 4ccd3dc..7e5d37f 100644 --- a/tests/test_executenode.py +++ b/tests/test_executenode.py @@ -64,7 +64,7 @@ class TestExecuteNode(unittest2.TestCase): def test_readonly(self): node=ExecuteNode(debug_output=True, readonly=True) - self.assertEqual(node.run(["echo","test"], readonly=False), None) + self.assertEqual(node.run(["echo","test"], readonly=False), []) self.assertEqual(node.run(["echo","test"], readonly=True), ["test"]) @@ -73,36 +73,36 @@ class TestExecuteNode(unittest2.TestCase): def pipe(self, nodea, nodeb): with self.subTest("pipe data"): - output=nodea.get_pipe(["dd", "if=/dev/zero", "count=1000"]) + output=nodea.run(["dd", "if=/dev/zero", "count=1000"],pipe=True) self.assertEqual(nodeb.run(["md5sum"], inp=output), ["816df6f64deba63b029ca19d880ee10a -"]) with self.subTest("exit code both ends of pipe ok"): - output=nodea.get_pipe(["true"]) + output=nodea.run(["true"], pipe=True) nodeb.run(["true"], inp=output) with self.subTest("error on pipe input side"): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.get_pipe(["false"]) + output=nodea.run(["false"], pipe=True) nodeb.run(["true"], inp=output) with self.subTest("error on pipe output side "): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.get_pipe(["true"]) + output=nodea.run(["true"], pipe=True) nodeb.run(["false"], inp=output) with self.subTest("error on both sides of pipe"): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.get_pipe(["false"]) + output=nodea.run(["false"], pipe=True) nodeb.run(["false"], inp=output) with self.subTest("check stderr on pipe output side"): - output=nodea.get_pipe(["true"]) + output=nodea.run(["true"], pipe=True) (stdout, stderr)=nodeb.run(["ls", "nonexistingfile"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) self.assertEqual(stdout,[]) self.assertRegex(stderr[0], "nonexistingfile" ) with self.subTest("check stderr on pipe input side (should be only printed)"): - output=nodea.get_pipe(["ls", "nonexistingfile"]) + output=nodea.run(["ls", "nonexistingfile"], pipe=True) (stdout, stderr)=nodeb.run(["true"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) self.assertEqual(stdout,[]) self.assertEqual(stderr,[]) diff --git a/tests/test_scaling.py b/tests/test_scaling.py index 167521e..49568f6 100644 --- a/tests/test_scaling.py +++ b/tests/test_scaling.py @@ -38,7 +38,7 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with an impact of more than O(snapshot_count/2) - expected_runs=240 + expected_runs=343 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), snapshot_count/2) @@ -69,6 +69,7 @@ class TestZfsScaling(unittest2.TestCase): global run_counter + #first run run_counter=0 with patch.object(ExecuteNode,'run', run_count) as p: @@ -77,11 +78,12 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with an impact of more than O(snapshot_count/2) - expected_runs=640 + expected_runs=743 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), dataset_count/2) + #second run, should have higher number of expected_runs run_counter=0 with patch.object(ExecuteNode,'run', run_count) as p: @@ -90,6 +92,6 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with a performance impact of more than O(snapshot_count/2) - expected_runs=844 + expected_runs=947 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), dataset_count/2) diff --git a/zfs_autobackup/CmdPipe.py b/zfs_autobackup/CmdPipe.py index b07c4cc..ae0e0a7 100644 --- a/zfs_autobackup/CmdPipe.py +++ b/zfs_autobackup/CmdPipe.py @@ -3,7 +3,7 @@ import os import select class CmdPipe: - """a pipe of one or more commands """ + """a pipe of one or more commands. also takes care of utf-8 encoding/decoding and line based parsing""" def __init__(self, readonly=False, inp=None): """ @@ -30,6 +30,12 @@ class CmdPipe: def __str__(self): """transform into oneliner for debugging and testing """ + + #just one command? + if len(self.items)==1: + return " ".join(self.items[0]['cmd']) + + #an actual pipe ret = "" for item in self.items: if ret: @@ -38,11 +44,14 @@ class CmdPipe: return ret + def should_execute(self): + return(self._should_execute) + def execute(self, stdout_handler): - """run the pipe""" + """run the pipe. returns True if it executed, and false if it skipped due to readonly conditions""" if not self._should_execute: - return None + return False # first process should have actual user input as stdin: selectors = [] @@ -51,7 +60,14 @@ class CmdPipe: last_stdout = None stdin = subprocess.PIPE for item in self.items: - item['process'] = subprocess.Popen(item['cmd'], env=os.environ, stdout=subprocess.PIPE, stdin=stdin, + + # make sure the command gets all the data in utf8 format: + # (this is necessary if LC_ALL=en_US.utf8 is not set in the environment) + encoded_cmd = [] + for arg in item['cmd']: + encoded_cmd.append(arg.encode('utf-8')) + + item['process'] = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) selectors.append(item['process'].stderr) @@ -101,11 +117,10 @@ class CmdPipe: if eof_count == len(selectors) and done_count == len(self.items): break - # close all filehandles and get all exit codes - ret = [] + # ret = [] last_stdout.close() for item in self.items: item['process'].stderr.close() - ret.append(item['process'].returncode) + # ret.append(item['process'].returncode) - return ret + return True diff --git a/zfs_autobackup/ExecuteNode.py b/zfs_autobackup/ExecuteNode.py index eedbca8..7afc905 100644 --- a/zfs_autobackup/ExecuteNode.py +++ b/zfs_autobackup/ExecuteNode.py @@ -2,6 +2,7 @@ import os import select import subprocess +from zfs_autobackup.CmdPipe import CmdPipe from zfs_autobackup.LogStub import LogStub @@ -38,247 +39,114 @@ class ExecuteNode(LogStub): else: self.error("STDERR > " + line.rstrip()) - def _parse_stderr_pipe(self, line, hide_errors): - """parse stderr from pipe input process. can be overridden in subclass""" - if hide_errors: - self.debug("STDERR|> " + line.rstrip()) - else: - self.error("STDERR|> " + line.rstrip()) + # def _parse_stderr_pipe(self, line, hide_errors): + # """parse stderr from pipe input process. can be overridden in subclass""" + # if hide_errors: + # self.debug("STDERR|> " + line.rstrip()) + # else: + # self.error("STDERR|> " + line.rstrip()) - def _encode_cmd(self, cmd): - """returns cmd in encoded and escaped form that can be used with popen.""" - - encoded_cmd = [] - - # make sure the command gets all the data in utf8 format: - # (this is necessary if LC_ALL=en_US.utf8 is not set in the environment) + def _remote_cmd(self, cmd): + """transforms cmd in correct form for remote over ssh, if needed""" # use ssh? if self.ssh_to is not None: - encoded_cmd.append("ssh".encode('utf-8')) + encoded_cmd = [] + encoded_cmd.append("ssh") if self.ssh_config is not None: - encoded_cmd.extend(["-F".encode('utf-8'), self.ssh_config.encode('utf-8')]) + encoded_cmd.extend(["-F", self.ssh_config]) - encoded_cmd.append(self.ssh_to.encode('utf-8')) + encoded_cmd.append(self.ssh_to) for arg in cmd: # add single quotes for remote commands to support spaces and other weird stuff (remote commands are # executed in a shell) and escape existing single quotes (bash needs ' to end the quoted string, # then a \' for the actual quote and then another ' to start a new quoted string) (and then python # needs the double \ to get a single \) - encoded_cmd.append(("'" + arg.replace("'", "'\\''") + "'").encode('utf-8')) + encoded_cmd.append(("'" + arg.replace("'", "'\\''") + "'")) + return encoded_cmd else: - for arg in cmd: - encoded_cmd.append(arg.encode('utf-8')) + return(cmd) - return encoded_cmd def is_local(self): return self.ssh_to is None - def get_pipe(self, cmd, inp=None, readonly=False): - """return a pipehandle to a process. - - The caller should pass this handle as inp= to run() via inp= at some point to actually execute it. - - Returns None if we're in test-mode. (but still prints important debug output) - """ - - encoded_cmd = self._encode_cmd(cmd) - - # debug and test stuff - debug_txt = "" - for c in encoded_cmd: - debug_txt = debug_txt + " " + c.decode() - - # determine stdin - if inp is None: - # NOTE: Not None, otherwise it reads stdin from terminal! - stdin = subprocess.PIPE - elif isinstance(inp, str) or type(inp) == 'unicode': - self.debug("STDIN: \n" + inp.rstrip()) - stdin = subprocess.PIPE - elif isinstance(inp, subprocess.Popen): - self.debug("PIPE to:") - stdin = inp.stdout - else: - raise (Exception("Program error: Incompatible input")) - - if self.readonly and not readonly: - self.debug("CMDSKIP> " + debug_txt) - return None - else: - self.debug("CMD > " + debug_txt) - - if self.readonly and not readonly: - return None - - # create pipe - p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) - - # Note: make streaming? - if isinstance(inp, str) or type(inp) == 'unicode': - p.stdin.write(inp.encode('utf-8')) - - if p.stdin: - p.stdin.close() - - return p def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False, - return_stderr=False): + return_stderr=False, pipe=False): """run a command on the node , checks output and parses/handle output and returns it :param cmd: the actual command, should be a list, where the first item is the command and the rest are parameters. - :param inp: Can be None, a string or a pipe-handle you got from get_pipe() + :param pipe: return CmdPipe instead of executing it. + :param inp: Can be None, a string or a CmdPipe that was previously returned. :param tab_split: split tabbed files in output into a list :param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe) - Use [] to accept all exit codes. + Use [] to accept all exit codes. Default [0] :param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode :param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors) :param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout) """ - if valid_exitcodes is None: - valid_exitcodes = [0] - - # encoded_cmd = self._encode_cmd(cmd) - # - # # debug and test stuff - # debug_txt = "" - # for c in encoded_cmd: - # debug_txt = debug_txt + " " + c.decode() - # - # if pipe: - # debug_txt = debug_txt + " |" - # - # if self.readonly and not readonly: - # self.debug("SKIP > " + debug_txt) - # else: - # if pipe: - # self.debug("PIPE > " + debug_txt) - # else: - # self.debug("RUN > " + debug_txt) - # - # # determine stdin - # if inp is None: - # # NOTE: Not None, otherwise it reads stdin from terminal! - # stdin = subprocess.PIPE - # elif isinstance(inp, str) or type(inp) == 'unicode': - # self.debug("INPUT > \n" + inp.rstrip()) - # stdin = subprocess.PIPE - # elif isinstance(inp, subprocess.Popen): - # self.debug("Piping input") - # stdin = inp.stdout - # else: - # raise (Exception("Program error: Incompatible input")) - - # if self.readonly and not readonly: - # # todo: what happens if input is piped? - # return - # - # # execute and parse/return results - # p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) - - # # Note: make streaming? - # if isinstance(inp, str) or type(inp) == 'unicode': - # p.stdin.write(inp.encode('utf-8')) - # - # if p.stdin: - # p.stdin.close() - # - # # return pipe - # if pipe: - # return p - - p = self.get_pipe(cmd, inp=inp, readonly=readonly) - - if p is None: - return None - - # handle all outputs - if isinstance(inp, subprocess.Popen): - selectors = [p.stdout, p.stderr, inp.stderr] - inp.stdout.close() # otherwise inputprocess wont exit when ours does + # create new pipe? + if not isinstance(inp, CmdPipe): + p = CmdPipe(self.readonly, inp) else: - selectors = [p.stdout, p.stderr] + # add stuff to existing pipe + p = inp - output_lines = [] + # stderr parser error_lines = [] - while True: - (read_ready, write_ready, ex_ready) = select.select(selectors, [], []) - eof_count = 0 - if p.stdout in read_ready: - line = p.stdout.readline().decode('utf-8') - if line != "": - if tab_split: - output_lines.append(line.rstrip().split('\t')) - else: - output_lines.append(line.rstrip()) - self._parse_stdout(line) - else: - eof_count = eof_count + 1 - if p.stderr in read_ready: - line = p.stderr.readline().decode('utf-8') - if line != "": - if tab_split: - error_lines.append(line.rstrip().split('\t')) - else: - error_lines.append(line.rstrip()) - self._parse_stderr(line, hide_errors) - else: - eof_count = eof_count + 1 - if isinstance(inp, subprocess.Popen) and (inp.stderr in read_ready): - line = inp.stderr.readline().decode('utf-8') - if line != "": - self._parse_stderr_pipe(line, hide_errors) - else: - eof_count = eof_count + 1 + def stderr_handler(line): + if tab_split: + error_lines.append(line.rstrip().split('\t')) + else: + error_lines.append(line.rstrip()) + self._parse_stderr(line, hide_errors) - # stop if both processes are done and all filehandles are EOF: - if (p.poll() is not None) and ( - (not isinstance(inp, subprocess.Popen)) or inp.poll() is not None) and eof_count == len(selectors): - break + # add command to pipe + encoded_cmd = self._remote_cmd(cmd) + p.add(cmd=encoded_cmd, readonly=readonly, stderr_handler=stderr_handler) - p.stderr.close() - p.stdout.close() + # return pipe instead of executing? + if pipe: + return p - if self.debug_output: - self.debug("EXIT > {}".format(p.returncode)) + # stdout parser + output_lines = [] + def stdout_handler(line): + if tab_split: + output_lines.append(line.rstrip().split('\t')) + else: + output_lines.append(line.rstrip()) + self._parse_stdout(line) - # handle piped process error output and exit codes - if isinstance(inp, subprocess.Popen): - inp.stderr.close() - inp.stdout.close() + if p.should_execute(): + self.debug("CMD > {}".format(p)) + else: + self.debug("CMDSKIP> {}".format(p)) - if self.debug_output: - self.debug("EXIT |> {}".format(inp.returncode)) - if valid_exitcodes and inp.returncode not in valid_exitcodes: - raise (subprocess.CalledProcessError(inp.returncode, "(pipe)")) + # execute and verify exit codes + if p.execute(stdout_handler=stdout_handler) and valid_exitcodes is not []: + if valid_exitcodes is None: + valid_exitcodes = [0] - if valid_exitcodes and p.returncode not in valid_exitcodes: - raise (subprocess.CalledProcessError(p.returncode, self._encode_cmd(cmd))) + item_nr=1 + for item in p.items: + exit_code=item['process'].returncode + + if self.debug_output: + self.debug("EXIT{} > {}".format(item_nr, exit_code)) + + if exit_code not in valid_exitcodes: + raise (subprocess.CalledProcessError(exit_code, " ".join(item['cmd']))) + item_nr=item_nr+1 if return_stderr: return output_lines, error_lines else: return output_lines - - # def run_pipe(self, cmds, *args, **kwargs): - # """run a array of commands piped together. """ - # - # #i - # if self.zfs_node.is_local(): - # output_pipe = self.zfs_node.run(cmd, pipe=True) - # for pipe_cmd in output_pipes: - # output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, pipe=True, ) - # return output_pipe - # #remote, so add with actual | and let remote shell handle it - # else: - # for pipe_cmd in output_pipes: - # cmd.append("|") - # cmd.extend(pipe_cmd) diff --git a/zfs_autobackup/ZfsDataset.py b/zfs_autobackup/ZfsDataset.py index 61f442b..f78ec3a 100644 --- a/zfs_autobackup/ZfsDataset.py +++ b/zfs_autobackup/ZfsDataset.py @@ -565,7 +565,7 @@ class ZfsDataset: # cmd.append("|") # cmd.extend(pipe_cmd) - return self.zfs_node.get_pipe(cmd) + return self.zfs_node.run(cmd, pipe=True, readonly=True) def recv_pipe(self, pipe, features, filter_properties=None, set_properties=None, ignore_exit_code=False): diff --git a/zfs_autobackup/ZfsNode.py b/zfs_autobackup/ZfsNode.py index 6c16339..1d27b25 100644 --- a/zfs_autobackup/ZfsNode.py +++ b/zfs_autobackup/ZfsNode.py @@ -135,8 +135,8 @@ class ZfsNode(ExecuteNode): else: self.error(prefix + line.rstrip()) - def _parse_stderr_pipe(self, line, hide_errors): - self.parse_zfs_progress(line, hide_errors, "STDERR|> ") + # def _parse_stderr_pipe(self, line, hide_errors): + # self.parse_zfs_progress(line, hide_errors, "STDERR|> ") def _parse_stderr(self, line, hide_errors): self.parse_zfs_progress(line, hide_errors, "STDERR > ")