mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 04:49:17 +00:00
Added REQnCliCommandCont Req type
This commit is contained in:
parent
73929bb8d0
commit
271b9d5258
2 changed files with 115 additions and 0 deletions
35
README.md
35
README.md
|
@ -153,6 +153,14 @@ TIP: Most likely the best way to control how the service should behave and what
|
||||||
|
|
||||||
Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
||||||
|
|
||||||
|
Will run the command given, and return the stdout output of the command when the command is done.
|
||||||
|
|
||||||
|
#### REQnCliCommandCont
|
||||||
|
|
||||||
|
Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
||||||
|
|
||||||
|
Will run the command given, and return the stdout output of the command continously while the command runs.
|
||||||
|
|
||||||
#### REQTailFile
|
#### REQTailFile
|
||||||
|
|
||||||
Tail log files on some node, and get the result sent back in a reply message.
|
Tail log files on some node, and get the result sent back in a reply message.
|
||||||
|
@ -369,6 +377,33 @@ To start a process of a specified type on a node.
|
||||||
},
|
},
|
||||||
```
|
```
|
||||||
|
|
||||||
|
and another example
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"directory":"opcommand_logs",
|
||||||
|
"fileExtension": ".log",
|
||||||
|
"toNode": "ship2",
|
||||||
|
"data": [],
|
||||||
|
"method":"REQOpCommand",
|
||||||
|
"operation":{
|
||||||
|
"opCmd":"startProc",
|
||||||
|
"opArg": {
|
||||||
|
"method": "REQnCliCommandCont",
|
||||||
|
"allowedNodes": ["central"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"replyMethod":"REQToFileAppend",
|
||||||
|
"ACKTimeout":3,
|
||||||
|
"retries":3,
|
||||||
|
"replyACKTimeout":3,
|
||||||
|
"replyRetries":3,
|
||||||
|
"MethodTimeout": 7
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
NB: Both the keys and the values used are case sensitive.
|
NB: Both the keys and the values used are case sensitive.
|
||||||
|
|
||||||
#### Sending a command from one Node to Another Node
|
#### Sending a command from one Node to Another Node
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -83,6 +84,11 @@ const (
|
||||||
// The data field is a slice of strings where the first string
|
// The data field is a slice of strings where the first string
|
||||||
// value should be the command, and the following the arguments.
|
// value should be the command, and the following the arguments.
|
||||||
REQnCliCommand Method = "REQnCliCommand"
|
REQnCliCommand Method = "REQnCliCommand"
|
||||||
|
// REQnCliCommandCont same as normal Cli command, but can be used
|
||||||
|
// when running a command that will take longer time and you want
|
||||||
|
// to send the output of the command continually back as it is
|
||||||
|
// generated, and not just when the command is finished.
|
||||||
|
REQnCliCommandCont Method = "REQnCliCommandCont"
|
||||||
// Send text to be logged to the console.
|
// Send text to be logged to the console.
|
||||||
// The data field is a slice of strings where the first string
|
// The data field is a slice of strings where the first string
|
||||||
// value should be the command, and the following the arguments.
|
// value should be the command, and the following the arguments.
|
||||||
|
@ -143,6 +149,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
REQnCliCommand: methodREQnCliCommand{
|
REQnCliCommand: methodREQnCliCommand{
|
||||||
commandOrEvent: CommandACK,
|
commandOrEvent: CommandACK,
|
||||||
},
|
},
|
||||||
|
REQnCliCommandCont: methodREQnCliCommandCont{
|
||||||
|
commandOrEvent: CommandACK,
|
||||||
|
},
|
||||||
REQToConsole: methodREQToConsole{
|
REQToConsole: methodREQToConsole{
|
||||||
commandOrEvent: EventACK,
|
commandOrEvent: EventACK,
|
||||||
},
|
},
|
||||||
|
@ -967,3 +976,74 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
||||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
return ackMsg, nil
|
return ackMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
// --- methodREQTailFile
|
||||||
|
|
||||||
|
type methodREQnCliCommandCont struct {
|
||||||
|
commandOrEvent CommandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQnCliCommandCont) getKind() CommandOrEvent {
|
||||||
|
return m.commandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to run a tailing of files with timeout context. The handler will
|
||||||
|
// return the output of the command run back to the calling publisher
|
||||||
|
// as a new message.
|
||||||
|
func (m methodREQnCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
|
||||||
|
// Execute the CLI command in it's own go routine, so we are able
|
||||||
|
// to return immediately with an ack reply that the message was
|
||||||
|
// received, and we create a new message to send back to the calling
|
||||||
|
// node for the out put of the actual command.
|
||||||
|
go func() {
|
||||||
|
c := message.Data[0]
|
||||||
|
a := message.Data[1:]
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
cmd := exec.CommandContext(ctx, c, a...)
|
||||||
|
|
||||||
|
// Using cmd.StdoutPipe here so we are continuosly
|
||||||
|
// able to read the out put of the command.
|
||||||
|
outReader, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
log.Printf("error: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(outReader)
|
||||||
|
for scanner.Scan() {
|
||||||
|
outCh <- []byte(scanner.Text() + "\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check if context timer or command output were received.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
return
|
||||||
|
case out := <-outCh:
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue