diff --git a/example/OpCommand.json b/example/OpCommand.json index 6150174..7beb3bc 100644 --- a/example/OpCommand.json +++ b/example/OpCommand.json @@ -6,6 +6,6 @@ "method":"OpCommand", "timeout":3, "retries":3, - "MethodTimeout": 7 + "methodTimeout": 7 } ] \ No newline at end of file diff --git a/example/OpCommandRequest.json b/example/OpCommandRequest.json index 290e009..87ce9ed 100644 --- a/example/OpCommandRequest.json +++ b/example/OpCommandRequest.json @@ -2,10 +2,12 @@ { "toNode": "ship1", - "data": ["psa"], + "data": ["ps"], "method":"OpCommandRequest", "timeout":3, "retries":3, + "requestTimeout":3, + "requestRetries":3, "MethodTimeout": 7 } ] \ No newline at end of file diff --git a/example/toShip1-CLICommandRequest1.json b/example/toShip1-CLICommandRequest1.json index 129851a..37d9c7c 100644 --- a/example/toShip1-CLICommandRequest1.json +++ b/example/toShip1-CLICommandRequest1.json @@ -6,6 +6,6 @@ "method":"CLICommandRequest", "timeout":10, "retries":3, - "MethodTimeout": 7 + "methodTimeout": 4 } ] \ No newline at end of file diff --git a/example/toShip1-CLICommandRequest2.json b/example/toShip1-CLICommandRequest2.json index 672777a..4de58e9 100644 --- a/example/toShip1-CLICommandRequest2.json +++ b/example/toShip1-CLICommandRequest2.json @@ -6,6 +6,6 @@ "method":"CLICommandRequestNOSEQ", "timeout":10, "retries":3, - "MethodTimeout": 0 + "methodTimeout": 0 } ] \ No newline at end of file diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 03ff58d..e7a578e 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -270,31 +270,41 @@ func (m methodOpCommandRequest) handler(proc process, message Message, node stri return } - //-- - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(out)}, - Method: OpCommandReply, - Timeout: 3, - Retries: 3, - } - fmt.Printf("** %#v\n", newMsg) - - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: methodCLICommandRequest: %v\n", err) - } - proc.toRingbufferCh <- []subjectAndMessage{nSAM} - //-- + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, OpCommandReply, out) }() ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID)) return ackMsg, nil } +//-- +// Create a new message for the reply containing the output of the +// action executed put in outData, and put it on the ringbuffer to +// be published. +func newReplyMessage(proc process, message Message, method Method, outData []byte) { + //-- + // Create a new message for the reply, and put it on the + // ringbuffer to be published. + newMsg := Message{ + ToNode: message.FromNode, + Data: []string{string(outData)}, + Method: method, + Timeout: message.RequestTimeout, + Retries: message.RequestRetries, + } + fmt.Printf("** %#v\n", newMsg) + + nSAM, err := newSAM(newMsg) + if err != nil { + // In theory the system should drop the message before it reaches here. + log.Printf("error: %v: %v\n", message.Method, err) + } + proc.toRingbufferCh <- []subjectAndMessage{nSAM} + //-- +} + // ----- type methodOpCommandReply struct { @@ -442,22 +452,9 @@ func (m methodEchoRequest) getKind() CommandOrEvent { func (m methodEchoRequest) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- ECHO REQUEST received from: %v, containing: %v", message.FromNode, message.Data) - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{""}, - Method: ECHOReply, - Timeout: 3, - Retries: 3, - } - - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: methodEchoRequest: %v\n", err) - } - proc.toRingbufferCh <- []subjectAndMessage{nSAM} + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, ECHOReply, []byte{}) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil @@ -525,23 +522,9 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str case out := <-outCh: cancel() - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(out)}, - Method: CLICommandReply, - Timeout: 3, - Retries: 3, - } - fmt.Printf("** %#v\n", newMsg) - - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: methodCLICommandRequest: %v\n", err) - } - proc.toRingbufferCh <- []subjectAndMessage{nSAM} + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, CLICommandReply, out) } }() @@ -598,23 +581,9 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod case out := <-outCh: cancel() - // Create a new message for the reply, and put it on the - // ringbuffer to be published. - newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(out)}, - Method: CLICommandReply, - Timeout: 3, - Retries: 3, - } - fmt.Printf("** %#v\n", newMsg) - - nSAM, err := newSAM(newMsg) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: methodCLICommandRequest: %v\n", err) - } - proc.toRingbufferCh <- []subjectAndMessage{nSAM} + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, CLICommandReply, out) } }()