refactorred ZfsCheck.py for better sigpipe handling

This commit is contained in:
Edwin Eefting 2022-03-08 17:22:08 +01:00
parent 5d7d6f6a6c
commit 75161c1bd2
6 changed files with 274 additions and 197 deletions

View File

@ -18,6 +18,7 @@ if ! [ -e /root/.ssh/id_rsa ]; then
ssh -oStrictHostKeyChecking=no localhost true || exit 1
fi
umount /tmp/ZfsCheck*
coverage run --branch --source zfs_autobackup -m unittest discover -vvvvf $SCRIPTDIR $@ 2>&1
EXIT=$?

View File

@ -181,7 +181,7 @@ whole_whole2_partial 0 309ffffba2e1977d12f3b7469971f30d28b94bd8
#breaks pipe when grep exists:
#important to use --debug, since that generates extra output which would be problematic if we didnt do correct SIGPIPE handling
shelltest("python -m zfs_autobackup.ZfsCheck test_source1@test --debug | grep -m1 'Hashing tree'")
time.sleep(1)
# time.sleep(5)
#should NOT be mounted anymore if cleanup went ok:
self.assertNotRegex(shelltest("mount"), "test_source1@test")
@ -194,11 +194,11 @@ whole_whole2_partial 0 309ffffba2e1977d12f3b7469971f30d28b94bd8
#breaks pipe when grep exists:
#important to use --debug, since that generates extra output which would be problematic if we didnt do correct SIGPIPE handling
shelltest("python -m zfs_autobackup.ZfsCheck test_source1/vol@test --debug | grep -m1 'Hashing dev'")
time.sleep(1)
shelltest("python -m zfs_autobackup.ZfsCheck test_source1/vol@test --debug | grep -m1 'Hashing file'")
# time.sleep(1)
r = shelltest("zfs list -H -o name -r -t all " + TEST_POOLS)
self.assertMultiLineEqual(r, """
self.assertMultiLineEqual("""
test_source1
test_source1/fs1
test_source1/fs1/sub
@ -210,7 +210,7 @@ test_source2/fs2/sub
test_source2/fs3
test_source2/fs3/sub
test_target1
""")
""",r )

View File

@ -70,11 +70,11 @@ class BlockHasher():
with open(fname, "rb") as fh:
fsize = fh.seek(0, os.SEEK_END)
fh.seek(0, os.SEEK_END)
fsize=fh.tell()
fh.seek(0)
while fh.tell()<fsize:
chunk_nr=self._seek_next_chunk(fh, fsize)
if chunk_nr is False:
return

View File

@ -3,6 +3,7 @@ from __future__ import print_function
import time
from signal import signal, SIGPIPE
from . import util
from .TreeHasher import TreeHasher
from .BlockHasher import BlockHasher
from .ZfsNode import ZfsNode
@ -63,38 +64,23 @@ class ZfsCheck(CliBase):
return args
def generate_zfs_filesystem(self, snapshot, input_generator):
""" recursively hash all files in this snapshot, using block_hash_tree()
def prepare_zfs_filesystem(self, snapshot):
:type snapshot: ZfsDataset.ZfsDataset
"""
mnt = "/tmp/" + tmp_name()
self.debug("Create temporary mount point {}".format(mnt))
self.node.run(["mkdir", mnt])
snapshot.mount(mnt)
return mnt
try:
self.debug("Create temporary mount point {}".format(mnt))
self.node.run(["mkdir", mnt])
snapshot.mount(mnt)
tree_hasher=TreeHasher(self.block_hasher)
self.debug("Hashing tree: {}".format(mnt))
if not self.args.test:
if input_generator:
for i in tree_hasher.compare(mnt, input_generator):
yield i
else:
for i in tree_hasher.generate(mnt):
yield i
finally:
snapshot.unmount()
self.debug("Cleaning up temporary mount point")
self.node.run(["rmdir", mnt], hide_errors=True, valid_exitcodes=[])
def cleanup_zfs_filesystem(self, snapshot):
mnt = "/tmp/" + tmp_name()
snapshot.unmount()
self.debug("Cleaning up temporary mount point")
self.node.run(["rmdir", mnt], hide_errors=True, valid_exitcodes=[])
# NOTE: https://www.google.com/search?q=Mount+Path+Limit+freebsd
# Freebsd has limitations regarding path length, so we have to clone it so the part stays sort
def activate_volume_snapshot(self, snapshot):
def prepare_zfs_volume(self, snapshot):
"""clone volume, waits and tries to findout /dev path to the volume, in a compatible way. (linux/freebsd/smartos)"""
clone_name = get_tmp_clone_name(snapshot)
@ -122,92 +108,45 @@ class ZfsCheck(CliBase):
raise (Exception("Timeout while waiting for /dev entry to appear. (looking in: {})".format(locations)))
def deacitvate_volume_snapshot(self, snapshot):
def cleanup_zfs_volume(self, snapshot):
"""destroys temporary volume snapshot"""
clone_name = get_tmp_clone_name(snapshot)
clone = snapshot.zfs_node.get_dataset(clone_name)
clone.destroy(deferred=True, verbose=False)
def generate_zfs_volume(self, snapshot, input_generator):
try:
dev=self.activate_volume_snapshot(snapshot)
self.debug("Hashing dev: {}".format(dev))
if not self.args.test:
if input_generator:
for i in self.block_hasher.compare(dev, input_generator):
yield i
else:
for i in self.block_hasher.generate(dev):
yield i
finally:
self.deacitvate_volume_snapshot(snapshot)
def generate_zfs_target(self, input_generator):
"""specified arget is a ZFS snapshot"""
snapshot = self.node.get_dataset(self.args.target)
if not snapshot.exists:
raise Exception("Snapshot {} not found".format(snapshot))
if not snapshot.is_snapshot:
raise Exception("Dataset {} should be a snapshot".format(snapshot))
dataset_type = snapshot.parent.properties['type']
if dataset_type == 'volume':
return self.generate_zfs_volume(snapshot, input_generator)
elif dataset_type == 'filesystem':
return self.generate_zfs_filesystem(snapshot, input_generator)
else:
raise Exception("huh?")
def generate_path(self, input_generator=None):
def generate_tree_hashes(self, prepared_target):
tree_hasher = TreeHasher(self.block_hasher)
self.debug("Hashing tree: {}".format(prepared_target))
for i in tree_hasher.generate(prepared_target):
yield i
self.debug("Hashing tree: {}".format(self.args.target))
if input_generator:
for i in tree_hasher.compare(self.args.target, input_generator):
yield i
else:
for i in tree_hasher.generate(self.args.target):
yield i
def generate_tree_compare(self, prepared_target, input_generator=None):
def generate_file(self, input_generator=None):
tree_hasher = TreeHasher(self.block_hasher)
self.debug("Comparing tree: {}".format(prepared_target))
for i in tree_hasher.compare(prepared_target, input_generator):
yield i
def generate_file_hashes(self, prepared_target):
self.debug("Hashing file: {}".format(self.args.target))
if input_generator:
for i in self.block_hasher.compare(self.args.target, input_generator):
yield i
else:
for i in self.block_hasher.generate(self.args.target):
yield i
self.debug("Hashing file: {}".format(prepared_target))
for i in self.block_hasher.generate(prepared_target):
yield i
def generate(self, input_generator=None):
"""generate checksums or compare (and generate error messages)"""
def generate_file_compare(self, prepared_target, input_generator=None):
if '@' in self.args.target:
self.verbose("Target is a ZFS snapshot.".format(self.args.target))
return self.generate_zfs_target(input_generator)
elif os.path.isdir(self.args.target):
self.verbose("Target is a directory, checking recursively.".format(self.args.target))
return self.generate_path(input_generator)
elif os.path.exists(self.args.target):
self.verbose("Target is single file or blockdevice.".format(self.args.target))
return self.generate_file(input_generator)
else:
raise Exception("Cant open {} ".format(self.args.target))
self.debug("Comparing file: {}".format(prepared_target))
for i in self.block_hasher.compare(prepared_target, input_generator):
yield i
def input_parser(self, file_name):
"""parse input lines and generate items to use in compare functions"""
def generate_input(self):
"""parse input lines and yield items to use in compare functions"""
if self.args.check is True:
input_fh=sys.stdin
else:
input_fh=open(file_name, 'r')
input_fh=open(self.args.check, 'r')
last_progress_time = time.time()
progress_checked = 0
@ -236,52 +175,101 @@ class ZfsCheck(CliBase):
self.verbose("Checked {} hashes (skipped {})".format(progress_checked, progress_skipped))
def print_hashes(self, hash_generator):
"""prints hashes that are yielded by the specified hash_generator"""
last_progress_time = time.time()
progress_count = 0
for i in hash_generator:
if len(i) == 3:
print("{}\t{}\t{}".format(*i))
else:
print("{}\t{}".format(*i))
progress_count = progress_count + 1
if self.args.progress and time.time() - last_progress_time > 1:
last_progress_time = time.time()
self.progress("Generated {} hashes.".format(progress_count))
sys.stdout.flush()
self.verbose("Generated {} hashes.".format(progress_count))
self.clear_progress()
return 0
def print_errors(self, compare_generator):
"""prints errors that are yielded by the specified compare_generator"""
errors = 0
for i in compare_generator:
errors = errors + 1
if len(i) == 4:
(file_name, chunk_nr, compare_hexdigest, actual_hexdigest) = i
print("{}: Chunk {} failed: {} {}".format(file_name, chunk_nr, compare_hexdigest, actual_hexdigest))
else:
(chunk_nr, compare_hexdigest, actual_hexdigest) = i
print("Chunk {} failed: {} {}".format(chunk_nr, compare_hexdigest, actual_hexdigest))
sys.stdout.flush()
self.verbose("Total errors: {}".format(errors))
self.clear_progress()
return errors
def prepare_target(self):
if "@" in self.args.target:
# zfs snapshot
snapshot=self.node.get_dataset(self.args.target)
dataset_type = snapshot.parent.properties['type']
if dataset_type == 'volume':
return self.prepare_zfs_volume(snapshot)
elif dataset_type == 'filesystem':
return self.prepare_zfs_filesystem(snapshot)
else:
raise Exception("Unknown dataset type")
return self.args.target
def cleanup_target(self):
if "@" in self.args.target:
# zfs snapshot
snapshot=self.node.get_dataset(self.args.target)
dataset_type = snapshot.parent.properties['type']
if dataset_type == 'volume':
self.cleanup_zfs_volume(snapshot)
elif dataset_type == 'filesystem':
self.cleanup_zfs_filesystem(snapshot)
def run(self):
compare_generator=None
hash_generator=None
try:
last_progress_time = time.time()
progress_count = 0
#run as generator
if self.args.check==None:
for i in self.generate(input_generator=None):
if len(i)==3:
print("{}\t{}\t{}".format(*i))
else:
print("{}\t{}".format(*i))
progress_count=progress_count+1
if self.args.progress and time.time()-last_progress_time>1:
last_progress_time=time.time()
self.progress("Generated {} hashes.".format(progress_count))
sys.stdout.flush()
self.verbose("Generated {} hashes.".format(progress_count))
self.clear_progress()
return 0
prepared_target=self.prepare_target()
is_dir=os.path.isdir(prepared_target)
#run as compare
if self.args.check is not None:
input_generator=self.generate_input()
if is_dir:
compare_generator = self.generate_tree_compare(prepared_target, input_generator)
else:
compare_generator=self.generate_file_compare(prepared_target, input_generator)
errors=self.print_errors(compare_generator)
#run as generator
else:
errors=0
input_generator=self.input_parser(self.args.check)
for i in self.generate(input_generator):
errors=errors+1
if is_dir:
hash_generator = self.generate_tree_hashes(prepared_target)
else:
hash_generator=self.generate_file_hashes(prepared_target)
if len(i)==4:
(file_name, chunk_nr, compare_hexdigest, actual_hexdigest)=i
print("{}: Chunk {} failed: {} {}".format(file_name, chunk_nr, compare_hexdigest, actual_hexdigest))
else:
(chunk_nr, compare_hexdigest, actual_hexdigest) = i
print("Chunk {} failed: {} {}".format(chunk_nr, compare_hexdigest, actual_hexdigest))
sys.stdout.flush()
self.verbose("Total errors: {}".format(errors))
self.clear_progress()
return min(255,errors)
errors=self.print_hashes(hash_generator)
except Exception as e:
self.error("Exception: " + str(e))
@ -292,12 +280,26 @@ class ZfsCheck(CliBase):
self.error("Aborted")
return 255
finally:
#important to call check_output so that cleanup still functions in case of a broken pipe:
# util.check_output()
#close generators, to make sure files are not in use anymore when cleaning up
if hash_generator is not None:
hash_generator.close()
if compare_generator is not None:
compare_generator.close()
self.cleanup_target()
return errors
def cli():
import sys
signal(SIGPIPE, sigpipe_handler)
sys.exit(ZfsCheck(sys.argv[1:], False).run())
if __name__ == "__main__":
if __name__ == "__main__":
cli()

View File

@ -1,70 +1,129 @@
import os.path
import os
import subprocess
import sys
import time
from random import random
from signal import signal, SIGPIPE
with open('test.py', 'rb') as fh:
import util
# fsize = fh.seek(10000, os.SEEK_END)
# print(fsize)
start=time.time()
for i in range(0,1000000):
# fh.seek(0, 0)
fsize=fh.seek(0, os.SEEK_END)
# fsize=fh.tell()
# os.path.getsize('test.py')
print(time.time()-start)
signal(SIGPIPE, util.sigpipe_handler)
print(fh.tell())
try:
print ("voor eerste")
raise Exception("eerstre")
except Exception as e:
print ("voor tweede")
raise Exception("tweede")
finally:
print ("JO")
sys.exit(0)
def generator():
try:
util.deb('in generator')
print ("TRIGGER SIGPIPE")
sys.stdout.flush()
util.deb('after trigger')
# if False:
yield ("bla")
# yield ("bla")
except GeneratorExit as e:
util.deb('GENEXIT '+str(e))
raise
except Exception as e:
util.deb('EXCEPT '+str(e))
finally:
util.deb('FINALLY')
print("nog iets")
sys.stdout.flush()
util.deb('after print in finally WOOP!')
util.deb('START')
g=generator()
util.deb('after generator')
for bla in g:
# print ("heb wat ontvangen")
util.deb('ontvangen van gen')
break
# raise Exception("moi")
checked=1
skipped=1
coverage=0.1
pass
raise Exception("moi")
max_skip=0
util.deb('after for')
skipinarow=0
while True:
total=checked+skipped
pass
skip=coverage<random()
if skip:
skipped = skipped + 1
print("S {:.2f}%".format(checked * 100 / total))
skipinarow = skipinarow+1
if skipinarow>max_skip:
max_skip=skipinarow
else:
skipinarow=0
checked=checked+1
print("C {:.2f}%".format(checked * 100 / total))
print(max_skip)
skip=0
while True:
total=checked+skipped
if skip>0:
skip=skip-1
skipped = skipped + 1
print("S {:.2f}%".format(checked * 100 / total))
else:
checked=checked+1
print("C {:.2f}%".format(checked * 100 / total))
#calc new skip
skip=skip+((1/coverage)-1)*(random()*2)
# print(skip)
if skip> max_skip:
max_skip=skip
print(max_skip)
#
# with open('test.py', 'rb') as fh:
#
# # fsize = fh.seek(10000, os.SEEK_END)
# # print(fsize)
#
# start=time.time()
# for i in range(0,1000000):
# # fh.seek(0, 0)
# fsize=fh.seek(0, os.SEEK_END)
# # fsize=fh.tell()
# # os.path.getsize('test.py')
# print(time.time()-start)
#
#
# print(fh.tell())
#
# sys.exit(0)
#
#
#
# checked=1
# skipped=1
# coverage=0.1
#
# max_skip=0
#
#
# skipinarow=0
# while True:
# total=checked+skipped
#
# skip=coverage<random()
# if skip:
# skipped = skipped + 1
# print("S {:.2f}%".format(checked * 100 / total))
#
# skipinarow = skipinarow+1
# if skipinarow>max_skip:
# max_skip=skipinarow
# else:
# skipinarow=0
# checked=checked+1
# print("C {:.2f}%".format(checked * 100 / total))
#
# print(max_skip)
#
# skip=0
# while True:
#
# total=checked+skipped
# if skip>0:
# skip=skip-1
# skipped = skipped + 1
# print("S {:.2f}%".format(checked * 100 / total))
# else:
# checked=checked+1
# print("C {:.2f}%".format(checked * 100 / total))
#
# #calc new skip
# skip=skip+((1/coverage)-1)*(random()*2)
# # print(skip)
# if skip> max_skip:
# max_skip=skip
#
# print(max_skip)

View File

@ -19,7 +19,7 @@ import sys
def tmp_name(suffix=""):
"""create temporary name unique to this process and node"""
"""create temporary name unique to this process and node. always retruns the same result during the same execution"""
#we could use uuids but those are ugly and confusing
name="{}-{}-{}".format(
@ -48,3 +48,18 @@ def output_redir():
def sigpipe_handler(sig, stack):
#redir output so we dont get more SIGPIPES during cleanup. (which my try to write to stdout)
output_redir()
deb('redir')
# def check_output():
# """make sure stdout still functions. if its broken, this will trigger a SIGPIPE which will be handled by the sigpipe_handler."""
# try:
# print(" ")
# sys.stdout.flush()
# except Exception as e:
# pass
# def deb(txt):
# with open('/tmp/debug.log', 'a') as fh:
# fh.write("DEB: "+txt+"\n")