mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
rewrote REQCliCommandCont pipe reading
This commit is contained in:
parent
f77de0d17d
commit
b7c12d27d7
2 changed files with 30 additions and 27 deletions
|
@ -354,6 +354,13 @@ Will run the command given, and return the stdout output of the command continou
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**NB**: A github issue is filed on not killing all child processes when using pipes <https://github.com/golang/go/issues/23019>. This is relevant for this request type.
|
||||||
|
|
||||||
|
TODO: Check in later if there are any progress on the issue.
|
||||||
|
When testing the problem seems to appear when using sudo, or tcpdump without
|
||||||
|
the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
|
||||||
|
which makes stdout line buffered.
|
||||||
|
|
||||||
#### REQTailFile
|
#### REQTailFile
|
||||||
|
|
||||||
Tail log files on some node, and get the result for each new line read sent back in a reply message. Uses the methodTimeout to define for how long the command will run.
|
Tail log files on some node, and get the result for each new line read sent back in a reply message. Uses the methodTimeout to define for how long the command will run.
|
||||||
|
|
46
requests.go
46
requests.go
|
@ -1427,6 +1427,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||||
|
|
||||||
outCh := make(chan []byte)
|
outCh := make(chan []byte)
|
||||||
|
errCh := make(chan string)
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -1440,9 +1441,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("error: %v\n", er)
|
||||||
|
|
||||||
log.Printf("error: %v\n", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ErrorReader, err := cmd.StderrPipe()
|
ErrorReader, err := cmd.StderrPipe()
|
||||||
|
@ -1450,8 +1449,6 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
log.Printf("error: %v\n", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
|
@ -1461,34 +1458,33 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also send error messages that might happen during the time
|
|
||||||
// cmd.Start runs.
|
|
||||||
// Putting the scanner.Text value on a channel so we can make
|
|
||||||
// the scanner non-blocking, and also check context cancelation.
|
|
||||||
go func() {
|
go func() {
|
||||||
errCh := make(chan string, 1)
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(ErrorReader)
|
scanner := bufio.NewScanner(ErrorReader)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
select {
|
errCh <- scanner.Text()
|
||||||
case errCh <- scanner.Text():
|
|
||||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, <-errCh)
|
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
|
||||||
log.Printf("%v\n", er)
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
scanner := bufio.NewScanner(outReader)
|
scanner := bufio.NewScanner(outReader)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
select {
|
outCh <- []byte(scanner.Text() + "\n")
|
||||||
case outCh <- []byte(scanner.Text() + "\n"):
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// NB: sending cancel to command context, so processes are killed.
|
||||||
|
// A github issue is filed on not killing all child processes when using pipes:
|
||||||
|
// https://github.com/golang/go/issues/23019
|
||||||
|
// TODO: Check in later if there are any progress on the issue.
|
||||||
|
// When testing the problem seems to appear when using sudo, or tcpdump without
|
||||||
|
// the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
|
||||||
|
// which makes stdout line buffered.
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
log.Printf(" --------------- * error: REQCliCommandCont: cmd.Wait: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
@ -1502,9 +1498,9 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return
|
return
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
// Prepare and queue for sending a new message with the output
|
|
||||||
// of the action executed.
|
|
||||||
newReplyMessage(proc, message, out)
|
newReplyMessage(proc, message, out)
|
||||||
|
case out := <-errCh:
|
||||||
|
newReplyMessage(proc, message, []byte(out))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in a new issue