diff --git a/example/OpCommandRequest.json b/example/OpCommandRequest.json index 87ce9ed..83eb47d 100644 --- a/example/OpCommandRequest.json +++ b/example/OpCommandRequest.json @@ -1,6 +1,6 @@ [ { - + "label":"opcommand_logs", "toNode": "ship1", "data": ["ps"], "method":"OpCommandRequest", diff --git a/message_and_subject.go b/message_and_subject.go index 8dc45f2..180ad81 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -16,7 +16,7 @@ type Message struct { // information that can be used on the subscriber side to for // example create specific folders using the Format name to // logically group data recevied. - Label string + Label string `json:"label" yaml:"label"` // The node to send the message to ToNode node `json:"toNode" yaml:"toNode"` // The Unique ID of the message @@ -78,6 +78,9 @@ type Subject struct { // from routeMessagesToPublish in *server. // This channel is only used for publishing processes. messageCh chan Message + // Label, single word used to describe the data content. For example + // syslog, metrics, etc. + Label string } // newSubject will return a new variable of the type subject, and insert diff --git a/read_socket.go b/read_socket.go index be22b90..981c284 100644 --- a/read_socket.go +++ b/read_socket.go @@ -104,6 +104,7 @@ func newSAM(m Message) (subjectAndMessage, error) { ToNode: string(m.ToNode), CommandOrEvent: tmpH.getKind(), Method: m.Method, + Label: m.Label, } sm := subjectAndMessage{ diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 0570b35..38fb95b 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -252,6 +252,8 @@ func (m methodOpCommandRequest) handler(proc process, message Message, node stri switch { case message.Data[0] == "ps": proc.processes.mu.Lock() + // Loop the the processes map, and find all that is active to + // be returned in the reply message. for _, v := range proc.processes.active { s := fmt.Sprintf("* proc - : %v, id: %v, name: %v, allowed from: %s\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers) sb := []byte(s) @@ -290,7 +292,21 @@ func newReplyMessage(proc process, message Message, method Method, outData []byt Retries: message.RequestRetries, PreviousMessageSubject: proc.subject, } - fmt.Printf(" ** %#v\n", newMsg) + + // The label field is part of the subject struct, but it is not + // used for creating subjects for Nats messages. + // The label field is set in the individual message that are brought + // into the system via the socket. + // since the process don't care or knows about the labels for the message + // handling we need to manually add it here on the message level, + // so the receiving subscriber gets that information. + // + // NB: This would probably be better handled if the Message rather + // contained the whole previous message instead of using the previous + // subject, but when testing it seemed to fail to unmarshal a *message + // field in the message struct. That is also why the previousSubject + // field were introduced to avoid having a field with *previousMessage. + newMsg.PreviousMessageSubject.Label = message.Label nSAM, err := newSAM(newMsg) if err != nil { @@ -355,8 +371,9 @@ func (m methodTextLogging) getKind() CommandOrEvent { } func (m methodTextLogging) handler(proc process, message Message, node string) ([]byte, error) { - // Recreate the subject structure for the message, so we can use - // it in the naming of the files to create. + + // If it was a request type message we want to check what the initial messages + // method, so we can use that in creating the file name to store the data. var fileName string switch { case message.PreviousMessageSubject.ToNode != "": @@ -365,8 +382,21 @@ func (m methodTextLogging) handler(proc process, message Message, node string) ( fileName = fmt.Sprintf("%v.%v.log", message.FromNode, message.Method) } - logFile := filepath.Join(proc.configuration.SubscribersDataFolder, fileName) - f, err := os.OpenFile(logFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend) + // Check if folder structure exist, if not create it. + folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessageSubject.Label, message.PreviousMessageSubject.ToNode) + + if _, err := os.Stat(folderTree); os.IsNotExist(err) { + err := os.MkdirAll(folderTree, 0700) + if err != nil { + return nil, fmt.Errorf("error: failed to create directory %v: %v", folderTree, err) + } + + log.Printf("info: Creating subscribers data folder at %v\n", folderTree) + } + + // Open file and write data. + file := filepath.Join(folderTree, fileName) + f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend) if err != nil { log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err) return nil, err