From 058a189aa5f84c7c8261c6e72d189ce5558ed2b6 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Mon, 21 Oct 2019 13:08:50 +0200 Subject: [PATCH] wip --- zfs_autobackup | 108 +++++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 58 deletions(-) diff --git a/zfs_autobackup b/zfs_autobackup index 27ae4a9..1af432a 100755 --- a/zfs_autobackup +++ b/zfs_autobackup @@ -116,13 +116,15 @@ class ExecuteNode: """an endpoint to execute local or remote commands via ssh""" - def __init__(self, ssh_to=None, readonly=False): + def __init__(self, ssh_to=None, readonly=False, debug_output=False): """ssh_to: server you want to ssh to. none means local readonly: only execute commands that dont make any changes (usefull for testing-runs) + debug_output: show output and exit codes of commands in debugging output. """ self.ssh_to=ssh_to self.readonly=readonly + self.debug_output=debug_output def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False, pipe=False): @@ -202,7 +204,8 @@ class ExecuteNode: line=p.stdout.readline() if line!="": output_lines.append(line.rstrip()) - self.debug("STDOUT > "+line.rstrip()) + if self.debug_output: + self.debug("STDOUT > "+line.rstrip()) else: eof_count=eof_count+1 if p.stderr in read_ready: @@ -228,34 +231,20 @@ class ExecuteNode: if p.poll()!=None and ((not isinstance(input, subprocess.Popen)) or input.poll()!=None) and eof_count==len(selectors): break - - # if isinstance(input, subprocess.Popen): - # (output, errors)=p.communicate() - # else: - # (output, errors)=p.communicate(input=input) - - #handle piped process error output and exit codes if isinstance(input, subprocess.Popen): - # pipe_outputs=input.communicate() - # if not hide_errors: - # for line in pipe_outputs[1].splitlines(): - # self.error("Pipe-error: "+line) - self.debug("EXIT |> {}".format(input.returncode)) + if self.debug_output: + self.debug("EXIT |> {}".format(input.returncode)) if input.returncode not in valid_exitcodes: raise(subprocess.CalledProcessError(input.returncode, "(pipe)")) - #handle error output and exit codes - # if not hide_errors: - # for line in errors.splitlines(): - # self.error(line) - self.debug("EXIT > {}".format(p.returncode)) + if self.debug_output: + self.debug("EXIT > {}".format(p.returncode)) if p.returncode not in valid_exitcodes: raise(subprocess.CalledProcessError(p.returncode, encoded_cmd)) - # lines=output.splitlines() if not tab_split: return(output_lines) else: @@ -375,7 +364,7 @@ class ZfsDataset(): names=self.zfs_node.run(cmd=cmd, readonly=True) return(self.from_names(names)) - @cached_property + @property def our_snapshots(self): """get list of snapshots creates by us of this dataset""" self.debug("Getting our snapshots") @@ -527,50 +516,47 @@ class ZfsDataset(): connects a send_pipe() to recv_pipe() """ - self.debug("Transfer snapshot") - - #initial or resume - if not prev_snapshot or resume_token: - if resume_token: - target_dataset.verbose("receiving resumed") #we dont know which one XXX: fix, we need to know for testmode - else: - target_dataset.verbose("receiving @{}".format(self.snapshot_name)) - - pipe=self.send_pipe(resume=resume, resume_token=resume_token, show_progress=show_progress) - - else: - #incemental - target_dataset.verbose("receiving @{}...@{}".format(prev_snapshot.snapshot_name, self.snapshot_name)) - pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot) - - #do it - target_dataset.recv_pipe(pipe) - + self.debug("Transfer snapshot to {}".format(target_dataset)) if resume_token: - #we dont know which snapshot it was, so invalidate cache - target_dataset.invalidate() + target_dataset.verbose("resuming") + + #initial or resume + if not prev_snapshot: + target_dataset.verbose("receiving @{} (new)".format(self.snapshot_name)) else: - #update cache - target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name)) + #incemental + target_dataset.verbose("receiving @{}".format(self.snapshot_name)) + + #do it + pipe=self.send_pipe(resume=resume, show_progress=show_progress, resume_token=resume_token, prev_snapshot=prev_snapshot) + target_dataset.recv_pipe(pipe) + + #update cache + target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name)) def sync_snapshots(self, target_dataset, show_progress=False): """sync our snapshots to target_dataset""" - self.debug("Sync snapshots") - # inital transfer + resume_token=None if not target_dataset.exists: - self.debug("Initial transfer") - self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=True) + self.debug("Sync snapshots: Initial transfer") + self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress) else: - #on resuming we dont need to know anything, the token is enough + #filesystem exists, need to resume something? if 'receive_resume_token' in target_dataset.properties: - self.debug("Resume transfer") - self.transfer_snapshot(target_dataset, resume_token=target_dataset.properties['receive_resume_token'], show_progress=show_progress) + self.debug("Sync snapshots: Found resume token") + resume_token=target_dataset.properties['receive_resume_token'] - self.debug("Incremental transfer") + #resume initial transfer? + if len(target_dataset.our_snapshots)==0: + self.debug("Sync snapshots: Resuming inital transfer") + self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress, resume_token=resume_token) + resume_token=None + + self.debug("Sync snapshots: Incremental transfer") latest_common_snapshot=None for source_snapshot in self.our_snapshots: target_snapshot=target_dataset.find_snapshot(source_snapshot.snapshot_name) @@ -580,7 +566,8 @@ class ZfsDataset(): else: if latest_common_snapshot: #transfer it - source_snapshot.transfer_snapshot(target_dataset, latest_common_snapshot, show_progress=True) + source_snapshot.transfer_snapshot(target_dataset, latest_common_snapshot, show_progress=True, resume_token=resume_token) + resume_token=None latest_common_snapshot=source_snapshot if not latest_common_snapshot: @@ -591,7 +578,7 @@ class ZfsDataset(): class ZfsNode(ExecuteNode): """a node that contains zfs datasets. implements global lowlevel zfs commands""" - def __init__(self, backup_name, zfs_autobackup, ssh_to=None, readonly=False, description=""): + def __init__(self, backup_name, zfs_autobackup, ssh_to=None, readonly=False, description="", debug_output=False): self.backup_name=backup_name if not description: self.description=ssh_to @@ -600,7 +587,7 @@ class ZfsNode(ExecuteNode): self.zfs_autobackup=zfs_autobackup #for logging - ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly) + ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output) def verbose(self,txt): self.zfs_autobackup.verbose("{} {}".format(self.description, txt)) @@ -726,13 +713,18 @@ class ZfsAutobackup: parser.add_argument('--test', action='store_true', help='dont change anything, just show what would be done (still does all read-only operations)') parser.add_argument('--verbose', action='store_true', help='verbose output') - parser.add_argument('--debug', action='store_true', help='debug output (shows commands that are executed)') + parser.add_argument('--debug', action='store_true', help='Show zfs commands that are executed.') + parser.add_argument('--debug-output', action='store_true', help='Show zfs commands and their output/exit codes. (noisy)') parser.add_argument('--progress', action='store_true', help='show zfs progress output (to stderr)') #note args is the only global variable we use, since its a global readonly setting anyway args = parser.parse_args() self.args=args + + if args.debug_output: + args.debug=True + self.log=Log(show_debug=self.args.debug, show_verbose=self.args.verbose) @@ -751,10 +743,10 @@ class ZfsAutobackup: def run(self): description="[Source]" - source_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_source, readonly=self.args.test, description=description) + source_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_source, readonly=self.args.test, debug_output=self.args.debug_output, description=description) description="[Target]" - target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, description=description) + target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, debug_output=self.args.debug_output, description=description) # target_node.run(["/root/outputtest"], readonly=True) self.set_title("Selecting")