This commit is contained in:
Edwin Eefting 2017-07-30 00:52:31 +02:00
parent 441a323fb2
commit cae8ec3e70

View File

@ -23,13 +23,19 @@ def debug(txt):
print(txt)
class Node(Object):
"""an endpoint that contains zfs filesystems. can be local or remote"""
class ZfsNode:
"""an endpoint that contains zfs filesystems.
def __init__(self, ssh_to='local'):
self.backup_name=backup_name
contains lowlevel zfs wrappers for actual zfs commands on remote nodes via ssh (or local)
methods only accept and return simple dataset names, just like the real commands
"""
def __init__(self, ssh_to):
"""ssh_to: server you want to ssh to. specify 'local' to just use local commands without ssh"""
self.ssh_to=ssh_to
def run(cmd, input=None, tab_split=False, valid_exitcodes=[ 0 ], test=False):
"""run a command on the node"""
@ -87,7 +93,10 @@ class Node(Object):
def zfs_get_selected_filesystems():
"""determine filesystems that should be backupped by looking at the special autobackup-property"""
"""determine filesystems that should be backupped by looking at the special autobackup-property
return: list with dataset names
"""
#get all source filesystems that have the backup property
source_filesystems=self.run(tab_split=True, cmd=[
@ -171,225 +180,270 @@ class Node(Object):
#in testmode we dont actually make changes, so keep them in a list to simulate
if args.test:
if not ssh_to in test_snapshots:
test_snapshots[ssh_to]={}
if not filesystem in test_snapshots[ssh_to]:
test_snapshots[ssh_to][filesystem]=[]
test_snapshots[ssh_to][filesystem].append(snapshot)
if not filesystem in test_snapshots:
test_snapshots[filesystem]=[]
test_snapshots[filesystem].append(snapshot)
run(ssh_to=ssh_to, tab_split=False, cmd=cmd, test=args.test)
"""get names of all snapshots for specified filesystems belonging to backup_name
def zfs_get_snapshots(filesystems):
"""get names of all snapshots for specified filesystems belonging to backup_name
return[filesystem_name]=[ "snashot1", "snapshot2", ... ]
"""
def zfs_get_snapshots(ssh_to, filesystems, backup_name):
return[filesystem_name]=[ "snashot1", "snapshot2", ... ]
"""
ret={}
ret={}
if filesystems:
#TODO: get rid of ugly errors for non-existing target filesystems
cmd=[
"zfs", "list", "-d", "1", "-r", "-t" ,"snapshot", "-H", "-o", "name"
]
cmd.extend(filesystems)
if filesystems:
#TODO: get rid of ugly errors for non-existing target filesystems
cmd=[
"zfs", "list", "-d", "1", "-r", "-t" ,"snapshot", "-H", "-o", "name"
]
cmd.extend(filesystems)
snapshots=run(ssh_to=ssh_to, tab_split=False, cmd=cmd, valid_exitcodes=[ 0,1 ])
snapshots=run(ssh_to=ssh_to, tab_split=False, cmd=cmd, valid_exitcodes=[ 0,1 ])
for snapshot in snapshots:
(filesystem, snapshot_name)=snapshot.split("@")
if re.match("^"+backup_name+"-[0-9]*$", snapshot_name):
if not filesystem in ret:
ret[filesystem]=[]
ret[filesystem].append(snapshot_name)
for snapshot in snapshots:
(filesystem, snapshot_name)=snapshot.split("@")
if re.match("^"+backup_name+"-[0-9]*$", snapshot_name):
if not filesystem in ret:
ret[filesystem]=[]
ret[filesystem].append(snapshot_name)
#also add any test-snapshots that where created with --test mode
if args.test:
if ssh_to in test_snapshots:
for filesystem in filesystems:
if filesystem in test_snapshots[ssh_to]:
if not filesystem in ret:
ret[filesystem]=[]
ret[filesystem].extend(test_snapshots[ssh_to][filesystem])
#also add any test-snapshots that where created with --test mode
if args.test:
if ssh_to in test_snapshots:
for filesystem in filesystems:
if filesystem in test_snapshots[ssh_to]:
if not filesystem in ret:
ret[filesystem]=[]
ret[filesystem].extend(test_snapshots[ssh_to][filesystem])
return(ret)
return(ret)
"""transfer a zfs snapshot from source to target. both can be either local or via ssh.
"""transfer a zfs snapshot from source to target. both can be either local or via ssh.
TODO:
TODO:
buffering: specify buffer_size to use mbuffer (or alike) to apply buffering where neccesary
buffering: specify buffer_size to use mbuffer (or alike) to apply buffering where neccesary
local to local:
local send -> local buffer -> local receive
local to local:
local send -> local buffer -> local receive
local to remote and remote to local:
local send -> local buffer -> ssh -> remote buffer -> remote receive
remote send -> remote buffer -> ssh -> local buffer -> local receive
local to remote and remote to local:
local send -> local buffer -> ssh -> remote buffer -> remote receive
remote send -> remote buffer -> ssh -> local buffer -> local receive
remote to remote:
remote send -> remote buffer -> ssh -> local buffer -> ssh -> remote buffer -> remote receive
remote to remote:
remote send -> remote buffer -> ssh -> local buffer -> ssh -> remote buffer -> remote receive
TODO: can we string together all the zfs sends and recvs, so that we only need to use 1 ssh connection? should be faster if there are many small snaphots
TODO: can we string together all the zfs sends and recvs, so that we only need to use 1 ssh connection? should be faster if there are many small snaphots
"""
def zfs_transfer(ssh_source, source_filesystem, first_snapshot, second_snapshot,
ssh_target, target_filesystem, resume_token=None, buffer_size=None):
"""
def zfs_transfer(ssh_source, source_filesystem, first_snapshot, second_snapshot,
ssh_target, target_filesystem, resume_token=None, buffer_size=None):
#### build source command
source_cmd=[]
if ssh_source != "local":
source_cmd.extend([ "ssh", ssh_source ])
if args.ssh_cipher:
source_cmd.extend(["-c", args.ssh_cipher])
if args.compress:
source_cmd.append("-C")
source_cmd.extend(["zfs", "send", ])
#only verbose in debug mode, lots of output
if args.debug:
source_cmd.append("-v")
if not first_snapshot:
txt="Initial transfer of "+source_filesystem+" snapshot "+second_snapshot
else:
txt="Incremental transfer of "+source_filesystem+" between snapshots "+first_snapshot+"..."+second_snapshot
if resume_token:
source_cmd.extend([ "-t", resume_token ])
verbose("RESUMING "+txt)
else:
source_cmd.append("-p")
if first_snapshot:
source_cmd.extend([ "-i", first_snapshot ])
#### build source command
source_cmd=[]
if ssh_source != "local":
source_cmd.append("'" + source_filesystem + "@" + second_snapshot + "'")
source_cmd.extend([ "ssh", ssh_source ])
if args.ssh_cipher:
source_cmd.extend(["-c", args.ssh_cipher])
if args.compress:
source_cmd.append("-C")
source_cmd.extend(["zfs", "send", ])
#only verbose in debug mode, lots of output
if args.debug:
source_cmd.append("-v")
if not first_snapshot:
txt="Initial transfer of "+source_filesystem+" snapshot "+second_snapshot
else:
source_cmd.append(source_filesystem + "@" + second_snapshot)
txt="Incremental transfer of "+source_filesystem+" between snapshots "+first_snapshot+"..."+second_snapshot
verbose(txt)
if resume_token:
source_cmd.extend([ "-t", resume_token ])
verbose("RESUMING "+txt)
#### build target command
target_cmd=[]
else:
source_cmd.append("-p")
if ssh_target != "local":
target_cmd.extend([ "ssh", ssh_target ])
if args.ssh_cipher:
target_cmd.extend(["-c", args.ssh_cipher])
if args.compress:
target_cmd.append("-C")
if first_snapshot:
source_cmd.extend([ "-i", first_snapshot ])
target_cmd.extend(["zfs", "recv", "-u" ])
#also verbose in --verbose mode so we can see the transfer speed when its completed
if args.verbose or args.debug:
target_cmd.append("-v")
if args.resume:
target_cmd.append("-s")
if ssh_target!="local":
target_cmd.append("'" + target_filesystem + "'")
else:
target_cmd.append(target_filesystem)
#### make sure parent on target exists
parent_filesystem= "/".join(target_filesystem.split("/")[:-1])
run(ssh_to=ssh_target, cmd=[ "zfs", "create" ,"-p", parent_filesystem ], test=args.test)
### execute pipe
debug_txt="# "+source_cmd[0]+" '"+("' '".join(source_cmd[1:]))+"'" + " | " + target_cmd[0]+" '"+("' '".join(target_cmd[1:]))+"'"
if args.test:
debug("[TEST] "+debug_txt)
return
else:
debug(debug_txt)
source_proc=subprocess.Popen(source_cmd, env=os.environ, stdout=subprocess.PIPE)
target_proc=subprocess.Popen(target_cmd, env=os.environ, stdin=source_proc.stdout)
source_proc.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
target_proc.communicate()
if source_proc.returncode:
raise(subprocess.CalledProcessError(source_proc.returncode, source_cmd))
#zfs recv sometimes gives an exitcode 1 while the transfer was succesfull, therefore we ignore exit 1's and do an extra check to see if the snapshot is there.
if target_proc.returncode and target_proc.returncode!=1:
raise(subprocess.CalledProcessError(target_proc.returncode, target_cmd))
debug("Verifying if snapshot exists on target")
run(ssh_to=ssh_target, cmd=["zfs", "list", target_filesystem+"@"+second_snapshot ])
"""get filesystems that where already backupped to a target. """
def zfs_get_backupped_filesystems(ssh_to, backup_name, target_fs):
#get all target filesystems that have received or inherited the backup propert, under the target_fs tree
ret=run(ssh_to=ssh_to, tab_split=False, cmd=[
"zfs", "get", "-r", "-t", "volume,filesystem", "-o", "name", "-s", "received,inherited", "-H", "autobackup:"+backup_name, target_fs
])
return(ret)
"""get filesystems that where once backupped to target but are no longer selected on source
these are filesystems that are not in the list in target_filesystems.
this happens when filesystems are destroyed or unselected on the source.
"""
def get_stale_backupped_filesystems(ssh_to, backup_name, target_fs, target_filesystems):
backupped_filesystems=zfs_get_backupped_filesystems(ssh_to=ssh_to, backup_name=backup_name, target_fs=target_fs)
#determine backupped filesystems that are not in target_filesystems anymore
stale_backupped_filesystems=[]
for backupped_filesystem in backupped_filesystems:
if backupped_filesystem not in target_filesystems:
stale_backupped_filesystems.append(backupped_filesystem)
return(stale_backupped_filesystems)
now=time.time()
"""determine list of snapshot (in @format) to destroy, according to age"""
def determine_destroy_list(snapshots, days):
ret=[]
for filesystem in snapshots:
for snapshot in snapshots[filesystem]:
time_str=re.findall("^.*-([0-9]*)$", snapshot)[0]
if len(time_str)==14:
#new format:
time_secs=time.mktime(time.strptime(time_str,"%Y%m%d%H%M%S"))
if ssh_source != "local":
source_cmd.append("'" + source_filesystem + "@" + second_snapshot + "'")
else:
time_secs=int(time_str)
# verbose("time_secs"+time_str)
if (now-time_secs) > (24 * 3600 * days):
ret.append(filesystem+"@"+snapshot)
source_cmd.append(source_filesystem + "@" + second_snapshot)
return(ret)
verbose(txt)
#### build target command
target_cmd=[]
if ssh_target != "local":
target_cmd.extend([ "ssh", ssh_target ])
if args.ssh_cipher:
target_cmd.extend(["-c", args.ssh_cipher])
if args.compress:
target_cmd.append("-C")
target_cmd.extend(["zfs", "recv", "-u" ])
#also verbose in --verbose mode so we can see the transfer speed when its completed
if args.verbose or args.debug:
target_cmd.append("-v")
if args.resume:
target_cmd.append("-s")
def lstrip_path(path, count):
return("/".join(path.split("/")[count:]))
if ssh_target!="local":
target_cmd.append("'" + target_filesystem + "'")
else:
target_cmd.append(target_filesystem)
#### make sure parent on target exists
parent_filesystem= "/".join(target_filesystem.split("/")[:-1])
run(ssh_to=ssh_target, cmd=[ "zfs", "create" ,"-p", parent_filesystem ], test=args.test)
### execute pipe
debug_txt="# "+source_cmd[0]+" '"+("' '".join(source_cmd[1:]))+"'" + " | " + target_cmd[0]+" '"+("' '".join(target_cmd[1:]))+"'"
if args.test:
debug("[TEST] "+debug_txt)
return
else:
debug(debug_txt)
source_proc=subprocess.Popen(source_cmd, env=os.environ, stdout=subprocess.PIPE)
target_proc=subprocess.Popen(target_cmd, env=os.environ, stdin=source_proc.stdout)
source_proc.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
target_proc.communicate()
if source_proc.returncode:
raise(subprocess.CalledProcessError(source_proc.returncode, source_cmd))
#zfs recv sometimes gives an exitcode 1 while the transfer was succesfull, therefore we ignore exit 1's and do an extra check to see if the snapshot is there.
if target_proc.returncode and target_proc.returncode!=1:
raise(subprocess.CalledProcessError(target_proc.returncode, target_cmd))
debug("Verifying if snapshot exists on target")
run(ssh_to=ssh_target, cmd=["zfs", "list", target_filesystem+"@"+second_snapshot ])
"""get filesystems that where already backupped to a target. """
def zfs_get_backupped_filesystems(ssh_to, backup_name, target_fs):
#get all target filesystems that have received or inherited the backup propert, under the target_fs tree
ret=run(ssh_to=ssh_to, tab_split=False, cmd=[
"zfs", "get", "-r", "-t", "volume,filesystem", "-o", "name", "-s", "received,inherited", "-H", "autobackup:"+backup_name, target_fs
])
return(ret)
"""get filesystems that where once backupped to target but are no longer selected on source
these are filesystems that are not in the list in target_filesystems.
this happens when filesystems are destroyed or unselected on the source.
"""
def get_stale_backupped_filesystems(ssh_to, backup_name, target_fs, target_filesystems):
backupped_filesystems=zfs_get_backupped_filesystems(ssh_to=ssh_to, backup_name=backup_name, target_fs=target_fs)
#determine backupped filesystems that are not in target_filesystems anymore
stale_backupped_filesystems=[]
for backupped_filesystem in backupped_filesystems:
if backupped_filesystem not in target_filesystems:
stale_backupped_filesystems.append(backupped_filesystem)
return(stale_backupped_filesystems)
now=time.time()
"""determine list of snapshot (in @format) to destroy, according to age"""
def determine_destroy_list(snapshots, days):
ret=[]
for filesystem in snapshots:
for snapshot in snapshots[filesystem]:
time_str=re.findall("^.*-([0-9]*)$", snapshot)[0]
if len(time_str)==14:
#new format:
time_secs=time.mktime(time.strptime(time_str,"%Y%m%d%H%M%S"))
else:
time_secs=int(time_str)
# verbose("time_secs"+time_str)
if (now-time_secs) > (24 * 3600 * days):
ret.append(filesystem+"@"+snapshot)
return(ret)
def lstrip_path(path, count):
return("/".join(path.split("/")[count:]))
class ZfsDataset:
"""a generic zfs dataset"""
def __init__(name, parent, backup=false, created=false):
""" backup: should be backupped by zfs_autobackup
created: is created by zfs_autobackup (and may be destroyed by it as well)
parent: parent dataset this belongs to (none is "root")
"""
self.name=name
self.parent=parent
self.created=created
self.backup=backup
self.childs={}
class ZfsSnapshot(Dataset):
"""A zfs snapshot"""
def __init__(previous_snapshot=false, next_snapshot=fase, keep_time=false, timestamp=false, **kwargs, *args):
super.__init__(**kargs, *args)
self.timestamp
self.keep_time
self.previous_snapshot
self.next_snapshot
class ZfsBackupSource():
"""backup source.
contains high level backup source functions.
these work with ZfsDataset and ZfsSnapshot objects.
"""
def __init__(self):
self.node=ZfsNode(ssh_to=args.ssh_to)
self.datasets={}
self.snapshots={}
def refresh():
"""refresh all data by calling various zfs commands"""
selected_filesystems=self.node.zfs_get_selected_filesystems()