diff --git a/example/toShip1-CLICommandRequest1.json b/example/toShip1-CLICommandRequest1.json new file mode 100644 index 0000000..e02a523 --- /dev/null +++ b/example/toShip1-CLICommandRequest1.json @@ -0,0 +1,11 @@ +[ + { + + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommandRequest", + "timeout":20, + "retries":3, + "MethodTimeout": 10 + } +] \ No newline at end of file diff --git a/example/toShip1-CLICommandRequest2.json b/example/toShip1-CLICommandRequest2.json new file mode 100644 index 0000000..672777a --- /dev/null +++ b/example/toShip1-CLICommandRequest2.json @@ -0,0 +1,11 @@ +[ + { + + "toNode": "ship1", + "data": ["bash","-c","tree ../"], + "method":"CLICommandRequestNOSEQ", + "timeout":10, + "retries":3, + "MethodTimeout": 0 + } +] \ No newline at end of file diff --git a/message-and-subject.go b/message-and-subject.go index b367e97..f82a7b1 100644 --- a/message-and-subject.go +++ b/message-and-subject.go @@ -17,12 +17,15 @@ type Message struct { // method, what is this message doing, etc. CLI, syslog, etc. Method Method `json:"method" yaml:"method"` FromNode node + // Reply wait timeout + Timeout int `json:"timeout" yaml:"timeout"` + // Resend retries + Retries int `json:"retries" yaml:"retries"` // done is used to signal when a message is fully processed. // This is used when choosing when to move the message from // the ringbuffer into the time series log. - Timeout int `json:"timeout" yaml:"timeout"` - Retries int `json:"retries" yaml:"retries"` - done chan struct{} + MethodTimeout int `json:"methodTimeout" yaml:"methodTimeout"` + done chan struct{} } // gobEncodePayload will encode the message structure along with its diff --git a/runProcessesAtStartup.go b/runProcessesAtStartup.go index 0b1c288..747a573 100644 --- a/runProcessesAtStartup.go +++ b/runProcessesAtStartup.go @@ -114,4 +114,28 @@ func (s *server) ProcessesStart() { proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) go proc.spawnWorker(s) } + + // Start a subscriber for CLICommandRequest messages + { + fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName) + sub := newSubject(CLICommandRequest, EventACK, s.nodeName) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s) + } + + // Start a subscriber for CLICommandRequest messages + { + fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName) + sub := newSubject(CLICommandRequestNOSEQ, EventACK, s.nodeName) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s) + } + + // Start a subscriber for CLICommandReply messages + { + fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName) + sub := newSubject(CLICommandReply, EventACK, s.nodeName) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s) + } } diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index 381b8f7..34102cd 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -38,6 +38,7 @@ import ( "os" "os/exec" "path/filepath" + "time" ) // ------------------------------------------------------------ @@ -46,6 +47,15 @@ import ( const ( // Shell command to be executed via f.ex. bash CLICommand Method = "CLICommand" + // Shell command to be executed via f.ex. bash + CLICommandRequest Method = "CLICommandRequest" + // Shell command to be executed via f.ex. bash + // The NOSEQ method will process messages as they are recived, + // and the reply back will be sent as soon as the process is + // done. No order are preserved. + CLICommandRequestNOSEQ Method = "CLICommandRequestNOSEQ" + // Will generate a reply for a CLICommandRequest + CLICommandReply Method = "CLICommandReply" // Send text logging to some host TextLogging Method = "TextLogging" // Send Hello I'm here message @@ -55,8 +65,7 @@ const ( // Echo request will ask the subscriber for a // reply generated as a new message ECHORequest Method = "ECHORequest" - // Echo reply will generate a response to a - // recived Echo request + // Will generate a reply for a ECHORequest ECHOReply Method = "ECHOReply" ) @@ -78,6 +87,15 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { CLICommand: methodSubscriberCLICommand{ commandOrEvent: CommandACK, }, + CLICommandRequest: methodSubscriberCLICommandRequest{ + commandOrEvent: EventACK, + }, + CLICommandRequestNOSEQ: methodSubscriberCLICommandRequestNOSEQ{ + commandOrEvent: EventACK, + }, + CLICommandReply: methodSubscriberCLICommandReply{ + commandOrEvent: EventACK, + }, TextLogging: methodSubscriberTextLogging{ commandOrEvent: EventACK, }, @@ -283,3 +301,107 @@ func (m methodSubscriberEchoReply) handler(proc process, message Message, node s ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +// --- methodSubscriberCLICommandRequest + +type methodSubscriberCLICommandRequest struct { + commandOrEvent CommandOrEvent +} + +func (m methodSubscriberCLICommandRequest) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodSubscriberCLICommandRequest) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + + c := message.Data[0] + a := message.Data[1:] + cmd := exec.Command(c, a...) + //cmd.Stdout = os.Stdout + out, err := cmd.CombinedOutput() + if err != nil { + log.Printf("error: execution of command failed: %v\n", err) + } + + time.Sleep(time.Second * time.Duration(message.MethodTimeout)) + + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{string(out)}, + Method: CLICommandReply, + Timeout: 3, + Retries: 3, + } + fmt.Printf("** %#v\n", newMsg) + proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)} + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// --- methodSubscriberCLICommandRequestNOSEQ + +type methodSubscriberCLICommandRequestNOSEQ struct { + commandOrEvent CommandOrEvent +} + +func (m methodSubscriberCLICommandRequestNOSEQ) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// The NOSEQ method will process messages as they are recived, +// and the reply back will be sent as soon as the process is +// done. No order are preserved. +func (m methodSubscriberCLICommandRequestNOSEQ) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + + go func() { + + c := message.Data[0] + a := message.Data[1:] + cmd := exec.Command(c, a...) + //cmd.Stdout = os.Stdout + out, err := cmd.CombinedOutput() + if err != nil { + log.Printf("error: execution of command failed: %v\n", err) + } + + time.Sleep(time.Second * time.Duration(message.MethodTimeout)) + + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{string(out)}, + Method: CLICommandReply, + Timeout: 3, + Retries: 3, + } + fmt.Printf("** %#v\n", newMsg) + proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)} + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// --- + +type methodSubscriberCLICommandReply struct { + commandOrEvent CommandOrEvent +} + +func (m methodSubscriberCLICommandReply) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodSubscriberCLICommandReply) handler(proc process, message Message, node string) ([]byte, error) { + fmt.Printf("### %v\n", message.Data) + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +}