From 3a0f3ee3bb2c88b2d526a2b1a2553cc19a404a71 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sat, 3 Apr 2021 06:31:48 +0200 Subject: [PATCH] fixed using previous message instead of subject --- message_and_subject.go | 14 ++++++++------ subscriber_method_types.go | 39 ++++++++++++++------------------------ 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/message_and_subject.go b/message_and_subject.go index 180ad81..4db8386 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -37,13 +37,15 @@ type Message struct { // Timeout for long a process should be allowed to operate MethodTimeout int `json:"methodTimeout" yaml:"methodTimeout"` - // PreviousMessageSubject are used for example if a reply - // message is generated and we also need a copy of thedetails - // of the the initial request message - PreviousMessageSubject Subject + // PreviousMessage are used for example if a reply message is + // generated and we also need a copy of thedetails of the the + // initial request message + PreviousMessage *Message + // done is used to signal when a message is fully processed. - // This is used when choosing when to move the message from - // the ringbuffer into the time series log. + // This is used for signaling back to the ringbuffer that we are + // done with processing a message, and the message can be removed + // from the ringbuffer and into the time series log. done chan struct{} } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 38fb95b..87d3a7e 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -285,28 +285,16 @@ func newReplyMessage(proc process, message Message, method Method, outData []byt // Create a new message for the reply, and put it on the // ringbuffer to be published. newMsg := Message{ - ToNode: message.FromNode, - Data: []string{string(outData)}, - Method: method, - Timeout: message.RequestTimeout, - Retries: message.RequestRetries, - PreviousMessageSubject: proc.subject, - } + ToNode: message.FromNode, + Data: []string{string(outData)}, + Method: method, + Timeout: message.RequestTimeout, + Retries: message.RequestRetries, - // The label field is part of the subject struct, but it is not - // used for creating subjects for Nats messages. - // The label field is set in the individual message that are brought - // into the system via the socket. - // since the process don't care or knows about the labels for the message - // handling we need to manually add it here on the message level, - // so the receiving subscriber gets that information. - // - // NB: This would probably be better handled if the Message rather - // contained the whole previous message instead of using the previous - // subject, but when testing it seemed to fail to unmarshal a *message - // field in the message struct. That is also why the previousSubject - // field were introduced to avoid having a field with *previousMessage. - newMsg.PreviousMessageSubject.Label = message.Label + // Put in a copy of the initial request message, so we can use it's properties if + // needed to for example create the file structure naming on the subscriber. + PreviousMessage: &message, + } nSAM, err := newSAM(newMsg) if err != nil { @@ -374,16 +362,17 @@ func (m methodTextLogging) handler(proc process, message Message, node string) ( // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. + fmt.Printf(" ** DEBUG: %v\n", message.PreviousMessage) var fileName string switch { - case message.PreviousMessageSubject.ToNode != "": - fileName = fmt.Sprintf("%v.%v.log", message.PreviousMessageSubject.ToNode, message.PreviousMessageSubject.Method) - case message.PreviousMessageSubject.ToNode == "": + case message.PreviousMessage.ToNode != "": + fileName = fmt.Sprintf("%v.%v.log", message.PreviousMessage.ToNode, message.PreviousMessage.Method) + case message.PreviousMessage.ToNode == "": fileName = fmt.Sprintf("%v.%v.log", message.FromNode, message.Method) } // Check if folder structure exist, if not create it. - folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessageSubject.Label, message.PreviousMessageSubject.ToNode) + folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Label, string(message.PreviousMessage.ToNode)) if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700)