1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
ctrl/requests.go
2021-09-23 05:46:25 +02:00

1484 lines
48 KiB
Go

// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
// type methodCommandCLICommandRequest struct {
// commandOrEvent CommandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
// return m.commandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
// ...
// ...
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
package steward
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus"
)
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
// ------------------------------------------------------------
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
// Initial parent method used to start other processes.
REQInitial Method = "REQInitial"
// Command for client operation request of the system. The op
// command to execute shall be given in the data field of the
// message as string value. For example "ps".
REQOpCommand Method = "REQOpCommand"
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
REQCliCommand Method = "REQCliCommand"
// REQCliCommandCont same as normal Cli command, but can be used
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
// generated, and not wait until the command is finished.
REQCliCommandCont Method = "REQCliCommandCont"
// Send text to be logged to the console.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
REQToConsole Method = "REQToConsole"
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
REQToFileAppend Method = "REQToFileAppend"
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
REQToFile Method = "REQToFile"
// Send Hello I'm here message.
REQHello Method = "REQHello"
// Error log methods to centralError node.
REQErrorLog Method = "REQErrorLog"
// Echo request will ask the subscriber for a
// reply generated as a new message, and sent back to where
// the initial request was made.
REQPing Method = "REQPing"
// Will generate a reply for a ECHORequest
REQPong Method = "REQPong"
// Http Get
REQHttpGet Method = "REQHttpGet"
// Tail file
REQTailFile Method = "REQTailFile"
// Write to steward socket
REQToSocket Method = "REQToSocket"
)
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
// Allowed values for the commandOrEvent field are:
// - CommandACK
// - CommandNACK
// - EventACK
// - EventNack
func (m Method) GetMethodsAvailable() MethodsAvailable {
// Command, Used to make a request to perform an action
// Event, Used to communicate that something have happened.
ma := MethodsAvailable{
Methodhandlers: map[Method]methodHandler{
REQInitial: methodREQInitial{
commandOrEvent: CommandACK,
},
REQOpCommand: methodREQOpCommand{
commandOrEvent: CommandACK,
},
REQOpProcessList: methodREQOpProcessList{
commandOrEvent: CommandACK,
},
REQOpProcessStart: methodREQOpProcessStart{
commandOrEvent: CommandACK,
},
REQOpProcessStop: methodREQOpProcessStop{
commandOrEvent: CommandACK,
},
REQCliCommand: methodREQCliCommand{
commandOrEvent: CommandACK,
},
REQCliCommandCont: methodREQCliCommandCont{
commandOrEvent: CommandACK,
},
REQToConsole: methodREQToConsole{
commandOrEvent: EventACK,
},
REQToFileAppend: methodREQToFileAppend{
commandOrEvent: EventACK,
},
REQToFile: methodREQToFile{
commandOrEvent: EventACK,
},
REQHello: methodREQHello{
commandOrEvent: EventNACK,
},
REQErrorLog: methodREQErrorLog{
commandOrEvent: EventACK,
},
REQPing: methodREQPing{
commandOrEvent: EventACK,
},
REQPong: methodREQPong{
commandOrEvent: EventACK,
},
REQHttpGet: methodREQHttpGet{
commandOrEvent: EventACK,
},
REQTailFile: methodREQTailFile{
commandOrEvent: EventACK,
},
REQToSocket: methodREQToSocket{
commandOrEvent: EventACK,
},
},
}
return ma
}
// Reply methods. The slice generated here is primarily used within
// the Stew client for knowing what of the req types are generally
// used as reply methods.
func (m Method) GetReplyMethods() []Method {
rm := []Method{REQToConsole, REQToFile, REQToFileAppend, REQToSocket}
return rm
}
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func (m Method) getHandler(method Method) methodHandler {
ma := m.GetMethodsAvailable()
mh := ma.Methodhandlers[method]
return mh
}
// The structure that works as a reference for all the methods and if
// they are of the command or event type, and also if it is a ACK or
// NACK message.
// ----
// Initial parent method used to start other processes.
type methodREQInitial struct {
commandOrEvent CommandOrEvent
}
func (m methodREQInitial) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodREQInitial) handler(proc process, message Message, node string) ([]byte, error) {
// proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
type MethodsAvailable struct {
Methodhandlers map[Method]methodHandler
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
mFunc, ok := ma.Methodhandlers[m]
if ok {
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
return mFunc, true
} else {
// fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m)
return nil, false
}
}
// Create a new message for the reply containing the output of the
// action executed put in outData, and put it on the ringbuffer to
// be published.
// The method to use for the reply message should initially be
// specified within the first message as the replyMethod, and we will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
func newReplyMessage(proc process, message Message, outData []byte) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message.ReplyMethod == "" {
message.ReplyMethod = REQToFileAppend
}
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
FromNode: message.ToNode,
Data: []string{string(outData)},
Method: message.ReplyMethod,
MethodArgs: message.ReplyMethodArgs,
MethodTimeout: message.ReplyMethodTimeout,
IsReply: true,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
Directory: message.Directory,
FileName: message.FileName,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage: &message,
}
sam, err := newSubjectAndMessage(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming(message Message, proc process) (string, string) {
var fileName string
var folderTree string
switch {
case message.PreviousMessage == nil:
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode))
case message.PreviousMessage.ToNode != "":
fileName = message.PreviousMessage.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.PreviousMessage.ToNode))
case message.PreviousMessage.ToNode == "":
fileName = message.PreviousMessage.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.FromNode))
}
return fileName, folderTree
}
// ------------------------------------------------------------
// Subscriber method handlers
// ------------------------------------------------------------
// The methodHandler interface.
type methodHandler interface {
handler(proc process, message Message, node string) ([]byte, error)
getKind() CommandOrEvent
}
// -----
type methodREQOpCommand struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpCommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
type OpCmdStartProc struct {
Method Method `json:"method"`
AllowedNodes []Node `json:"allowedNodes"`
}
type OpCmdStopProc struct {
RecevingNode Node `json:"receivingNode"`
Method Method `json:"method"`
Kind processKind `json:"kind"`
ID int `json:"id"`
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
// unmarshal the json.RawMessage field called OpArgs.
//
// Dst interface is the generic type to Unmarshal OpArgs into, and we will
// set the type it should contain depending on the value specified in Cmd.
var dst interface{}
switch message.Operation.OpCmd {
case "ps":
proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
for _, idMap := range proc.processes.active {
for _, v := range idMap {
s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name())
sb := []byte(s)
out = append(out, sb...)
}
}
proc.processes.mu.Unlock()
case "startProc":
// Set the empty interface type dst to &OpStart.
dst = &OpCmdStartProc{}
err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStartProc)
//fmt.Printf(" ** Content of Arg = %#v\n", arg)
if len(arg.AllowedNodes) == 0 {
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
if arg.Method == "" {
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// Create the process and start it.
sub := newSubject(arg.Method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case "stopProc":
// Set the interface type dst to &OpStart.
dst = &OpCmdStopProc{}
err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStopProc)
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(arg.Method, string(arg.RecevingNode))
processName := processNameGet(sub.name(), arg.Kind)
// Check if the message contains an id.
err = func() error {
if arg.ID == 0 {
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return er
}
return nil
}()
if err != nil {
er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
proc.processes.mu.Lock()
// Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][arg.ID]
if ok {
// Delete the process from the processes map
delete(proc.processes.active, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Remove the prometheus label
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error()))
} else {
er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error()))
}
proc.processes.mu.Unlock()
}
newReplyMessage(proc, message, out)
}()
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", proc.node, message.ID))
return ackMsg, nil
}
// ---- New operations
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessList) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process List
func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
for _, idMap := range proc.processes.active {
for _, v := range idMap {
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name())
sb := []byte(s)
out = append(out, sb...)
}
}
proc.processes.mu.Unlock()
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStart) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process Start
func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var out []byte
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
m := message.MethodArgs[0]
method := Method(m)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// TODO: What should this look like ?
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- OpProcessStop
type methodREQOpProcessStop struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStop) getKind() CommandOrEvent {
return m.commandOrEvent
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var out []byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
if v := len(message.MethodArgs); v != 4 {
er := fmt.Errorf("error: OpProcessStop: methodArgs should contain 4 elements, found %v", v)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// --- Parse and check the method arguments given.
// The Reason for also having the node as one of the arguments is
// that publisher processes are named by the node they are sending the
// message to. Subscriber processes names are named by the node name
// they are running on.
methodString := message.MethodArgs[0]
node := message.MethodArgs[1]
kind := message.MethodArgs[2]
idString := message.MethodArgs[3]
method := Method(methodString)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// Check if id is a valid number.
id, err := strconv.Atoi(idString)
if err != nil {
er := fmt.Errorf("error: OpProcessStop: id: %v, is not a number, check that the methodArgs are correct: %v", idString, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(method, string(node))
processName := processNameGet(sub.name(), processKind(kind))
proc.processes.mu.Lock()
// Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][id]
if ok {
// Delete the process from the processes map
delete(proc.processes.active, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Remove the prometheus label
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", id, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
} else {
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}
proc.processes.mu.Unlock()
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQToFileAppend struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToFileAppend) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle appending data to file.
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// -----
type methodREQToFile struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToFile) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQHello struct {
commandOrEvent CommandOrEvent
}
func (m methodREQHello) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler for receiving hello messages.
func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) {
data := fmt.Sprintf("%v, Received hello from %#v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
fileName := message.FileName
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.FromNode))
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Printf("error: methodREQHello.handler: failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
_, err = f.Write([]byte(data))
f.Sync()
if err != nil {
log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err)
}
// --------------------------
// send the message to the procFuncCh which is running alongside the process
// and can hold registries and handle special things for an individual process.
proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQErrorLog struct {
commandOrEvent CommandOrEvent
}
func (m methodREQErrorLog) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle the writing of error logs.
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.metrics.promErrorMessagesReceivedTotal.Inc()
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Printf("error: methodREQErrorLog.handler: failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQPing struct {
commandOrEvent CommandOrEvent
}
func (m methodREQPing) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle receving a ping.
func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) {
// Write to file that we received a ping
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
// And write the data
d := fmt.Sprintf("%v, ping received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
_, err = f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
newReplyMessage(proc, message, nil)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQPong struct {
commandOrEvent CommandOrEvent
}
func (m methodREQPong) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle receiving a pong.
func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) {
// Write to file that we received a pong
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
// And write the data
d := fmt.Sprintf("%v, pong received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.PreviousMessage.ToNode)
_, err = f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQCliCommand struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCliCommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.Data)
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
c := message.MethodArgs[0]
a := message.MethodArgs[1:]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i, v := range message.MethodArgs {
if strings.Contains(v, "{{STEWARD_DATA}}") {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1)
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
envData = strings.Join(message.Data, "")
}
}
cmd := exec.CommandContext(ctx, c, a...)
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt.Sprintf("STEWARD_DATA=%v", envData)
cmd.Env = append(cmd.Env, envData)
}
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
if err != nil {
log.Printf("error: failed cmd.Run: %v\n", err)
}
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String())
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
select {
case outCh <- out.Bytes():
case <-ctx.Done():
return
}
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh:
cancel()
// NB: Not quite sure what is the best way to handle the below
// isReply right now. Implementing as send to central for now.
//
// If this is this a reply message swap the toNode and fromNode
// fields so the output of the command are sent to central node.
if message.IsReply {
message.ToNode, message.FromNode = message.FromNode, message.ToNode
}
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQToConsole struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToConsole) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to write directly to console.
func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) {
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQHttpGet struct {
commandOrEvent CommandOrEvent
}
func (m methodREQHttpGet) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to do a Http Get.
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
url := message.MethodArgs[0]
client := http.Client{
Timeout: time.Second * 5,
}
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
cancel()
return
}
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
out := body
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh:
cancel()
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodREQTailFile
type methodREQTailFile struct {
commandOrEvent CommandOrEvent
}
func (m methodREQTailFile) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
fp := message.MethodArgs[0]
var ctx context.Context
var cancel context.CancelFunc
if message.MethodTimeout != 0 {
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
} else {
ctx, cancel = context.WithCancel(proc.ctx)
}
outCh := make(chan []byte)
t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{
Offset: 0,
Whence: os.SEEK_END,
}})
if err != nil {
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
}
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
for {
select {
case line := <-t.Lines:
outCh <- []byte(line.Text + "\n")
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
cancel()
// Close the lines channel so we exit the reading lines
// go routine.
// close(t.Lines)
er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQCliCommandCont struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCliCommandCont) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to run REQCliCommandCont, which is the same as normal
// Cli command, but can be used when running a command that will take
// longer time and you want to send the output of the command continually
// back as it is generated, and not just when the command is finished.
func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
c := message.MethodArgs[0]
a := message.MethodArgs[1:]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
proc.processes.wg.Done()
cmd := exec.CommandContext(ctx, c, a...)
// Using cmd.StdoutPipe here so we are continuosly
// able to read the out put of the command.
outReader, err := cmd.StdoutPipe()
if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
log.Printf("error: %v\n", err)
}
ErrorReader, err := cmd.StderrPipe()
if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
log.Printf("error: %v\n", err)
}
if err := cmd.Start(); err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Also send error messages that might happen during the time
// cmd.Start runs.
// Putting the scanner.Text value on a channel so we can make
// the scanner non-blocking, and also check context cancelation.
go func() {
errCh := make(chan string, 1)
scanner := bufio.NewScanner(ErrorReader)
for scanner.Scan() {
select {
case errCh <- scanner.Text():
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v, error_output: %v", err, message, <-errCh)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
case <-ctx.Done():
return
}
}
}()
scanner := bufio.NewScanner(outReader)
for scanner.Scan() {
select {
case outCh <- []byte(scanner.Text() + "\n"):
case <-ctx.Done():
return
}
}
}()
// Check if context timer or command output were received.
for {
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: %v", message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQToSocket struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToSocket) getKind() CommandOrEvent {
return m.commandOrEvent
}
// TODO:
// Handler to write to unix socket file.
func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) {
for _, d := range message.Data {
// TODO: Write the data to the socket here.
fmt.Printf("Info: Data to write to socket: %v\n", d)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----