From f114114993b1b536ed5758a5f4ac55a863862d00 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Mon, 28 Oct 2019 18:50:43 +0100 Subject: [PATCH] wip --- zfs_autobackup | 104 ++++++++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 40 deletions(-) diff --git a/zfs_autobackup b/zfs_autobackup index 98d715a..ab851dc 100755 --- a/zfs_autobackup +++ b/zfs_autobackup @@ -885,7 +885,7 @@ class ZfsDataset(): # cmd.append("|mbuffer -m {}".format(args.buffer)) - def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False): + def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None): """transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer connects a send_pipe() to recv_pipe() @@ -893,8 +893,10 @@ class ZfsDataset(): self.debug("Transfer snapshot to {}".format(target_snapshot.filesystem_name)) + if resume_token: + target_snapshot.verbose("resuming") - #initial or resume + #initial or increment if not prev_snapshot: target_snapshot.verbose("receiving full".format(self.snapshot_name)) else: @@ -902,38 +904,46 @@ class ZfsDataset(): target_snapshot.verbose("receiving incremental".format(self.snapshot_name)) #do it - pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot) + pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token) target_snapshot.recv_pipe(pipe, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) + def abort_resume(self): + """abort current resume state""" + self.zfs_node.run(["zfs", "recv", "-A", self.name]) + def get_resume_snapshot(self, resume_token): """returns snapshot that will be resumed by this resume token (run this on source with target-token)""" #use zfs send -n option to determine this - lines=self.zfs_node.run([ "zfs", "send", "-t", resume_token, "-n", "-v", ], valid_exitcodes=[ 0 ]) + lines=self.zfs_node.run([ "zfs", "send", "-t", resume_token, "-n", "-v" ], valid_exitcodes=[ 0, 255 ], readonly=True) for line in lines: matches=re.findall("toname = .*@(.*)", line) if matches: snapshot_name=matches[0] - return(ZfsDataset(self.zfs_node, self.filesystem_name+"@"+snapshot_name)) + snapshot=ZfsDataset(self.zfs_node, self.filesystem_name+"@"+snapshot_name) + snapshot.debug("resume token belongs to this snapshot") + return(snapshot) + + return(None) - def resume_transfer(self, target_dataset, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False): - """resume an interrupted transfer, if there is one""" - - #resume is a kind of special case since we dont know which snapshot we are transferring. (its encoded in the resume token) - if 'receive_resume_token' in target_dataset.properties: - target_dataset.verbose("resuming") - snapshot=self.get_resume_snapshot(target_dataset.properties['receive_resume_token']) - p(snapshot) - sys.exit(1) - - #just send and recv on dataset instead of snapshot object. - pipe=self.send_pipe(show_progress=show_progress, resume_token=target_dataset.properties['receive_resume_token']) - target_dataset.recv_pipe(pipe,resume=True, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) - return(True) - - return(False) + # def resume_transfer(self, target_dataset, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False): + # """resume an interrupted transfer, if there is one""" + # + # #resume is a kind of special case since we dont know which snapshot we are transferring. (its encoded in the resume token) + # if 'receive_resume_token' in target_dataset.properties: + # target_dataset.verbose("resuming") + # snapshot=self.get_resume_snapshot(target_dataset.properties['receive_resume_token']) + # p(snapshot) + # sys.exit(1) + # + # #just send and recv on dataset instead of snapshot object. + # pipe=self.send_pipe(show_progress=show_progress, resume_token=target_dataset.properties['receive_resume_token']) + # target_dataset.recv_pipe(pipe,resume=True, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) + # return(True) + # + # return(False) def thin(self, keeps=[]): @@ -967,13 +977,12 @@ class ZfsDataset(): """sync our snapshots to target_dataset""" - #resume something first? - resumed=self.resume_transfer(target_dataset, show_progress=show_progress, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code) - if resumed: - #running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step) - if self.zfs_node.readonly and not target_dataset.our_snapshots: - target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name + "@" + self.our_snapshots[0].snapshot_name)) + # resumed=self.resume_transfer(target_dataset, show_progress=show_progress, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code) + # if resumed: + # #running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step) + # if self.zfs_node.readonly and not target_dataset.our_snapshots: + # target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name + "@" + self.our_snapshots[0].snapshot_name)) #determine start snapshot (the first snapshot after the common snapshot) target_dataset.debug("Determining start snapshot") @@ -984,20 +993,34 @@ class ZfsDataset(): else: start_snapshot=self.find_our_next_snapshot(common_snapshot) - #if something is resumed, fix the holds at this point - if resumed: - #hold the current commons, relase the previous ones - if common_snapshot: - common_snapshot.hold() - target_dataset.find_snapshot(common_snapshot).hold() + #resume? + resume_token=None + if 'receive_resume_token' in target_dataset.properties: + resume_token=target_dataset.properties['receive_resume_token'] + #not valid anymore? + resume_snapshot=self.get_resume_snapshot(resume_token) + if not resume_snapshot or start_snapshot.snapshot_name!=resume_snapshot.snapshot_name: + target_dataset.verbose("Cant resume, resume token no longer valid.") + target_dataset.abort_resume() + resume_token=None - prev_target_snapshot=target_dataset.find_our_prev_snapshot(common_snapshot) - if prev_target_snapshot: - prev_target_snapshot.release() - prev_source_snapshot=self.find_snapshot(prev_target_snapshot) - if prev_source_snapshot: - prev_source_snapshot.release() + + + # #if something is resumed, fix the holds at this point + # if resumed: + # #hold the current commons, relase the previous ones + # if common_snapshot: + # common_snapshot.hold() + # target_dataset.find_snapshot(common_snapshot).hold() + # + # prev_target_snapshot=target_dataset.find_our_prev_snapshot(common_snapshot) + # if prev_target_snapshot: + # prev_target_snapshot.release() + # + # prev_source_snapshot=self.find_snapshot(prev_target_snapshot) + # if prev_source_snapshot: + # prev_source_snapshot.release() #create virtual target snapshots @@ -1038,7 +1061,8 @@ class ZfsDataset(): #does target actually want it? if target_snapshot in target_keeps: - source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code) + source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token) + resume_token=None #hold the new common snapshots and release the previous ones target_snapshot.hold()