run() now uses CmdPipe for better pipe handling and cleaner code

This commit is contained in:
Edwin Eefting 2021-04-12 18:16:42 +02:00
parent ed7cd41ad7
commit 4b97f789df
7 changed files with 125 additions and 230 deletions

View File

@ -10,11 +10,12 @@ class TestCmdPipe(unittest2.TestCase):
err=[]
out=[]
p.add(["ls", "-d", "/", "/", "/nonexistent"], stderr_handler=lambda line: err.append(line))
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err, ["ls: cannot access '/nonexistent': No such file or directory"])
self.assertEqual(out, ["/","/"])
self.assertEqual(exits, [2])
self.assertTrue(executed)
self.assertEqual(p.items[0]['process'].returncode,2)
def test_input(self):
"""test stdinput"""
@ -22,11 +23,12 @@ class TestCmdPipe(unittest2.TestCase):
err=[]
out=[]
p.add(["echo", "test"], stderr_handler=lambda line: err.append(line))
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err, [])
self.assertEqual(out, ["test"])
self.assertEqual(exits, [0])
self.assertTrue(executed)
self.assertEqual(p.items[0]['process'].returncode,0)
def test_pipe(self):
"""test piped"""
@ -38,13 +40,16 @@ class TestCmdPipe(unittest2.TestCase):
p.add(["echo", "test"], stderr_handler=lambda line: err1.append(line))
p.add(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line))
p.add(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line))
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(err3, [])
self.assertEqual(out, ["TEsT"])
self.assertEqual(exits, [0,0,0])
self.assertTrue(executed)
self.assertEqual(p.items[0]['process'].returncode,0)
self.assertEqual(p.items[1]['process'].returncode,0)
self.assertEqual(p.items[2]['process'].returncode,0)
#test str representation as well
self.assertEqual(str(p), "(echo test) | (tr e E) | (tr t T)")
@ -59,13 +64,16 @@ class TestCmdPipe(unittest2.TestCase):
p.add(["ls", "/nonexistent1"], stderr_handler=lambda line: err1.append(line))
p.add(["ls", "/nonexistent2"], stderr_handler=lambda line: err2.append(line))
p.add(["ls", "/nonexistent3"], stderr_handler=lambda line: err3.append(line))
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err1, ["ls: cannot access '/nonexistent1': No such file or directory"])
self.assertEqual(err2, ["ls: cannot access '/nonexistent2': No such file or directory"])
self.assertEqual(err3, ["ls: cannot access '/nonexistent3': No such file or directory"])
self.assertEqual(out, [])
self.assertEqual(exits, [2,2,2])
self.assertTrue(executed)
self.assertEqual(p.items[0]['process'].returncode,2)
self.assertEqual(p.items[1]['process'].returncode,2)
self.assertEqual(p.items[2]['process'].returncode,2)
def test_readonly_execute(self):
"""everything readonly, just should execute"""
@ -76,12 +84,14 @@ class TestCmdPipe(unittest2.TestCase):
out=[]
p.add(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=True)
p.add(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True)
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(out, ["test2"])
self.assertEqual(exits, [0,0])
self.assertTrue(executed)
self.assertEqual(p.items[0]['process'].returncode,0)
self.assertEqual(p.items[1]['process'].returncode,0)
def test_readonly_skip(self):
"""one command not readonly, skip"""
@ -92,10 +102,10 @@ class TestCmdPipe(unittest2.TestCase):
out=[]
p.add(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=False)
p.add(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True)
exits=p.execute(stdout_handler=lambda line: out.append(line))
executed=p.execute(stdout_handler=lambda line: out.append(line))
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(out, [])
self.assertEqual(exits, None)
self.assertFalse(executed)

View File

@ -64,7 +64,7 @@ class TestExecuteNode(unittest2.TestCase):
def test_readonly(self):
node=ExecuteNode(debug_output=True, readonly=True)
self.assertEqual(node.run(["echo","test"], readonly=False), None)
self.assertEqual(node.run(["echo","test"], readonly=False), [])
self.assertEqual(node.run(["echo","test"], readonly=True), ["test"])
@ -73,36 +73,36 @@ class TestExecuteNode(unittest2.TestCase):
def pipe(self, nodea, nodeb):
with self.subTest("pipe data"):
output=nodea.get_pipe(["dd", "if=/dev/zero", "count=1000"])
output=nodea.run(["dd", "if=/dev/zero", "count=1000"],pipe=True)
self.assertEqual(nodeb.run(["md5sum"], inp=output), ["816df6f64deba63b029ca19d880ee10a -"])
with self.subTest("exit code both ends of pipe ok"):
output=nodea.get_pipe(["true"])
output=nodea.run(["true"], pipe=True)
nodeb.run(["true"], inp=output)
with self.subTest("error on pipe input side"):
with self.assertRaises(subprocess.CalledProcessError):
output=nodea.get_pipe(["false"])
output=nodea.run(["false"], pipe=True)
nodeb.run(["true"], inp=output)
with self.subTest("error on pipe output side "):
with self.assertRaises(subprocess.CalledProcessError):
output=nodea.get_pipe(["true"])
output=nodea.run(["true"], pipe=True)
nodeb.run(["false"], inp=output)
with self.subTest("error on both sides of pipe"):
with self.assertRaises(subprocess.CalledProcessError):
output=nodea.get_pipe(["false"])
output=nodea.run(["false"], pipe=True)
nodeb.run(["false"], inp=output)
with self.subTest("check stderr on pipe output side"):
output=nodea.get_pipe(["true"])
output=nodea.run(["true"], pipe=True)
(stdout, stderr)=nodeb.run(["ls", "nonexistingfile"], inp=output, return_stderr=True, valid_exitcodes=[0,2])
self.assertEqual(stdout,[])
self.assertRegex(stderr[0], "nonexistingfile" )
with self.subTest("check stderr on pipe input side (should be only printed)"):
output=nodea.get_pipe(["ls", "nonexistingfile"])
output=nodea.run(["ls", "nonexistingfile"], pipe=True)
(stdout, stderr)=nodeb.run(["true"], inp=output, return_stderr=True, valid_exitcodes=[0,2])
self.assertEqual(stdout,[])
self.assertEqual(stderr,[])

View File

@ -38,7 +38,7 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with an impact of more than O(snapshot_count/2)
expected_runs=240
expected_runs=343
print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), snapshot_count/2)
@ -69,6 +69,7 @@ class TestZfsScaling(unittest2.TestCase):
global run_counter
#first run
run_counter=0
with patch.object(ExecuteNode,'run', run_count) as p:
@ -77,11 +78,12 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with an impact of more than O(snapshot_count/2)
expected_runs=640
expected_runs=743
print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2)
#second run, should have higher number of expected_runs
run_counter=0
with patch.object(ExecuteNode,'run', run_count) as p:
@ -90,6 +92,6 @@ class TestZfsScaling(unittest2.TestCase):
#this triggers if you make a change with a performance impact of more than O(snapshot_count/2)
expected_runs=844
expected_runs=947
print("ACTUAL RUNS: {}".format(run_counter))
self.assertLess(abs(run_counter-expected_runs), dataset_count/2)

View File

@ -3,7 +3,7 @@ import os
import select
class CmdPipe:
"""a pipe of one or more commands """
"""a pipe of one or more commands. also takes care of utf-8 encoding/decoding and line based parsing"""
def __init__(self, readonly=False, inp=None):
"""
@ -30,6 +30,12 @@ class CmdPipe:
def __str__(self):
"""transform into oneliner for debugging and testing """
#just one command?
if len(self.items)==1:
return " ".join(self.items[0]['cmd'])
#an actual pipe
ret = ""
for item in self.items:
if ret:
@ -38,11 +44,14 @@ class CmdPipe:
return ret
def should_execute(self):
return(self._should_execute)
def execute(self, stdout_handler):
"""run the pipe"""
"""run the pipe. returns True if it executed, and false if it skipped due to readonly conditions"""
if not self._should_execute:
return None
return False
# first process should have actual user input as stdin:
selectors = []
@ -51,7 +60,14 @@ class CmdPipe:
last_stdout = None
stdin = subprocess.PIPE
for item in self.items:
item['process'] = subprocess.Popen(item['cmd'], env=os.environ, stdout=subprocess.PIPE, stdin=stdin,
# make sure the command gets all the data in utf8 format:
# (this is necessary if LC_ALL=en_US.utf8 is not set in the environment)
encoded_cmd = []
for arg in item['cmd']:
encoded_cmd.append(arg.encode('utf-8'))
item['process'] = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin,
stderr=subprocess.PIPE)
selectors.append(item['process'].stderr)
@ -101,11 +117,10 @@ class CmdPipe:
if eof_count == len(selectors) and done_count == len(self.items):
break
# close all filehandles and get all exit codes
ret = []
# ret = []
last_stdout.close()
for item in self.items:
item['process'].stderr.close()
ret.append(item['process'].returncode)
# ret.append(item['process'].returncode)
return ret
return True

View File

@ -2,6 +2,7 @@ import os
import select
import subprocess
from zfs_autobackup.CmdPipe import CmdPipe
from zfs_autobackup.LogStub import LogStub
@ -38,247 +39,114 @@ class ExecuteNode(LogStub):
else:
self.error("STDERR > " + line.rstrip())
def _parse_stderr_pipe(self, line, hide_errors):
"""parse stderr from pipe input process. can be overridden in subclass"""
if hide_errors:
self.debug("STDERR|> " + line.rstrip())
else:
self.error("STDERR|> " + line.rstrip())
# def _parse_stderr_pipe(self, line, hide_errors):
# """parse stderr from pipe input process. can be overridden in subclass"""
# if hide_errors:
# self.debug("STDERR|> " + line.rstrip())
# else:
# self.error("STDERR|> " + line.rstrip())
def _encode_cmd(self, cmd):
"""returns cmd in encoded and escaped form that can be used with popen."""
encoded_cmd = []
# make sure the command gets all the data in utf8 format:
# (this is necessary if LC_ALL=en_US.utf8 is not set in the environment)
def _remote_cmd(self, cmd):
"""transforms cmd in correct form for remote over ssh, if needed"""
# use ssh?
if self.ssh_to is not None:
encoded_cmd.append("ssh".encode('utf-8'))
encoded_cmd = []
encoded_cmd.append("ssh")
if self.ssh_config is not None:
encoded_cmd.extend(["-F".encode('utf-8'), self.ssh_config.encode('utf-8')])
encoded_cmd.extend(["-F", self.ssh_config])
encoded_cmd.append(self.ssh_to.encode('utf-8'))
encoded_cmd.append(self.ssh_to)
for arg in cmd:
# add single quotes for remote commands to support spaces and other weird stuff (remote commands are
# executed in a shell) and escape existing single quotes (bash needs ' to end the quoted string,
# then a \' for the actual quote and then another ' to start a new quoted string) (and then python
# needs the double \ to get a single \)
encoded_cmd.append(("'" + arg.replace("'", "'\\''") + "'").encode('utf-8'))
encoded_cmd.append(("'" + arg.replace("'", "'\\''") + "'"))
return encoded_cmd
else:
for arg in cmd:
encoded_cmd.append(arg.encode('utf-8'))
return(cmd)
return encoded_cmd
def is_local(self):
return self.ssh_to is None
def get_pipe(self, cmd, inp=None, readonly=False):
"""return a pipehandle to a process.
The caller should pass this handle as inp= to run() via inp= at some point to actually execute it.
Returns None if we're in test-mode. (but still prints important debug output)
"""
encoded_cmd = self._encode_cmd(cmd)
# debug and test stuff
debug_txt = ""
for c in encoded_cmd:
debug_txt = debug_txt + " " + c.decode()
# determine stdin
if inp is None:
# NOTE: Not None, otherwise it reads stdin from terminal!
stdin = subprocess.PIPE
elif isinstance(inp, str) or type(inp) == 'unicode':
self.debug("STDIN: \n" + inp.rstrip())
stdin = subprocess.PIPE
elif isinstance(inp, subprocess.Popen):
self.debug("PIPE to:")
stdin = inp.stdout
else:
raise (Exception("Program error: Incompatible input"))
if self.readonly and not readonly:
self.debug("CMDSKIP> " + debug_txt)
return None
else:
self.debug("CMD > " + debug_txt)
if self.readonly and not readonly:
return None
# create pipe
p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
# Note: make streaming?
if isinstance(inp, str) or type(inp) == 'unicode':
p.stdin.write(inp.encode('utf-8'))
if p.stdin:
p.stdin.close()
return p
def run(self, cmd, inp=None, tab_split=False, valid_exitcodes=None, readonly=False, hide_errors=False,
return_stderr=False):
return_stderr=False, pipe=False):
"""run a command on the node , checks output and parses/handle output and returns it
:param cmd: the actual command, should be a list, where the first item is the command
and the rest are parameters.
:param inp: Can be None, a string or a pipe-handle you got from get_pipe()
:param pipe: return CmdPipe instead of executing it.
:param inp: Can be None, a string or a CmdPipe that was previously returned.
:param tab_split: split tabbed files in output into a list
:param valid_exitcodes: list of valid exit codes for this command (checks exit code of both sides of a pipe)
Use [] to accept all exit codes.
Use [] to accept all exit codes. Default [0]
:param readonly: make this True if the command doesn't make any changes and is safe to execute in testmode
:param hide_errors: don't show stderr output as error, instead show it as debugging output (use to hide expected errors)
:param return_stderr: return both stdout and stderr as a tuple. (normally only returns stdout)
"""
if valid_exitcodes is None:
valid_exitcodes = [0]
# encoded_cmd = self._encode_cmd(cmd)
#
# # debug and test stuff
# debug_txt = ""
# for c in encoded_cmd:
# debug_txt = debug_txt + " " + c.decode()
#
# if pipe:
# debug_txt = debug_txt + " |"
#
# if self.readonly and not readonly:
# self.debug("SKIP > " + debug_txt)
# else:
# if pipe:
# self.debug("PIPE > " + debug_txt)
# else:
# self.debug("RUN > " + debug_txt)
#
# # determine stdin
# if inp is None:
# # NOTE: Not None, otherwise it reads stdin from terminal!
# stdin = subprocess.PIPE
# elif isinstance(inp, str) or type(inp) == 'unicode':
# self.debug("INPUT > \n" + inp.rstrip())
# stdin = subprocess.PIPE
# elif isinstance(inp, subprocess.Popen):
# self.debug("Piping input")
# stdin = inp.stdout
# else:
# raise (Exception("Program error: Incompatible input"))
# if self.readonly and not readonly:
# # todo: what happens if input is piped?
# return
#
# # execute and parse/return results
# p = subprocess.Popen(encoded_cmd, env=os.environ, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE)
# # Note: make streaming?
# if isinstance(inp, str) or type(inp) == 'unicode':
# p.stdin.write(inp.encode('utf-8'))
#
# if p.stdin:
# p.stdin.close()
#
# # return pipe
# if pipe:
# return p
p = self.get_pipe(cmd, inp=inp, readonly=readonly)
if p is None:
return None
# handle all outputs
if isinstance(inp, subprocess.Popen):
selectors = [p.stdout, p.stderr, inp.stderr]
inp.stdout.close() # otherwise inputprocess wont exit when ours does
# create new pipe?
if not isinstance(inp, CmdPipe):
p = CmdPipe(self.readonly, inp)
else:
selectors = [p.stdout, p.stderr]
# add stuff to existing pipe
p = inp
output_lines = []
# stderr parser
error_lines = []
while True:
(read_ready, write_ready, ex_ready) = select.select(selectors, [], [])
eof_count = 0
if p.stdout in read_ready:
line = p.stdout.readline().decode('utf-8')
if line != "":
if tab_split:
output_lines.append(line.rstrip().split('\t'))
else:
output_lines.append(line.rstrip())
self._parse_stdout(line)
else:
eof_count = eof_count + 1
if p.stderr in read_ready:
line = p.stderr.readline().decode('utf-8')
if line != "":
if tab_split:
error_lines.append(line.rstrip().split('\t'))
else:
error_lines.append(line.rstrip())
self._parse_stderr(line, hide_errors)
else:
eof_count = eof_count + 1
if isinstance(inp, subprocess.Popen) and (inp.stderr in read_ready):
line = inp.stderr.readline().decode('utf-8')
if line != "":
self._parse_stderr_pipe(line, hide_errors)
else:
eof_count = eof_count + 1
def stderr_handler(line):
if tab_split:
error_lines.append(line.rstrip().split('\t'))
else:
error_lines.append(line.rstrip())
self._parse_stderr(line, hide_errors)
# stop if both processes are done and all filehandles are EOF:
if (p.poll() is not None) and (
(not isinstance(inp, subprocess.Popen)) or inp.poll() is not None) and eof_count == len(selectors):
break
# add command to pipe
encoded_cmd = self._remote_cmd(cmd)
p.add(cmd=encoded_cmd, readonly=readonly, stderr_handler=stderr_handler)
p.stderr.close()
p.stdout.close()
# return pipe instead of executing?
if pipe:
return p
if self.debug_output:
self.debug("EXIT > {}".format(p.returncode))
# stdout parser
output_lines = []
def stdout_handler(line):
if tab_split:
output_lines.append(line.rstrip().split('\t'))
else:
output_lines.append(line.rstrip())
self._parse_stdout(line)
# handle piped process error output and exit codes
if isinstance(inp, subprocess.Popen):
inp.stderr.close()
inp.stdout.close()
if p.should_execute():
self.debug("CMD > {}".format(p))
else:
self.debug("CMDSKIP> {}".format(p))
if self.debug_output:
self.debug("EXIT |> {}".format(inp.returncode))
if valid_exitcodes and inp.returncode not in valid_exitcodes:
raise (subprocess.CalledProcessError(inp.returncode, "(pipe)"))
# execute and verify exit codes
if p.execute(stdout_handler=stdout_handler) and valid_exitcodes is not []:
if valid_exitcodes is None:
valid_exitcodes = [0]
if valid_exitcodes and p.returncode not in valid_exitcodes:
raise (subprocess.CalledProcessError(p.returncode, self._encode_cmd(cmd)))
item_nr=1
for item in p.items:
exit_code=item['process'].returncode
if self.debug_output:
self.debug("EXIT{} > {}".format(item_nr, exit_code))
if exit_code not in valid_exitcodes:
raise (subprocess.CalledProcessError(exit_code, " ".join(item['cmd'])))
item_nr=item_nr+1
if return_stderr:
return output_lines, error_lines
else:
return output_lines
# def run_pipe(self, cmds, *args, **kwargs):
# """run a array of commands piped together. """
#
# #i
# if self.zfs_node.is_local():
# output_pipe = self.zfs_node.run(cmd, pipe=True)
# for pipe_cmd in output_pipes:
# output_pipe=self.zfs_node.run(pipe_cmd, inp=output_pipe, pipe=True, )
# return output_pipe
# #remote, so add with actual | and let remote shell handle it
# else:
# for pipe_cmd in output_pipes:
# cmd.append("|")
# cmd.extend(pipe_cmd)

View File

@ -565,7 +565,7 @@ class ZfsDataset:
# cmd.append("|")
# cmd.extend(pipe_cmd)
return self.zfs_node.get_pipe(cmd)
return self.zfs_node.run(cmd, pipe=True, readonly=True)
def recv_pipe(self, pipe, features, filter_properties=None, set_properties=None, ignore_exit_code=False):

View File

@ -135,8 +135,8 @@ class ZfsNode(ExecuteNode):
else:
self.error(prefix + line.rstrip())
def _parse_stderr_pipe(self, line, hide_errors):
self.parse_zfs_progress(line, hide_errors, "STDERR|> ")
# def _parse_stderr_pipe(self, line, hide_errors):
# self.parse_zfs_progress(line, hide_errors, "STDERR|> ")
def _parse_stderr(self, line, hide_errors):
self.parse_zfs_progress(line, hide_errors, "STDERR > ")