diff --git a/process.go b/process.go index 89f171a..ff72dea 100644 --- a/process.go +++ b/process.go @@ -323,21 +323,29 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // ----------- HERE ------------- - // Check if the previos message was a relayed message, and if true + // Check if the previous message was a relayed message, and if true // make a copy of the current message where the to field is set to // the value of the previous message's RelayFromNode field, so we // also can send the a copy of the reply back to where it originated. - if message.PreviousMessage != nil && message.PreviousMessage.RelayViaNode != "" { + //fmt.Printf("\n *** DEBUG: process.subscriberHandler: message.previousMessage: %#v \n\n", message.PreviousMessage) + + if message.PreviousMessage != nil && message.PreviousMessage.RelayOriginalViaNode != "" { + + //fmt.Printf("\n *** DEBUG: process.subscriberHandler: got match on if sentence \n\n") + // make a copy of the message msgCopy := message msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode - // If no RelayReplyMethod is set, we default to the replyMethod - // of the initial message. - + // We set the replyMethod of the initial message. + // If no RelayReplyMethod was found, we default to the reply + // method of the previos message. switch { case msgCopy.PreviousMessage.RelayReplyMethod == "": + er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previos message: %v ", msgCopy) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er) + log.Printf("%v\n", er) msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod case msgCopy.PreviousMessage.RelayReplyMethod != "": msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod @@ -345,10 +353,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Reset the previosMessage relay fields so the message don't loop. message.PreviousMessage.RelayViaNode = "" + message.PreviousMessage.RelayOriginalViaNode = "" + // Create a SAM for the msg copy that will be sent back the where the + // relayed message originated from. sam, err := newSubjectAndMessage(msgCopy) if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage : %v, message copy: %v", err, msgCopy) + er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy) sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er) log.Printf("%v\n", er) } diff --git a/requests.go b/requests.go index 57ba97b..672b2aa 100644 --- a/requests.go +++ b/requests.go @@ -264,14 +264,22 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { } } -// Create a new message for the reply containing the output of the -// action executed put in outData, and put it on the ringbuffer to -// be published. -// The method to use for the reply message should initially be -// specified within the first message as the replyMethod, and we will +// newReplyMessage will create and send a reply message back to where +// the original provided message came from. The primary use of this +// function is to report back to a node who sent a message with the +// result of the request method of the original message. +// +// The method to use for the reply message when reporting back should +// be specified within a message in the replyMethod field. We will // pick up that value here, and use it as the method for the new // request message. If no replyMethod is set we default to the // REQToFileAppend method type. +// +// There will also be a copy of the original message put in the +// previousMessage field. For the copy of the original message the data +// field will be set to nil before the whole message is put in the +// previousMessage field so we don't copy around the original data in +// the reply response when it is not needed anymore. func newReplyMessage(proc process, message Message, outData []byte) { // If no replyMethod is set we default to writing to writing to @@ -280,6 +288,13 @@ func newReplyMessage(proc process, message Message, outData []byte) { message.ReplyMethod = REQToFileAppend } + // Make a copy of the message as it is right now to use + // in the previous message field, but set the data field + // to nil so we don't copy around the original data when + // we don't need to for the reply message. + thisMsg := message + thisMsg.Data = nil + // Create a new message for the reply, and put it on the // ringbuffer to be published. newMsg := Message{ @@ -297,7 +312,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { // Put in a copy of the initial request message, so we can use it's properties if // needed to for example create the file structure naming on the subscriber. - PreviousMessage: &message, + PreviousMessage: &thisMsg, } sam, err := newSubjectAndMessage(newMsg)