2021-03-01 16:08:40 +00:00
// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
2021-04-04 05:33:18 +00:00
// type methodCommandCLICommandRequest struct {
2021-03-01 16:08:40 +00:00
// commandOrEvent CommandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
2021-03-01 16:08:40 +00:00
// return m.commandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
2021-03-01 16:08:40 +00:00
// ...
// ...
2021-03-11 05:34:36 +00:00
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
2021-03-01 16:08:40 +00:00
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
2021-02-11 14:39:19 +00:00
package steward
import (
2021-06-08 18:52:45 +00:00
"bufio"
2021-09-08 03:50:45 +00:00
"bytes"
2021-03-26 15:04:01 +00:00
"context"
2021-04-09 16:20:04 +00:00
"encoding/json"
2021-02-11 14:39:19 +00:00
"fmt"
2021-04-06 17:42:03 +00:00
"io"
2021-02-11 14:39:19 +00:00
"log"
2021-04-06 17:42:03 +00:00
"net/http"
2021-03-02 12:46:02 +00:00
"os"
2021-02-11 14:39:19 +00:00
"os/exec"
2021-03-02 12:46:02 +00:00
"path/filepath"
2021-09-20 09:53:17 +00:00
"strconv"
2021-09-21 21:19:55 +00:00
"strings"
2021-03-11 11:07:09 +00:00
"time"
2021-04-12 13:35:20 +00:00
2021-04-13 09:28:52 +00:00
"github.com/hpcloud/tail"
2021-04-12 13:35:20 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-02-11 14:39:19 +00:00
)
2021-03-11 16:14:43 +00:00
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-03-01 16:08:40 +00:00
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
REQInitial Method = "REQInitial"
2021-04-06 09:02:58 +00:00
// Command for client operation request of the system. The op
// command to execute shall be given in the data field of the
// message as string value. For example "ps".
2021-04-04 05:55:07 +00:00
REQOpCommand Method = "REQOpCommand"
2021-09-20 04:40:34 +00:00
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
2021-09-20 09:53:17 +00:00
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
2021-03-11 16:14:43 +00:00
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-04 09:19:17 +00:00
REQCliCommand Method = "REQCliCommand"
2021-09-17 08:17:10 +00:00
// REQCliCommandCont same as normal Cli command, but can be used
2021-06-08 18:52:45 +00:00
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
2021-09-17 08:17:10 +00:00
// generated, and not wait until the command is finished.
REQCliCommandCont Method = "REQCliCommandCont"
2021-04-05 06:17:04 +00:00
// Send text to be logged to the console.
2021-03-11 16:14:43 +00:00
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-13 15:22:25 +00:00
REQToConsole Method = "REQToConsole"
2021-04-05 06:17:04 +00:00
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
2021-03-11 16:14:43 +00:00
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
2021-04-13 13:54:04 +00:00
REQToFileAppend Method = "REQToFileAppend"
2021-04-06 12:05:47 +00:00
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
2021-04-13 15:15:13 +00:00
REQToFile Method = "REQToFile"
2021-03-11 16:14:43 +00:00
// Send Hello I'm here message.
2021-04-06 03:46:07 +00:00
REQHello Method = "REQHello"
2021-03-11 16:14:43 +00:00
// Error log methods to centralError node.
2021-04-06 05:56:49 +00:00
REQErrorLog Method = "REQErrorLog"
2021-03-11 05:34:36 +00:00
// Echo request will ask the subscriber for a
2021-03-11 16:14:43 +00:00
// reply generated as a new message, and sent back to where
// the initial request was made.
2021-04-06 04:08:26 +00:00
REQPing Method = "REQPing"
2021-03-11 11:07:09 +00:00
// Will generate a reply for a ECHORequest
2021-04-06 04:08:26 +00:00
REQPong Method = "REQPong"
2021-04-06 17:42:03 +00:00
// Http Get
REQHttpGet Method = "REQHttpGet"
2021-04-13 09:28:52 +00:00
// Tail file
REQTailFile Method = "REQTailFile"
2021-07-01 08:05:34 +00:00
// Write to steward socket
REQToSocket Method = "REQToSocket"
2021-03-01 16:08:40 +00:00
)
2021-02-11 14:39:19 +00:00
2021-03-01 16:08:40 +00:00
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
// Allowed values for the commandOrEvent field are:
// - CommandACK
// - CommandNACK
// - EventACK
// - EventNack
2021-02-11 14:39:19 +00:00
func ( m Method ) GetMethodsAvailable ( ) MethodsAvailable {
2021-04-03 05:14:39 +00:00
// Command, Used to make a request to perform an action
2021-08-16 11:01:12 +00:00
// Event, Used to communicate that something have happened.
2021-02-11 14:39:19 +00:00
ma := MethodsAvailable {
2021-06-29 06:21:42 +00:00
Methodhandlers : map [ Method ] methodHandler {
2021-04-08 11:43:47 +00:00
REQInitial : methodREQInitial {
commandOrEvent : CommandACK ,
} ,
2021-04-04 05:55:07 +00:00
REQOpCommand : methodREQOpCommand {
2021-03-31 10:26:28 +00:00
commandOrEvent : CommandACK ,
} ,
2021-09-20 04:40:34 +00:00
REQOpProcessList : methodREQOpProcessList {
commandOrEvent : CommandACK ,
} ,
REQOpProcessStart : methodREQOpProcessStart {
commandOrEvent : CommandACK ,
} ,
2021-09-20 09:53:17 +00:00
REQOpProcessStop : methodREQOpProcessStop {
commandOrEvent : CommandACK ,
} ,
2021-04-04 09:19:17 +00:00
REQCliCommand : methodREQCliCommand {
2021-04-03 05:14:39 +00:00
commandOrEvent : CommandACK ,
2021-03-11 11:07:09 +00:00
} ,
2021-09-17 08:17:10 +00:00
REQCliCommandCont : methodREQCliCommandCont {
2021-06-08 18:52:45 +00:00
commandOrEvent : CommandACK ,
} ,
2021-04-13 15:22:25 +00:00
REQToConsole : methodREQToConsole {
2021-03-11 11:07:09 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-13 13:54:04 +00:00
REQToFileAppend : methodREQToFileAppend {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-13 15:15:13 +00:00
REQToFile : methodREQToFile {
2021-04-06 12:05:47 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 03:46:07 +00:00
REQHello : methodREQHello {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventNACK ,
} ,
2021-04-06 05:56:49 +00:00
REQErrorLog : methodREQErrorLog {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 04:08:26 +00:00
REQPing : methodREQPing {
2021-03-11 05:34:36 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 04:08:26 +00:00
REQPong : methodREQPong {
2021-03-11 05:34:36 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 17:42:03 +00:00
REQHttpGet : methodREQHttpGet {
commandOrEvent : EventACK ,
} ,
2021-04-13 09:28:52 +00:00
REQTailFile : methodREQTailFile {
commandOrEvent : EventACK ,
} ,
2021-07-01 08:05:34 +00:00
REQToSocket : methodREQToSocket {
commandOrEvent : EventACK ,
} ,
2021-02-11 14:39:19 +00:00
} ,
}
return ma
}
2021-07-01 05:42:45 +00:00
// Reply methods. The slice generated here is primarily used within
// the Stew client for knowing what of the req types are generally
// used as reply methods.
2021-06-29 06:21:42 +00:00
func ( m Method ) GetReplyMethods ( ) [ ] Method {
2021-07-01 08:05:34 +00:00
rm := [ ] Method { REQToConsole , REQToFile , REQToFileAppend , REQToSocket }
2021-06-16 19:38:33 +00:00
return rm
}
2021-03-01 16:08:40 +00:00
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func ( m Method ) getHandler ( method Method ) methodHandler {
ma := m . GetMethodsAvailable ( )
2021-06-29 06:21:42 +00:00
mh := ma . Methodhandlers [ method ]
2021-03-01 16:08:40 +00:00
return mh
}
2021-02-11 14:39:19 +00:00
2021-03-11 16:14:43 +00:00
// The structure that works as a reference for all the methods and if
// they are of the command or event type, and also if it is a ACK or
// NACK message.
2021-04-08 11:43:47 +00:00
// ----
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
type methodREQInitial struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQInitial ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
func ( m methodREQInitial ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// proc.procFuncCh <- message
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-08-16 11:01:12 +00:00
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
2021-02-11 14:39:19 +00:00
type MethodsAvailable struct {
2021-06-29 06:21:42 +00:00
Methodhandlers map [ Method ] methodHandler
2021-02-11 14:39:19 +00:00
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func ( ma MethodsAvailable ) CheckIfExists ( m Method ) ( methodHandler , bool ) {
2021-06-29 06:21:42 +00:00
mFunc , ok := ma . Methodhandlers [ m ]
2021-02-11 14:39:19 +00:00
if ok {
2021-02-16 03:57:54 +00:00
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
2021-02-11 14:39:19 +00:00
return mFunc , true
} else {
2021-02-16 03:57:54 +00:00
// fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m)
2021-02-11 14:39:19 +00:00
return nil , false
}
}
2021-08-16 11:01:12 +00:00
// Create a new message for the reply containing the output of the
// action executed put in outData, and put it on the ringbuffer to
// be published.
// The method to use for the reply message should initially be
// specified within the first message as the replyMethod, and we will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
func newReplyMessage ( proc process , message Message , outData [ ] byte ) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message . ReplyMethod == "" {
message . ReplyMethod = REQToFileAppend
}
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message {
2021-09-16 10:37:46 +00:00
ToNode : message . FromNode ,
2021-09-22 13:25:40 +00:00
FromNode : message . ToNode ,
2021-09-16 10:37:46 +00:00
Data : [ ] string { string ( outData ) } ,
Method : message . ReplyMethod ,
MethodArgs : message . ReplyMethodArgs ,
MethodTimeout : message . ReplyMethodTimeout ,
2021-09-22 14:08:55 +00:00
IsReply : true ,
2021-09-16 10:37:46 +00:00
ACKTimeout : message . ReplyACKTimeout ,
Retries : message . ReplyRetries ,
2021-09-22 13:25:40 +00:00
Directory : message . Directory ,
FileName : message . FileName ,
2021-08-16 11:01:12 +00:00
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage : & message ,
}
2021-08-25 06:56:44 +00:00
sam , err := newSubjectAndMessage ( newMsg )
2021-08-16 11:01:12 +00:00
if err != nil {
// In theory the system should drop the message before it reaches here.
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-16 11:01:12 +00:00
log . Printf ( "%v\n" , er )
}
2021-08-25 06:56:44 +00:00
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2021-08-16 11:01:12 +00:00
}
2021-09-07 04:24:21 +00:00
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming ( message Message , proc process ) ( string , string ) {
var fileName string
var folderTree string
switch {
case message . PreviousMessage == nil :
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . ToNode ) )
case message . PreviousMessage . ToNode != "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . PreviousMessage . ToNode ) )
case message . PreviousMessage . ToNode == "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . FromNode ) )
}
return fileName , folderTree
}
2021-02-18 13:27:53 +00:00
// ------------------------------------------------------------
// Subscriber method handlers
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-08-16 11:01:12 +00:00
// The methodHandler interface.
2021-02-11 14:39:19 +00:00
type methodHandler interface {
2021-03-08 13:09:14 +00:00
handler ( proc process , message Message , node string ) ( [ ] byte , error )
2021-03-01 16:08:40 +00:00
getKind ( ) CommandOrEvent
2021-02-11 14:39:19 +00:00
}
2021-03-01 16:08:40 +00:00
// -----
2021-04-04 05:55:07 +00:00
type methodREQOpCommand struct {
2021-03-31 10:26:28 +00:00
commandOrEvent CommandOrEvent
}
2021-04-04 05:55:07 +00:00
func ( m methodREQOpCommand ) getKind ( ) CommandOrEvent {
2021-03-31 10:26:28 +00:00
return m . commandOrEvent
}
2021-04-09 16:20:04 +00:00
type OpCmdStartProc struct {
Method Method ` json:"method" `
2021-06-29 06:21:42 +00:00
AllowedNodes [ ] Node ` json:"allowedNodes" `
2021-04-09 16:20:04 +00:00
}
type OpCmdStopProc struct {
2021-06-29 06:21:42 +00:00
RecevingNode Node ` json:"receivingNode" `
2021-04-09 16:20:04 +00:00
Method Method ` json:"method" `
Kind processKind ` json:"kind" `
2021-06-08 11:56:31 +00:00
ID int ` json:"id" `
2021-04-09 16:20:04 +00:00
}
2021-03-31 10:26:28 +00:00
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
2021-04-07 14:45:51 +00:00
func ( m methodREQOpCommand ) handler ( proc process , message Message , nodeName string ) ( [ ] byte , error ) {
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-31 10:26:28 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-03-31 10:26:28 +00:00
out := [ ] byte { }
2021-04-09 16:20:04 +00:00
// unmarshal the json.RawMessage field called OpArgs.
//
// Dst interface is the generic type to Unmarshal OpArgs into, and we will
// set the type it should contain depending on the value specified in Cmd.
var dst interface { }
2021-04-03 05:14:39 +00:00
2021-04-09 16:20:04 +00:00
switch message . Operation . OpCmd {
case "ps" :
2021-03-31 10:26:28 +00:00
proc . processes . mu . Lock ( )
2021-04-02 20:15:52 +00:00
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
2021-06-08 11:56:31 +00:00
for _ , idMap := range proc . processes . active {
for _ , v := range idMap {
2021-09-20 10:22:34 +00:00
s := fmt . Sprintf ( "%v, proc: %v, id: %v, name: %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , v . processKind , v . processID , v . subject . name ( ) )
2021-06-08 11:56:31 +00:00
sb := [ ] byte ( s )
out = append ( out , sb ... )
}
2021-03-31 10:26:28 +00:00
}
proc . processes . mu . Unlock ( )
2021-04-09 16:20:04 +00:00
case "startProc" :
2021-08-16 11:01:12 +00:00
// Set the empty interface type dst to &OpStart.
2021-04-09 16:20:04 +00:00
dst = & OpCmdStartProc { }
err := json . Unmarshal ( message . Operation . OpArg , & dst )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQOpCommand startProc json.Umarshal failed : %v, OpArg: %v" , err , message . Operation . OpArg )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-09 16:20:04 +00:00
}
// Assert it into the correct non pointer value.
arg := * dst . ( * OpCmdStartProc )
//fmt.Printf(" ** Content of Arg = %#v\n", arg)
if len ( arg . AllowedNodes ) == 0 {
2021-04-07 14:45:51 +00:00
er := fmt . Errorf ( "error: startProc: no allowed publisher nodes specified: %v" + fmt . Sprint ( message ) )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-07 14:45:51 +00:00
return
}
2021-04-09 16:20:04 +00:00
if arg . Method == "" {
er := fmt . Errorf ( "error: startProc: no method specified: %v" + fmt . Sprint ( message ) )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-09 16:20:04 +00:00
return
2021-04-07 14:45:51 +00:00
}
2021-04-09 16:20:04 +00:00
// Create the process and start it.
sub := newSubject ( arg . Method , proc . configuration . NodeName )
2021-09-08 15:57:21 +00:00
procNew := newProcess ( proc . ctx , proc . processes . metrics , proc . natsConn , proc . processes , proc . toRingbufferCh , proc . configuration , sub , proc . errorCh , processKindSubscriber , nil )
2021-04-07 14:45:51 +00:00
go procNew . spawnWorker ( proc . processes , proc . natsConn )
2021-06-08 04:02:08 +00:00
er := fmt . Errorf ( "info: startProc: started id: %v, subject: %v: node: %v" , procNew . processID , sub , message . ToNode )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-09 09:30:40 +00:00
2021-04-09 16:20:04 +00:00
case "stopProc" :
// Set the interface type dst to &OpStart.
dst = & OpCmdStopProc { }
err := json . Unmarshal ( message . Operation . OpArg , & dst )
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v" , err , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-09 16:20:04 +00:00
}
// Assert it into the correct non pointer value.
arg := * dst . ( * OpCmdStopProc )
2021-08-16 11:01:12 +00:00
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
2021-08-12 07:21:56 +00:00
// We can then use this processName to get the real values for the
// actual process we want to stop.
2021-04-09 16:20:04 +00:00
sub := newSubject ( arg . Method , string ( arg . RecevingNode ) )
processName := processNameGet ( sub . name ( ) , arg . Kind )
2021-04-08 05:07:13 +00:00
2021-06-08 11:56:31 +00:00
// Check if the message contains an id.
err = func ( ) error {
if arg . ID == 0 {
er := fmt . Errorf ( "error: stopProc: did not find process to stop: %v on %v" , sub , message . ToNode )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-08 11:56:31 +00:00
return er
}
return nil
} ( )
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: stopProc: err was not nil: %v : %v on %v" , err , sub , message . ToNode )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-06-08 11:56:31 +00:00
return
}
2021-04-08 05:07:13 +00:00
proc . processes . mu . Lock ( )
2021-04-08 10:51:54 +00:00
2021-08-16 11:01:12 +00:00
// Remove the process from the processes active map if found.
2021-06-08 11:56:31 +00:00
toStopProc , ok := proc . processes . active [ processName ] [ arg . ID ]
2021-04-08 05:07:13 +00:00
if ok {
2021-04-08 10:51:54 +00:00
// Delete the process from the processes map
2021-04-08 05:07:13 +00:00
delete ( proc . processes . active , processName )
2021-04-08 10:51:54 +00:00
// Stop started go routines that belong to the process.
2021-04-08 05:07:13 +00:00
toStopProc . ctxCancel ( )
2021-04-08 10:51:54 +00:00
// Stop subscribing for messages on the process's subject.
err := toStopProc . natsSubscription . Unsubscribe ( )
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v" , err , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-08 10:51:54 +00:00
}
2021-04-09 16:20:04 +00:00
2021-04-12 13:35:20 +00:00
// Remove the prometheus label
2021-08-18 13:41:53 +00:00
proc . processes . metrics . promProcessesAllRunning . Delete ( prometheus . Labels { "processName" : string ( processName ) } )
2021-04-12 13:35:20 +00:00
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "info: stopProc: stopped %v on %v" , sub , message . ToNode )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-09 16:20:04 +00:00
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , [ ] byte ( er . Error ( ) ) )
2021-04-09 16:20:04 +00:00
} else {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v" , sub , message . ToNode )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-09 16:20:04 +00:00
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , [ ] byte ( er . Error ( ) ) )
2021-04-08 05:07:13 +00:00
}
2021-04-09 16:20:04 +00:00
proc . processes . mu . Unlock ( )
2021-04-09 09:30:40 +00:00
2021-03-31 10:26:28 +00:00
}
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-03-31 10:26:28 +00:00
} ( )
2021-04-07 14:45:51 +00:00
ackMsg := [ ] byte ( fmt . Sprintf ( "confirmed from node: %v: messageID: %v\n---\n" , proc . node , message . ID ) )
2021-03-31 10:26:28 +00:00
return ackMsg , nil
}
2021-09-20 04:40:34 +00:00
// ---- New operations
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessList ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle Op Process List
func ( m methodREQOpProcessList ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
out := [ ] byte { }
proc . processes . mu . Lock ( )
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
for _ , idMap := range proc . processes . active {
for _ , v := range idMap {
2021-09-20 10:22:34 +00:00
s := fmt . Sprintf ( "%v, process: %v, id: %v, name: %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , v . processKind , v . processID , v . subject . name ( ) )
2021-09-20 04:40:34 +00:00
sb := [ ] byte ( s )
out = append ( out , sb ... )
}
}
proc . processes . mu . Unlock ( )
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessStart ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle Op Process Start
func ( m methodREQOpProcessStart ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-09-20 09:53:17 +00:00
var out [ ] byte
2021-09-20 04:40:34 +00:00
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
m := message . MethodArgs [ 0 ]
method := Method ( m )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStart: no such request type defined: %v" + m )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
}
// Create the process and start it.
sub := newSubject ( method , proc . configuration . NodeName )
procNew := newProcess ( proc . ctx , proc . processes . metrics , proc . natsConn , proc . processes , proc . toRingbufferCh , proc . configuration , sub , proc . errorCh , processKindSubscriber , nil )
go procNew . spawnWorker ( proc . processes , proc . natsConn )
2021-09-20 09:53:17 +00:00
txt := fmt . Sprintf ( "info: OpProcessStart: started id: %v, subject: %v: node: %v" , procNew . processID , sub , message . ToNode )
er := fmt . Errorf ( txt )
2021-09-20 04:40:34 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-09-20 09:53:17 +00:00
// TODO: What should this look like ?
out = [ ] byte ( txt + "\n" )
2021-09-20 04:40:34 +00:00
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 09:53:17 +00:00
// --- OpProcessStop
type methodREQOpProcessStop struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessStop ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func ( m methodREQOpProcessStop ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
var out [ ] byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
2021-09-21 05:10:41 +00:00
if v := len ( message . MethodArgs ) ; v != 4 {
2021-09-20 09:53:17 +00:00
er := fmt . Errorf ( "error: OpProcessStop: methodArgs should contain 4 elements, found %v" , v )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
}
// --- Parse and check the method arguments given.
2021-09-21 05:10:41 +00:00
// The Reason for also having the node as one of the arguments is
// that publisher processes are named by the node they are sending the
// message to. Subscriber processes names are named by the node name
// they are running on.
2021-09-20 09:53:17 +00:00
methodString := message . MethodArgs [ 0 ]
2021-09-21 05:10:41 +00:00
node := message . MethodArgs [ 1 ]
kind := message . MethodArgs [ 2 ]
idString := message . MethodArgs [ 3 ]
2021-09-20 09:53:17 +00:00
method := Method ( methodString )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
}
// Check if id is a valid number.
id , err := strconv . Atoi ( idString )
if err != nil {
er := fmt . Errorf ( "error: OpProcessStop: id: %v, is not a number, check that the methodArgs are correct: %v" , idString , err )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject ( method , string ( node ) )
processName := processNameGet ( sub . name ( ) , processKind ( kind ) )
proc . processes . mu . Lock ( )
// Remove the process from the processes active map if found.
toStopProc , ok := proc . processes . active [ processName ] [ id ]
if ok {
// Delete the process from the processes map
delete ( proc . processes . active , processName )
// Stop started go routines that belong to the process.
toStopProc . ctxCancel ( )
// Stop subscribing for messages on the process's subject.
err := toStopProc . natsSubscription . Unsubscribe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-20 09:53:17 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
// Remove the prometheus label
proc . processes . metrics . promProcessesAllRunning . Delete ( prometheus . Labels { "processName" : string ( processName ) } )
txt := fmt . Sprintf ( "info: OpProcessStop: process stopped id: %v, method: %v on: %v" , id , sub , message . ToNode )
er := fmt . Errorf ( txt )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
} else {
txt := fmt . Sprintf ( "error: OpProcessStop: did not find process to stop: %v on %v" , sub , message . ToNode )
er := fmt . Errorf ( txt )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
}
proc . processes . mu . Unlock ( )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 04:40:34 +00:00
// ----
2021-03-31 11:29:55 +00:00
2021-04-13 13:54:04 +00:00
type methodREQToFileAppend struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 13:54:04 +00:00
func ( m methodREQToFileAppend ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-11 14:39:19 +00:00
2021-08-16 11:01:12 +00:00
// Handle appending data to file.
2021-04-13 13:54:04 +00:00
func ( m methodREQToFileAppend ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-02 20:15:52 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-03-02 12:46:02 +00:00
2021-04-02 20:15:52 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v" , folderTree , proc . subject , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-02 20:15:52 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-09 12:41:31 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-03-02 12:46:02 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend.handler: failed to open file: %v, %v" , file , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-02 12:46:02 +00:00
return nil , err
}
defer f . Close ( )
2021-02-11 14:39:19 +00:00
for _ , d := range message . Data {
2021-03-02 12:46:02 +00:00
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file : %v, %v" , file , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-02 12:46:02 +00:00
}
2021-02-11 14:39:19 +00:00
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-11 14:39:19 +00:00
}
2021-02-18 13:27:53 +00:00
// -----
2021-04-13 15:15:13 +00:00
type methodREQToFile struct {
2021-04-06 12:05:47 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 15:15:13 +00:00
func ( m methodREQToFile ) getKind ( ) CommandOrEvent {
2021-04-06 12:05:47 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
2021-04-13 15:15:13 +00:00
func ( m methodREQToFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-06 12:05:47 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 12:05:47 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
return nil , er
2021-04-06 12:05:47 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-04-06 17:42:03 +00:00
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
2021-04-06 12:05:47 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-06 12:05:47 +00:00
return nil , err
}
defer f . Close ( )
for _ , d := range message . Data {
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file: file: %v, %v" , file , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-06 12:05:47 +00:00
}
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-04-06 03:46:07 +00:00
type methodREQHello struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 03:46:07 +00:00
func ( m methodREQHello ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-18 13:27:53 +00:00
2021-08-16 11:01:12 +00:00
// Handler for receiving hello messages.
2021-04-06 03:46:07 +00:00
func ( m methodREQHello ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-27 10:27:38 +00:00
data := fmt . Sprintf ( "%v, Received hello from %#v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-03-09 03:55:51 +00:00
2021-08-24 12:05:44 +00:00
fileName := message . FileName
2021-09-08 02:11:35 +00:00
folderTree := filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . FromNode ) )
2021-08-10 08:53:15 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
if err != nil {
2021-08-24 13:29:56 +00:00
log . Printf ( "error: methodREQHello.handler: failed to open file: %v\n" , err )
2021-08-10 08:53:15 +00:00
return nil , err
}
defer f . Close ( )
_ , err = f . Write ( [ ] byte ( data ) )
f . Sync ( )
if err != nil {
log . Printf ( "error: methodEventTextLogging.handler: failed to write to file: %v\n" , err )
}
// --------------------------
2021-03-04 15:27:55 +00:00
// send the message to the procFuncCh which is running alongside the process
// and can hold registries and handle special things for an individual process.
proc . procFuncCh <- message
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-18 13:27:53 +00:00
}
2021-02-24 14:43:31 +00:00
// ---
2021-04-06 05:56:49 +00:00
type methodREQErrorLog struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 05:56:49 +00:00
func ( m methodREQErrorLog ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-24 14:43:31 +00:00
2021-08-16 11:01:12 +00:00
// Handle the writing of error logs.
2021-04-06 05:56:49 +00:00
func ( m methodREQErrorLog ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 09:41:46 +00:00
proc . processes . metrics . promErrorMessagesReceivedTotal . Inc ( )
2021-04-06 05:56:49 +00:00
2021-04-06 07:06:26 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 07:06:26 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
2021-04-06 07:06:26 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-12 10:27:47 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-04-06 07:06:26 +00:00
if err != nil {
2021-08-24 13:29:56 +00:00
log . Printf ( "error: methodREQErrorLog.handler: failed to open file: %v\n" , err )
2021-04-06 07:06:26 +00:00
return nil , err
}
defer f . Close ( )
2021-04-06 05:56:49 +00:00
2021-04-06 07:06:26 +00:00
for _ , d := range message . Data {
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
log . Printf ( "error: methodEventTextLogging.handler: failed to write to file: %v\n" , err )
}
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-24 14:43:31 +00:00
}
2021-03-11 05:34:36 +00:00
// ---
2021-04-06 04:08:26 +00:00
type methodREQPing struct {
2021-03-11 05:34:36 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 04:08:26 +00:00
func ( m methodREQPing ) getKind ( ) CommandOrEvent {
2021-03-11 05:34:36 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle receving a ping.
2021-04-06 04:08:26 +00:00
func ( m methodREQPing ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a ping
2021-03-11 05:34:36 +00:00
2021-08-26 06:40:33 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to create toFile directory tree: %v, %v" , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , er
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , err
}
defer f . Close ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, ping received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
}
2021-08-26 05:01:55 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 05:31:50 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
newReplyMessage ( proc , message , nil )
2021-04-06 05:31:50 +00:00
} ( )
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-06 04:08:26 +00:00
type methodREQPong struct {
2021-03-11 05:34:36 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 04:08:26 +00:00
func ( m methodREQPong ) getKind ( ) CommandOrEvent {
2021-03-11 05:34:36 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle receiving a pong.
2021-04-06 04:08:26 +00:00
func ( m methodREQPong ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a pong
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to create toFile directory tree %v: %v" , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , er
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , err
}
defer f . Close ( )
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, pong received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . PreviousMessage . ToNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-03-11 11:07:09 +00:00
2021-03-11 11:51:16 +00:00
// ---
2021-03-11 11:07:09 +00:00
2021-04-04 09:19:17 +00:00
type methodREQCliCommand struct {
2021-03-11 11:07:09 +00:00
commandOrEvent CommandOrEvent
}
2021-04-04 09:19:17 +00:00
func ( m methodREQCliCommand ) getKind ( ) CommandOrEvent {
2021-03-11 11:07:09 +00:00
return m . commandOrEvent
}
2021-03-29 04:53:34 +00:00
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
2021-04-04 09:19:17 +00:00
func ( m methodREQCliCommand ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-03-29 04:53:34 +00:00
log . Printf ( "<--- CLICommandREQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2021-03-11 11:07:09 +00:00
2021-03-29 04:53:34 +00:00
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
a := message . MethodArgs [ 1 : ]
2021-03-11 11:07:09 +00:00
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-03-26 15:04:01 +00:00
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-22 14:58:23 +00:00
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i , v := range message . MethodArgs {
if strings . Contains ( v , "{{STEWARD_DATA}}" ) {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message . MethodArgs [ i ] = strings . Replace ( message . MethodArgs [ i ] , "{{STEWARD_DATA}}" , "$STEWARD_DATA" , - 1 )
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
envData = strings . Join ( message . Data , "" )
}
}
2021-03-26 15:04:01 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
2021-09-08 03:50:45 +00:00
2021-09-22 13:25:40 +00:00
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt . Sprintf ( "STEWARD_DATA=%v" , envData )
cmd . Env = append ( cmd . Env , envData )
}
2021-09-08 03:50:45 +00:00
var out bytes . Buffer
var stderr bytes . Buffer
cmd . Stdout = & out
cmd . Stderr = & stderr
err := cmd . Run ( )
2021-03-26 15:04:01 +00:00
if err != nil {
2021-09-08 03:50:45 +00:00
if err != nil {
2021-09-22 13:25:40 +00:00
log . Printf ( "error: failed cmd.Run: %v\n" , err )
2021-09-08 03:50:45 +00:00
}
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: cmd.Run : %v, methodArgs: %v, error_output: %v" , err , message . MethodArgs , stderr . String ( ) )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-26 15:04:01 +00:00
}
2021-08-12 10:27:47 +00:00
select {
2021-09-08 03:50:45 +00:00
case outCh <- out . Bytes ( ) :
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-03-26 15:04:01 +00:00
} ( )
select {
case <- ctx . Done ( ) :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: method timed out: %v" , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-03-26 15:04:01 +00:00
case out := <- outCh :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-03-26 15:04:01 +00:00
2021-09-22 14:58:23 +00:00
// NB: Not quite sure what is the best way to handle the below
// isReply right now. Implementing as send to central for now.
//
2021-09-22 14:08:55 +00:00
// If this is this a reply message swap the toNode and fromNode
// fields so the output of the command are sent to central node.
if message . IsReply {
message . ToNode , message . FromNode = message . FromNode , message . ToNode
}
2021-03-31 11:29:55 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-03-26 15:04:01 +00:00
}
2021-03-12 11:08:11 +00:00
2021-03-26 15:04:01 +00:00
} ( )
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-13 15:22:25 +00:00
type methodREQToConsole struct {
2021-03-11 11:07:09 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 15:22:25 +00:00
func ( m methodREQToConsole ) getKind ( ) CommandOrEvent {
2021-03-11 11:07:09 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handler to write directly to console.
2021-04-13 15:22:25 +00:00
func ( m methodREQToConsole ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-06 17:42:03 +00:00
// ---
type methodREQHttpGet struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQHttpGet ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// handler to do a Http Get.
2021-04-06 17:42:03 +00:00
func ( m methodREQHttpGet ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
log . Printf ( "<--- REQHttpGet received from: %v, containing: %v" , message . FromNode , message . Data )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-16 09:51:34 +00:00
url := message . MethodArgs [ 0 ]
2021-04-06 17:42:03 +00:00
client := http . Client {
Timeout : time . Second * 5 ,
}
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-04-06 17:42:03 +00:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
cancel ( )
return
}
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-04-06 17:42:03 +00:00
resp , err := client . Do ( req )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, bailing out: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, where %#v, bailing out: %v" , resp . StatusCode , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
return
}
2021-08-24 07:25:43 +00:00
body , err := io . ReadAll ( resp . Body )
2021-04-06 17:42:03 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-06 17:42:03 +00:00
}
2021-08-24 07:25:43 +00:00
out := body
2021-08-12 10:27:47 +00:00
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
}
2021-04-06 17:42:03 +00:00
} ( )
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: method timed out: %v" , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
case out := <- outCh :
cancel ( )
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-06 17:42:03 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-13 09:28:52 +00:00
// --- methodREQTailFile
type methodREQTailFile struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQTailFile ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func ( m methodREQTailFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-12 14:39:09 +00:00
log . Printf ( "<--- TailFile REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2021-04-13 09:28:52 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-16 09:51:34 +00:00
fp := message . MethodArgs [ 0 ]
2021-04-13 09:28:52 +00:00
2021-04-16 21:58:43 +00:00
var ctx context . Context
var cancel context . CancelFunc
if message . MethodTimeout != 0 {
2021-07-02 09:26:52 +00:00
ctx , cancel = context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-04-16 21:58:43 +00:00
} else {
2021-07-02 09:26:52 +00:00
ctx , cancel = context . WithCancel ( proc . ctx )
2021-04-16 21:58:43 +00:00
}
2021-04-13 09:28:52 +00:00
outCh := make ( chan [ ] byte )
2021-04-16 21:58:43 +00:00
t , err := tail . TailFile ( fp , tail . Config { Follow : true , Location : & tail . SeekInfo {
Offset : 0 ,
Whence : os . SEEK_END ,
} } )
2021-04-13 09:28:52 +00:00
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQToTailFile: tailFile: %v" , err )
2021-04-13 09:28:52 +00:00
log . Printf ( "%v\n" , er )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-13 09:28:52 +00:00
}
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-13 10:03:30 +00:00
for {
2021-08-12 10:27:47 +00:00
select {
2021-08-13 10:03:30 +00:00
case line := <- t . Lines :
outCh <- [ ] byte ( line . Text + "\n" )
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-04-13 09:28:52 +00:00
}
} ( )
for {
select {
case <- ctx . Done ( ) :
cancel ( )
// Close the lines channel so we exit the reading lines
// go routine.
2021-04-16 21:58:43 +00:00
// close(t.Lines)
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: method timeout reached REQTailFile, canceling: %v" , message . MethodArgs )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-13 10:03:30 +00:00
2021-04-13 09:28:52 +00:00
return
case out := <- outCh :
2021-08-12 14:39:09 +00:00
2021-04-13 09:28:52 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-13 09:28:52 +00:00
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-06-08 18:52:45 +00:00
// ---
2021-09-17 08:17:10 +00:00
type methodREQCliCommandCont struct {
2021-06-08 18:52:45 +00:00
commandOrEvent CommandOrEvent
}
2021-09-17 08:17:10 +00:00
func ( m methodREQCliCommandCont ) getKind ( ) CommandOrEvent {
2021-06-08 18:52:45 +00:00
return m . commandOrEvent
}
2021-09-17 08:17:10 +00:00
// Handler to run REQCliCommandCont, which is the same as normal
2021-08-10 10:49:42 +00:00
// Cli command, but can be used when running a command that will take
// longer time and you want to send the output of the command continually
// back as it is generated, and not just when the command is finished.
2021-09-17 08:17:10 +00:00
func ( m methodREQCliCommandCont ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-06-08 18:52:45 +00:00
log . Printf ( "<--- CLInCommandCont REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
a := message . MethodArgs [ 1 : ]
2021-06-08 18:52:45 +00:00
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-06-08 18:52:45 +00:00
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
proc . processes . wg . Done ( )
2021-06-08 18:52:45 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
// Using cmd.StdoutPipe here so we are continuosly
// able to read the out put of the command.
outReader , err := cmd . StdoutPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-06-08 18:52:45 +00:00
log . Printf ( "error: %v\n" , err )
}
2021-09-08 03:50:45 +00:00
ErrorReader , err := cmd . StderrPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-08 03:50:45 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
log . Printf ( "error: %v\n" , err )
}
2021-06-08 18:52:45 +00:00
if err := cmd . Start ( ) ; err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-06-08 18:52:45 +00:00
}
2021-09-08 03:50:45 +00:00
// Also send error messages that might happen during the time
// cmd.Start runs.
// Putting the scanner.Text value on a channel so we can make
// the scanner non-blocking, and also check context cancelation.
go func ( ) {
errCh := make ( chan string , 1 )
scanner := bufio . NewScanner ( ErrorReader )
for scanner . Scan ( ) {
select {
case errCh <- scanner . Text ( ) :
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v, error_output: %v" , err , message . MethodArgs , <- errCh )
2021-09-08 03:50:45 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
case <- ctx . Done ( ) :
return
}
}
} ( )
2021-06-08 18:52:45 +00:00
scanner := bufio . NewScanner ( outReader )
for scanner . Scan ( ) {
2021-08-12 10:27:47 +00:00
select {
case outCh <- [ ] byte ( scanner . Text ( ) + "\n" ) :
case <- ctx . Done ( ) :
return
}
2021-06-08 18:52:45 +00:00
}
} ( )
// Check if context timer or command output were received.
for {
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v" , message . MethodArgs )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-08 18:52:45 +00:00
return
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-07-01 08:05:34 +00:00
// ---
type methodREQToSocket struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQToSocket ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// TODO:
// Handler to write to unix socket file.
2021-07-01 08:05:34 +00:00
func ( m methodREQToSocket ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
for _ , d := range message . Data {
2021-09-23 03:46:25 +00:00
// TODO: Write the data to the socket here.
2021-07-01 08:05:34 +00:00
fmt . Printf ( "Info: Data to write to socket: %v\n" , d )
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----