mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 06:46:48 +00:00
labels are used to create folder structue on subscriber
This commit is contained in:
parent
f58c12bf80
commit
26078cffa2
4 changed files with 41 additions and 7 deletions
|
@ -1,6 +1,6 @@
|
|||
[
|
||||
{
|
||||
|
||||
"label":"opcommand_logs",
|
||||
"toNode": "ship1",
|
||||
"data": ["ps"],
|
||||
"method":"OpCommandRequest",
|
||||
|
|
|
@ -16,7 +16,7 @@ type Message struct {
|
|||
// information that can be used on the subscriber side to for
|
||||
// example create specific folders using the Format name to
|
||||
// logically group data recevied.
|
||||
Label string
|
||||
Label string `json:"label" yaml:"label"`
|
||||
// The node to send the message to
|
||||
ToNode node `json:"toNode" yaml:"toNode"`
|
||||
// The Unique ID of the message
|
||||
|
@ -78,6 +78,9 @@ type Subject struct {
|
|||
// from routeMessagesToPublish in *server.
|
||||
// This channel is only used for publishing processes.
|
||||
messageCh chan Message
|
||||
// Label, single word used to describe the data content. For example
|
||||
// syslog, metrics, etc.
|
||||
Label string
|
||||
}
|
||||
|
||||
// newSubject will return a new variable of the type subject, and insert
|
||||
|
|
|
@ -104,6 +104,7 @@ func newSAM(m Message) (subjectAndMessage, error) {
|
|||
ToNode: string(m.ToNode),
|
||||
CommandOrEvent: tmpH.getKind(),
|
||||
Method: m.Method,
|
||||
Label: m.Label,
|
||||
}
|
||||
|
||||
sm := subjectAndMessage{
|
||||
|
|
|
@ -252,6 +252,8 @@ func (m methodOpCommandRequest) handler(proc process, message Message, node stri
|
|||
switch {
|
||||
case message.Data[0] == "ps":
|
||||
proc.processes.mu.Lock()
|
||||
// Loop the the processes map, and find all that is active to
|
||||
// be returned in the reply message.
|
||||
for _, v := range proc.processes.active {
|
||||
s := fmt.Sprintf("* proc - : %v, id: %v, name: %v, allowed from: %s\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
|
||||
sb := []byte(s)
|
||||
|
@ -290,7 +292,21 @@ func newReplyMessage(proc process, message Message, method Method, outData []byt
|
|||
Retries: message.RequestRetries,
|
||||
PreviousMessageSubject: proc.subject,
|
||||
}
|
||||
fmt.Printf(" ** %#v\n", newMsg)
|
||||
|
||||
// The label field is part of the subject struct, but it is not
|
||||
// used for creating subjects for Nats messages.
|
||||
// The label field is set in the individual message that are brought
|
||||
// into the system via the socket.
|
||||
// since the process don't care or knows about the labels for the message
|
||||
// handling we need to manually add it here on the message level,
|
||||
// so the receiving subscriber gets that information.
|
||||
//
|
||||
// NB: This would probably be better handled if the Message rather
|
||||
// contained the whole previous message instead of using the previous
|
||||
// subject, but when testing it seemed to fail to unmarshal a *message
|
||||
// field in the message struct. That is also why the previousSubject
|
||||
// field were introduced to avoid having a field with *previousMessage.
|
||||
newMsg.PreviousMessageSubject.Label = message.Label
|
||||
|
||||
nSAM, err := newSAM(newMsg)
|
||||
if err != nil {
|
||||
|
@ -355,8 +371,9 @@ func (m methodTextLogging) getKind() CommandOrEvent {
|
|||
}
|
||||
|
||||
func (m methodTextLogging) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
// Recreate the subject structure for the message, so we can use
|
||||
// it in the naming of the files to create.
|
||||
|
||||
// If it was a request type message we want to check what the initial messages
|
||||
// method, so we can use that in creating the file name to store the data.
|
||||
var fileName string
|
||||
switch {
|
||||
case message.PreviousMessageSubject.ToNode != "":
|
||||
|
@ -365,8 +382,21 @@ func (m methodTextLogging) handler(proc process, message Message, node string) (
|
|||
fileName = fmt.Sprintf("%v.%v.log", message.FromNode, message.Method)
|
||||
}
|
||||
|
||||
logFile := filepath.Join(proc.configuration.SubscribersDataFolder, fileName)
|
||||
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend)
|
||||
// Check if folder structure exist, if not create it.
|
||||
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessageSubject.Label, message.PreviousMessageSubject.ToNode)
|
||||
|
||||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error: failed to create directory %v: %v", folderTree, err)
|
||||
}
|
||||
|
||||
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
|
||||
}
|
||||
|
||||
// Open file and write data.
|
||||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend)
|
||||
if err != nil {
|
||||
log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err)
|
||||
return nil, err
|
||||
|
|
Loading…
Add table
Reference in a new issue