1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00

implemented newReplyMessage for all request types

This commit is contained in:
postmannen 2021-03-31 13:29:55 +02:00
parent 66f89cd731
commit 1b17f8e345
5 changed files with 44 additions and 73 deletions

View file

@ -6,6 +6,6 @@
"method":"OpCommand", "method":"OpCommand",
"timeout":3, "timeout":3,
"retries":3, "retries":3,
"MethodTimeout": 7 "methodTimeout": 7
} }
] ]

View file

@ -2,10 +2,12 @@
{ {
"toNode": "ship1", "toNode": "ship1",
"data": ["psa"], "data": ["ps"],
"method":"OpCommandRequest", "method":"OpCommandRequest",
"timeout":3, "timeout":3,
"retries":3, "retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7 "MethodTimeout": 7
} }
] ]

View file

@ -6,6 +6,6 @@
"method":"CLICommandRequest", "method":"CLICommandRequest",
"timeout":10, "timeout":10,
"retries":3, "retries":3,
"MethodTimeout": 7 "methodTimeout": 4
} }
] ]

View file

@ -6,6 +6,6 @@
"method":"CLICommandRequestNOSEQ", "method":"CLICommandRequestNOSEQ",
"timeout":10, "timeout":10,
"retries":3, "retries":3,
"MethodTimeout": 0 "methodTimeout": 0
} }
] ]

View file

@ -270,29 +270,39 @@ func (m methodOpCommandRequest) handler(proc process, message Message, node stri
return return
} }
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, OpCommandReply, out)
}()
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID))
return ackMsg, nil
}
//--
// Create a new message for the reply containing the output of the
// action executed put in outData, and put it on the ringbuffer to
// be published.
func newReplyMessage(proc process, message Message, method Method, outData []byte) {
//-- //--
// Create a new message for the reply, and put it on the // Create a new message for the reply, and put it on the
// ringbuffer to be published. // ringbuffer to be published.
newMsg := Message{ newMsg := Message{
ToNode: message.FromNode, ToNode: message.FromNode,
Data: []string{string(out)}, Data: []string{string(outData)},
Method: OpCommandReply, Method: method,
Timeout: 3, Timeout: message.RequestTimeout,
Retries: 3, Retries: message.RequestRetries,
} }
fmt.Printf("** %#v\n", newMsg) fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg) nSAM, err := newSAM(newMsg)
if err != nil { if err != nil {
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err) log.Printf("error: %v: %v\n", message.Method, err)
} }
proc.toRingbufferCh <- []subjectAndMessage{nSAM} proc.toRingbufferCh <- []subjectAndMessage{nSAM}
//-- //--
}()
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID))
return ackMsg, nil
} }
// ----- // -----
@ -442,22 +452,9 @@ func (m methodEchoRequest) getKind() CommandOrEvent {
func (m methodEchoRequest) handler(proc process, message Message, node string) ([]byte, error) { func (m methodEchoRequest) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- ECHO REQUEST received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- ECHO REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
// Create a new message for the reply, and put it on the // Prepare and queue for sending a new message with the output
// ringbuffer to be published. // of the action executed.
newMsg := Message{ newReplyMessage(proc, message, ECHOReply, []byte{})
ToNode: message.FromNode,
Data: []string{""},
Method: ECHOReply,
Timeout: 3,
Retries: 3,
}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodEchoRequest: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -525,23 +522,9 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str
case out := <-outCh: case out := <-outCh:
cancel() cancel()
// Create a new message for the reply, and put it on the // Prepare and queue for sending a new message with the output
// ringbuffer to be published. // of the action executed.
newMsg := Message{ newReplyMessage(proc, message, CLICommandReply, out)
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
} }
}() }()
@ -598,23 +581,9 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod
case out := <-outCh: case out := <-outCh:
cancel() cancel()
// Create a new message for the reply, and put it on the // Prepare and queue for sending a new message with the output
// ringbuffer to be published. // of the action executed.
newMsg := Message{ newReplyMessage(proc, message, CLICommandReply, out)
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
} }
}() }()