1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

request subscriber and non sequential processing of messages implemented.

This commit is contained in:
postmannen 2021-03-11 12:07:09 +01:00
parent 48399e1958
commit 6864d7a946
5 changed files with 176 additions and 5 deletions

View file

@ -0,0 +1,11 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommandRequest",
"timeout":20,
"retries":3,
"MethodTimeout": 10
}
]

View file

@ -0,0 +1,11 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","tree ../"],
"method":"CLICommandRequestNOSEQ",
"timeout":10,
"retries":3,
"MethodTimeout": 0
}
]

View file

@ -17,12 +17,15 @@ type Message struct {
// method, what is this message doing, etc. CLI, syslog, etc.
Method Method `json:"method" yaml:"method"`
FromNode node
// Reply wait timeout
Timeout int `json:"timeout" yaml:"timeout"`
// Resend retries
Retries int `json:"retries" yaml:"retries"`
// done is used to signal when a message is fully processed.
// This is used when choosing when to move the message from
// the ringbuffer into the time series log.
Timeout int `json:"timeout" yaml:"timeout"`
Retries int `json:"retries" yaml:"retries"`
done chan struct{}
MethodTimeout int `json:"methodTimeout" yaml:"methodTimeout"`
done chan struct{}
}
// gobEncodePayload will encode the message structure along with its

View file

@ -114,4 +114,28 @@ func (s *server) ProcessesStart() {
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
// Start a subscriber for CLICommandRequest messages
{
fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequest, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
// Start a subscriber for CLICommandRequest messages
{
fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequestNOSEQ, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
// Start a subscriber for CLICommandReply messages
{
fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandReply, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
}

View file

@ -38,6 +38,7 @@ import (
"os"
"os/exec"
"path/filepath"
"time"
)
// ------------------------------------------------------------
@ -46,6 +47,15 @@ import (
const (
// Shell command to be executed via f.ex. bash
CLICommand Method = "CLICommand"
// Shell command to be executed via f.ex. bash
CLICommandRequest Method = "CLICommandRequest"
// Shell command to be executed via f.ex. bash
// The NOSEQ method will process messages as they are recived,
// and the reply back will be sent as soon as the process is
// done. No order are preserved.
CLICommandRequestNOSEQ Method = "CLICommandRequestNOSEQ"
// Will generate a reply for a CLICommandRequest
CLICommandReply Method = "CLICommandReply"
// Send text logging to some host
TextLogging Method = "TextLogging"
// Send Hello I'm here message
@ -55,8 +65,7 @@ const (
// Echo request will ask the subscriber for a
// reply generated as a new message
ECHORequest Method = "ECHORequest"
// Echo reply will generate a response to a
// recived Echo request
// Will generate a reply for a ECHORequest
ECHOReply Method = "ECHOReply"
)
@ -78,6 +87,15 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
CLICommand: methodSubscriberCLICommand{
commandOrEvent: CommandACK,
},
CLICommandRequest: methodSubscriberCLICommandRequest{
commandOrEvent: EventACK,
},
CLICommandRequestNOSEQ: methodSubscriberCLICommandRequestNOSEQ{
commandOrEvent: EventACK,
},
CLICommandReply: methodSubscriberCLICommandReply{
commandOrEvent: EventACK,
},
TextLogging: methodSubscriberTextLogging{
commandOrEvent: EventACK,
},
@ -283,3 +301,107 @@ func (m methodSubscriberEchoReply) handler(proc process, message Message, node s
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodSubscriberCLICommandRequest
type methodSubscriberCLICommandRequest struct {
commandOrEvent CommandOrEvent
}
func (m methodSubscriberCLICommandRequest) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodSubscriberCLICommandRequest) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
c := message.Data[0]
a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
time.Sleep(time.Second * time.Duration(message.MethodTimeout))
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodSubscriberCLICommandRequestNOSEQ
type methodSubscriberCLICommandRequestNOSEQ struct {
commandOrEvent CommandOrEvent
}
func (m methodSubscriberCLICommandRequestNOSEQ) getKind() CommandOrEvent {
return m.commandOrEvent
}
// The NOSEQ method will process messages as they are recived,
// and the reply back will be sent as soon as the process is
// done. No order are preserved.
func (m methodSubscriberCLICommandRequestNOSEQ) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
go func() {
c := message.Data[0]
a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
time.Sleep(time.Second * time.Duration(message.MethodTimeout))
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodSubscriberCLICommandReply struct {
commandOrEvent CommandOrEvent
}
func (m methodSubscriberCLICommandReply) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodSubscriberCLICommandReply) handler(proc process, message Message, node string) ([]byte, error) {
fmt.Printf("### %v\n", message.Data)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}