mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
removed REQnCliCommand request type
This commit is contained in:
parent
d5082cc7a4
commit
855275d19f
5 changed files with 1 additions and 170 deletions
25
README.md
25
README.md
|
@ -15,15 +15,12 @@ The idea behind Steward is to help out with exactly that, and let you handle the
|
|||
- [Logical structure](#logical-structure)
|
||||
- [Terminology](#terminology)
|
||||
- [Features](#features)
|
||||
- [Messages in order](#messages-in-order)
|
||||
- [Messages not in order](#messages-not-in-order)
|
||||
- [Error messages from nodes](#error-messages-from-nodes)
|
||||
- [Message handling and threads](#message-handling-and-threads)
|
||||
- [Timeouts and retries](#timeouts-and-retries)
|
||||
- [Flags and configuration file](#flags-and-configuration-file)
|
||||
- [Request Methods](#request-methods)
|
||||
- [REQCliCommand](#reqclicommand)
|
||||
- [REQnCliCommand](#reqnclicommand)
|
||||
- [REQnCliCommandCont](#reqnclicommandcont)
|
||||
- [REQTailFile](#reqtailfile)
|
||||
- [REQHttpGet](#reqhttpget)
|
||||
|
@ -100,8 +97,6 @@ Send Shell Commands, HTTP Get, or Tail log files to control your servers by pass
|
|||
|
||||
Steward uses Nats as message passing architecture for the commands back and forth from nodes, where delivery is guaranteed, and where all of the processes in the system are running concurrently so if something breaks or some process is slow it will not affect the handling and delivery of the other messages in the system.
|
||||
|
||||
By default the system guarantees that the order of the messages are handled by the subscriber in the order they where sent. There have also been implemented a special type `NOSEQ` which will allow messages within that process to be handles in a not sequential manner. This is handy for jobs that will run for a long time, and where other messages are not dependent on it's result.
|
||||
|
||||
A node can be a server running any host operating system, a container living in the cloud somewhere, a rapsberry pi, or something else that needs to be controlled that have an operating system installed.
|
||||
|
||||
## Inspiration
|
||||
|
@ -156,16 +151,6 @@ If one process hangs on a long running message method it will not affect the res
|
|||
|
||||
## Features
|
||||
|
||||
### Messages in order
|
||||
|
||||
- By default the system guarantees that the order of the messages are handled by the subscriber in the order they where sent. So if a network link is down when the message is being sent, it will automatically be rescheduled at the specified interval with the given number of retries.
|
||||
|
||||
These types of messages have method starting with `REQ<Method name>`
|
||||
|
||||
### Messages not in order
|
||||
|
||||
- There have been implemented a special method type `REQn<Method name>` which will allow messages to be handled within that process in a not sequential manner. This is handy for jobs that will run for a long time, and where other messages are not dependent on it's result.
|
||||
|
||||
### Error messages from nodes
|
||||
|
||||
- Error messages will be sent back to the central error handler upon failure on a node.
|
||||
|
@ -180,7 +165,7 @@ These types of messages have method starting with `REQ<Method name>`
|
|||
|
||||
- Publishing processes will potentially be able to send to all nodes. It is the subscribing nodes who will limit from where and what they will receive from.
|
||||
|
||||
- Messages not fully processed or not started yet will be automatically handled in chronological order if the service is restarted since the current state of all the messages being processed are stored on the local node in a key value store until they are finished.
|
||||
- Messages not fully processed or not started yet will be automatically rehandled if the service is restarted since the current state of all the messages being processed are stored on the local node in a key value store until they are finished.
|
||||
|
||||
- All messages processed by a publisher will be written to a log file as they are processed, with all the information needed to recreate the same message if needed, or it can be used for auditing.
|
||||
|
||||
|
@ -215,12 +200,6 @@ Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
|||
|
||||
Will run the command given, and return the stdout output of the command when the command is done.
|
||||
|
||||
#### REQnCliCommand
|
||||
|
||||
Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
||||
|
||||
Will run the command given without the execution order guarantee, and return the stdout output of the command when the command is done.
|
||||
|
||||
#### REQnCliCommandCont
|
||||
|
||||
Run CLI command on a node. Linux/Windows/Mac/Docker-container or other.
|
||||
|
@ -343,8 +322,6 @@ The location of the config file are given via an env variable at startup (defaul
|
|||
StartSubREQPong bool
|
||||
// Subscriber for CLICommandRequest
|
||||
StartSubREQCliCommand bool
|
||||
// Subscriber for REQnCliCommand
|
||||
StartSubREQnCliCommand bool
|
||||
// Subscriber for REQToConsole
|
||||
StartSubREQToConsole bool
|
||||
// Subscriber for REQHttpGet
|
||||
|
|
|
@ -72,8 +72,6 @@ type Configuration struct {
|
|||
StartSubREQPong bool
|
||||
// Subscriber for CLICommandRequest
|
||||
StartSubREQCliCommand bool
|
||||
// Subscriber for REQnCliCommand
|
||||
StartSubREQnCliCommand bool
|
||||
// Subscriber for REQToConsole
|
||||
StartSubREQToConsole bool
|
||||
// Subscriber for REQHttpGet
|
||||
|
@ -159,7 +157,6 @@ func newConfigurationDefaults() Configuration {
|
|||
StartSubREQPing: true,
|
||||
StartSubREQPong: true,
|
||||
StartSubREQCliCommand: true,
|
||||
StartSubREQnCliCommand: true,
|
||||
StartSubREQToConsole: true,
|
||||
StartSubREQHttpGet: true,
|
||||
StartSubREQTailFile: true,
|
||||
|
@ -309,11 +306,6 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
|||
} else {
|
||||
conf.StartSubREQCliCommand = *cf.StartSubREQCliCommand
|
||||
}
|
||||
if cf.StartSubREQnCliCommand == nil {
|
||||
conf.StartSubREQnCliCommand = cd.StartSubREQnCliCommand
|
||||
} else {
|
||||
conf.StartSubREQnCliCommand = *cf.StartSubREQnCliCommand
|
||||
}
|
||||
if cf.StartSubREQToConsole == nil {
|
||||
conf.StartSubREQToConsole = cd.StartSubREQToConsole
|
||||
} else {
|
||||
|
@ -395,7 +387,6 @@ func (c *Configuration) CheckFlags() error {
|
|||
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQnCliCommand, "startSubREQnCliCommand", fc.StartSubREQnCliCommand, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQToConsole, "startSubREQToConsole", fc.StartSubREQToConsole, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", fc.StartSubREQHttpGet, "true/false")
|
||||
flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false")
|
||||
|
|
12
processes.go
12
processes.go
|
@ -98,11 +98,6 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subREQCliCommand(proc)
|
||||
}
|
||||
|
||||
// Start a subscriber for Not In Order Cli Command Request messages
|
||||
if proc.configuration.StartSubREQnCliCommand {
|
||||
proc.startup.subREQnCliCommand(proc)
|
||||
}
|
||||
|
||||
// Start a subscriber for CLICommandReply messages
|
||||
if proc.configuration.StartSubREQToConsole {
|
||||
proc.startup.subREQToConsole(proc)
|
||||
|
@ -214,13 +209,6 @@ func (s startup) subREQToConsole(p process) {
|
|||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQnCliCommand(p process) {
|
||||
log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQnCliCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQCliCommand(p process) {
|
||||
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCliCommand, string(p.node))
|
||||
|
|
95
requests.go
95
requests.go
|
@ -73,18 +73,6 @@ const (
|
|||
// The data field is a slice of strings where the first string
|
||||
// value should be the command, and the following the arguments.
|
||||
REQCliCommand Method = "REQCliCommand"
|
||||
// Execute a CLI command in for example bash or cmd.
|
||||
// This is an event type, where a message will be sent to a
|
||||
// node with the command to execute and an ACK will be replied
|
||||
// if it was delivered succesfully. The output of the command
|
||||
// ran will be delivered back to the node where it was initiated
|
||||
// as a new message.
|
||||
// The NOSEQ method will process messages as they are recived,
|
||||
// and the reply back will be sent as soon as the process is
|
||||
// done. No order are preserved.
|
||||
// The data field is a slice of strings where the first string
|
||||
// value should be the command, and the following the arguments.
|
||||
REQnCliCommand Method = "REQnCliCommand"
|
||||
// REQnCliCommandCont same as normal Cli command, but can be used
|
||||
// when running a command that will take longer time and you want
|
||||
// to send the output of the command continually back as it is
|
||||
|
@ -149,9 +137,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQCliCommand: methodREQCliCommand{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
REQnCliCommand: methodREQnCliCommand{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
REQnCliCommandCont: methodREQnCliCommandCont{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
|
@ -913,86 +898,6 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// --- methodCLICommandRequestNOSEQ
|
||||
|
||||
type methodREQnCliCommand struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodREQnCliCommand) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
// handler to run a CLI command with timeout context. The handler will
|
||||
// return the output of the command run back to the calling publisher
|
||||
// as a new message.
|
||||
// The NOSEQ method will process messages as they are recived,
|
||||
// and the reply back will be sent as soon as the process is
|
||||
// done. No order are preserved.
|
||||
func (m methodREQnCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
log.Printf("<--- nCLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
|
||||
// Execute the CLI command in it's own go routine, so we are able
|
||||
// to return immediately with an ack reply that the messag was
|
||||
// received, and we create a new message to send back to the calling
|
||||
// node for the out put of the actual command.
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
c := message.MethodArgs[0]
|
||||
a := message.MethodArgs[1:]
|
||||
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
|
||||
outCh := make(chan []byte)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
cmd := exec.CommandContext(ctx, c, a...)
|
||||
var out bytes.Buffer
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stdout = &out
|
||||
cmd.Stderr = &stderr
|
||||
err := cmd.Run()
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
log.Printf("error: failed to io.ReadAll of stderr: %v\n", err)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String())
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
select {
|
||||
case outCh <- out.Bytes():
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQnCliCommand: method timed out %v", message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
||||
// Prepare and queue for sending a new message with the output
|
||||
// of the action executed.
|
||||
newReplyMessage(proc, message, out)
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
type methodREQToConsole struct {
|
||||
|
|
|
@ -56,7 +56,6 @@ func TestStewardServer(t *testing.T) {
|
|||
DefaultMessageTimeout: 3,
|
||||
|
||||
StartSubREQCliCommand: true,
|
||||
StartSubREQnCliCommand: true,
|
||||
StartSubREQnCliCommandCont: true,
|
||||
StartSubREQToConsole: true,
|
||||
StartSubREQToFileAppend: true,
|
||||
|
@ -83,7 +82,6 @@ func TestStewardServer(t *testing.T) {
|
|||
funcs := []testFunc{
|
||||
checkREQOpCommandTest,
|
||||
checkREQCliCommandTest,
|
||||
checkREQnCliCommandTest,
|
||||
checkREQnCliCommandContTest,
|
||||
// checkREQToConsoleTest(conf, t), NB: No tests will be made for console ouput.
|
||||
// checkREQToFileAppendTest(conf, t), NB: Already tested via others
|
||||
|
@ -177,34 +175,6 @@ func checkREQCliCommandTest(stewardServer *server, conf *Configuration, t *testi
|
|||
return nil
|
||||
}
|
||||
|
||||
// The non-sequential sending of CLI Commands.
|
||||
func checkREQnCliCommandTest(stewardServer *server, conf *Configuration, t *testing.T) error {
|
||||
m := `[
|
||||
{
|
||||
"directory":"commands-executed",
|
||||
"fileName":"fileName.result",
|
||||
"toNode": "central",
|
||||
"data": ["bash","-c","echo apekatt"],
|
||||
"replyMethod":"REQToFileAppend",
|
||||
"method":"REQnCliCommand",
|
||||
"ACKTimeout":3,
|
||||
"retries":3,
|
||||
"methodTimeout": 10
|
||||
}
|
||||
]`
|
||||
|
||||
writeToSocketTest(conf, m, t)
|
||||
|
||||
resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "fileName.result")
|
||||
_, err := findStringInFileTest("apekatt", resultFile, conf, t)
|
||||
if err != nil {
|
||||
return fmt.Errorf(" [FAILED] : checkREQnCliCommandTest: %v", err)
|
||||
}
|
||||
|
||||
t.Logf(" \U0001f600 [SUCCESS] : checkREQnCliCommandTest\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// The continous non-sequential sending of CLI Commands.
|
||||
func checkREQnCliCommandContTest(stewardServer *server, conf *Configuration, t *testing.T) error {
|
||||
m := `[
|
||||
|
|
Loading…
Add table
Reference in a new issue