1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
ctrl/requests.go
2021-12-15 15:13:48 +01:00

1806 lines
57 KiB
Go

// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
// type methodCommandCLICommandRequest struct {
// commandOrEvent CommandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
// return m.commandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
// ...
// ...
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
package steward
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus"
)
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
// ------------------------------------------------------------
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
// Initial parent method used to start other processes.
REQInitial Method = "REQInitial"
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
REQCliCommand Method = "REQCliCommand"
// REQCliCommandCont same as normal Cli command, but can be used
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
// generated, and not wait until the command is finished.
REQCliCommandCont Method = "REQCliCommandCont"
// Send text to be logged to the console.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
REQToConsole Method = "REQToConsole"
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
REQToFileAppend Method = "REQToFileAppend"
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
REQToFile Method = "REQToFile"
// Read the source file to be copied to some node.
REQCopyFileFrom Method = "REQCopyFileFrom"
// Write the destination copied to some node.
REQCopyFileTo Method = "REQCopyFileTo"
// Send Hello I'm here message.
REQHello Method = "REQHello"
// Error log methods to centralError node.
REQErrorLog Method = "REQErrorLog"
// Echo request will ask the subscriber for a
// reply generated as a new message, and sent back to where
// the initial request was made.
REQPing Method = "REQPing"
// Will generate a reply for a ECHORequest
REQPong Method = "REQPong"
// Http Get
REQHttpGet Method = "REQHttpGet"
// Tail file
REQTailFile Method = "REQTailFile"
// Write to steward socket
REQToSocket Method = "REQToSocket"
// Send a message via a node
REQRelay Method = "REQRelay"
// The method handler for the first step in a relay chain.
REQRelayInitial Method = "REQRelayInitial"
)
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
// Allowed values for the commandOrEvent field are:
// - CommandACK
// - CommandNACK
// - EventACK
// - EventNack
func (m Method) GetMethodsAvailable() MethodsAvailable {
// Command, Used to make a request to perform an action
// Event, Used to communicate that something have happened.
ma := MethodsAvailable{
Methodhandlers: map[Method]methodHandler{
REQInitial: methodREQInitial{
commandOrEvent: CommandACK,
},
REQOpProcessList: methodREQOpProcessList{
commandOrEvent: CommandACK,
},
REQOpProcessStart: methodREQOpProcessStart{
commandOrEvent: CommandACK,
},
REQOpProcessStop: methodREQOpProcessStop{
commandOrEvent: CommandACK,
},
REQCliCommand: methodREQCliCommand{
commandOrEvent: CommandACK,
},
REQCliCommandCont: methodREQCliCommandCont{
commandOrEvent: CommandACK,
},
REQToConsole: methodREQToConsole{
commandOrEvent: EventACK,
},
REQToFileAppend: methodREQToFileAppend{
commandOrEvent: EventACK,
},
REQToFile: methodREQToFile{
commandOrEvent: EventACK,
},
REQCopyFileFrom: methodREQCopyFileFrom{
commandOrEvent: EventACK,
},
REQCopyFileTo: methodREQCopyFileTo{
commandOrEvent: EventACK,
},
REQHello: methodREQHello{
commandOrEvent: EventNACK,
},
REQErrorLog: methodREQErrorLog{
commandOrEvent: EventACK,
},
REQPing: methodREQPing{
commandOrEvent: EventACK,
},
REQPong: methodREQPong{
commandOrEvent: EventACK,
},
REQHttpGet: methodREQHttpGet{
commandOrEvent: EventACK,
},
REQTailFile: methodREQTailFile{
commandOrEvent: EventACK,
},
REQToSocket: methodREQToSocket{
commandOrEvent: EventACK,
},
REQRelay: methodREQRelay{
commandOrEvent: EventACK,
},
REQRelayInitial: methodREQRelayInitial{
commandOrEvent: EventACK,
},
},
}
return ma
}
// Reply methods. The slice generated here is primarily used within
// the Stew client for knowing what of the req types are generally
// used as reply methods.
func (m Method) GetReplyMethods() []Method {
rm := []Method{REQToConsole, REQToFile, REQToFileAppend, REQToSocket}
return rm
}
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func (m Method) getHandler(method Method) methodHandler {
ma := m.GetMethodsAvailable()
mh := ma.Methodhandlers[method]
return mh
}
// The structure that works as a reference for all the methods and if
// they are of the command or event type, and also if it is a ACK or
// NACK message.
// ----
// Initial parent method used to start other processes.
type methodREQInitial struct {
commandOrEvent CommandOrEvent
}
func (m methodREQInitial) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodREQInitial) handler(proc process, message Message, node string) ([]byte, error) {
// proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
type MethodsAvailable struct {
Methodhandlers map[Method]methodHandler
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
mFunc, ok := ma.Methodhandlers[m]
if ok {
return mFunc, true
} else {
return nil, false
}
}
// newReplyMessage will create and send a reply message back to where
// the original provided message came from. The primary use of this
// function is to report back to a node who sent a message with the
// result of the request method of the original message.
//
// The method to use for the reply message when reporting back should
// be specified within a message in the replyMethod field. We will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
//
// There will also be a copy of the original message put in the
// previousMessage field. For the copy of the original message the data
// field will be set to nil before the whole message is put in the
// previousMessage field so we don't copy around the original data in
// the reply response when it is not needed anymore.
func newReplyMessage(proc process, message Message, outData []byte) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message.ReplyMethod == "" {
message.ReplyMethod = REQToFileAppend
}
// Make a copy of the message as it is right now to use
// in the previous message field, but set the data field
// to nil so we don't copy around the original data when
// we don't need to for the reply message.
thisMsg := message
thisMsg.Data = nil
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
FromNode: message.ToNode,
Data: []string{string(outData)},
Method: message.ReplyMethod,
MethodArgs: message.ReplyMethodArgs,
MethodTimeout: message.ReplyMethodTimeout,
IsReply: true,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
Directory: message.Directory,
FileName: message.FileName,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage: &thisMsg,
}
sam, err := newSubjectAndMessage(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming(message Message, proc process) (string, string) {
var fileName string
var folderTree string
switch {
case message.PreviousMessage == nil:
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode))
case message.PreviousMessage.ToNode != "":
fileName = message.PreviousMessage.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.PreviousMessage.ToNode))
case message.PreviousMessage.ToNode == "":
fileName = message.PreviousMessage.FileName
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.FromNode))
}
return fileName, folderTree
}
// ------------------------------------------------------------
// Subscriber method handlers
// ------------------------------------------------------------
// The methodHandler interface.
type methodHandler interface {
handler(proc process, message Message, node string) ([]byte, error)
getKind() CommandOrEvent
}
// -----
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessList) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process List
func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
proc.processes.active.mu.Lock()
for _, pTmp := range proc.processes.active.procNames {
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
sb := []byte(s)
out = append(out, sb...)
}
proc.processes.active.mu.Unlock()
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStart) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process Start
func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var out []byte
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
m := message.MethodArgs[0]
method := Method(m)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// TODO: What should this look like ?
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- OpProcessStop
type methodREQOpProcessStop struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStop) getKind() CommandOrEvent {
return m.commandOrEvent
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var out []byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
// --- Parse and check the method arguments given.
// The Reason for also having the node as one of the arguments is
// that publisher processes are named by the node they are sending the
// message to. Subscriber processes names are named by the node name
// they are running on.
if v := len(message.MethodArgs); v != 3 {
er := fmt.Errorf("error: methodREQCliCommand: got <4 number methodArgs, want: method,node,kind")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
methodString := message.MethodArgs[0]
node := message.MethodArgs[1]
kind := message.MethodArgs[2]
method := Method(methodString)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(method, string(node))
processName := processNameGet(sub.name(), processKind(kind))
// Remove the process from the processes active map if found.
proc.processes.active.mu.Lock()
toStopProc, ok := proc.processes.active.procNames[processName]
if ok {
// Delete the process from the processes map
delete(proc.processes.active.procNames, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Remove the prometheus label
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
} else {
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}
proc.processes.active.mu.Unlock()
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQToFileAppend struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToFileAppend) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle appending data to file.
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// -----
type methodREQToFile struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToFile) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQCopyFileFrom struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCopyFileFrom) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
SrcFilePath := message.MethodArgs[0]
DstNode := message.MethodArgs[1]
DstFilePath := message.MethodArgs[2]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
defer cancel()
outCh := make(chan []byte)
errCh := make(chan error)
// Read the file, and put the result on the out channel to be sent when done reading.
proc.processes.wg.Add(1)
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
// Wait here until we got the data to send, then create a new message
// and send it.
// Also checking the ctx.Done which calls Cancel will allow us to
// kill all started go routines started by this message.
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case er := <-errCh:
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
// Prepare for sending a new message with the output
// TODO: Maybe changing the type of Message.Data to []byte ?
// Copy the original message to get the defaults for timeouts etc,
// and set new values for fields to change.
msg := message
msg.ToNode = Node(DstNode)
//msg.Method = REQToFile
msg.Method = REQCopyFileTo
msg.Data = []string{string(out)}
msg.Directory = dstDir
msg.FileName = dstFile
// Create SAM and put the message on the send new message channel.
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
// TODO: Should we also send a reply message with the result back
// to where the message originated ?
replyData := fmt.Sprintf("info: succesfully read the file %v, and sent the content to %v\n", SrcFilePath, DstNode)
newReplyMessage(proc, message, []byte(replyData))
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) {
defer wg.Done()
const natsMaxMsgSize = 1000000
fi, err := os.Stat(SrcFilePath)
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os.IsNotExist(err):
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
return
case fi.Size() > natsMaxMsgSize:
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
return
}
fh, err := os.Open(SrcFilePath)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
return
}
b, err := io.ReadAll(fh)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
return
}
select {
case outCh <- b:
fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
case <-ctx.Done():
return
}
}
// ----
type methodREQCopyFileTo struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCopyFileTo) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
defer cancel()
// Put data that should be the result of the action done in the inner
// go routine on the outCh.
outCh := make(chan []byte)
// Put errors from the inner go routine on the errCh.
errCh := make(chan error)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// ---
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// Pick up the values for the directory and filename for where
// to store the file.
DstFilePath := message.MethodArgs[2]
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
fileRealPath := path.Join(dstDir, dstFile)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
err := os.MkdirAll(dstDir, 0700)
if err != nil {
er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, dstDir, err)
errCh <- er
return
}
log.Printf("info: MethodREQCopyFileTo: Creating folders %v\n", dstDir)
}
// Open file and write data. Truncate and overwrite any existing files.
file := filepath.Join(dstDir, dstFile)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
errCh <- er
return
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err)
errCh <- er
}
}
// All went ok, send a signal to the outer select statement.
outCh <- []byte(fileRealPath)
// ---
}()
// Wait for messages received from the inner go routine.
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case err := <-errCh:
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
newReplyMessage(proc, message, []byte(replyData))
return
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQHello struct {
commandOrEvent CommandOrEvent
}
func (m methodREQHello) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler for receiving hello messages.
func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) {
data := fmt.Sprintf("%v, Received hello from %#v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
fileName := message.FileName
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.FromNode))
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Printf("error: methodREQHello.handler: failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
_, err = f.Write([]byte(data))
f.Sync()
if err != nil {
log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err)
}
// --------------------------
// send the message to the procFuncCh which is running alongside the process
// and can hold registries and handle special things for an individual process.
proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQErrorLog struct {
commandOrEvent CommandOrEvent
}
func (m methodREQErrorLog) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle the writing of error logs.
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.metrics.promErrorMessagesReceivedTotal.Inc()
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Printf("error: methodREQErrorLog.handler: failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
log.Printf("error: methodEventTextLogging.handler: failed to write to file: %v\n", err)
}
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQPing struct {
commandOrEvent CommandOrEvent
}
func (m methodREQPing) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle receving a ping.
func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) {
// Write to file that we received a ping
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
// And write the data
d := fmt.Sprintf("%v, ping received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
_, err = f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
newReplyMessage(proc, message, nil)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQPong struct {
commandOrEvent CommandOrEvent
}
func (m methodREQPong) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle receiving a pong.
func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) {
// Write to file that we received a pong
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, er
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
// And write the data
d := fmt.Sprintf("%v, pong received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.PreviousMessage.ToNode)
_, err = f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQCliCommand struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCliCommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs)
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var a []string
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
case len(message.MethodArgs) >= 0:
a = message.MethodArgs[1:]
}
c := message.MethodArgs[0]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i, v := range message.MethodArgs {
if strings.Contains(v, "{{STEWARD_DATA}}") {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1)
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
envData = strings.Join(message.Data, "")
}
}
cmd := exec.CommandContext(ctx, c, a...)
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt.Sprintf("STEWARD_DATA=%v", envData)
cmd.Env = append(cmd.Env, envData)
}
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
select {
case outCh <- out.Bytes():
case <-ctx.Done():
return
}
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh:
cancel()
// NB: Not quite sure what is the best way to handle the below
// isReply right now. Implementing as send to central for now.
//
// If this is this a reply message swap the toNode and fromNode
// fields so the output of the command are sent to central node.
if message.IsReply {
message.ToNode, message.FromNode = message.FromNode, message.ToNode
}
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQToConsole struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToConsole) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to write directly to console.
func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) {
for _, v := range message.Data {
fmt.Fprintf(os.Stdout, "%v", string(v))
}
fmt.Println()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQHttpGet struct {
commandOrEvent CommandOrEvent
}
func (m methodREQHttpGet) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to do a Http Get.
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
url := message.MethodArgs[0]
client := http.Client{
Timeout: time.Duration(message.MethodTimeout),
}
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
cancel()
return
}
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
out := body
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh:
cancel()
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodREQTailFile
type methodREQTailFile struct {
commandOrEvent CommandOrEvent
}
func (m methodREQTailFile) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
fp := message.MethodArgs[0]
var ctx context.Context
var cancel context.CancelFunc
if message.MethodTimeout != 0 {
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
} else {
ctx, cancel = context.WithCancel(proc.ctx)
}
outCh := make(chan []byte)
t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{
Offset: 0,
Whence: os.SEEK_END,
}})
if err != nil {
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
}
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
for {
select {
case line := <-t.Lines:
outCh <- []byte(line.Text + "\n")
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
cancel()
// Close the lines channel so we exit the reading lines
// go routine.
// close(t.Lines)
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQCliCommandCont struct {
commandOrEvent CommandOrEvent
}
func (m methodREQCliCommandCont) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to run REQCliCommandCont, which is the same as normal
// Cli command, but can be used when running a command that will take
// longer time and you want to send the output of the command continually
// back as it is generated, and not just when the command is finished.
func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var a []string
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
case len(message.MethodArgs) >= 0:
a = message.MethodArgs[1:]
}
c := message.MethodArgs[0]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
outCh := make(chan []byte)
errCh := make(chan string)
proc.processes.wg.Add(1)
go func() {
proc.processes.wg.Done()
cmd := exec.CommandContext(ctx, c, a...)
// Using cmd.StdoutPipe here so we are continuosly
// able to read the out put of the command.
outReader, err := cmd.StdoutPipe()
if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("error: %v\n", er)
}
ErrorReader, err := cmd.StderrPipe()
if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
if err := cmd.Start(); err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
go func() {
scanner := bufio.NewScanner(ErrorReader)
for scanner.Scan() {
errCh <- scanner.Text()
}
}()
go func() {
scanner := bufio.NewScanner(outReader)
for scanner.Scan() {
outCh <- []byte(scanner.Text() + "\n")
}
}()
// NB: sending cancel to command context, so processes are killed.
// A github issue is filed on not killing all child processes when using pipes:
// https://github.com/golang/go/issues/23019
// TODO: Check in later if there are any progress on the issue.
// When testing the problem seems to appear when using sudo, or tcpdump without
// the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
// which makes stdout line buffered.
<-ctx.Done()
cancel()
if err := cmd.Wait(); err != nil {
log.Printf(" --------------- * error: REQCliCommandCont: cmd.Wait: %v\n", err)
}
}()
// Check if context timer or command output were received.
for {
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
newReplyMessage(proc, message, out)
case out := <-errCh:
newReplyMessage(proc, message, []byte(out))
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQToSocket struct {
commandOrEvent CommandOrEvent
}
func (m methodREQToSocket) getKind() CommandOrEvent {
return m.commandOrEvent
}
// TODO:
// Handler to write to unix socket file.
func (m methodREQToSocket) handler(proc process, message Message, node string) ([]byte, error) {
for _, d := range message.Data {
// TODO: Write the data to the socket here.
fmt.Printf("Info: Data to write to socket: %v\n", d)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQRelayInitial struct {
commandOrEvent CommandOrEvent
}
func (m methodREQRelayInitial) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to relay messages via a host.
func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
defer cancel()
outCh := make(chan []byte)
errCh := make(chan error)
nothingCh := make(chan struct{}, 1)
var out []byte
// If the actual Method for the message is REQCopyFileFrom we need to
// do the actual file reading here so we can fill the data field of the
// message with the content of the file before relaying it.
switch {
case message.RelayOriginalMethod == REQCopyFileFrom:
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
SrcFilePath := message.MethodArgs[0]
//DstFilePath := message.MethodArgs[2]
// Read the file, and put the result on the out channel to be sent when done reading.
proc.processes.wg.Add(1)
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
// Since we now have read the source file we don't need the REQCopyFileFrom
// request method anymore, so we change the original method of the message
// so it will write the data after the relaying.
//dstDir := filepath.Dir(DstFilePath)
//dstFile := filepath.Base(DstFilePath)
message.RelayOriginalMethod = REQCopyFileTo
//message.FileName = dstFile
//message.Directory = dstDir
default:
// No request type that need special handling if relayed, so we should signal that
// there is nothing to do for the select below.
// We need to do this signaling in it's own go routine here, so we don't block here
// since the select below is in the same function.
go func() {
nothingCh <- struct{}{}
}()
}
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case er := <-errCh:
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case <-nothingCh:
// Do nothing.
case out = <-outCh:
}
// relay the message to the actual host here by prefixing the the RelayToNode
// to the subject.
relayTo := fmt.Sprintf("%v.%v", message.RelayToNode, message.RelayOriginalViaNode)
// message.ToNode = message.RelayOriginalViaNode
message.ToNode = Node(relayTo)
message.FromNode = Node(node)
message.Method = REQRelay
message.Data = []string{string(out)}
sam, err := newSubjectAndMessage(message)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}()
// Send back an ACK message.
ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQRelay struct {
commandOrEvent CommandOrEvent
}
func (m methodREQRelay) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to relay messages via a host.
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
// relay the message here to the actual host here.
message.ToNode = message.RelayToNode
message.FromNode = Node(node)
message.Method = message.RelayOriginalMethod
sam, err := newSubjectAndMessage(message)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
// Send back an ACK message.
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// ---- Template that can be used for creating request methods
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// defer cancel()
//
// // Put data that should be the result of the action done in the inner
// // go routine on the outCh.
// outCh := make(chan []byte)
// // Put errors from the inner go routine on the errCh.
// errCh := make(chan error)
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// // Do some work here....
//
// }()
//
// // Wait for messages received from the inner go routine.
// select {
// case <-ctx.Done():
// fmt.Printf(" ** DEBUG: got ctx.Done\n")
//
// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs)
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case er := <-errCh:
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case out := <-outCh:
// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
// newReplyMessage(proc, message, []byte(replyData))
// return
// }
//
// }()
//
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
// return ackMsg, nil
// }