From e83c297f92fe50e6b2fea1cb3daeb80b1f2cfd07 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 20 Oct 2019 23:21:45 +0200 Subject: [PATCH] rewriting output handling --- zfs_autobackup | 127 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 35 deletions(-) diff --git a/zfs_autobackup b/zfs_autobackup index 447326d..b9031b5 100755 --- a/zfs_autobackup +++ b/zfs_autobackup @@ -11,6 +11,8 @@ import pprint import time import argparse from pprint import pprint as p +import select + import imp try: @@ -60,7 +62,7 @@ class Log: def debug(self, txt): if self.show_debug: if use_color: - print(colorama.Fore.BLUE+ "# "+txt+colorama.Style.RESET_ALL) + print(colorama.Fore.GREEN+ "# "+txt+colorama.Style.RESET_ALL) else: print("# "+txt) @@ -118,10 +120,12 @@ class ExecuteNode: self.readonly=readonly - def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False): + def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False, pipe=False): """run a command on the node readonly: make this True if the command doesnt make any changes and is safe to execute in testmode + pipe: Instead of executing, return a pipe-handle to be used to input to another run() command. (just like a | in linux) + input: Can be None, a string or a pipe-handle you got from another run() """ encoded_cmd=[] @@ -142,38 +146,106 @@ class ExecuteNode: #debug and test stuff debug_txt=" ".join(encoded_cmd) + if pipe: + debug_txt=debug_txt+" |" if self.readonly and not readonly: - self.debug("[NOT RUNNING] "+debug_txt) + self.debug("SKIP > "+debug_txt) else: - self.debug("[RUN] "+ debug_txt) + self.debug("RUN > "+ debug_txt) - if input: - self.debug("INPUT:\n"+input.rstrip()) - stdin=subprocess.PIPE - else: + #determine stdin + if input==None: stdin=None + elif isinstance(input,str): + self.debug("INPUT > \n"+input.rstrip()) + stdin=subprocess.PIPE + elif isinstance(input, subprocess.Popen): + self.debug("Piping input") + stdin=input.stdout + else: + abort("Incompatible input") if self.readonly and not readonly: + #todo: what happens if input is piped? return #execute and parse/return results p=subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE) - (output, errors)=p.communicate(input=input) - if not hide_errors: - for line in errors.splitlines(): - self.error(line) + #Note: make streaming? + if isinstance(input,str): + p.stdin.write(input) + + if pipe: + return(p) + + if isinstance(input, subprocess.Popen): + selectors=[p.stdout, p.stderr, input.stderr ] + else: + selectors=[p.stdout, p.stderr ] + + + output_lines=[] + while True: + (read_ready, write_ready, ex_ready)=select.select(selectors, [], []) + eof_count=0 + if p.stdout in read_ready: + line=p.stdout.readline() + if line!="": + output_lines.append(line.rstrip()) + self.debug("STDOUT > "+line.rstrip()) + else: + eof_count=eof_count+1 + if p.stderr in read_ready: + line=p.stderr.readline() + if line!="" and not hide_errors: + self.error("STDERR > "+line.rstrip()) + else: + eof_count=eof_count+1 + if isinstance(input, subprocess.Popen) and (input.strerr in read_ready): + line=input.stderr.readline() + if line!="" and not hide_errors: + self.error("STDERR|> "+line.rstrip()) + else: + eof_count=eof_count+1 + + + #stop if both processes are done and all filehandles are eof: + if p.poll()!=None and ((not isinstance(input, subprocess.Popen)) or input.poll()!=None) and eof_count==len(selectors): + break + + + # if isinstance(input, subprocess.Popen): + # (output, errors)=p.communicate() + # else: + # (output, errors)=p.communicate(input=input) + + + #handle piped process error output and exit codes + if isinstance(input, subprocess.Popen): + # pipe_outputs=input.communicate() + # if not hide_errors: + # for line in pipe_outputs[1].splitlines(): + # self.error("Pipe-error: "+line) + + if input.returncode not in valid_exitcodes: + raise(subprocess.CalledProcessError(input.returncode, "(pipe)")) + + #handle error output and exit codes + # if not hide_errors: + # for line in errors.splitlines(): + # self.error(line) if p.returncode not in valid_exitcodes: raise(subprocess.CalledProcessError(p.returncode, encoded_cmd)) - lines=output.splitlines() + # lines=output.splitlines() if not tab_split: - return(lines) + return(output_lines) else: ret=[] - for line in lines: + for line in output_lines: ret.append(line.split("\t")) return(ret) @@ -356,25 +428,6 @@ class ZfsDataset(): return(self.from_names(names[1:])) - # def transfer_snapshots(self, source_dataset, source_start_snapshot, source_sends): - # """transfer bunch snapshots to this target""" - # - # receive_resume_token=getattr(source_dataset.properties, 'receive_resume_token', None) - # last_snapshot=source_start_snapshot - # - # for snapshot in source_sends: - # if receive_resume_token: - # resumed="[RESUMED]" - # else: - # resumed="" - # - # if (last_snapshot): - # source_dataset.verbose("incremental @{}...@{} {}".format(last_snapshot.snapshot_name, snapshot.snapshot_name, resumed)) - # else: - # source_dataset.verbose("initial @{} {}".format(snapshot.snapshot_name, resumed)) - # - # last_snapshot=snapshot - # receive_resume_token=None def transfer_snapshot(self, target_dataset, prev_snapshot=None): """transfer this snapshot to target_dataset. specify prev_snapshot for incremental transfer""" @@ -396,6 +449,9 @@ class ZfsDataset(): else: target_dataset.verbose("receiving @{} {}".format(self.snapshot_name, resumed)) + pipe=self.zfs_node.run(["lsX"], pipe=True, readonly=True) + target_dataset.zfs_node.run(["cat"], input=pipe, readonly=True) + #update cache target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name)) @@ -591,6 +647,7 @@ class ZfsAutobackup: description="[Target]" target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, description=description) + # target_node.run(["/root/outputtest"], readonly=True) self.set_title("Selecting") source_datasets=source_node.selected_datasets @@ -617,7 +674,7 @@ class ZfsAutobackup: target_dataset=ZfsDataset(target_node, target_name) source_dataset.sync_snapshots(target_dataset) except Exception as e: - self.error(str(e)) + source_dataset.error(str(e)) if self.args.debug: raise