mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-20 22:52:13 +00:00
setting correct method for reply to origin.
This commit is contained in:
parent
c8f8de71c5
commit
3b37df638d
2 changed files with 38 additions and 12 deletions
23
process.go
23
process.go
|
@ -323,21 +323,29 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
// ----------- HERE -------------
|
// ----------- HERE -------------
|
||||||
|
|
||||||
// Check if the previos message was a relayed message, and if true
|
// Check if the previous message was a relayed message, and if true
|
||||||
// make a copy of the current message where the to field is set to
|
// make a copy of the current message where the to field is set to
|
||||||
// the value of the previous message's RelayFromNode field, so we
|
// the value of the previous message's RelayFromNode field, so we
|
||||||
// also can send the a copy of the reply back to where it originated.
|
// also can send the a copy of the reply back to where it originated.
|
||||||
|
|
||||||
if message.PreviousMessage != nil && message.PreviousMessage.RelayViaNode != "" {
|
//fmt.Printf("\n *** DEBUG: process.subscriberHandler: message.previousMessage: %#v \n\n", message.PreviousMessage)
|
||||||
|
|
||||||
|
if message.PreviousMessage != nil && message.PreviousMessage.RelayOriginalViaNode != "" {
|
||||||
|
|
||||||
|
//fmt.Printf("\n *** DEBUG: process.subscriberHandler: got match on if sentence \n\n")
|
||||||
|
|
||||||
// make a copy of the message
|
// make a copy of the message
|
||||||
msgCopy := message
|
msgCopy := message
|
||||||
msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode
|
msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode
|
||||||
|
|
||||||
// If no RelayReplyMethod is set, we default to the replyMethod
|
// We set the replyMethod of the initial message.
|
||||||
// of the initial message.
|
// If no RelayReplyMethod was found, we default to the reply
|
||||||
|
// method of the previos message.
|
||||||
switch {
|
switch {
|
||||||
case msgCopy.PreviousMessage.RelayReplyMethod == "":
|
case msgCopy.PreviousMessage.RelayReplyMethod == "":
|
||||||
|
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previos message: %v ", msgCopy)
|
||||||
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
||||||
|
log.Printf("%v\n", er)
|
||||||
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
|
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
|
||||||
case msgCopy.PreviousMessage.RelayReplyMethod != "":
|
case msgCopy.PreviousMessage.RelayReplyMethod != "":
|
||||||
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
|
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
|
||||||
|
@ -345,10 +353,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
// Reset the previosMessage relay fields so the message don't loop.
|
// Reset the previosMessage relay fields so the message don't loop.
|
||||||
message.PreviousMessage.RelayViaNode = ""
|
message.PreviousMessage.RelayViaNode = ""
|
||||||
|
message.PreviousMessage.RelayOriginalViaNode = ""
|
||||||
|
|
||||||
|
// Create a SAM for the msg copy that will be sent back the where the
|
||||||
|
// relayed message originated from.
|
||||||
sam, err := newSubjectAndMessage(msgCopy)
|
sam, err := newSubjectAndMessage(msgCopy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message copy: %v", err, msgCopy)
|
er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy)
|
||||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
27
requests.go
27
requests.go
|
@ -264,14 +264,22 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new message for the reply containing the output of the
|
// newReplyMessage will create and send a reply message back to where
|
||||||
// action executed put in outData, and put it on the ringbuffer to
|
// the original provided message came from. The primary use of this
|
||||||
// be published.
|
// function is to report back to a node who sent a message with the
|
||||||
// The method to use for the reply message should initially be
|
// result of the request method of the original message.
|
||||||
// specified within the first message as the replyMethod, and we will
|
//
|
||||||
|
// The method to use for the reply message when reporting back should
|
||||||
|
// be specified within a message in the replyMethod field. We will
|
||||||
// pick up that value here, and use it as the method for the new
|
// pick up that value here, and use it as the method for the new
|
||||||
// request message. If no replyMethod is set we default to the
|
// request message. If no replyMethod is set we default to the
|
||||||
// REQToFileAppend method type.
|
// REQToFileAppend method type.
|
||||||
|
//
|
||||||
|
// There will also be a copy of the original message put in the
|
||||||
|
// previousMessage field. For the copy of the original message the data
|
||||||
|
// field will be set to nil before the whole message is put in the
|
||||||
|
// previousMessage field so we don't copy around the original data in
|
||||||
|
// the reply response when it is not needed anymore.
|
||||||
func newReplyMessage(proc process, message Message, outData []byte) {
|
func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
|
|
||||||
// If no replyMethod is set we default to writing to writing to
|
// If no replyMethod is set we default to writing to writing to
|
||||||
|
@ -280,6 +288,13 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
message.ReplyMethod = REQToFileAppend
|
message.ReplyMethod = REQToFileAppend
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make a copy of the message as it is right now to use
|
||||||
|
// in the previous message field, but set the data field
|
||||||
|
// to nil so we don't copy around the original data when
|
||||||
|
// we don't need to for the reply message.
|
||||||
|
thisMsg := message
|
||||||
|
thisMsg.Data = nil
|
||||||
|
|
||||||
// Create a new message for the reply, and put it on the
|
// Create a new message for the reply, and put it on the
|
||||||
// ringbuffer to be published.
|
// ringbuffer to be published.
|
||||||
newMsg := Message{
|
newMsg := Message{
|
||||||
|
@ -297,7 +312,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
|
|
||||||
// Put in a copy of the initial request message, so we can use it's properties if
|
// Put in a copy of the initial request message, so we can use it's properties if
|
||||||
// needed to for example create the file structure naming on the subscriber.
|
// needed to for example create the file structure naming on the subscriber.
|
||||||
PreviousMessage: &message,
|
PreviousMessage: &thisMsg,
|
||||||
}
|
}
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(newMsg)
|
sam, err := newSubjectAndMessage(newMsg)
|
||||||
|
|
Loading…
Add table
Reference in a new issue