// The structure of how to add new method types to the system. // ----------------------------------------------------------- // All methods need 3 things: // - A type definition // - The type needs a getKind method // - The type needs a handler method // Overall structure example shown below. // // --- // type methodCommandCLICommandRequest struct { // commandOrEvent CommandOrEvent // } // // func (m methodCommandCLICommandRequest) getKind() CommandOrEvent { // return m.commandOrEvent // } // // func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) { // ... // ... // ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) // return ackMsg, nil // } // // --- // You also need to make a constant for the Method, and add // that constant as the key in the map, where the value is // the actual type you want to map it to with a handler method. // You also specify if it is a Command or Event, and if it is // ACK or NACK. // Check out the existing code below for more examples. package steward import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "os/exec" "path/filepath" "strconv" "strings" "time" "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" ) // Method is used to specify the actual function/method that // is represented in a typed manner. type Method string // ------------------------------------------------------------ // The constants that will be used throughout the system for // when specifying what kind of Method to send or work with. const ( // Initial parent method used to start other processes. REQInitial Method = "REQInitial" // Command for client operation request of the system. The op // command to execute shall be given in the data field of the // message as string value. For example "ps". REQOpCommand Method = "REQOpCommand" // Get a list of all the running processes. REQOpProcessList Method = "REQOpProcessList" // Start up a process. REQOpProcessStart Method = "REQOpProcessStart" // Stop up a process. REQOpProcessStop Method = "REQOpProcessStop" // Execute a CLI command in for example bash or cmd. // This is an event type, where a message will be sent to a // node with the command to execute and an ACK will be replied // if it was delivered succesfully. The output of the command // ran will be delivered back to the node where it was initiated // as a new message. // The data field is a slice of strings where the first string // value should be the command, and the following the arguments. REQCliCommand Method = "REQCliCommand" // REQCliCommandCont same as normal Cli command, but can be used // when running a command that will take longer time and you want // to send the output of the command continually back as it is // generated, and not wait until the command is finished. REQCliCommandCont Method = "REQCliCommandCont" // Send text to be logged to the console. // The data field is a slice of strings where the first string // value should be the command, and the following the arguments. REQToConsole Method = "REQToConsole" // Send text logging to some host by appending the output to a // file, if the file do not exist we create it. // A file with the full subject+hostName will be created on // the receiving end. // The data field is a slice of strings where the values of the // slice will be written to the log file. REQToFileAppend Method = "REQToFileAppend" // Send text to some host by overwriting the existing content of // the fileoutput to a file. If the file do not exist we create it. // A file with the full subject+hostName will be created on // the receiving end. // The data field is a slice of strings where the values of the // slice will be written to the file. REQToFile Method = "REQToFile" // Send Hello I'm here message. REQHello Method = "REQHello" // Error log methods to centralError node. REQErrorLog Method = "REQErrorLog" // Echo request will ask the subscriber for a // reply generated as a new message, and sent back to where // the initial request was made. REQPing Method = "REQPing" // Will generate a reply for a ECHORequest REQPong Method = "REQPong" // Http Get REQHttpGet Method = "REQHttpGet" // Tail file REQTailFile Method = "REQTailFile" // Write to steward socket REQToSocket Method = "REQToSocket" ) // The mapping of all the method constants specified, what type // it references, and the kind if it is an Event or Command, and // if it is ACK or NACK. // Allowed values for the commandOrEvent field are: // - CommandACK // - CommandNACK // - EventACK // - EventNack func (m Method) GetMethodsAvailable() MethodsAvailable { // Command, Used to make a request to perform an action // Event, Used to communicate that something have happened. ma := MethodsAvailable{ Methodhandlers: map[Method]methodHandler{ REQInitial: methodREQInitial{ commandOrEvent: CommandACK, }, REQOpCommand: methodREQOpCommand{ commandOrEvent: CommandACK, }, REQOpProcessList: methodREQOpProcessList{ commandOrEvent: CommandACK, }, REQOpProcessStart: methodREQOpProcessStart{ commandOrEvent: CommandACK, }, REQOpProcessStop: methodREQOpProcessStop{ commandOrEvent: CommandACK, }, REQCliCommand: methodREQCliCommand{ commandOrEvent: CommandACK, }, REQCliCommandCont: methodREQCliCommandCont{ commandOrEvent: CommandACK, }, REQToConsole: methodREQToConsole{ commandOrEvent: EventACK, }, REQToFileAppend: methodREQToFileAppend{ commandOrEvent: EventACK, }, REQToFile: methodREQToFile{ commandOrEvent: EventACK, }, REQHello: methodREQHello{ commandOrEvent: EventNACK, }, REQErrorLog: methodREQErrorLog{ commandOrEvent: EventACK, }, REQPing: methodREQPing{ commandOrEvent: EventACK, }, REQPong: methodREQPong{ commandOrEvent: EventACK, }, REQHttpGet: methodREQHttpGet{ commandOrEvent: EventACK, }, REQTailFile: methodREQTailFile{ commandOrEvent: EventACK, }, REQToSocket: methodREQToSocket{ commandOrEvent: EventACK, }, }, } return ma } // Reply methods. The slice generated here is primarily used within // the Stew client for knowing what of the req types are generally // used as reply methods. func (m Method) GetReplyMethods() []Method { rm := []Method{REQToConsole, REQToFile, REQToFileAppend, REQToSocket} return rm } // getHandler will check the methodsAvailable map, and return the // method handler for the method given // as input argument. func (m Method) getHandler(method Method) methodHandler { ma := m.GetMethodsAvailable() mh := ma.Methodhandlers[method] return mh } // The structure that works as a reference for all the methods and if // they are of the command or event type, and also if it is a ACK or // NACK message. // ---- // Initial parent method used to start other processes. type methodREQInitial struct { commandOrEvent CommandOrEvent } func (m methodREQInitial) getKind() CommandOrEvent { return m.commandOrEvent } func (m methodREQInitial) handler(proc process, message Message, node string) ([]byte, error) { // proc.procFuncCh <- message ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // ---- // MethodsAvailable holds a map of all the different method types and the // associated handler to that method type. type MethodsAvailable struct { Methodhandlers map[Method]methodHandler } // Check if exists will check if the Method is defined. If true the bool // value will be set to true, and the methodHandler function for that type // will be returned. func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { mFunc, ok := ma.Methodhandlers[m] if ok { // fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) return mFunc, true } else { // fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m) return nil, false } } // Create a new message for the reply containing the output of the // action executed put in outData, and put it on the ringbuffer to // be published. // The method to use for the reply message should initially be // specified within the first message as the replyMethod, and we will // pick up that value here, and use it as the method for the new // request message. If no replyMethod is set we default to the // REQToFileAppend method type. func newReplyMessage(proc process, message Message, outData []byte) { // If no replyMethod is set we default to writing to writing to // a log file. if message.ReplyMethod == "" { message.ReplyMethod = REQToFileAppend } fmt.Printf("\n * DEBUG * newReplyMessage: message.FromNode contains: %v\n", message.FromNode) fmt.Printf("\n * DEBUG * newReplyMessage: message.ToNode contains: %v\n", message.ToNode) // Create a new message for the reply, and put it on the // ringbuffer to be published. newMsg := Message{ ToNode: message.FromNode, FromNode: message.ToNode, Data: []string{string(outData)}, Method: message.ReplyMethod, MethodArgs: message.ReplyMethodArgs, MethodTimeout: message.ReplyMethodTimeout, IsReply: true, ACKTimeout: message.ReplyACKTimeout, Retries: message.ReplyRetries, Directory: message.Directory, FileName: message.FileName, // Put in a copy of the initial request message, so we can use it's properties if // needed to for example create the file structure naming on the subscriber. PreviousMessage: &message, } sam, err := newSubjectAndMessage(newMsg) if err != nil { // In theory the system should drop the message before it reaches here. er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } proc.toRingbufferCh <- []subjectAndMessage{sam} } // selectFileNaming will figure out the correct naming of the file // structure to use for the reply data. // It will return the filename, and the tree structure for the folders // to create. func selectFileNaming(message Message, proc process) (string, string) { var fileName string var folderTree string switch { case message.PreviousMessage == nil: // If this was a direct request there are no previous message to take // information from, so we use the one that are in the current mesage. fileName = message.FileName folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode)) case message.PreviousMessage.ToNode != "": fileName = message.PreviousMessage.FileName folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.PreviousMessage.ToNode)) case message.PreviousMessage.ToNode == "": fileName = message.PreviousMessage.FileName folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.FromNode)) } return fileName, folderTree } // ------------------------------------------------------------ // Subscriber method handlers // ------------------------------------------------------------ // The methodHandler interface. type methodHandler interface { handler(proc process, message Message, node string) ([]byte, error) getKind() CommandOrEvent } // ----- type methodREQOpCommand struct { commandOrEvent CommandOrEvent } func (m methodREQOpCommand) getKind() CommandOrEvent { return m.commandOrEvent } type OpCmdStartProc struct { Method Method `json:"method"` AllowedNodes []Node `json:"allowedNodes"` } type OpCmdStopProc struct { RecevingNode Node `json:"receivingNode"` Method Method `json:"method"` Kind processKind `json:"kind"` ID int `json:"id"` } // handler to run a CLI command with timeout context. The handler will // return the output of the command run back to the calling publisher // in the ack message. func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) { proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() out := []byte{} // unmarshal the json.RawMessage field called OpArgs. // // Dst interface is the generic type to Unmarshal OpArgs into, and we will // set the type it should contain depending on the value specified in Cmd. var dst interface{} switch message.Operation.OpCmd { case "ps": proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. for _, idMap := range proc.processes.active { for _, v := range idMap { s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) sb := []byte(s) out = append(out, sb...) } } proc.processes.mu.Unlock() case "startProc": // Set the empty interface type dst to &OpStart. dst = &OpCmdStartProc{} err := json.Unmarshal(message.Operation.OpArg, &dst) if err != nil { er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } // Assert it into the correct non pointer value. arg := *dst.(*OpCmdStartProc) //fmt.Printf(" ** Content of Arg = %#v\n", arg) if len(arg.AllowedNodes) == 0 { er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message)) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return } if arg.Method == "" { er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message)) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return } // Create the process and start it. sub := newSubject(arg.Method, proc.configuration.NodeName) procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) case "stopProc": // Set the interface type dst to &OpStart. dst = &OpCmdStopProc{} err := json.Unmarshal(message.Operation.OpArg, &dst) if err != nil { er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } // Assert it into the correct non pointer value. arg := *dst.(*OpCmdStopProc) // Based on the arg values received in the message we create a // processName structure as used in naming the real processes. // We can then use this processName to get the real values for the // actual process we want to stop. sub := newSubject(arg.Method, string(arg.RecevingNode)) processName := processNameGet(sub.name(), arg.Kind) // Check if the message contains an id. err = func() error { if arg.ID == 0 { er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return er } return nil }() if err != nil { er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return } proc.processes.mu.Lock() // Remove the process from the processes active map if found. toStopProc, ok := proc.processes.active[processName][arg.ID] if ok { // Delete the process from the processes map delete(proc.processes.active, processName) // Stop started go routines that belong to the process. toStopProc.ctxCancel() // Stop subscribing for messages on the process's subject. err := toStopProc.natsSubscription.Unsubscribe() if err != nil { er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } // Remove the prometheus label proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) newReplyMessage(proc, message, []byte(er.Error())) } else { er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) newReplyMessage(proc, message, []byte(er.Error())) } proc.processes.mu.Unlock() } newReplyMessage(proc, message, out) }() ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", proc.node, message.ID)) return ackMsg, nil } // ---- New operations // --- OpProcessList type methodREQOpProcessList struct { commandOrEvent CommandOrEvent } func (m methodREQOpProcessList) getKind() CommandOrEvent { return m.commandOrEvent } // Handle Op Process List func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) { proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() out := []byte{} proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. for _, idMap := range proc.processes.active { for _, v := range idMap { s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) sb := []byte(s) out = append(out, sb...) } } proc.processes.mu.Unlock() newReplyMessage(proc, message, out) }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- OpProcessStart type methodREQOpProcessStart struct { commandOrEvent CommandOrEvent } func (m methodREQOpProcessStart) getKind() CommandOrEvent { return m.commandOrEvent } // Handle Op Process Start func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) { proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() var out []byte // We need to create a tempory method type to look up the kind for the // real method for the message. var mt Method m := message.MethodArgs[0] method := Method(m) tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return } // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf(txt) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) // TODO: What should this look like ? out = []byte(txt + "\n") newReplyMessage(proc, message, out) }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- OpProcessStop type methodREQOpProcessStop struct { commandOrEvent CommandOrEvent } func (m methodREQOpProcessStop) getKind() CommandOrEvent { return m.commandOrEvent } // RecevingNode Node `json:"receivingNode"` // Method Method `json:"method"` // Kind processKind `json:"kind"` // ID int `json:"id"` // Handle Op Process Start func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) { proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() var out []byte // We need to create a tempory method type to use to look up the kind for the // real method for the message. var mt Method if v := len(message.MethodArgs); v != 4 { er := fmt.Errorf("error: OpProcessStop: methodArgs should contain 4 elements, found %v", v) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return } // --- Parse and check the method arguments given. // The Reason for also having the node as one of the arguments is // that publisher processes are named by the node they are sending the // message to. Subscriber processes names are named by the node name // they are running on. fmt.Printf(" * DEBUG : %v\n", message.MethodArgs) methodString := message.MethodArgs[0] node := message.MethodArgs[1] kind := message.MethodArgs[2] idString := message.MethodArgs[3] method := Method(methodString) tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return } // Check if id is a valid number. id, err := strconv.Atoi(idString) if err != nil { er := fmt.Errorf("error: OpProcessStop: id: %v, is not a number, check that the methodArgs are correct: %v", idString, err) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return } // --- Find, and stop process if found // Based on the arg values received in the message we create a // processName structure as used in naming the real processes. // We can then use this processName to get the real values for the // actual process we want to stop. sub := newSubject(method, string(node)) fmt.Printf(" * DEBUG : sub: %v\n", sub) processName := processNameGet(sub.name(), processKind(kind)) fmt.Printf(" * DEBUG : processName: %v\n", processName) proc.processes.mu.Lock() // Remove the process from the processes active map if found. toStopProc, ok := proc.processes.active[processName][id] if ok { // Delete the process from the processes map delete(proc.processes.active, processName) // Stop started go routines that belong to the process. toStopProc.ctxCancel() // Stop subscribing for messages on the process's subject. err := toStopProc.natsSubscription.Unsubscribe() if err != nil { er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } // Remove the prometheus label proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", id, sub, message.ToNode) er := fmt.Errorf(txt) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) out = []byte(txt + "\n") newReplyMessage(proc, message, out) } else { txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode) er := fmt.Errorf(txt) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) out = []byte(txt + "\n") newReplyMessage(proc, message, out) } proc.processes.mu.Unlock() }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // ---- type methodREQToFileAppend struct { commandOrEvent CommandOrEvent } func (m methodREQToFileAppend) getKind() CommandOrEvent { return m.commandOrEvent } // Handle appending data to file. func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) { // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. fileName, folderTree := selectFileNaming(message, proc) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file and write data. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, err } defer f.Close() for _, d := range message.Data { _, err := f.Write([]byte(d)) f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // ----- type methodREQToFile struct { commandOrEvent CommandOrEvent } func (m methodREQToFile) getKind() CommandOrEvent { return m.commandOrEvent } // Handle writing to a file. Will truncate any existing data if the file did already // exist. func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) { // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. fileName, folderTree := selectFileNaming(message, proc) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, er } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file and write data. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, err } defer f.Close() for _, d := range message.Data { _, err := f.Write([]byte(d)) f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // ---- type methodREQHello struct { commandOrEvent CommandOrEvent } func (m methodREQHello) getKind() CommandOrEvent { return m.commandOrEvent } // Handler for receiving hello messages. func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) { data := fmt.Sprintf("%v, Received hello from %#v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode) fileName := message.FileName folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.FromNode)) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err) } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file and write data. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { log.Printf("error: methodREQHello.handler: failed to open file: %v\n", err) return nil, err } defer f.Close() _, err = f.Write([]byte(data)) f.Sync() if err != nil { log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err) } // -------------------------- // send the message to the procFuncCh which is running alongside the process // and can hold registries and handle special things for an individual process. proc.procFuncCh <- message ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQErrorLog struct { commandOrEvent CommandOrEvent } func (m methodREQErrorLog) getKind() CommandOrEvent { return m.commandOrEvent } // Handle the writing of error logs. func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { proc.processes.metrics.promErrorMessagesReceivedTotal.Inc() // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. fileName, folderTree := selectFileNaming(message, proc) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err) } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file and write data. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { log.Printf("error: methodREQErrorLog.handler: failed to open file: %v\n", err) return nil, err } defer f.Close() for _, d := range message.Data { _, err := f.Write([]byte(d)) f.Sync() if err != nil { log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err) } } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQPing struct { commandOrEvent CommandOrEvent } func (m methodREQPing) getKind() CommandOrEvent { return m.commandOrEvent } // Handle receving a ping. func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) { // Write to file that we received a ping // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. fileName, folderTree := selectFileNaming(message, proc) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, er } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, err } defer f.Close() // And write the data d := fmt.Sprintf("%v, ping received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode) _, err = f.Write([]byte(d)) f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() newReplyMessage(proc, message, nil) }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQPong struct { commandOrEvent CommandOrEvent } func (m methodREQPong) getKind() CommandOrEvent { return m.commandOrEvent } // Handle receiving a pong. func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) { // Write to file that we received a pong // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. fileName, folderTree := selectFileNaming(message, proc) // Check if folder structure exist, if not create it. if _, err := os.Stat(folderTree); os.IsNotExist(err) { err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, er } log.Printf("info: Creating subscribers data folder at %v\n", folderTree) } // Open file. file := filepath.Join(folderTree, fileName) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) return nil, err } defer f.Close() // And write the data d := fmt.Sprintf("%v, pong received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.PreviousMessage.ToNode) _, err = f.Write([]byte(d)) f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQCliCommand struct { commandOrEvent CommandOrEvent } func (m methodREQCliCommand) getKind() CommandOrEvent { return m.commandOrEvent } // handler to run a CLI command with timeout context. The handler will // return the output of the command run back to the calling publisher // as a new message. func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.Data) // Execute the CLI command in it's own go routine, so we are able // to return immediately with an ack reply that the messag was // received, and we create a new message to send back to the calling // node for the out put of the actual command. proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() fmt.Printf("* DEBUG * handler: received message contains : %#v\n", message) c := message.MethodArgs[0] a := message.MethodArgs[1:] ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) outCh := make(chan []byte) proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() // Check if {{data}} is defined in the method arguments. If found put the // data payload there. var foundEnvData bool var envData string for i, v := range message.MethodArgs { if strings.Contains(v, "{{STEWARD_DATA}}") { foundEnvData = true // Replace the found env variable placeholder with an actual env variable message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1) // Put all the data which is a slice of string into a single // string so we can put it in a single env variable. envData = strings.Join(message.Data, "") } } cmd := exec.CommandContext(ctx, c, a...) // Check for the use of env variable for STEWARD_DATA, and set env if found. if foundEnvData { envData = fmt.Sprintf("STEWARD_DATA=%v", envData) cmd.Env = append(cmd.Env, envData) fmt.Printf("\n * DEBUG * cmd.Env contains: %v\n\n", cmd.Env) } var out bytes.Buffer var stderr bytes.Buffer cmd.Stdout = &out cmd.Stderr = &stderr err := cmd.Run() if err != nil { if err != nil { log.Printf("error: failed cmd.Run: %v\n", err) } er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String()) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } select { case outCh <- out.Bytes(): case <-ctx.Done(): return } }() select { case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) case out := <-outCh: cancel() // NB: Not quite sure what is the best way to handle the below // isReply right now. Implementing as send to central for now. // // If this is this a reply message swap the toNode and fromNode // fields so the output of the command are sent to central node. if message.IsReply { message.ToNode, message.FromNode = message.FromNode, message.ToNode } // Prepare and queue for sending a new message with the output // of the action executed. newReplyMessage(proc, message, out) } }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQToConsole struct { commandOrEvent CommandOrEvent } func (m methodREQToConsole) getKind() CommandOrEvent { return m.commandOrEvent } // Handler to write directly to console. func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) { fmt.Printf("<--- methodCLICommandReply: %v\n", message.Data) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQHttpGet struct { commandOrEvent CommandOrEvent } func (m methodREQHttpGet) getKind() CommandOrEvent { return m.commandOrEvent } // handler to do a Http Get. func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() url := message.MethodArgs[0] client := http.Client{ Timeout: time.Second * 5, } ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) cancel() return } outCh := make(chan []byte) proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() resp, err := client.Do(req) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return } defer resp.Body.Close() if resp.StatusCode != 200 { cancel() er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return } body, err := io.ReadAll(resp.Body) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } out := body select { case outCh <- out: case <-ctx.Done(): return } }() select { case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) case out := <-outCh: cancel() // Prepare and queue for sending a new message with the output // of the action executed. newReplyMessage(proc, message, out) } }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- methodREQTailFile type methodREQTailFile struct { commandOrEvent CommandOrEvent } func (m methodREQTailFile) getKind() CommandOrEvent { return m.commandOrEvent } // handler to run a tailing of files with timeout context. The handler will // return the output of the command run back to the calling publisher // as a new message. func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data) proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() fp := message.MethodArgs[0] var ctx context.Context var cancel context.CancelFunc if message.MethodTimeout != 0 { ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) } else { ctx, cancel = context.WithCancel(proc.ctx) } outCh := make(chan []byte) t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{ Offset: 0, Whence: os.SEEK_END, }}) if err != nil { er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err) log.Printf("%v\n", er) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) } proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() for { select { case line := <-t.Lines: outCh <- []byte(line.Text + "\n") case <-ctx.Done(): return } } }() for { select { case <-ctx.Done(): cancel() // Close the lines channel so we exit the reading lines // go routine. // close(t.Lines) er := fmt.Errorf("info: method timeout reached, canceling: %v", message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return case out := <-outCh: // Prepare and queue for sending a new message with the output // of the action executed. newReplyMessage(proc, message, out) } } }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQCliCommandCont struct { commandOrEvent CommandOrEvent } func (m methodREQCliCommandCont) getKind() CommandOrEvent { return m.commandOrEvent } // Handler to run REQCliCommandCont, which is the same as normal // Cli command, but can be used when running a command that will take // longer time and you want to send the output of the command continually // back as it is generated, and not just when the command is finished. func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) // Execute the CLI command in it's own go routine, so we are able // to return immediately with an ack reply that the message was // received, and we create a new message to send back to the calling // node for the out put of the actual command. proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() c := message.MethodArgs[0] a := message.MethodArgs[1:] ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) outCh := make(chan []byte) proc.processes.wg.Add(1) go func() { proc.processes.wg.Done() cmd := exec.CommandContext(ctx, c, a...) // Using cmd.StdoutPipe here so we are continuosly // able to read the out put of the command. outReader, err := cmd.StdoutPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) log.Printf("error: %v\n", err) } ErrorReader, err := cmd.StderrPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) log.Printf("error: %v\n", err) } if err := cmd.Start(); err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v", err, message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) } // Also send error messages that might happen during the time // cmd.Start runs. // Putting the scanner.Text value on a channel so we can make // the scanner non-blocking, and also check context cancelation. go func() { errCh := make(chan string, 1) scanner := bufio.NewScanner(ErrorReader) for scanner.Scan() { select { case errCh <- scanner.Text(): er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v, error_output: %v", err, message, <-errCh) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) log.Printf("%v\n", er) case <-ctx.Done(): return } } }() scanner := bufio.NewScanner(outReader) for scanner.Scan() { select { case outCh <- []byte(scanner.Text() + "\n"): case <-ctx.Done(): return } } }() // Check if context timer or command output were received. for { select { case <-ctx.Done(): cancel() er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: %v", message) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) return case out := <-outCh: // Prepare and queue for sending a new message with the output // of the action executed. newReplyMessage(proc, message, out) } } }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // --- type methodREQToSocket struct { commandOrEvent CommandOrEvent } func (m methodREQToSocket) getKind() CommandOrEvent { return m.commandOrEvent } // TODO: // Handler to write to unix socket file. func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) { for _, d := range message.Data { // Write the data to the socket here. fmt.Printf("Info: Data to write to socket: %v\n", d) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } // ----