#!/usr/bin/env python import subprocess from dataclasses import dataclass from argparse import ArgumentParser @dataclass(frozen=True) class ZfsProp: prop: str value: str source: str D_ZFS_PROPS = dict[str, ZfsProp] def zfs_get_all(name: str) -> D_ZFS_PROPS: res = dict() s = subprocess.run( f"zfs get -H -p -o property,value,source all {name}", shell=True, check=True, capture_output=True, ) for line in s.stdout.decode().splitlines(keepends=False): p = ZfsProp(*line.split("\t")) res[p.prop] = p return res def no_backups_statement(props: D_ZFS_PROPS) -> str: referenced = int(props["referenced"].value) mounted = props["mounted"] == "yes" mountpoint = props["mountpoint"].value usedbysnapshots = int(props["usedbysnapshots"].value) if ( usedbysnapshots == 0 and mounted is False and referenced < 175_000 and mountpoint == "none" ): return "OK; A parent dataset, no data is actually stored" else: return "CHECK; Not really sure" def main(pool_name: str, only_no_backups: bool) -> int: p = subprocess.run( f"zfs list -H -o name -r {pool_name}", shell=True, capture_output=True ) if p.returncode == 1 and p.stdout == b"": print("Pool not found") return 1 if p.returncode != 0: print("return code of zfs list was not 0") return 2 print("DATASET\tBACKUP_LABEL\tVALUE\tSOURCE\tHEURISTIC") for ds_name in p.stdout.decode().splitlines(keepends=False): props = zfs_get_all(ds_name) autobackup_props: D_ZFS_PROPS = { p: props[p] for p in props.keys() if p.startswith("autobackup:") } if len(autobackup_props) > 1: raise RuntimeError( f">1 autobackup: properties on {ds_name=!r}: {autobackup_props!r}" ) elif len(autobackup_props) == 0: msg = no_backups_statement(props) print(f"{ds_name}\tno\t\t\t{msg}") else: if not only_no_backups: autobackup_prop: ZfsProp = next(iter((autobackup_props.values()))) print( f"{ds_name}\t{autobackup_prop.prop}\t{autobackup_prop.value}\t{autobackup_prop.source}" ) if __name__ == "__main__": import sys try: parser = ArgumentParser( "zfs_autobackup_explorer", usage="It's wise to pipe output to column tool to get " f"formatted output: {sys.argv[0]} | column -t -s $'\\t' | less -S", ) parser.add_argument("pool_name") parser.add_argument("--only-no-backups", action="store_true", default=False) args = parser.parse_args() exit(main(args.pool_name, args.only_no_backups)) except KeyboardInterrupt: print("exiting by CTRL+C") exit(130)