This commit is contained in:
Edwin Eefting 2019-10-28 18:50:43 +01:00
parent d367d9aa98
commit f114114993

View File

@ -885,7 +885,7 @@ class ZfsDataset():
# cmd.append("|mbuffer -m {}".format(args.buffer))
def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False):
def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None):
"""transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer
connects a send_pipe() to recv_pipe()
@ -893,8 +893,10 @@ class ZfsDataset():
self.debug("Transfer snapshot to {}".format(target_snapshot.filesystem_name))
if resume_token:
target_snapshot.verbose("resuming")
#initial or resume
#initial or increment
if not prev_snapshot:
target_snapshot.verbose("receiving full".format(self.snapshot_name))
else:
@ -902,38 +904,46 @@ class ZfsDataset():
target_snapshot.verbose("receiving incremental".format(self.snapshot_name))
#do it
pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot)
pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token)
target_snapshot.recv_pipe(pipe, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
def abort_resume(self):
"""abort current resume state"""
self.zfs_node.run(["zfs", "recv", "-A", self.name])
def get_resume_snapshot(self, resume_token):
"""returns snapshot that will be resumed by this resume token (run this on source with target-token)"""
#use zfs send -n option to determine this
lines=self.zfs_node.run([ "zfs", "send", "-t", resume_token, "-n", "-v", ], valid_exitcodes=[ 0 ])
lines=self.zfs_node.run([ "zfs", "send", "-t", resume_token, "-n", "-v" ], valid_exitcodes=[ 0, 255 ], readonly=True)
for line in lines:
matches=re.findall("toname = .*@(.*)", line)
if matches:
snapshot_name=matches[0]
return(ZfsDataset(self.zfs_node, self.filesystem_name+"@"+snapshot_name))
snapshot=ZfsDataset(self.zfs_node, self.filesystem_name+"@"+snapshot_name)
snapshot.debug("resume token belongs to this snapshot")
return(snapshot)
return(None)
def resume_transfer(self, target_dataset, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False):
"""resume an interrupted transfer, if there is one"""
#resume is a kind of special case since we dont know which snapshot we are transferring. (its encoded in the resume token)
if 'receive_resume_token' in target_dataset.properties:
target_dataset.verbose("resuming")
snapshot=self.get_resume_snapshot(target_dataset.properties['receive_resume_token'])
p(snapshot)
sys.exit(1)
#just send and recv on dataset instead of snapshot object.
pipe=self.send_pipe(show_progress=show_progress, resume_token=target_dataset.properties['receive_resume_token'])
target_dataset.recv_pipe(pipe,resume=True, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
return(True)
return(False)
# def resume_transfer(self, target_dataset, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False):
# """resume an interrupted transfer, if there is one"""
#
# #resume is a kind of special case since we dont know which snapshot we are transferring. (its encoded in the resume token)
# if 'receive_resume_token' in target_dataset.properties:
# target_dataset.verbose("resuming")
# snapshot=self.get_resume_snapshot(target_dataset.properties['receive_resume_token'])
# p(snapshot)
# sys.exit(1)
#
# #just send and recv on dataset instead of snapshot object.
# pipe=self.send_pipe(show_progress=show_progress, resume_token=target_dataset.properties['receive_resume_token'])
# target_dataset.recv_pipe(pipe,resume=True, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code)
# return(True)
#
# return(False)
def thin(self, keeps=[]):
@ -967,13 +977,12 @@ class ZfsDataset():
"""sync our snapshots to target_dataset"""
#resume something first?
resumed=self.resume_transfer(target_dataset, show_progress=show_progress, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code)
if resumed:
#running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step)
if self.zfs_node.readonly and not target_dataset.our_snapshots:
target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name + "@" + self.our_snapshots[0].snapshot_name))
# resumed=self.resume_transfer(target_dataset, show_progress=show_progress, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code)
# if resumed:
# #running in readonly mode and no snapshots yet? assume initial snapshot (otherwise we cant find common snapshot in next step)
# if self.zfs_node.readonly and not target_dataset.our_snapshots:
# target_dataset.snapshots.append(ZfsDataset(target_dataset.zfs_node, target_dataset.name + "@" + self.our_snapshots[0].snapshot_name))
#determine start snapshot (the first snapshot after the common snapshot)
target_dataset.debug("Determining start snapshot")
@ -984,20 +993,34 @@ class ZfsDataset():
else:
start_snapshot=self.find_our_next_snapshot(common_snapshot)
#if something is resumed, fix the holds at this point
if resumed:
#hold the current commons, relase the previous ones
if common_snapshot:
common_snapshot.hold()
target_dataset.find_snapshot(common_snapshot).hold()
#resume?
resume_token=None
if 'receive_resume_token' in target_dataset.properties:
resume_token=target_dataset.properties['receive_resume_token']
#not valid anymore?
resume_snapshot=self.get_resume_snapshot(resume_token)
if not resume_snapshot or start_snapshot.snapshot_name!=resume_snapshot.snapshot_name:
target_dataset.verbose("Cant resume, resume token no longer valid.")
target_dataset.abort_resume()
resume_token=None
prev_target_snapshot=target_dataset.find_our_prev_snapshot(common_snapshot)
if prev_target_snapshot:
prev_target_snapshot.release()
prev_source_snapshot=self.find_snapshot(prev_target_snapshot)
if prev_source_snapshot:
prev_source_snapshot.release()
# #if something is resumed, fix the holds at this point
# if resumed:
# #hold the current commons, relase the previous ones
# if common_snapshot:
# common_snapshot.hold()
# target_dataset.find_snapshot(common_snapshot).hold()
#
# prev_target_snapshot=target_dataset.find_our_prev_snapshot(common_snapshot)
# if prev_target_snapshot:
# prev_target_snapshot.release()
#
# prev_source_snapshot=self.find_snapshot(prev_target_snapshot)
# if prev_source_snapshot:
# prev_source_snapshot.release()
#create virtual target snapshots
@ -1038,7 +1061,8 @@ class ZfsDataset():
#does target actually want it?
if target_snapshot in target_keeps:
source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code)
source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token)
resume_token=None
#hold the new common snapshots and release the previous ones
target_snapshot.hold()