1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

working concept for method timeouts

This commit is contained in:
postmannen 2021-03-26 16:04:01 +01:00
parent 7f64048f8a
commit 3fdf7e40b9
3 changed files with 48 additions and 28 deletions

View file

@ -6,7 +6,7 @@ DefaultMessageTimeout = 5
NodeName = "central" NodeName = "central"
ProfilingPort = "" ProfilingPort = ""
PromHostAndPort = "" PromHostAndPort = ""
StartPubSayhello = 0 StartPubSayHello = 0
SubscribersDataFolder = "./data" SubscribersDataFolder = "./data"
[StartSubCLICommand] [StartSubCLICommand]

View file

@ -2,10 +2,10 @@
{ {
"toNode": "ship1", "toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"], "data": ["bash","-c","sleep 5 & echo 'apekatt'"],
"method":"CLICommandRequest", "method":"CLICommandRequest",
"timeout":20, "timeout":10,
"retries":3, "retries":3,
"MethodTimeout": 10 "MethodTimeout": 7
} }
] ]

View file

@ -33,6 +33,7 @@
package steward package steward
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -350,32 +351,51 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str
c := message.Data[0] c := message.Data[0]
a := message.Data[1:] a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
time.Sleep(time.Second * time.Duration(message.MethodTimeout)) go func() {
// Create a new message for the reply, and put it on the ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
// ringbuffer to be published. defer cancel()
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg) outCh := make(chan []byte)
if err != nil {
// In theory the system should drop the message before it reaches here. go func() {
log.Printf("error: methodCLICommandRequest: %v\n", err) cmd := exec.CommandContext(ctx, c, a...)
} out, err := cmd.Output()
proc.newMessagesCh <- []subjectAndMessage{nSAM} if err != nil {
log.Printf("error: %v\n", err)
}
outCh <- out
}()
select {
case <-ctx.Done():
fmt.Printf(" ** Before\n")
er := fmt.Errorf("error: method timed out %v", proc)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er)
fmt.Printf(" ** After\n")
case out := <-outCh:
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{nSAM}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -445,7 +465,7 @@ func (m methodCLICommandReply) getKind() CommandOrEvent {
} }
func (m methodCLICommandReply) handler(proc process, message Message, node string) ([]byte, error) { func (m methodCLICommandReply) handler(proc process, message Message, node string) ([]byte, error) {
fmt.Printf("### %v\n", message.Data) fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil