From 4910b1dfb57e269b6105bf4ee1627dcc655d9c2d Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Mon, 5 Apr 2021 22:18:14 +0200 Subject: [PATCH] seperated piping --- tests/test_executenode.py | 16 ++-- tests/test_scaling.py | 6 +- zfs_autobackup/ExecuteNode.py | 146 +++++++++++++++++++++++--------- zfs_autobackup/ZfsAutobackup.py | 19 +++-- zfs_autobackup/ZfsDataset.py | 42 ++++++--- 5 files changed, 163 insertions(+), 66 deletions(-) diff --git a/tests/test_executenode.py b/tests/test_executenode.py index f81cece..4ccd3dc 100644 --- a/tests/test_executenode.py +++ b/tests/test_executenode.py @@ -73,39 +73,39 @@ class TestExecuteNode(unittest2.TestCase): def pipe(self, nodea, nodeb): with self.subTest("pipe data"): - output=nodea.run(["dd", "if=/dev/zero", "count=1000"], pipe=True) + output=nodea.get_pipe(["dd", "if=/dev/zero", "count=1000"]) self.assertEqual(nodeb.run(["md5sum"], inp=output), ["816df6f64deba63b029ca19d880ee10a -"]) with self.subTest("exit code both ends of pipe ok"): - output=nodea.run(["true"], pipe=True) + output=nodea.get_pipe(["true"]) nodeb.run(["true"], inp=output) with self.subTest("error on pipe input side"): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.run(["false"], pipe=True) + output=nodea.get_pipe(["false"]) nodeb.run(["true"], inp=output) with self.subTest("error on pipe output side "): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.run(["true"], pipe=True) + output=nodea.get_pipe(["true"]) nodeb.run(["false"], inp=output) with self.subTest("error on both sides of pipe"): with self.assertRaises(subprocess.CalledProcessError): - output=nodea.run(["false"], pipe=True) + output=nodea.get_pipe(["false"]) nodeb.run(["false"], inp=output) with self.subTest("check stderr on pipe output side"): - output=nodea.run(["true"], pipe=True) + output=nodea.get_pipe(["true"]) (stdout, stderr)=nodeb.run(["ls", "nonexistingfile"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) self.assertEqual(stdout,[]) self.assertRegex(stderr[0], "nonexistingfile" ) with self.subTest("check stderr on pipe input side (should be only printed)"): - output=nodea.run(["ls", "nonexistingfile"], pipe=True) + output=nodea.get_pipe(["ls", "nonexistingfile"]) (stdout, stderr)=nodeb.run(["true"], inp=output, return_stderr=True, valid_exitcodes=[0,2]) self.assertEqual(stdout,[]) - self.assertEqual(stderr,[] ) + self.assertEqual(stderr,[]) diff --git a/tests/test_scaling.py b/tests/test_scaling.py index 487cd2c..167521e 100644 --- a/tests/test_scaling.py +++ b/tests/test_scaling.py @@ -38,7 +38,7 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with an impact of more than O(snapshot_count/2) - expected_runs=343 + expected_runs=240 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), snapshot_count/2) @@ -77,7 +77,7 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with an impact of more than O(snapshot_count/2) - expected_runs=743 + expected_runs=640 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), dataset_count/2) @@ -90,6 +90,6 @@ class TestZfsScaling(unittest2.TestCase): #this triggers if you make a change with a performance impact of more than O(snapshot_count/2) - expected_runs=947 + expected_runs=844 print("ACTUAL RUNS: {}".format(run_counter)) self.assertLess(abs(run_counter-expected_runs), dataset_count/2) diff --git a/zfs_autobackup/ExecuteNode.py b/zfs_autobackup/ExecuteNode.py index 9c61d98..eedbca8 100644 --- a/zfs_autobackup/ExecuteNode.py +++ b/zfs_autobackup/ExecuteNode.py @@ -48,7 +48,7 @@ class ExecuteNode(LogStub): def _encode_cmd(self, cmd): """returns cmd in encoded and escaped form that can be used with popen.""" - encoded_cmd=[] + encoded_cmd = [] # make sure the command gets all the data in utf8 format: # (this is necessary if LC_ALL=en_US.utf8 is not set in the environment) @@ -75,27 +75,17 @@ class ExecuteNode(LogStub): return encoded_cmd - def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False, pipe=False, - return_stderr=False): - """run a command on the node. + def is_local(self): + return self.ssh_to is None - :param cmd: the actual command, should be a list, where the first item is the command - and the rest are parameters. - :param inp: Can be None, a string or a pipe-handle you got from another run() - :param tab_split: split tabbed files in output into a list - :param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe) - Use [] to accept all exit codes. - :param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode - :param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors) - :param pipe: Instead of executing, return a pipe-handle to be used to - input to another run() command. (just like a | in linux) - :param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout) + def get_pipe(self, cmd, inp=None, readonly=False): + """return a pipehandle to a process. + The caller should pass this handle as inp= to run() via inp= at some point to actually execute it. + + Returns None if we're in test-mode. (but still prints important debug output) """ - if valid_exitcodes is None: - valid_exitcodes = [0] - encoded_cmd = self._encode_cmd(cmd) # debug and test stuff @@ -103,35 +93,29 @@ class ExecuteNode(LogStub): for c in encoded_cmd: debug_txt = debug_txt + " " + c.decode() - if pipe: - debug_txt = debug_txt + " |" - - if self.readonly and not readonly: - self.debug("SKIP > " + debug_txt) - else: - if pipe: - self.debug("PIPE > " + debug_txt) - else: - self.debug("RUN > " + debug_txt) - # determine stdin if inp is None: # NOTE: Not None, otherwise it reads stdin from terminal! stdin = subprocess.PIPE elif isinstance(inp, str) or type(inp) == 'unicode': - self.debug("INPUT > \n" + inp.rstrip()) + self.debug("STDIN: \n" + inp.rstrip()) stdin = subprocess.PIPE elif isinstance(inp, subprocess.Popen): - self.debug("Piping input") + self.debug("PIPE to:") stdin = inp.stdout else: raise (Exception("Program error: Incompatible input")) if self.readonly and not readonly: - # todo: what happens if input is piped? - return + self.debug("CMDSKIP> " + debug_txt) + return None + else: + self.debug("CMD > " + debug_txt) - # execute and parse/return results + if self.readonly and not readonly: + return None + + # create pipe p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) # Note: make streaming? @@ -141,9 +125,80 @@ class ExecuteNode(LogStub): if p.stdin: p.stdin.close() - # return pipe - if pipe: - return p + return p + + def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False, + return_stderr=False): + """run a command on the node , checks output and parses/handle output and returns it + + :param cmd: the actual command, should be a list, where the first item is the command + and the rest are parameters. + :param inp: Can be None, a string or a pipe-handle you got from get_pipe() + :param tab_split: split tabbed files in output into a list + :param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe) + Use [] to accept all exit codes. + :param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode + :param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors) + :param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout) + + """ + + if valid_exitcodes is None: + valid_exitcodes = [0] + + # encoded_cmd = self._encode_cmd(cmd) + # + # # debug and test stuff + # debug_txt = "" + # for c in encoded_cmd: + # debug_txt = debug_txt + " " + c.decode() + # + # if pipe: + # debug_txt = debug_txt + " |" + # + # if self.readonly and not readonly: + # self.debug("SKIP > " + debug_txt) + # else: + # if pipe: + # self.debug("PIPE > " + debug_txt) + # else: + # self.debug("RUN > " + debug_txt) + # + # # determine stdin + # if inp is None: + # # NOTE: Not None, otherwise it reads stdin from terminal! + # stdin = subprocess.PIPE + # elif isinstance(inp, str) or type(inp) == 'unicode': + # self.debug("INPUT > \n" + inp.rstrip()) + # stdin = subprocess.PIPE + # elif isinstance(inp, subprocess.Popen): + # self.debug("Piping input") + # stdin = inp.stdout + # else: + # raise (Exception("Program error: Incompatible input")) + + # if self.readonly and not readonly: + # # todo: what happens if input is piped? + # return + # + # # execute and parse/return results + # p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) + + # # Note: make streaming? + # if isinstance(inp, str) or type(inp) == 'unicode': + # p.stdin.write(inp.encode('utf-8')) + # + # if p.stdin: + # p.stdin.close() + # + # # return pipe + # if pipe: + # return p + + p = self.get_pipe(cmd, inp=inp, readonly=readonly) + + if p is None: + return None # handle all outputs if isinstance(inp, subprocess.Popen): @@ -206,9 +261,24 @@ class ExecuteNode(LogStub): raise (subprocess.CalledProcessError(inp.returncode, "(pipe)")) if valid_exitcodes and p.returncode not in valid_exitcodes: - raise (subprocess.CalledProcessError(p.returncode, encoded_cmd)) + raise (subprocess.CalledProcessError(p.returncode, self._encode_cmd(cmd))) if return_stderr: return output_lines, error_lines else: return output_lines + + # def run_pipe(self, cmds, *args, **kwargs): + # """run a array of commands piped together. """ + # + # #i + # if self.zfs_node.is_local(): + # output_pipe = self.zfs_node.run(cmd, pipe=True) + # for pipe_cmd in output_pipes: + # output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, pipe=True, ) + # return output_pipe + # #remote, so add with actual | and let remote shell handle it + # else: + # for pipe_cmd in output_pipes: + # cmd.append("|") + # cmd.extend(pipe_cmd) diff --git a/zfs_autobackup/ZfsAutobackup.py b/zfs_autobackup/ZfsAutobackup.py index 8d5cbb1..b384af3 100644 --- a/zfs_autobackup/ZfsAutobackup.py +++ b/zfs_autobackup/ZfsAutobackup.py @@ -104,11 +104,11 @@ class ZfsAutobackup: help='show zfs progress output. Enabled automaticly on ttys. (use --no-progress to disable)') parser.add_argument('--no-progress', action='store_true', help=argparse.SUPPRESS) # needed to workaround a zfs recv -v bug - parser.add_argument('--output-pipe', metavar="COMMAND", default=[], action='append', - help='add zfs send output pipe command') + parser.add_argument('--send-pipe', metavar="COMMAND", default=[], action='append', + help='pipe zfs send output through COMMAND') - parser.add_argument('--input-pipe', metavar="COMMAND", default=[], action='append', - help='add zfs recv input pipe command') + parser.add_argument('--recv-pipe', metavar="COMMAND", default=[], action='append', + help='pipe zfs recv input through COMMAND') # note args is the only global variable we use, since its a global readonly setting anyway args = parser.parse_args(argv) @@ -222,7 +222,11 @@ class ZfsAutobackup: # NOTE: this method also uses self.args. args that need extra processing are passed as function parameters: def sync_datasets(self, source_node, source_datasets, target_node): - """Sync datasets, or thin-only on both sides""" + """Sync datasets, or thin-only on both sides + :type target_node: ZfsNode + :type source_datasets: list of ZfsDataset + :type source_node: ZfsNode + """ fail_count = 0 target_datasets = [] @@ -255,7 +259,7 @@ class ZfsAutobackup: raw=self.args.raw, also_other_snapshots=self.args.other_snapshots, no_send=self.args.no_send, destroy_incompatible=self.args.destroy_incompatible, - no_thinning=self.args.no_thinning) + no_thinning=self.args.no_thinning, output_pipes=self.args.send_pipe, input_pipes=self.args.recv_pipe) except Exception as e: fail_count = fail_count + 1 source_dataset.error("FAILED: " + str(e)) @@ -383,6 +387,7 @@ class ZfsAutobackup: source_datasets=source_datasets, target_node=target_node) + #no target specified, run in snapshot-only mode else: if not self.args.no_thinning: self.thin_source(source_datasets) @@ -390,7 +395,7 @@ class ZfsAutobackup: if not fail_count: if self.args.test: - self.set_title("All tests successfull.") + self.set_title("All tests successful.") else: self.set_title("All operations completed successfully") if not self.args.target_path: diff --git a/zfs_autobackup/ZfsDataset.py b/zfs_autobackup/ZfsDataset.py index 1d987c2..61f442b 100644 --- a/zfs_autobackup/ZfsDataset.py +++ b/zfs_autobackup/ZfsDataset.py @@ -393,6 +393,7 @@ class ZfsDataset: ZfsDataset ) Args: + :rtype: ZfsDataset :type snapshot: str or ZfsDataset """ @@ -492,13 +493,14 @@ class ZfsDataset: return self.from_names(names[1:]) - def send_pipe(self, features, prev_snapshot=None, resume_token=None, show_progress=False, raw=False): + def send_pipe(self, features, prev_snapshot, resume_token, show_progress, raw, output_pipes): """returns a pipe with zfs send output for this snapshot resume_token: resume sending from this token. (in that case we don't need to know snapshot names) Args: + :type output_pipes: list of str :type features: list of str :type prev_snapshot: ZfsDataset :type resume_token: str @@ -549,8 +551,22 @@ class ZfsDataset: cmd.append(self.name) - # NOTE: this doesn't start the send yet, it only returns a subprocess.Pipe - return self.zfs_node.run(cmd, pipe=True) + # #add custom output pipes? + # if output_pipes: + # #local so do our own piping + # if self.zfs_node.is_local(): + # output_pipe = self.zfs_node.run(cmd) + # for pipe_cmd in output_pipes: + # output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, ) + # return output_pipe + # #remote, so add with actual | and let remote shell handle it + # else: + # for pipe_cmd in output_pipes: + # cmd.append("|") + # cmd.extend(pipe_cmd) + + return self.zfs_node.get_pipe(cmd) + def recv_pipe(self, pipe, features, filter_properties=None, set_properties=None, ignore_exit_code=False): """starts a zfs recv for this snapshot and uses pipe as input @@ -618,15 +634,17 @@ class ZfsDataset: self.error("error during transfer") raise (Exception("Target doesn't exist after transfer, something went wrong.")) - def transfer_snapshot(self, target_snapshot, features, prev_snapshot=None, show_progress=False, - filter_properties=None, set_properties=None, ignore_recv_exit_code=False, resume_token=None, - raw=False): + def transfer_snapshot(self, target_snapshot, features, prev_snapshot, show_progress, + filter_properties, set_properties, ignore_recv_exit_code, resume_token, + raw, output_pipes, input_pipes): """transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer connects a send_pipe() to recv_pipe() Args: + :type output_pipes: list of str + :type input_pipes: list of str :type target_snapshot: ZfsDataset :type features: list of str :type prev_snapshot: ZfsDataset @@ -657,7 +675,7 @@ class ZfsDataset: # do it pipe = self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot, - resume_token=resume_token, raw=raw) + resume_token=resume_token, raw=raw, output_pipes=output_pipes) target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) @@ -898,6 +916,7 @@ class ZfsDataset: """plan where to start syncing and what to sync and what to keep Args: + :rtype: ( ZfsDataset, ZfsDataset, list of ZfsDataset, list of ZfsDataset, list of ZfsDataset, list of ZfsDataset ) :type target_dataset: ZfsDataset :type also_other_snapshots: bool """ @@ -945,11 +964,13 @@ class ZfsDataset: def sync_snapshots(self, target_dataset, features, show_progress, filter_properties, set_properties, ignore_recv_exit_code, holds, rollback, raw, also_other_snapshots, - no_send, destroy_incompatible, no_thinning): + no_send, destroy_incompatible, no_thinning, output_pipes, input_pipes): """sync this dataset's snapshots to target_dataset, while also thinning out old snapshots along the way. Args: + :type output_pipes: list of str + :type input_pipes: list of str :type target_dataset: ZfsDataset :type features: list of str :type show_progress: bool @@ -1000,13 +1021,14 @@ class ZfsDataset: if target_snapshot not in target_obsoletes: # NOTE: should we let transfer_snapshot handle this? (allowed_filter_properties, allowed_set_properties) = self.get_allowed_properties(filter_properties, - set_properties) + set_properties) source_snapshot.transfer_snapshot(target_snapshot, features=features, prev_snapshot=prev_source_snapshot, show_progress=show_progress, filter_properties=allowed_filter_properties, set_properties=allowed_set_properties, ignore_recv_exit_code=ignore_recv_exit_code, - resume_token=resume_token, raw=raw) + resume_token=resume_token, raw=raw, output_pipes=output_pipes, input_pipes=input_pipes) + resume_token = None # hold the new common snapshots and release the previous ones