From 3bc37d143ce7fb565a16e1483de9b710874914cf Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 17 May 2020 21:49:58 +0200 Subject: [PATCH] take into account pool features and supported options (fixes regressions). also automaticly enable resume if its supported --- .gitignore | 2 + bin/zfs-autobackup | 126 ++++++++++++++++++++++++++++++++++++++------- run_tests | 3 +- 3 files changed, 110 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index c726c6c..32056fa 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ zfs_autobackup.egg-info .eggs/ __pycache__ .coverage +*.pyc + diff --git a/bin/zfs-autobackup b/bin/zfs-autobackup index 714e936..1fba88f 100755 --- a/bin/zfs-autobackup +++ b/bin/zfs-autobackup @@ -461,6 +461,72 @@ class ExecuteNode(Logger): return(output_lines) +class ZfsPool(): + """a zfs pool""" + + def __init__(self, zfs_node, name): + """name: name of the pool + """ + + self.zfs_node=zfs_node + self.name=name + + def __repr__(self): + return("{}: {}".format(self.zfs_node, self.name)) + + def __str__(self): + return(self.name) + + def __eq__(self, obj): + if not isinstance(obj, ZfsPool): + return(False) + + return(self.name == obj.name) + + def verbose(self,txt): + self.zfs_node.verbose("zpool {}: {}".format(self.name, txt)) + + def error(self,txt): + self.zfs_node.error("zpool {}: {}".format(self.name, txt)) + + def debug(self,txt): + self.zfs_node.debug("zpool {}: {}".format(self.name, txt)) + + + @cached_property + def properties(self): + """all zpool properties""" + + self.debug("Getting zpool properties") + + cmd=[ + "zpool", "get", "-H", "-o", "property,value", "-p", "all", self.name + ] + + + ret={} + + for pair in self.zfs_node.run(tab_split=True, cmd=cmd, readonly=True, valid_exitcodes=[ 0 ]): + if len(pair)==2: + ret[pair[0]]=pair[1] + + return(ret) + + @property + def features(self): + """get list of active zpool features""" + + ret=[] + for (key,value) in self.properties.items(): + if key.startswith("feature@"): + feature=key.split("@")[1] + if value=='enabled' or value=='active': + ret.append(feature) + + return(ret) + + + @@ -842,10 +908,9 @@ class ZfsDataset(): return(self.from_names(names[1:])) - def send_pipe(self, prev_snapshot=None, resume=True, resume_token=None, show_progress=False, raw=False): + def send_pipe(self, features, prev_snapshot=None, resume_token=None, show_progress=False, raw=False): """returns a pipe with zfs send output for this snapshot - resume: Use resuming (both sides need to support it) resume_token: resume sending from this token. (in that case we don't need to know snapshot names) """ @@ -854,16 +919,17 @@ class ZfsDataset(): cmd.extend(["zfs", "send", ]) - # #all kind of performance options: - # if "-L" in self.zfs_node.supported_send_options: - # cmd.append("-L") # large block support + #all kind of performance options: + if 'large_blocks' in features and "-L" in self.zfs_node.supported_send_options: + cmd.append("-L") # large block support (only if recordsize>128k which is seldomly used) - # if "-e" in self.zfs_node.supported_send_options: - # cmd.append("-e") # WRITE_EMBEDDED, more compact stream + if 'embedded_data' in features and "-e" in self.zfs_node.supported_send_options: + cmd.append("-e") # WRITE_EMBEDDED, more compact stream - # if "-c" in self.zfs_node.supported_send_options: - # cmd.append("-c") # use compressed WRITE records + if "-c" in self.zfs_node.supported_send_options: + cmd.append("-c") # use compressed WRITE records + #NOTE: performance is usually worse with this option, according to manual # if not resume: # if "-D" in self.zfs_node.supported_send_options: # cmd.append("-D") # dedupped stream, sends less duplicate data @@ -901,11 +967,11 @@ class ZfsDataset(): return(self.zfs_node.run(cmd, pipe=True)) - def recv_pipe(self, pipe, resume=True, filter_properties=[], set_properties=[], ignore_exit_code=False): + def recv_pipe(self, pipe, features, filter_properties=[], set_properties=[], ignore_exit_code=False): """starts a zfs recv for this snapshot and uses pipe as input - note: you can also call both a snapshot and filesystem object. - the resulting zfs command is the same, only our object cache is invalidated differently. + note: you can it both on a snapshot or filesystem object. + The resulting zfs command is the same, only our object cache is invalidated differently. """ #### build target command cmd=[] @@ -924,8 +990,9 @@ class ZfsDataset(): #verbose output cmd.append("-v") - if resume: + if 'extensible_dataset' in features: #support resuming + self.debug("Enabled resume support") cmd.append("-s") cmd.append(self.filesystem_name) @@ -954,7 +1021,7 @@ class ZfsDataset(): # cmd.append("|mbuffer -m {}".format(args.buffer)) - def transfer_snapshot(self, target_snapshot, prev_snapshot=None, resume=True, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None, raw=False): + def transfer_snapshot(self, target_snapshot, features, prev_snapshot=None, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, resume_token=None, raw=False): """transfer this snapshot to target_snapshot. specify prev_snapshot for incremental transfer connects a send_pipe() to recv_pipe() @@ -973,8 +1040,8 @@ class ZfsDataset(): target_snapshot.verbose("receiving incremental".format(self.snapshot_name)) #do it - pipe=self.send_pipe(resume=resume, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token, raw=raw) - target_snapshot.recv_pipe(pipe, resume=resume, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) + pipe=self.send_pipe(features=features, show_progress=show_progress, prev_snapshot=prev_snapshot, resume_token=resume_token, raw=raw) + target_snapshot.recv_pipe(pipe, features=features, filter_properties=filter_properties, set_properties=set_properties, ignore_exit_code=ignore_recv_exit_code) def abort_resume(self): """abort current resume state""" @@ -1108,7 +1175,7 @@ class ZfsDataset(): - def sync_snapshots(self, target_dataset, show_progress=False, resume=True, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, source_holds=True, rollback=False, raw=False, other_snapshots=False, no_send=False, destroy_incompatible=False): + def sync_snapshots(self, target_dataset, features, show_progress=False, filter_properties=[], set_properties=[], ignore_recv_exit_code=False, source_holds=True, rollback=False, raw=False, other_snapshots=False, no_send=False, destroy_incompatible=False): """sync this dataset's snapshots to target_dataset, while also thinning out old snapshots along the way.""" #determine common and start snapshot @@ -1206,7 +1273,7 @@ class ZfsDataset(): #does target actually want it? if target_snapshot not in target_obsoletes: ( allowed_filter_properties, allowed_set_properties ) = self.get_allowed_properties(filter_properties, set_properties) #NOTE: should we let transfer_snapshot handle this? - source_snapshot.transfer_snapshot(target_snapshot, prev_snapshot=prev_source_snapshot, show_progress=show_progress, resume=resume, filter_properties=allowed_filter_properties, set_properties=allowed_set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token, raw=raw) + source_snapshot.transfer_snapshot(target_snapshot, features=features, prev_snapshot=prev_source_snapshot, show_progress=show_progress, filter_properties=allowed_filter_properties, set_properties=allowed_set_properties, ignore_recv_exit_code=ignore_recv_exit_code, resume_token=resume_token, raw=raw) resume_token=None #hold the new common snapshots and release the previous ones @@ -1271,9 +1338,12 @@ class ZfsNode(ExecuteNode): self.thinner=thinner + #list of ZfsPools + self.__pools={} ExecuteNode.__init__(self, ssh_config=ssh_config, ssh_to=ssh_to, readonly=readonly, debug_output=debug_output) + @cached_property def supported_send_options(self): """list of supported options, for optimizing sends""" @@ -1297,6 +1367,13 @@ class ZfsNode(ExecuteNode): return True + #TODO: also create a get_zfs_dataset() function that stores all the objects in a dict. This should optimize caching a bit and is more consistent. + def get_zfs_pool(self, name): + """get a ZfsPool() object from specified name. stores objects internally to enable caching""" + + return(self.__pools.setdefault(name, ZfsPool(self, name))) + + def reset_progress(self): """reset progress output counters""" self._progress_total_bytes=0 @@ -1474,7 +1551,7 @@ class ZfsAutobackup: #not sure if this ever was useful: # parser.add_argument('--ignore-new', action='store_true', help='Ignore filesystem if there are already newer snapshots for it on the target (use with caution)') - parser.add_argument('--resume', action='store_true', help='Support resuming of interrupted transfers by using the zfs extensible_dataset feature (both zpools should have it enabled) Disadvantage is that you need to use zfs recv -A if another snapshot is created on the target during a receive. Otherwise it will keep failing.') + parser.add_argument('--resume', action='store_true', help=argparse.SUPPRESS) parser.add_argument('--strip-path', default=0, type=int, help='Number of directory to strip from path (use 1 when cloning zones between 2 SmartOS machines)') # parser.add_argument('--buffer', default="", help='Use mbuffer with specified size to speedup zfs transfer. (e.g. --buffer 1G) Will also show nice progress output.') @@ -1518,6 +1595,9 @@ class ZfsAutobackup: self.log=Log(show_debug=self.args.debug, show_verbose=self.args.verbose) + if args.resume: + self.verbose("NOTE: The --resume option isn't needed anymore (its autodetected now)") + def verbose(self,txt,titles=[]): self.log.verbose(txt) @@ -1617,7 +1697,13 @@ class ZfsAutobackup: if not self.args.no_send and not target_dataset.parent.exists: target_dataset.parent.create_filesystem(parents=True) - source_dataset.sync_snapshots(target_dataset, show_progress=self.args.progress, resume=self.args.resume, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=self.args.ignore_transfer_errors, source_holds= not self.args.no_holds, rollback=self.args.rollback, raw=self.args.raw, other_snapshots=self.args.other_snapshots, no_send=self.args.no_send, destroy_incompatible=self.args.destroy_incompatible) + #determine common zpool features + source_features=source_node.get_zfs_pool(source_dataset.split_path()[0]).features + target_features=target_node.get_zfs_pool(target_dataset.split_path()[0]).features + common_features=source_features and target_features + # source_dataset.debug("Common features: {}".format(common_features)) + + source_dataset.sync_snapshots(target_dataset, show_progress=self.args.progress, features=common_features, filter_properties=filter_properties, set_properties=set_properties, ignore_recv_exit_code=self.args.ignore_transfer_errors, source_holds= not self.args.no_holds, rollback=self.args.rollback, raw=self.args.raw, other_snapshots=self.args.other_snapshots, no_send=self.args.no_send, destroy_incompatible=self.args.destroy_incompatible) except Exception as e: fail_count=fail_count+1 self.error("DATASET FAILED: "+str(e)) diff --git a/run_tests b/run_tests index b5cbd9a..9e81522 100755 --- a/run_tests +++ b/run_tests @@ -1,12 +1,13 @@ #!/bin/bash + if [ "$USER" != "root" ]; then echo "Need root to do proper zfs testing" exit 1 fi #reactivate python environment, if any (usefull in Travis) -source $VIRTUAL_ENV/bin/activate || true +[ "$VIRTUAL_ENV" ] && source $VIRTUAL_ENV/bin/activate # test needs ssh access to localhost for testing if ! [ -e /root/.ssh/id_rsa ]; then