Mike Schwörer
c63cf442f8
All checks were successful
Build Docker and Deploy / Run goext test-suite (push) Successful in 39s
159 lines
2.9 KiB
Go
159 lines
2.9 KiB
Go
package cmdext
|
|
|
|
import (
|
|
"bufio"
|
|
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
type pipeReader struct {
|
|
lineBufferSize *int
|
|
stdout io.ReadCloser
|
|
stderr io.ReadCloser
|
|
}
|
|
|
|
// Read ready stdout and stdin until finished
|
|
// also splits both pipes into lines and calld the listener
|
|
func (pr *pipeReader) Read(listener []CommandListener) (string, string, string, error) {
|
|
type combevt struct {
|
|
line string
|
|
stop bool
|
|
}
|
|
|
|
errch := make(chan error, 8)
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
// [1] read raw stdout
|
|
|
|
wg.Add(1)
|
|
stdoutBufferReader, stdoutBufferWriter := io.Pipe()
|
|
stdout := ""
|
|
go func() {
|
|
buf := make([]byte, 128)
|
|
for {
|
|
n, err := pr.stdout.Read(buf)
|
|
if n > 0 {
|
|
txt := string(buf[:n])
|
|
stdout += txt
|
|
_, _ = stdoutBufferWriter.Write(buf[:n])
|
|
for _, lstr := range listener {
|
|
lstr.ReadRawStdout(buf[:n])
|
|
}
|
|
}
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
errch <- err
|
|
break
|
|
}
|
|
}
|
|
_ = stdoutBufferWriter.Close()
|
|
wg.Done()
|
|
}()
|
|
|
|
// [2] read raw stderr
|
|
|
|
wg.Add(1)
|
|
stderrBufferReader, stderrBufferWriter := io.Pipe()
|
|
stderr := ""
|
|
go func() {
|
|
buf := make([]byte, 128)
|
|
for {
|
|
n, err := pr.stderr.Read(buf)
|
|
|
|
if n > 0 {
|
|
txt := string(buf[:n])
|
|
stderr += txt
|
|
_, _ = stderrBufferWriter.Write(buf[:n])
|
|
for _, lstr := range listener {
|
|
lstr.ReadRawStderr(buf[:n])
|
|
}
|
|
}
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
errch <- err
|
|
break
|
|
}
|
|
}
|
|
_ = stderrBufferWriter.Close()
|
|
wg.Done()
|
|
}()
|
|
|
|
combch := make(chan combevt, 32)
|
|
|
|
// [3] collect stdout line-by-line
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
scanner := bufio.NewScanner(stdoutBufferReader)
|
|
if pr.lineBufferSize != nil {
|
|
scanner.Buffer([]byte{}, *pr.lineBufferSize)
|
|
}
|
|
for scanner.Scan() {
|
|
txt := scanner.Text()
|
|
for _, lstr := range listener {
|
|
lstr.ReadStdoutLine(txt)
|
|
}
|
|
combch <- combevt{txt, false}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
errch <- err
|
|
}
|
|
combch <- combevt{"", true}
|
|
wg.Done()
|
|
}()
|
|
|
|
// [4] collect stderr line-by-line
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
scanner := bufio.NewScanner(stderrBufferReader)
|
|
if pr.lineBufferSize != nil {
|
|
scanner.Buffer([]byte{}, *pr.lineBufferSize)
|
|
}
|
|
for scanner.Scan() {
|
|
txt := scanner.Text()
|
|
for _, lstr := range listener {
|
|
lstr.ReadStderrLine(txt)
|
|
}
|
|
combch <- combevt{txt, false}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
errch <- err
|
|
}
|
|
combch <- combevt{"", true}
|
|
wg.Done()
|
|
}()
|
|
|
|
// [5] combine stdcombined
|
|
|
|
wg.Add(1)
|
|
stdcombined := ""
|
|
go func() {
|
|
stopctr := 0
|
|
for stopctr < 2 {
|
|
vvv := <-combch
|
|
if vvv.stop {
|
|
stopctr++
|
|
} else {
|
|
stdcombined += vvv.line + "\n" // this comes from bufio.Scanner and has no newlines...
|
|
}
|
|
}
|
|
wg.Done()
|
|
}()
|
|
|
|
// wait for all (5) goroutines to finish
|
|
wg.Wait()
|
|
|
|
if err, ok := syncext.ReadNonBlocking(errch); ok {
|
|
return "", "", "", err
|
|
}
|
|
|
|
return stdout, stderr, stdcombined, nil
|
|
}
|