diff --git a/README.md b/README.md index 1fc14ce..084fbed 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,14 @@ TIP: Most likely the best way to control how the service should behave and what Run CLI command on a node. Linux/Windows/Mac/Docker-container or other. +Will run the command given, and return the stdout output of the command when the command is done. + +#### REQnCliCommandCont + +Run CLI command on a node. Linux/Windows/Mac/Docker-container or other. + +Will run the command given, and return the stdout output of the command continously while the command runs. + #### REQTailFile Tail log files on some node, and get the result sent back in a reply message. @@ -369,6 +377,33 @@ To start a process of a specified type on a node. }, ``` +and another example + +```json +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": [], + "method":"REQOpCommand", + "operation":{ + "opCmd":"startProc", + "opArg": { + "method": "REQnCliCommandCont", + "allowedNodes": ["central"] + } + }, + "replyMethod":"REQToFileAppend", + "ACKTimeout":3, + "retries":3, + "replyACKTimeout":3, + "replyRetries":3, + "MethodTimeout": 7 + } +] +``` + NB: Both the keys and the values used are case sensitive. #### Sending a command from one Node to Another Node diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 8661593..ef90da3 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -33,6 +33,7 @@ package steward import ( + "bufio" "context" "encoding/json" "fmt" @@ -83,6 +84,11 @@ const ( // The data field is a slice of strings where the first string // value should be the command, and the following the arguments. REQnCliCommand Method = "REQnCliCommand" + // REQnCliCommandCont same as normal Cli command, but can be used + // when running a command that will take longer time and you want + // to send the output of the command continually back as it is + // generated, and not just when the command is finished. + REQnCliCommandCont Method = "REQnCliCommandCont" // Send text to be logged to the console. // The data field is a slice of strings where the first string // value should be the command, and the following the arguments. @@ -143,6 +149,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQnCliCommand: methodREQnCliCommand{ commandOrEvent: CommandACK, }, + REQnCliCommandCont: methodREQnCliCommandCont{ + commandOrEvent: CommandACK, + }, REQToConsole: methodREQToConsole{ commandOrEvent: EventACK, }, @@ -967,3 +976,74 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +// --- + +// --- methodREQTailFile + +type methodREQnCliCommandCont struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQnCliCommandCont) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// handler to run a tailing of files with timeout context. The handler will +// return the output of the command run back to the calling publisher +// as a new message. +func (m methodREQnCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + + // Execute the CLI command in it's own go routine, so we are able + // to return immediately with an ack reply that the message was + // received, and we create a new message to send back to the calling + // node for the out put of the actual command. + go func() { + c := message.Data[0] + a := message.Data[1:] + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + + outCh := make(chan []byte) + + go func() { + cmd := exec.CommandContext(ctx, c, a...) + + // Using cmd.StdoutPipe here so we are continuosly + // able to read the out put of the command. + outReader, err := cmd.StdoutPipe() + if err != nil { + log.Printf("error: %v\n", err) + } + + if err := cmd.Start(); err != nil { + log.Printf("error: %v\n", err) + } + + scanner := bufio.NewScanner(outReader) + for scanner.Scan() { + outCh <- []byte(scanner.Text() + "\n") + } + + }() + + // Check if context timer or command output were received. + for { + select { + case <-ctx.Done(): + cancel() + er := fmt.Errorf("info: method timeout reached, canceling: %v", message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return + case out := <-outCh: + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, out) + } + } + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +}