package cmdext import ( "bufio" "gogs.mikescher.com/BlackForestBytes/goext/syncext" "io" "sync" ) type pipeReader struct { lineBufferSize *int stdout io.ReadCloser stderr io.ReadCloser } // Read ready stdout and stdin until finished // also splits both pipes into lines and calld the listener func (pr *pipeReader) Read(listener []CommandListener) (string, string, string, error) { type combevt struct { line string stop bool } errch := make(chan error, 8) wg := sync.WaitGroup{} // [1] read raw stdout wg.Add(1) stdoutBufferReader, stdoutBufferWriter := io.Pipe() stdout := "" go func() { buf := make([]byte, 128) for true { n, out := pr.stdout.Read(buf) if n > 0 { txt := string(buf[:n]) stdout += txt _, _ = stdoutBufferWriter.Write(buf[:n]) for _, lstr := range listener { lstr.ReadRawStdout(buf[:n]) } } if out == io.EOF { break } if out != nil { errch <- out break } } _ = stdoutBufferWriter.Close() wg.Done() }() // [2] read raw stderr wg.Add(1) stderrBufferReader, stderrBufferWriter := io.Pipe() stderr := "" go func() { buf := make([]byte, 128) for true { n, err := pr.stderr.Read(buf) if n > 0 { txt := string(buf[:n]) stderr += txt _, _ = stderrBufferWriter.Write(buf[:n]) for _, lstr := range listener { lstr.ReadRawStderr(buf[:n]) } } if err == io.EOF { break } if err != nil { errch <- err break } } _ = stderrBufferWriter.Close() wg.Done() }() combch := make(chan combevt, 32) // [3] collect stdout line-by-line wg.Add(1) go func() { scanner := bufio.NewScanner(stdoutBufferReader) if pr.lineBufferSize != nil { scanner.Buffer([]byte{}, *pr.lineBufferSize) } for scanner.Scan() { txt := scanner.Text() for _, lstr := range listener { lstr.ReadStdoutLine(txt) } combch <- combevt{txt, false} } if err := scanner.Err(); err != nil { errch <- err } combch <- combevt{"", true} wg.Done() }() // [4] collect stderr line-by-line wg.Add(1) go func() { scanner := bufio.NewScanner(stderrBufferReader) if pr.lineBufferSize != nil { scanner.Buffer([]byte{}, *pr.lineBufferSize) } for scanner.Scan() { txt := scanner.Text() for _, lstr := range listener { lstr.ReadStderrLine(txt) } combch <- combevt{txt, false} } if err := scanner.Err(); err != nil { errch <- err } combch <- combevt{"", true} wg.Done() }() // [5] combine stdcombined wg.Add(1) stdcombined := "" go func() { stopctr := 0 for stopctr < 2 { vvv := <-combch if vvv.stop { stopctr++ } else { stdcombined += vvv.line + "\n" // this comes from bufio.Scanner and has no newlines... } } wg.Done() }() // wait for all (5) goroutines to finish wg.Wait() if err, ok := syncext.ReadNonBlocking(errch); ok { return "", "", "", err } return stdout, stderr, stdcombined, nil }