mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 13:49:29 +00:00
fixed using previous message instead of subject
This commit is contained in:
parent
26078cffa2
commit
3a0f3ee3bb
2 changed files with 22 additions and 31 deletions
|
@ -37,13 +37,15 @@ type Message struct {
|
||||||
// Timeout for long a process should be allowed to operate
|
// Timeout for long a process should be allowed to operate
|
||||||
MethodTimeout int `json:"methodTimeout" yaml:"methodTimeout"`
|
MethodTimeout int `json:"methodTimeout" yaml:"methodTimeout"`
|
||||||
|
|
||||||
// PreviousMessageSubject are used for example if a reply
|
// PreviousMessage are used for example if a reply message is
|
||||||
// message is generated and we also need a copy of thedetails
|
// generated and we also need a copy of thedetails of the the
|
||||||
// of the the initial request message
|
// initial request message
|
||||||
PreviousMessageSubject Subject
|
PreviousMessage *Message
|
||||||
|
|
||||||
// done is used to signal when a message is fully processed.
|
// done is used to signal when a message is fully processed.
|
||||||
// This is used when choosing when to move the message from
|
// This is used for signaling back to the ringbuffer that we are
|
||||||
// the ringbuffer into the time series log.
|
// done with processing a message, and the message can be removed
|
||||||
|
// from the ringbuffer and into the time series log.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -285,28 +285,16 @@ func newReplyMessage(proc process, message Message, method Method, outData []byt
|
||||||
// Create a new message for the reply, and put it on the
|
// Create a new message for the reply, and put it on the
|
||||||
// ringbuffer to be published.
|
// ringbuffer to be published.
|
||||||
newMsg := Message{
|
newMsg := Message{
|
||||||
ToNode: message.FromNode,
|
ToNode: message.FromNode,
|
||||||
Data: []string{string(outData)},
|
Data: []string{string(outData)},
|
||||||
Method: method,
|
Method: method,
|
||||||
Timeout: message.RequestTimeout,
|
Timeout: message.RequestTimeout,
|
||||||
Retries: message.RequestRetries,
|
Retries: message.RequestRetries,
|
||||||
PreviousMessageSubject: proc.subject,
|
|
||||||
}
|
|
||||||
|
|
||||||
// The label field is part of the subject struct, but it is not
|
// Put in a copy of the initial request message, so we can use it's properties if
|
||||||
// used for creating subjects for Nats messages.
|
// needed to for example create the file structure naming on the subscriber.
|
||||||
// The label field is set in the individual message that are brought
|
PreviousMessage: &message,
|
||||||
// into the system via the socket.
|
}
|
||||||
// since the process don't care or knows about the labels for the message
|
|
||||||
// handling we need to manually add it here on the message level,
|
|
||||||
// so the receiving subscriber gets that information.
|
|
||||||
//
|
|
||||||
// NB: This would probably be better handled if the Message rather
|
|
||||||
// contained the whole previous message instead of using the previous
|
|
||||||
// subject, but when testing it seemed to fail to unmarshal a *message
|
|
||||||
// field in the message struct. That is also why the previousSubject
|
|
||||||
// field were introduced to avoid having a field with *previousMessage.
|
|
||||||
newMsg.PreviousMessageSubject.Label = message.Label
|
|
||||||
|
|
||||||
nSAM, err := newSAM(newMsg)
|
nSAM, err := newSAM(newMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -374,16 +362,17 @@ func (m methodTextLogging) handler(proc process, message Message, node string) (
|
||||||
|
|
||||||
// If it was a request type message we want to check what the initial messages
|
// If it was a request type message we want to check what the initial messages
|
||||||
// method, so we can use that in creating the file name to store the data.
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fmt.Printf(" ** DEBUG: %v\n", message.PreviousMessage)
|
||||||
var fileName string
|
var fileName string
|
||||||
switch {
|
switch {
|
||||||
case message.PreviousMessageSubject.ToNode != "":
|
case message.PreviousMessage.ToNode != "":
|
||||||
fileName = fmt.Sprintf("%v.%v.log", message.PreviousMessageSubject.ToNode, message.PreviousMessageSubject.Method)
|
fileName = fmt.Sprintf("%v.%v.log", message.PreviousMessage.ToNode, message.PreviousMessage.Method)
|
||||||
case message.PreviousMessageSubject.ToNode == "":
|
case message.PreviousMessage.ToNode == "":
|
||||||
fileName = fmt.Sprintf("%v.%v.log", message.FromNode, message.Method)
|
fileName = fmt.Sprintf("%v.%v.log", message.FromNode, message.Method)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if folder structure exist, if not create it.
|
// Check if folder structure exist, if not create it.
|
||||||
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessageSubject.Label, message.PreviousMessageSubject.ToNode)
|
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Label, string(message.PreviousMessage.ToNode))
|
||||||
|
|
||||||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
err := os.MkdirAll(folderTree, 0700)
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
|
Loading…
Add table
Reference in a new issue