diff --git a/cmdext/cmdrunner.go b/cmdext/cmdrunner.go index 9681a54..1a3550f 100644 --- a/cmdext/cmdrunner.go +++ b/cmdext/cmdrunner.go @@ -1,8 +1,8 @@ package cmdext import ( - "bufio" - "io" + "gogs.mikescher.com/BlackForestBytes/goext/mathext" + "gogs.mikescher.com/BlackForestBytes/goext/syncext" "os/exec" "time" ) @@ -30,119 +30,41 @@ func run(opt CommandRunner) (CommandResult, error) { return CommandResult{}, err } + preader := pipeReader{ + stdout: stdoutPipe, + stderr: stderrPipe, + } + err = cmd.Start() if err != nil { return CommandResult{}, err } - errch := make(chan error, 3) - go func() { errch <- cmd.Wait() }() + type resultObj struct { + stdout string + stderr string + stdcombined string + err error + } - // [1] read raw stdout - - stdoutBufferReader, stdoutBufferWriter := io.Pipe() - stdout := "" + outputChan := make(chan resultObj) go func() { - buf := make([]byte, 128) - for true { - n, out := stdoutPipe.Read(buf) + // we need to first fully read the pipes and then call Wait + // see https://pkg.go.dev/os/exec#Cmd.StdoutPipe - if n > 0 { - txt := string(buf[:n]) - stdout += txt - _, _ = stdoutBufferWriter.Write(buf[:n]) - for _, lstr := range opt.listener { - lstr.ReadRawStdout(buf[:n]) - } - } - if out == io.EOF { - break - } - if out != nil { - errch <- out - _ = cmd.Process.Kill() - break - } + stdout, stderr, stdcombined, err := preader.Read(opt.listener) + if err != nil { + outputChan <- resultObj{stdout, stderr, stdcombined, err} } - _ = stdoutBufferWriter.Close() - }() - // [2] read raw stderr - - stderrBufferReader, stderrBufferWriter := io.Pipe() - stderr := "" - go func() { - buf := make([]byte, 128) - for true { - n, err := stderrPipe.Read(buf) - - if n > 0 { - txt := string(buf[:n]) - stderr += txt - _, _ = stderrBufferWriter.Write(buf[:n]) - for _, lstr := range opt.listener { - lstr.ReadRawStderr(buf[:n]) - } - } - if err == io.EOF { - break - } - if err != nil { - errch <- err - _ = cmd.Process.Kill() - break - } + err = cmd.Wait() + if err != nil { + outputChan <- resultObj{stdout, stderr, stdcombined, err} } - _ = stderrBufferWriter.Close() + + outputChan <- resultObj{stdout, stderr, stdcombined, nil} }() - combch := make(chan string, 32) - stopCombch := make(chan bool) - - // [3] collect stdout line-by-line - - go func() { - scanner := bufio.NewScanner(stdoutBufferReader) - for scanner.Scan() { - txt := scanner.Text() - for _, lstr := range opt.listener { - lstr.ReadStdoutLine(txt) - } - combch <- txt - } - }() - - // [4] collect stderr line-by-line - - go func() { - scanner := bufio.NewScanner(stderrBufferReader) - for scanner.Scan() { - txt := scanner.Text() - for _, lstr := range opt.listener { - lstr.ReadStderrLine(txt) - } - combch <- txt - } - }() - - defer func() { stopCombch <- true }() - - // [5] combine stdcombined - - stdcombined := "" - go func() { - for { - select { - case txt := <-combch: - stdcombined += txt + "\n" // this comes from bufio.Scanner and has no newlines... - case <-stopCombch: - return - } - } - }() - - // [6] run - var timeoutChan <-chan time.Time = make(chan time.Time, 1) if opt.timeout != nil { timeoutChan = time.After(*opt.timeout) @@ -155,24 +77,37 @@ func run(opt CommandRunner) (CommandResult, error) { for _, lstr := range opt.listener { lstr.Timeout() } - return CommandResult{ - StdOut: stdout, - StdErr: stderr, - StdCombined: stdcombined, - ExitCode: -1, - CommandTimedOut: true, - }, nil - case err := <-errch: - if exiterr, ok := err.(*exec.ExitError); ok { + if fallback, ok := syncext.ReadChannelWithTimeout(outputChan, mathext.Min(32*time.Millisecond, *opt.timeout)); ok { + // most of the time the cmd.Process.Kill() should also ahve finished the pipereader + // and we can at least return the already collected stdout, stderr, etc + return CommandResult{ + StdOut: fallback.stdout, + StdErr: fallback.stderr, + StdCombined: fallback.stdcombined, + ExitCode: -1, + CommandTimedOut: true, + }, nil + } else { + return CommandResult{ + StdOut: "", + StdErr: "", + StdCombined: "", + ExitCode: -1, + CommandTimedOut: true, + }, nil + } + + case outobj := <-outputChan: + if exiterr, ok := outobj.err.(*exec.ExitError); ok { excode := exiterr.ExitCode() for _, lstr := range opt.listener { lstr.Finished(excode) } return CommandResult{ - StdOut: stdout, - StdErr: stderr, - StdCombined: stdcombined, + StdOut: outobj.stdout, + StdErr: outobj.stderr, + StdCombined: outobj.stdcombined, ExitCode: excode, CommandTimedOut: false, }, nil @@ -183,9 +118,9 @@ func run(opt CommandRunner) (CommandResult, error) { lstr.Finished(0) } return CommandResult{ - StdOut: stdout, - StdErr: stderr, - StdCombined: stdcombined, + StdOut: outobj.stdout, + StdErr: outobj.stderr, + StdCombined: outobj.stdcombined, ExitCode: 0, CommandTimedOut: false, }, nil diff --git a/cmdext/cmdrunner_test.go b/cmdext/cmdrunner_test.go index ca4548d..e4f2327 100644 --- a/cmdext/cmdrunner_test.go +++ b/cmdext/cmdrunner_test.go @@ -1,6 +1,9 @@ package cmdext -import "testing" +import ( + "testing" + "time" +) func TestStdout(t *testing.T) { @@ -57,3 +60,27 @@ func TestStdcombined(t *testing.T) { } } + +func TestPartialRead(t *testing.T) { + res1, err := Runner("python"). + Arg("-c"). + Arg("import sys; import time; print(\"first message\", flush=True); time.sleep(5); print(\"cant see me\", flush=True);"). + Timeout(100 * time.Millisecond). + Run() + if err != nil { + t.Errorf("%v", err) + } + if !res1.CommandTimedOut { + t.Errorf("!CommandTimedOut") + } + if res1.StdErr != "" { + t.Errorf("res1.StdErr == '%v'", res1.StdErr) + } + if res1.StdOut != "first message\n" { + t.Errorf("res1.StdOut == '%v'", res1.StdOut) + } + if res1.StdCombined != "first message\n" { + t.Errorf("res1.StdCombined == '%v'", res1.StdCombined) + } + +} diff --git a/cmdext/pipereader.go b/cmdext/pipereader.go new file mode 100644 index 0000000..7a74624 --- /dev/null +++ b/cmdext/pipereader.go @@ -0,0 +1,146 @@ +package cmdext + +import ( + "bufio" + "gogs.mikescher.com/BlackForestBytes/goext/syncext" + "io" + "sync" +) + +type pipeReader struct { + stdout io.ReadCloser + stderr io.ReadCloser +} + +// Read ready stdout and stdin until finished +// also splits both pipes into lines and calld the listener +func (pr *pipeReader) Read(listener []CommandListener) (string, string, string, error) { + type combevt struct { + line string + stop bool + } + + errch := make(chan error, 8) + + wg := sync.WaitGroup{} + + // [1] read raw stdout + + wg.Add(1) + stdoutBufferReader, stdoutBufferWriter := io.Pipe() + stdout := "" + go func() { + buf := make([]byte, 128) + for true { + n, out := pr.stdout.Read(buf) + + if n > 0 { + txt := string(buf[:n]) + stdout += txt + _, _ = stdoutBufferWriter.Write(buf[:n]) + for _, lstr := range listener { + lstr.ReadRawStdout(buf[:n]) + } + } + if out == io.EOF { + break + } + if out != nil { + errch <- out + break + } + } + _ = stdoutBufferWriter.Close() + wg.Done() + }() + + // [2] read raw stderr + + wg.Add(1) + stderrBufferReader, stderrBufferWriter := io.Pipe() + stderr := "" + go func() { + buf := make([]byte, 128) + for true { + n, err := pr.stderr.Read(buf) + + if n > 0 { + txt := string(buf[:n]) + stderr += txt + _, _ = stderrBufferWriter.Write(buf[:n]) + for _, lstr := range listener { + lstr.ReadRawStderr(buf[:n]) + } + } + if err == io.EOF { + break + } + if err != nil { + errch <- err + break + } + } + _ = stderrBufferWriter.Close() + wg.Done() + }() + + combch := make(chan combevt, 32) + + // [3] collect stdout line-by-line + + wg.Add(1) + go func() { + scanner := bufio.NewScanner(stdoutBufferReader) + for scanner.Scan() { + txt := scanner.Text() + for _, lstr := range listener { + lstr.ReadStdoutLine(txt) + } + combch <- combevt{txt, false} + } + combch <- combevt{"", true} + wg.Done() + }() + + // [4] collect stderr line-by-line + + wg.Add(1) + go func() { + scanner := bufio.NewScanner(stderrBufferReader) + for scanner.Scan() { + txt := scanner.Text() + for _, lstr := range listener { + lstr.ReadStderrLine(txt) + } + combch <- combevt{txt, false} + } + combch <- combevt{"", true} + wg.Done() + }() + + // [5] combine stdcombined + + wg.Add(1) + stdcombined := "" + go func() { + stopctr := 0 + for stopctr < 2 { + vvv := <-combch + if vvv.stop { + stopctr++ + } else { + stdcombined += vvv.line + "\n" // this comes from bufio.Scanner and has no newlines... + } + } + wg.Done() + }() + + // wait for all (5) goroutines to finish + wg.Wait() + + if err, ok := syncext.ReadNonBlocking(errch); ok { + return "", "", "", err + } + + return stdout, stderr, stdcombined, nil +}