cleaned up piping

This commit is contained in:
Edwin Eefting 2019-10-20 23:44:08 +02:00
parent e83c297f92
commit aed5d6f8a6

View File

@ -150,9 +150,12 @@ class ExecuteNode:
debug_txt=debug_txt+" |"
if self.readonly and not readonly:
self.debug("SKIP > "+debug_txt)
self.debug("SKIP > "+ debug_txt)
else:
self.debug("RUN > "+ debug_txt)
if pipe:
self.debug("PIPE > "+ debug_txt)
else:
self.debug("RUN > "+ debug_txt)
#determine stdin
if input==None:
@ -180,12 +183,12 @@ class ExecuteNode:
if pipe:
return(p)
#handle all outputs
if isinstance(input, subprocess.Popen):
selectors=[p.stdout, p.stderr, input.stderr ]
else:
selectors=[p.stdout, p.stderr ]
output_lines=[]
while True:
(read_ready, write_ready, ex_ready)=select.select(selectors, [], [])
@ -199,19 +202,24 @@ class ExecuteNode:
eof_count=eof_count+1
if p.stderr in read_ready:
line=p.stderr.readline()
if line!="" and not hide_errors:
self.error("STDERR > "+line.rstrip())
if line!="":
if hide_errors:
self.debug("STDERR > "+line.rstrip())
else:
self.error("STDERR > "+line.rstrip())
else:
eof_count=eof_count+1
if isinstance(input, subprocess.Popen) and (input.strerr in read_ready):
if isinstance(input, subprocess.Popen) and (input.stderr in read_ready):
line=input.stderr.readline()
if line!="" and not hide_errors:
self.error("STDERR|> "+line.rstrip())
if line!="":
if hide_errors:
self.debug("STDERR|> "+line.rstrip())
else:
self.error("STDERR|> "+line.rstrip())
else:
eof_count=eof_count+1
#stop if both processes are done and all filehandles are eof:
#stop if both processes are done and all filehandles are EOF:
if p.poll()!=None and ((not isinstance(input, subprocess.Popen)) or input.poll()!=None) and eof_count==len(selectors):
break
@ -229,6 +237,7 @@ class ExecuteNode:
# for line in pipe_outputs[1].splitlines():
# self.error("Pipe-error: "+line)
self.debug("EXIT |> {}".format(input.returncode))
if input.returncode not in valid_exitcodes:
raise(subprocess.CalledProcessError(input.returncode, "(pipe)"))
@ -237,6 +246,7 @@ class ExecuteNode:
# for line in errors.splitlines():
# self.error(line)
self.debug("EXIT > {}".format(p.returncode))
if p.returncode not in valid_exitcodes:
raise(subprocess.CalledProcessError(p.returncode, encoded_cmd))
@ -449,7 +459,7 @@ class ZfsDataset():
else:
target_dataset.verbose("receiving @{} {}".format(self.snapshot_name, resumed))
pipe=self.zfs_node.run(["lsX"], pipe=True, readonly=True)
pipe=self.zfs_node.run(["ls"], pipe=True, readonly=True)
target_dataset.zfs_node.run(["cat"], input=pipe, readonly=True)
#update cache