This commit is contained in:
Edwin Eefting 2019-10-21 13:08:50 +02:00
parent 04cc860db3
commit 058a189aa5

View File

@ -116,13 +116,15 @@ class ExecuteNode:
"""an endpoint to execute local or remote commands via ssh"""
def __init__(self, ssh_to=None, readonly=False):
def __init__(self, ssh_to=None, readonly=False, debug_output=False):
"""ssh_to: server you want to ssh to. none means local
readonly: only execute commands that dont make any changes (usefull for testing-runs)
debug_output: show output and exit codes of commands in debugging output.
"""
self.ssh_to=ssh_to
self.readonly=readonly
self.debug_output=debug_output
def run(self, cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], readonly=False, hide_errors=False, pipe=False):
@ -202,7 +204,8 @@ class ExecuteNode:
line=p.stdout.readline()
if line!="":
output_lines.append(line.rstrip())
self.debug("STDOUT > "+line.rstrip())
if self.debug_output:
self.debug("STDOUT > "+line.rstrip())
else:
eof_count=eof_count+1
if p.stderr in read_ready:
@ -228,34 +231,20 @@ class ExecuteNode:
if p.poll()!=None and ((not isinstance(input, subprocess.Popen)) or input.poll()!=None) and eof_count==len(selectors):
break
# if isinstance(input, subprocess.Popen):
# (output, errors)=p.communicate()
# else:
# (output, errors)=p.communicate(input=input)
#handle piped process error output and exit codes
if isinstance(input, subprocess.Popen):
# pipe_outputs=input.communicate()
# if not hide_errors:
# for line in pipe_outputs[1].splitlines():
# self.error("Pipe-error: "+line)
self.debug("EXIT |> {}".format(input.returncode))
if self.debug_output:
self.debug("EXIT |> {}".format(input.returncode))
if input.returncode not in valid_exitcodes:
raise(subprocess.CalledProcessError(input.returncode, "(pipe)"))
#handle error output and exit codes
# if not hide_errors:
# for line in errors.splitlines():
# self.error(line)
self.debug("EXIT > {}".format(p.returncode))
if self.debug_output:
self.debug("EXIT > {}".format(p.returncode))
if p.returncode not in valid_exitcodes:
raise(subprocess.CalledProcessError(p.returncode, encoded_cmd))
# lines=output.splitlines()
if not tab_split:
return(output_lines)
else:
@ -375,7 +364,7 @@ class ZfsDataset():
names=self.zfs_node.run(cmd=cmd, readonly=True)
return(self.from_names(names))
@cached_property
@property
def our_snapshots(self):
"""get list of snapshots creates by us of this dataset"""
self.debug("Getting our snapshots")
@ -527,50 +516,47 @@ class ZfsDataset():
connects a send_pipe() to recv_pipe()
"""
self.debug("Transfer snapshot")
#initial or resume
if not prev_snapshot or resume_token:
if resume_token:
target_dataset.verbose("receiving resumed") #we dont know which one XXX: fix, we need to know for testmode
else:
target_dataset.verbose("receiving @{}".format(self.snapshot_name))
pipe=self.send_pipe(resume=resume, resume_token=resume_token, show_progress=show_progress)
else:
#incemental
target_dataset.verbose("receiving @{}...@{}".format(prev_snapshot.snapshot_name, self.snapshot_name))
pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot)
#do it
target_dataset.recv_pipe(pipe)
self.debug("Transfer snapshot to {}".format(target_dataset))
if resume_token:
#we dont know which snapshot it was, so invalidate cache
target_dataset.invalidate()
target_dataset.verbose("resuming")
#initial or resume
if not prev_snapshot:
target_dataset.verbose("receiving @{} (new)".format(self.snapshot_name))
else:
#update cache
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
#incemental
target_dataset.verbose("receiving @{}".format(self.snapshot_name))
#do it
pipe=self.send_pipe(resume=resume, show_progress=show_progress, resume_token=resume_token, prev_snapshot=prev_snapshot)
target_dataset.recv_pipe(pipe)
#update cache
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name+"@"+self.snapshot_name))
def sync_snapshots(self, target_dataset, show_progress=False):
"""sync our snapshots to target_dataset"""
self.debug("Sync snapshots")
# inital transfer
resume_token=None
if not target_dataset.exists:
self.debug("Initial transfer")
self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=True)
self.debug("Sync snapshots: Initial transfer")
self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress)
else:
#on resuming we dont need to know anything, the token is enough
#filesystem exists, need to resume something?
if 'receive_resume_token' in target_dataset.properties:
self.debug("Resume transfer")
self.transfer_snapshot(target_dataset, resume_token=target_dataset.properties['receive_resume_token'], show_progress=show_progress)
self.debug("Sync snapshots: Found resume token")
resume_token=target_dataset.properties['receive_resume_token']
self.debug("Incremental transfer")
#resume initial transfer?
if len(target_dataset.our_snapshots)==0:
self.debug("Sync snapshots: Resuming inital transfer")
self.our_snapshots[0].transfer_snapshot(target_dataset, show_progress=show_progress, resume_token=resume_token)
resume_token=None
self.debug("Sync snapshots: Incremental transfer")
latest_common_snapshot=None
for source_snapshot in self.our_snapshots:
target_snapshot=target_dataset.find_snapshot(source_snapshot.snapshot_name)
@ -580,7 +566,8 @@ class ZfsDataset():
else:
if latest_common_snapshot:
#transfer it
source_snapshot.transfer_snapshot(target_dataset, latest_common_snapshot, show_progress=True)
source_snapshot.transfer_snapshot(target_dataset, latest_common_snapshot, show_progress=True, resume_token=resume_token)
resume_token=None
latest_common_snapshot=source_snapshot
if not latest_common_snapshot:
@ -591,7 +578,7 @@ class ZfsDataset():
class ZfsNode(ExecuteNode):
"""a node that contains zfs datasets. implements global lowlevel zfs commands"""
def __init__(self, backup_name, zfs_autobackup, ssh_to=None, readonly=False, description=""):
def __init__(self, backup_name, zfs_autobackup, ssh_to=None, readonly=False, description="", debug_output=False):
self.backup_name=backup_name
if not description:
self.description=ssh_to
@ -600,7 +587,7 @@ class ZfsNode(ExecuteNode):
self.zfs_autobackup=zfs_autobackup #for logging
ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly)
ExecuteNode.__init__(self, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output)
def verbose(self,txt):
self.zfs_autobackup.verbose("{} {}".format(self.description, txt))
@ -726,13 +713,18 @@ class ZfsAutobackup:
parser.add_argument('--test', action='store_true', help='dont change anything, just show what would be done (still does all read-only operations)')
parser.add_argument('--verbose', action='store_true', help='verbose output')
parser.add_argument('--debug', action='store_true', help='debug output (shows commands that are executed)')
parser.add_argument('--debug', action='store_true', help='Show zfs commands that are executed.')
parser.add_argument('--debug-output', action='store_true', help='Show zfs commands and their output/exit codes. (noisy)')
parser.add_argument('--progress', action='store_true', help='show zfs progress output (to stderr)')
#note args is the only global variable we use, since its a global readonly setting anyway
args = parser.parse_args()
self.args=args
if args.debug_output:
args.debug=True
self.log=Log(show_debug=self.args.debug, show_verbose=self.args.verbose)
@ -751,10 +743,10 @@ class ZfsAutobackup:
def run(self):
description="[Source]"
source_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_source, readonly=self.args.test, description=description)
source_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_source, readonly=self.args.test, debug_output=self.args.debug_output, description=description)
description="[Target]"
target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, description=description)
target_node=ZfsNode(self.args.backup_name, self, ssh_to=self.args.ssh_target, readonly=self.args.test, debug_output=self.args.debug_output, description=description)
# target_node.run(["/root/outputtest"], readonly=True)
self.set_title("Selecting")