zfs_autobackup/tests/test_cmdpipe.py

176 lines
6.4 KiB
Python

from basetest import *
from zfs_autobackup.CmdPipe import CmdPipe,CmdItem
class TestCmdPipe(unittest2.TestCase):
def test_single(self):
"""single process stdout and stderr"""
p=CmdPipe(readonly=False, inp=None)
err=[]
out=[]
p.add(CmdItem(["sh", "-c", "echo out1;echo err1 >&2; echo out2; echo err2 >&2"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(out, ["out1", "out2"])
self.assertEqual(err, ["err1","err2"])
self.assertIsNone(executed)
def test_input(self):
"""test stdinput"""
p=CmdPipe(readonly=False, inp="test")
err=[]
out=[]
p.add(CmdItem(["cat"], stderr_handler=lambda line: err.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line) ))
executed=p.execute()
self.assertEqual(err, [])
self.assertEqual(out, ["test"])
self.assertIsNone(executed)
def test_pipe(self):
"""test piped"""
p=CmdPipe(readonly=False)
err1=[]
err2=[]
err3=[]
out=[]
p.add(CmdItem(["echo", "test"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
p.add(CmdItem(["tr", "e", "E"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0)))
p.add(CmdItem(["tr", "t", "T"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,0), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(err3, [])
self.assertEqual(out, ["TEsT"])
self.assertIsNone(executed)
#test str representation as well
self.assertEqual(str(p), "(echo test) | (tr e E) | (tr t T)")
def test_pipeerrors(self):
"""test piped stderrs """
p=CmdPipe(readonly=False)
err1=[]
err2=[]
err3=[]
out=[]
p.add(CmdItem(["sh", "-c", "echo err1 >&2"], stderr_handler=lambda line: err1.append(line), ))
p.add(CmdItem(["sh", "-c", "echo err2 >&2"], stderr_handler=lambda line: err2.append(line), ))
p.add(CmdItem(["sh", "-c", "echo err3 >&2"], stderr_handler=lambda line: err3.append(line), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, ["err1"])
self.assertEqual(err2, ["err2"])
self.assertEqual(err3, ["err3"])
self.assertEqual(out, [])
self.assertTrue(executed)
def test_exitcode(self):
"""test piped exitcodes """
p=CmdPipe(readonly=False)
err1=[]
err2=[]
err3=[]
out=[]
p.add(CmdItem(["sh", "-c", "exit 1"], stderr_handler=lambda line: err1.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,1)))
p.add(CmdItem(["sh", "-c", "exit 2"], stderr_handler=lambda line: err2.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,2)))
p.add(CmdItem(["sh", "-c", "exit 3"], stderr_handler=lambda line: err3.append(line), exit_handler=lambda exit_code: self.assertEqual(exit_code,3), stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(err3, [])
self.assertEqual(out, [])
self.assertIsNone(executed)
def test_readonly_execute(self):
"""everything readonly, just should execute"""
p=CmdPipe(readonly=True)
err1=[]
err2=[]
out=[]
def true_exit(exit_code):
return True
p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), exit_handler=true_exit, readonly=True))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), exit_handler=true_exit, readonly=True, stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(out, ["test2"])
self.assertTrue(executed)
def test_readonly_skip(self):
"""one command not readonly, skip"""
p=CmdPipe(readonly=True)
err1=[]
err2=[]
out=[]
p.add(CmdItem(["echo", "test1"], stderr_handler=lambda line: err1.append(line), readonly=False))
p.add(CmdItem(["echo", "test2"], stderr_handler=lambda line: err2.append(line), readonly=True, stdout_handler=lambda line: out.append(line)))
executed=p.execute()
self.assertEqual(err1, [])
self.assertEqual(err2, [])
self.assertEqual(out, [])
self.assertTrue(executed)
def test_no_handlers(self):
with self.assertRaises(Exception):
p=CmdPipe()
p.add(CmdItem([ "echo" ]))
p.execute()
#NOTE: this will give some resource warnings
def test_manual_pipes(self):
# manual piping means: a command in the pipe has a stdout_handler, which is responsible for sending the data into the next item of the pipe.
result=[]
def stdout_handler(line):
item2.process.stdin.write(line.encode('utf8'))
# item2.process.stdin.close()
item1=CmdItem(["echo", "test"], stdout_handler=stdout_handler)
item2=CmdItem(["tr", "e", "E"], stdout_handler=lambda line: result.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.execute()
self.assertEqual(result, ["tEst"])
def test_multiprocess(self):
#dont do any piping at all, just run multiple processes and handle outputs
result1=[]
result2=[]
result3=[]
item1=CmdItem(["echo", "test1"], stdout_handler=lambda line: result1.append(line))
item2=CmdItem(["echo", "test2"], stdout_handler=lambda line: result2.append(line))
item3=CmdItem(["echo", "test3"], stdout_handler=lambda line: result3.append(line))
p=CmdPipe()
p.add(item1)
p.add(item2)
p.add(item3)
p.execute()
self.assertEqual(result1, ["test1"])
self.assertEqual(result2, ["test2"])
self.assertEqual(result3, ["test3"])