This commit is contained in:
Mike Schwörer 2023-01-31 22:41:12 +01:00
parent 1990e5d32d
commit fdc590c8c3
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF
3 changed files with 226 additions and 118 deletions

View File

@ -1,8 +1,8 @@
package cmdext package cmdext
import ( import (
"bufio" "gogs.mikescher.com/BlackForestBytes/goext/mathext"
"io" "gogs.mikescher.com/BlackForestBytes/goext/syncext"
"os/exec" "os/exec"
"time" "time"
) )
@ -30,119 +30,41 @@ func run(opt CommandRunner) (CommandResult, error) {
return CommandResult{}, err return CommandResult{}, err
} }
preader := pipeReader{
stdout: stdoutPipe,
stderr: stderrPipe,
}
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return CommandResult{}, err return CommandResult{}, err
} }
errch := make(chan error, 3) type resultObj struct {
go func() { errch <- cmd.Wait() }() stdout string
stderr string
stdcombined string
err error
}
// [1] read raw stdout outputChan := make(chan resultObj)
stdoutBufferReader, stdoutBufferWriter := io.Pipe()
stdout := ""
go func() { go func() {
buf := make([]byte, 128) // we need to first fully read the pipes and then call Wait
for true { // see https://pkg.go.dev/os/exec#Cmd.StdoutPipe
n, out := stdoutPipe.Read(buf)
if n > 0 { stdout, stderr, stdcombined, err := preader.Read(opt.listener)
txt := string(buf[:n])
stdout += txt
_, _ = stdoutBufferWriter.Write(buf[:n])
for _, lstr := range opt.listener {
lstr.ReadRawStdout(buf[:n])
}
}
if out == io.EOF {
break
}
if out != nil {
errch <- out
_ = cmd.Process.Kill()
break
}
}
_ = stdoutBufferWriter.Close()
}()
// [2] read raw stderr
stderrBufferReader, stderrBufferWriter := io.Pipe()
stderr := ""
go func() {
buf := make([]byte, 128)
for true {
n, err := stderrPipe.Read(buf)
if n > 0 {
txt := string(buf[:n])
stderr += txt
_, _ = stderrBufferWriter.Write(buf[:n])
for _, lstr := range opt.listener {
lstr.ReadRawStderr(buf[:n])
}
}
if err == io.EOF {
break
}
if err != nil { if err != nil {
errch <- err outputChan <- resultObj{stdout, stderr, stdcombined, err}
_ = cmd.Process.Kill()
break
} }
err = cmd.Wait()
if err != nil {
outputChan <- resultObj{stdout, stderr, stdcombined, err}
} }
_ = stderrBufferWriter.Close()
outputChan <- resultObj{stdout, stderr, stdcombined, nil}
}() }()
combch := make(chan string, 32)
stopCombch := make(chan bool)
// [3] collect stdout line-by-line
go func() {
scanner := bufio.NewScanner(stdoutBufferReader)
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range opt.listener {
lstr.ReadStdoutLine(txt)
}
combch <- txt
}
}()
// [4] collect stderr line-by-line
go func() {
scanner := bufio.NewScanner(stderrBufferReader)
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range opt.listener {
lstr.ReadStderrLine(txt)
}
combch <- txt
}
}()
defer func() { stopCombch <- true }()
// [5] combine stdcombined
stdcombined := ""
go func() {
for {
select {
case txt := <-combch:
stdcombined += txt + "\n" // this comes from bufio.Scanner and has no newlines...
case <-stopCombch:
return
}
}
}()
// [6] run
var timeoutChan <-chan time.Time = make(chan time.Time, 1) var timeoutChan <-chan time.Time = make(chan time.Time, 1)
if opt.timeout != nil { if opt.timeout != nil {
timeoutChan = time.After(*opt.timeout) timeoutChan = time.After(*opt.timeout)
@ -155,24 +77,37 @@ func run(opt CommandRunner) (CommandResult, error) {
for _, lstr := range opt.listener { for _, lstr := range opt.listener {
lstr.Timeout() lstr.Timeout()
} }
if fallback, ok := syncext.ReadChannelWithTimeout(outputChan, mathext.Min(32*time.Millisecond, *opt.timeout)); ok {
// most of the time the cmd.Process.Kill() should also ahve finished the pipereader
// and we can at least return the already collected stdout, stderr, etc
return CommandResult{ return CommandResult{
StdOut: stdout, StdOut: fallback.stdout,
StdErr: stderr, StdErr: fallback.stderr,
StdCombined: stdcombined, StdCombined: fallback.stdcombined,
ExitCode: -1, ExitCode: -1,
CommandTimedOut: true, CommandTimedOut: true,
}, nil }, nil
} else {
return CommandResult{
StdOut: "",
StdErr: "",
StdCombined: "",
ExitCode: -1,
CommandTimedOut: true,
}, nil
}
case err := <-errch: case outobj := <-outputChan:
if exiterr, ok := err.(*exec.ExitError); ok { if exiterr, ok := outobj.err.(*exec.ExitError); ok {
excode := exiterr.ExitCode() excode := exiterr.ExitCode()
for _, lstr := range opt.listener { for _, lstr := range opt.listener {
lstr.Finished(excode) lstr.Finished(excode)
} }
return CommandResult{ return CommandResult{
StdOut: stdout, StdOut: outobj.stdout,
StdErr: stderr, StdErr: outobj.stderr,
StdCombined: stdcombined, StdCombined: outobj.stdcombined,
ExitCode: excode, ExitCode: excode,
CommandTimedOut: false, CommandTimedOut: false,
}, nil }, nil
@ -183,9 +118,9 @@ func run(opt CommandRunner) (CommandResult, error) {
lstr.Finished(0) lstr.Finished(0)
} }
return CommandResult{ return CommandResult{
StdOut: stdout, StdOut: outobj.stdout,
StdErr: stderr, StdErr: outobj.stderr,
StdCombined: stdcombined, StdCombined: outobj.stdcombined,
ExitCode: 0, ExitCode: 0,
CommandTimedOut: false, CommandTimedOut: false,
}, nil }, nil

View File

@ -1,6 +1,9 @@
package cmdext package cmdext
import "testing" import (
"testing"
"time"
)
func TestStdout(t *testing.T) { func TestStdout(t *testing.T) {
@ -57,3 +60,27 @@ func TestStdcombined(t *testing.T) {
} }
} }
func TestPartialRead(t *testing.T) {
res1, err := Runner("python").
Arg("-c").
Arg("import sys; import time; print(\"first message\", flush=True); time.sleep(5); print(\"cant see me\", flush=True);").
Timeout(100 * time.Millisecond).
Run()
if err != nil {
t.Errorf("%v", err)
}
if !res1.CommandTimedOut {
t.Errorf("!CommandTimedOut")
}
if res1.StdErr != "" {
t.Errorf("res1.StdErr == '%v'", res1.StdErr)
}
if res1.StdOut != "first message\n" {
t.Errorf("res1.StdOut == '%v'", res1.StdOut)
}
if res1.StdCombined != "first message\n" {
t.Errorf("res1.StdCombined == '%v'", res1.StdCombined)
}
}

146
cmdext/pipereader.go Normal file
View File

@ -0,0 +1,146 @@
package cmdext
import (
"bufio"
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
"io"
"sync"
)
type pipeReader struct {
stdout io.ReadCloser
stderr io.ReadCloser
}
// Read ready stdout and stdin until finished
// also splits both pipes into lines and calld the listener
func (pr *pipeReader) Read(listener []CommandListener) (string, string, string, error) {
type combevt struct {
line string
stop bool
}
errch := make(chan error, 8)
wg := sync.WaitGroup{}
// [1] read raw stdout
wg.Add(1)
stdoutBufferReader, stdoutBufferWriter := io.Pipe()
stdout := ""
go func() {
buf := make([]byte, 128)
for true {
n, out := pr.stdout.Read(buf)
if n > 0 {
txt := string(buf[:n])
stdout += txt
_, _ = stdoutBufferWriter.Write(buf[:n])
for _, lstr := range listener {
lstr.ReadRawStdout(buf[:n])
}
}
if out == io.EOF {
break
}
if out != nil {
errch <- out
break
}
}
_ = stdoutBufferWriter.Close()
wg.Done()
}()
// [2] read raw stderr
wg.Add(1)
stderrBufferReader, stderrBufferWriter := io.Pipe()
stderr := ""
go func() {
buf := make([]byte, 128)
for true {
n, err := pr.stderr.Read(buf)
if n > 0 {
txt := string(buf[:n])
stderr += txt
_, _ = stderrBufferWriter.Write(buf[:n])
for _, lstr := range listener {
lstr.ReadRawStderr(buf[:n])
}
}
if err == io.EOF {
break
}
if err != nil {
errch <- err
break
}
}
_ = stderrBufferWriter.Close()
wg.Done()
}()
combch := make(chan combevt, 32)
// [3] collect stdout line-by-line
wg.Add(1)
go func() {
scanner := bufio.NewScanner(stdoutBufferReader)
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range listener {
lstr.ReadStdoutLine(txt)
}
combch <- combevt{txt, false}
}
combch <- combevt{"", true}
wg.Done()
}()
// [4] collect stderr line-by-line
wg.Add(1)
go func() {
scanner := bufio.NewScanner(stderrBufferReader)
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range listener {
lstr.ReadStderrLine(txt)
}
combch <- combevt{txt, false}
}
combch <- combevt{"", true}
wg.Done()
}()
// [5] combine stdcombined
wg.Add(1)
stdcombined := ""
go func() {
stopctr := 0
for stopctr < 2 {
vvv := <-combch
if vvv.stop {
stopctr++
} else {
stdcombined += vvv.line + "\n" // this comes from bufio.Scanner and has no newlines...
}
}
wg.Done()
}()
// wait for all (5) goroutines to finish
wg.Wait()
if err, ok := syncext.ReadNonBlocking(errch); ok {
return "", "", "", err
}
return stdout, stderr, stdcombined, nil
}