diff --git a/etc/config.toml b/etc/config.toml index 4d0ff37..66f6be6 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -6,7 +6,7 @@ DefaultMessageTimeout = 5 NodeName = "central" ProfilingPort = "" PromHostAndPort = "" -StartPubSayhello = 0 +StartPubSayHello = 0 SubscribersDataFolder = "./data" [StartSubCLICommand] diff --git a/example/toShip1-CLICommandRequest1.json b/example/toShip1-CLICommandRequest1.json index e02a523..73b7fd6 100644 --- a/example/toShip1-CLICommandRequest1.json +++ b/example/toShip1-CLICommandRequest1.json @@ -2,10 +2,10 @@ { "toNode": "ship1", - "data": ["bash","-c","netstat -an|grep -i listen"], + "data": ["bash","-c","sleep 5 & echo 'apekatt'"], "method":"CLICommandRequest", - "timeout":20, + "timeout":10, "retries":3, - "MethodTimeout": 10 + "MethodTimeout": 7 } ] \ No newline at end of file diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 2b5be8a..0b46543 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -33,6 +33,7 @@ package steward import ( + "context" "fmt" "log" "os" @@ -350,32 +351,51 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str c := message.Data[0] a := message.Data[1:] - cmd := exec.Command(c, a...) - //cmd.Stdout = os.Stdout - out, err := cmd.CombinedOutput() - if err != nil { - log.Printf("error: execution of command failed: %v\n", err) - } - time.Sleep(time.Second * time.Duration(message.MethodTimeout)) + go func() { - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(out)}, - Method: CLICommandReply, - Timeout: 3, - Retries: 3, - } - fmt.Printf("** %#v\n", newMsg) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + defer cancel() - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: methodCLICommandRequest: %v\n", err) - } - proc.newMessagesCh <- []subjectAndMessage{nSAM} + outCh := make(chan []byte) + + go func() { + cmd := exec.CommandContext(ctx, c, a...) + out, err := cmd.Output() + if err != nil { + log.Printf("error: %v\n", err) + } + outCh <- out + }() + + select { + case <-ctx.Done(): + fmt.Printf(" ** Before\n") + er := fmt.Errorf("error: method timed out %v", proc) + sendErrorLogMessage(proc.newMessagesCh, proc.node, er) + fmt.Printf(" ** After\n") + case out := <-outCh: + + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{string(out)}, + Method: CLICommandReply, + Timeout: 3, + Retries: 3, + } + fmt.Printf("** %#v\n", newMsg) + + nSAM, err := newSAM(newMsg) + if err != nil { + // In theory the system should drop the message before it reaches here. + log.Printf("error: methodCLICommandRequest: %v\n", err) + } + proc.newMessagesCh <- []subjectAndMessage{nSAM} + } + + }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil @@ -445,7 +465,7 @@ func (m methodCLICommandReply) getKind() CommandOrEvent { } func (m methodCLICommandReply) handler(proc process, message Message, node string) ([]byte, error) { - fmt.Printf("### %v\n", message.Data) + fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil