diff --git a/example/toShip1TwoMessages.json b/example/toShip1-CLICommand-Two.json similarity index 100% rename from example/toShip1TwoMessages.json rename to example/toShip1-CLICommand-Two.json diff --git a/example/toShip1ManyMessages.json b/example/toShip1-CLICommand-many.json similarity index 100% rename from example/toShip1ManyMessages.json rename to example/toShip1-CLICommand-many.json diff --git a/example/toShip1.json b/example/toShip1-CLICommand.json similarity index 100% rename from example/toShip1.json rename to example/toShip1-CLICommand.json diff --git a/example/toShip1-ECHORequest.json b/example/toShip1-ECHORequest.json new file mode 100644 index 0000000..28099a0 --- /dev/null +++ b/example/toShip1-ECHORequest.json @@ -0,0 +1,7 @@ +[ + { + "toNode": "ship1", + "data": [""], + "method":"ECHORequest" + } +] \ No newline at end of file diff --git a/example/toShip1FromShip2.json b/example/toShip1FromShip2.json deleted file mode 100644 index 757b2ab..0000000 --- a/example/toShip1FromShip2.json +++ /dev/null @@ -1,10 +0,0 @@ -[ - { - - "toNode": "ship1", - "data": ["bash","-c","netstat -an|grep -i listen"], - "method":"CLICommand", - "timeout":3, - "retries":3 - } -] \ No newline at end of file diff --git a/example/toShip1and2.json b/example/toShip1and2.json deleted file mode 100644 index ca7f2e2..0000000 --- a/example/toShip1and2.json +++ /dev/null @@ -1,8 +0,0 @@ -[ - { - "toNode": "*", - "data": ["bash","-c","tree ../"], - "method":"CLICommand" - - } -] \ No newline at end of file diff --git a/example/toShip2ManyMessages.json b/example/toShip2-CLICommand-many.json similarity index 100% rename from example/toShip2ManyMessages.json rename to example/toShip2-CLICommand-many.json diff --git a/example/toShip2FromShip1.json b/example/toShip2-CLICommand-timeouts.json similarity index 100% rename from example/toShip2FromShip1.json rename to example/toShip2-CLICommand-timeouts.json diff --git a/example/toShip2.json b/example/toShip2-CLICommand.json similarity index 100% rename from example/toShip2.json rename to example/toShip2-CLICommand.json diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 3d4004f..9864a03 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -71,17 +71,17 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) { // Range over all the messages parsed from json, and create a subject for // each message. for _, m := range MsgSlice { - sm := createSAMfromMessage(m) + sm := newSAM(m) sam = append(sam, sm) } return sam, nil } -// createSAMfromMessage will look up the correct values and value types to +// newSAM will look up the correct values and value types to // be used in a subject for a Message, and return the a combined structure // of type subjectAndMessage. -func createSAMfromMessage(m Message) subjectAndMessage { +func newSAM(m Message) subjectAndMessage { // We need to create a tempory method type to look up the kind for the // real method for the message. var mt Method diff --git a/process.go b/process.go index 77bb2d9..7b0421e 100644 --- a/process.go +++ b/process.go @@ -225,7 +225,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { continue } } - log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data) + log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data) } return } diff --git a/processesToStart.go b/runProcessesAtStartup.go similarity index 83% rename from processesToStart.go rename to runProcessesAtStartup.go index 8d00654..0b1c288 100644 --- a/processesToStart.go +++ b/runProcessesAtStartup.go @@ -91,11 +91,27 @@ func (s *server) ProcessesStart() { Method: SayHello, } - sam := createSAMfromMessage(m) + sam := newSAM(m) proc.newMessagesCh <- []subjectAndMessage{sam} time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello)) } }) go proc.spawnWorker(s) } + + // Start a subscriber for ECHORequest messages + { + fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName) + sub := newSubject(ECHORequest, EventACK, s.nodeName) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s) + } + + // Start a subscriber for ECHOReply messages + { + fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName) + sub := newSubject(ECHOReply, EventACK, s.nodeName) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s) + } } diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index 67f23f5..381b8f7 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -18,8 +18,8 @@ // func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) { // ... // ... -// outMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) -// return outMsg, nil +// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) +// return ackMsg, nil // } // // --- @@ -52,6 +52,12 @@ const ( SayHello Method = "SayHello" // Error log methods to centralError ErrorLog Method = "ErrorLog" + // Echo request will ask the subscriber for a + // reply generated as a new message + ECHORequest Method = "ECHORequest" + // Echo reply will generate a response to a + // recived Echo request + ECHOReply Method = "ECHOReply" ) // Method is used to specify the actual function/method that @@ -81,6 +87,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { ErrorLog: methodSubscriberErrorLog{ commandOrEvent: EventACK, }, + ECHORequest: methodSubscriberEchoRequest{ + commandOrEvent: EventACK, + }, + ECHOReply: methodSubscriberEchoReply{ + commandOrEvent: EventACK, + }, }, } @@ -147,8 +159,8 @@ func (m methodSubscriberCLICommand) handler(proc process, message Message, node log.Printf("error: execution of command failed: %v\n", err) } - outMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) - return outMsg, nil + ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) + return ackMsg, nil } // ----- @@ -186,8 +198,8 @@ func (m methodSubscriberTextLogging) handler(proc process, message Message, node //s.subscriberServices.logCh <- []byte(d) } - outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) - return outMsg, nil + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil } // ----- @@ -208,8 +220,8 @@ func (m methodSubscriberSayHello) handler(proc process, message Message, node st // and can hold registries and handle special things for an individual process. proc.procFuncCh <- message - outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) - return outMsg, nil + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil } // --- @@ -226,3 +238,48 @@ func (m methodSubscriberErrorLog) handler(proc process, message Message, node st log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) return nil, nil } + +// --- + +type methodSubscriberEchoRequest struct { + commandOrEvent CommandOrEvent +} + +func (m methodSubscriberEchoRequest) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodSubscriberEchoRequest) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- ECHO REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{""}, + Method: ECHOReply, + Timeout: 3, + Retries: 3, + } + proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)} + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// --- + +type methodSubscriberEchoReply struct { + commandOrEvent CommandOrEvent +} + +func (m methodSubscriberEchoReply) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodSubscriberEchoReply) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- ECHO Reply received from: %v, containing: %v", message.FromNode, message.Data) + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +}