1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 12:59:15 +00:00

method ECHO

This commit is contained in:
postmannen 2021-03-11 06:34:36 +01:00
parent 8fdf82f8a2
commit 48399e1958
13 changed files with 93 additions and 31 deletions

View file

@ -0,0 +1,7 @@
[
{
"toNode": "ship1",
"data": [""],
"method":"ECHORequest"
}
]

View file

@ -1,10 +0,0 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
}
]

View file

@ -1,8 +0,0 @@
[
{
"toNode": "*",
"data": ["bash","-c","tree ../"],
"method":"CLICommand"
}
]

View file

@ -71,17 +71,17 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
// Range over all the messages parsed from json, and create a subject for // Range over all the messages parsed from json, and create a subject for
// each message. // each message.
for _, m := range MsgSlice { for _, m := range MsgSlice {
sm := createSAMfromMessage(m) sm := newSAM(m)
sam = append(sam, sm) sam = append(sam, sm)
} }
return sam, nil return sam, nil
} }
// createSAMfromMessage will look up the correct values and value types to // newSAM will look up the correct values and value types to
// be used in a subject for a Message, and return the a combined structure // be used in a subject for a Message, and return the a combined structure
// of type subjectAndMessage. // of type subjectAndMessage.
func createSAMfromMessage(m Message) subjectAndMessage { func newSAM(m Message) subjectAndMessage {
// We need to create a tempory method type to look up the kind for the // We need to create a tempory method type to look up the kind for the
// real method for the message. // real method for the message.
var mt Method var mt Method

View file

@ -225,7 +225,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
continue continue
} }
} }
log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data) log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data)
} }
return return
} }

View file

@ -91,11 +91,27 @@ func (s *server) ProcessesStart() {
Method: SayHello, Method: SayHello,
} }
sam := createSAMfromMessage(m) sam := newSAM(m)
proc.newMessagesCh <- []subjectAndMessage{sam} proc.newMessagesCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello)) time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello))
} }
}) })
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
// Start a subscriber for ECHORequest messages
{
fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHORequest, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
// Start a subscriber for ECHOReply messages
{
fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHOReply, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
} }

View file

@ -18,8 +18,8 @@
// func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) { // func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) {
// ... // ...
// ... // ...
// outMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) // ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return outMsg, nil // return ackMsg, nil
// } // }
// //
// --- // ---
@ -52,6 +52,12 @@ const (
SayHello Method = "SayHello" SayHello Method = "SayHello"
// Error log methods to centralError // Error log methods to centralError
ErrorLog Method = "ErrorLog" ErrorLog Method = "ErrorLog"
// Echo request will ask the subscriber for a
// reply generated as a new message
ECHORequest Method = "ECHORequest"
// Echo reply will generate a response to a
// recived Echo request
ECHOReply Method = "ECHOReply"
) )
// Method is used to specify the actual function/method that // Method is used to specify the actual function/method that
@ -81,6 +87,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
ErrorLog: methodSubscriberErrorLog{ ErrorLog: methodSubscriberErrorLog{
commandOrEvent: EventACK, commandOrEvent: EventACK,
}, },
ECHORequest: methodSubscriberEchoRequest{
commandOrEvent: EventACK,
},
ECHOReply: methodSubscriberEchoReply{
commandOrEvent: EventACK,
},
}, },
} }
@ -147,8 +159,8 @@ func (m methodSubscriberCLICommand) handler(proc process, message Message, node
log.Printf("error: execution of command failed: %v\n", err) log.Printf("error: execution of command failed: %v\n", err)
} }
outMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
return outMsg, nil return ackMsg, nil
} }
// ----- // -----
@ -186,8 +198,8 @@ func (m methodSubscriberTextLogging) handler(proc process, message Message, node
//s.subscriberServices.logCh <- []byte(d) //s.subscriberServices.logCh <- []byte(d)
} }
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return outMsg, nil return ackMsg, nil
} }
// ----- // -----
@ -208,8 +220,8 @@ func (m methodSubscriberSayHello) handler(proc process, message Message, node st
// and can hold registries and handle special things for an individual process. // and can hold registries and handle special things for an individual process.
proc.procFuncCh <- message proc.procFuncCh <- message
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return outMsg, nil return ackMsg, nil
} }
// --- // ---
@ -226,3 +238,48 @@ func (m methodSubscriberErrorLog) handler(proc process, message Message, node st
log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data)
return nil, nil return nil, nil
} }
// ---
type methodSubscriberEchoRequest struct {
commandOrEvent CommandOrEvent
}
func (m methodSubscriberEchoRequest) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodSubscriberEchoRequest) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- ECHO REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{""},
Method: ECHOReply,
Timeout: 3,
Retries: 3,
}
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodSubscriberEchoReply struct {
commandOrEvent CommandOrEvent
}
func (m methodSubscriberEchoReply) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodSubscriberEchoReply) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- ECHO Reply received from: %v, containing: %v", message.FromNode, message.Data)
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}