2021-03-01 16:08:40 +00:00
// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
2021-04-04 05:33:18 +00:00
// type methodCommandCLICommandRequest struct {
2021-03-01 16:08:40 +00:00
// commandOrEvent CommandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
2021-03-01 16:08:40 +00:00
// return m.commandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
2021-03-01 16:08:40 +00:00
// ...
// ...
2021-03-11 05:34:36 +00:00
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
2021-03-01 16:08:40 +00:00
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
2021-02-11 14:39:19 +00:00
package steward
import (
2021-06-08 18:52:45 +00:00
"bufio"
2021-09-08 03:50:45 +00:00
"bytes"
2021-03-26 15:04:01 +00:00
"context"
2021-02-11 14:39:19 +00:00
"fmt"
2021-04-06 17:42:03 +00:00
"io"
2021-02-11 14:39:19 +00:00
"log"
2021-04-06 17:42:03 +00:00
"net/http"
2021-03-02 12:46:02 +00:00
"os"
2021-02-11 14:39:19 +00:00
"os/exec"
2021-11-18 08:34:16 +00:00
"path"
2021-03-02 12:46:02 +00:00
"path/filepath"
2021-09-21 21:19:55 +00:00
"strings"
2021-11-22 08:26:56 +00:00
"sync"
2021-03-11 11:07:09 +00:00
"time"
2021-04-12 13:35:20 +00:00
2021-04-13 09:28:52 +00:00
"github.com/hpcloud/tail"
2021-04-12 13:35:20 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-02-11 14:39:19 +00:00
)
2021-03-11 16:14:43 +00:00
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-03-01 16:08:40 +00:00
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
REQInitial Method = "REQInitial"
2021-09-20 04:40:34 +00:00
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
2021-09-20 09:53:17 +00:00
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
2021-03-11 16:14:43 +00:00
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-04 09:19:17 +00:00
REQCliCommand Method = "REQCliCommand"
2021-09-17 08:17:10 +00:00
// REQCliCommandCont same as normal Cli command, but can be used
2021-06-08 18:52:45 +00:00
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
2021-09-17 08:17:10 +00:00
// generated, and not wait until the command is finished.
REQCliCommandCont Method = "REQCliCommandCont"
2021-04-05 06:17:04 +00:00
// Send text to be logged to the console.
2021-03-11 16:14:43 +00:00
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-13 15:22:25 +00:00
REQToConsole Method = "REQToConsole"
2021-04-05 06:17:04 +00:00
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
2021-03-11 16:14:43 +00:00
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
2021-04-13 13:54:04 +00:00
REQToFileAppend Method = "REQToFileAppend"
2021-04-06 12:05:47 +00:00
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
2021-04-13 15:15:13 +00:00
REQToFile Method = "REQToFile"
2021-11-17 12:02:48 +00:00
// Read the source file to be copied to some node.
REQCopyFileFrom Method = "REQCopyFileFrom"
// Write the destination copied to some node.
REQCopyFileTo Method = "REQCopyFileTo"
2021-03-11 16:14:43 +00:00
// Send Hello I'm here message.
2021-04-06 03:46:07 +00:00
REQHello Method = "REQHello"
2021-03-11 16:14:43 +00:00
// Error log methods to centralError node.
2021-04-06 05:56:49 +00:00
REQErrorLog Method = "REQErrorLog"
2021-03-11 05:34:36 +00:00
// Echo request will ask the subscriber for a
2021-03-11 16:14:43 +00:00
// reply generated as a new message, and sent back to where
// the initial request was made.
2021-04-06 04:08:26 +00:00
REQPing Method = "REQPing"
2021-03-11 11:07:09 +00:00
// Will generate a reply for a ECHORequest
2021-04-06 04:08:26 +00:00
REQPong Method = "REQPong"
2021-04-06 17:42:03 +00:00
// Http Get
REQHttpGet Method = "REQHttpGet"
2021-04-13 09:28:52 +00:00
// Tail file
REQTailFile Method = "REQTailFile"
2021-07-01 08:05:34 +00:00
// Write to steward socket
REQToSocket Method = "REQToSocket"
2021-11-10 05:22:03 +00:00
// Send a message via a node
REQRelay Method = "REQRelay"
2021-11-19 08:35:53 +00:00
// The method handler for the first step in a relay chain.
REQRelayInitial Method = "REQRelayInitial"
2021-03-01 16:08:40 +00:00
)
2021-02-11 14:39:19 +00:00
2021-03-01 16:08:40 +00:00
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
// Allowed values for the commandOrEvent field are:
// - CommandACK
// - CommandNACK
// - EventACK
// - EventNack
2021-02-11 14:39:19 +00:00
func ( m Method ) GetMethodsAvailable ( ) MethodsAvailable {
2021-04-03 05:14:39 +00:00
// Command, Used to make a request to perform an action
2021-08-16 11:01:12 +00:00
// Event, Used to communicate that something have happened.
2021-02-11 14:39:19 +00:00
ma := MethodsAvailable {
2021-06-29 06:21:42 +00:00
Methodhandlers : map [ Method ] methodHandler {
2021-04-08 11:43:47 +00:00
REQInitial : methodREQInitial {
commandOrEvent : CommandACK ,
} ,
2021-09-20 04:40:34 +00:00
REQOpProcessList : methodREQOpProcessList {
commandOrEvent : CommandACK ,
} ,
REQOpProcessStart : methodREQOpProcessStart {
commandOrEvent : CommandACK ,
} ,
2021-09-20 09:53:17 +00:00
REQOpProcessStop : methodREQOpProcessStop {
commandOrEvent : CommandACK ,
} ,
2021-04-04 09:19:17 +00:00
REQCliCommand : methodREQCliCommand {
2021-04-03 05:14:39 +00:00
commandOrEvent : CommandACK ,
2021-03-11 11:07:09 +00:00
} ,
2021-09-17 08:17:10 +00:00
REQCliCommandCont : methodREQCliCommandCont {
2021-06-08 18:52:45 +00:00
commandOrEvent : CommandACK ,
} ,
2021-04-13 15:22:25 +00:00
REQToConsole : methodREQToConsole {
2021-03-11 11:07:09 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-13 13:54:04 +00:00
REQToFileAppend : methodREQToFileAppend {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-13 15:15:13 +00:00
REQToFile : methodREQToFile {
2021-04-06 12:05:47 +00:00
commandOrEvent : EventACK ,
} ,
2021-11-17 12:02:48 +00:00
REQCopyFileFrom : methodREQCopyFileFrom {
commandOrEvent : EventACK ,
} ,
REQCopyFileTo : methodREQCopyFileTo {
commandOrEvent : EventACK ,
} ,
2021-04-06 03:46:07 +00:00
REQHello : methodREQHello {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventNACK ,
} ,
2021-04-06 05:56:49 +00:00
REQErrorLog : methodREQErrorLog {
2021-03-01 16:08:40 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 04:08:26 +00:00
REQPing : methodREQPing {
2021-03-11 05:34:36 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 04:08:26 +00:00
REQPong : methodREQPong {
2021-03-11 05:34:36 +00:00
commandOrEvent : EventACK ,
} ,
2021-04-06 17:42:03 +00:00
REQHttpGet : methodREQHttpGet {
commandOrEvent : EventACK ,
} ,
2021-04-13 09:28:52 +00:00
REQTailFile : methodREQTailFile {
commandOrEvent : EventACK ,
} ,
2021-07-01 08:05:34 +00:00
REQToSocket : methodREQToSocket {
commandOrEvent : EventACK ,
} ,
2021-11-10 05:22:03 +00:00
REQRelay : methodREQRelay {
commandOrEvent : EventACK ,
} ,
2021-11-19 08:35:53 +00:00
REQRelayInitial : methodREQRelayInitial {
commandOrEvent : EventACK ,
} ,
2021-02-11 14:39:19 +00:00
} ,
}
return ma
}
2021-07-01 05:42:45 +00:00
// Reply methods. The slice generated here is primarily used within
// the Stew client for knowing what of the req types are generally
// used as reply methods.
2021-06-29 06:21:42 +00:00
func ( m Method ) GetReplyMethods ( ) [ ] Method {
2021-07-01 08:05:34 +00:00
rm := [ ] Method { REQToConsole , REQToFile , REQToFileAppend , REQToSocket }
2021-06-16 19:38:33 +00:00
return rm
}
2021-03-01 16:08:40 +00:00
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func ( m Method ) getHandler ( method Method ) methodHandler {
ma := m . GetMethodsAvailable ( )
2021-06-29 06:21:42 +00:00
mh := ma . Methodhandlers [ method ]
2021-03-01 16:08:40 +00:00
return mh
}
2021-02-11 14:39:19 +00:00
2021-03-11 16:14:43 +00:00
// The structure that works as a reference for all the methods and if
// they are of the command or event type, and also if it is a ACK or
// NACK message.
2021-04-08 11:43:47 +00:00
// ----
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
type methodREQInitial struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQInitial ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
func ( m methodREQInitial ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// proc.procFuncCh <- message
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-08-16 11:01:12 +00:00
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
2021-02-11 14:39:19 +00:00
type MethodsAvailable struct {
2021-06-29 06:21:42 +00:00
Methodhandlers map [ Method ] methodHandler
2021-02-11 14:39:19 +00:00
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func ( ma MethodsAvailable ) CheckIfExists ( m Method ) ( methodHandler , bool ) {
2021-06-29 06:21:42 +00:00
mFunc , ok := ma . Methodhandlers [ m ]
2021-02-11 14:39:19 +00:00
if ok {
return mFunc , true
} else {
return nil , false
}
}
2021-11-22 03:24:15 +00:00
// newReplyMessage will create and send a reply message back to where
// the original provided message came from. The primary use of this
// function is to report back to a node who sent a message with the
// result of the request method of the original message.
//
// The method to use for the reply message when reporting back should
// be specified within a message in the replyMethod field. We will
2021-08-16 11:01:12 +00:00
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
2021-11-22 03:24:15 +00:00
//
// There will also be a copy of the original message put in the
// previousMessage field. For the copy of the original message the data
// field will be set to nil before the whole message is put in the
// previousMessage field so we don't copy around the original data in
// the reply response when it is not needed anymore.
2021-08-16 11:01:12 +00:00
func newReplyMessage ( proc process , message Message , outData [ ] byte ) {
// If no replyMethod is set we default to writing to writing to
// a log file.
if message . ReplyMethod == "" {
message . ReplyMethod = REQToFileAppend
}
2021-11-22 03:24:15 +00:00
// Make a copy of the message as it is right now to use
// in the previous message field, but set the data field
// to nil so we don't copy around the original data when
// we don't need to for the reply message.
thisMsg := message
thisMsg . Data = nil
2021-08-16 11:01:12 +00:00
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message {
2021-09-16 10:37:46 +00:00
ToNode : message . FromNode ,
2021-09-22 13:25:40 +00:00
FromNode : message . ToNode ,
2021-09-16 10:37:46 +00:00
Data : [ ] string { string ( outData ) } ,
Method : message . ReplyMethod ,
MethodArgs : message . ReplyMethodArgs ,
MethodTimeout : message . ReplyMethodTimeout ,
2021-09-22 14:08:55 +00:00
IsReply : true ,
2021-09-16 10:37:46 +00:00
ACKTimeout : message . ReplyACKTimeout ,
Retries : message . ReplyRetries ,
2021-09-22 13:25:40 +00:00
Directory : message . Directory ,
FileName : message . FileName ,
2021-08-16 11:01:12 +00:00
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
2021-11-22 03:24:15 +00:00
PreviousMessage : & thisMsg ,
2021-08-16 11:01:12 +00:00
}
2021-08-25 06:56:44 +00:00
sam , err := newSubjectAndMessage ( newMsg )
2021-08-16 11:01:12 +00:00
if err != nil {
// In theory the system should drop the message before it reaches here.
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-16 11:01:12 +00:00
log . Printf ( "%v\n" , er )
}
2022-01-03 09:40:27 +00:00
2021-08-25 06:56:44 +00:00
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2021-08-16 11:01:12 +00:00
}
2021-09-07 04:24:21 +00:00
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming ( message Message , proc process ) ( string , string ) {
var fileName string
var folderTree string
switch {
case message . PreviousMessage == nil :
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . ToNode ) )
case message . PreviousMessage . ToNode != "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . PreviousMessage . ToNode ) )
case message . PreviousMessage . ToNode == "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . FromNode ) )
}
return fileName , folderTree
}
2021-02-18 13:27:53 +00:00
// ------------------------------------------------------------
// Subscriber method handlers
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-08-16 11:01:12 +00:00
// The methodHandler interface.
2021-02-11 14:39:19 +00:00
type methodHandler interface {
2021-03-08 13:09:14 +00:00
handler ( proc process , message Message , node string ) ( [ ] byte , error )
2021-03-01 16:08:40 +00:00
getKind ( ) CommandOrEvent
2021-02-11 14:39:19 +00:00
}
2021-03-01 16:08:40 +00:00
// -----
2021-09-20 04:40:34 +00:00
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessList ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle Op Process List
func ( m methodREQOpProcessList ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
out := [ ] byte { }
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Lock ( )
2021-11-16 18:07:24 +00:00
for _ , pTmp := range proc . processes . active . procNames {
s := fmt . Sprintf ( "%v, process: %v, id: %v, name: %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , pTmp . processKind , pTmp . processID , pTmp . subject . name ( ) )
sb := [ ] byte ( s )
out = append ( out , sb ... )
2021-09-20 04:40:34 +00:00
}
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Unlock ( )
2021-09-20 04:40:34 +00:00
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessStart ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle Op Process Start
func ( m methodREQOpProcessStart ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-09-20 09:53:17 +00:00
var out [ ] byte
2021-09-20 04:40:34 +00:00
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQOpProcessStart: got <1 number methodArgs" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
2021-09-20 04:40:34 +00:00
m := message . MethodArgs [ 0 ]
method := Method ( m )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStart: no such request type defined: %v" + m )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
}
// Create the process and start it.
sub := newSubject ( method , proc . configuration . NodeName )
procNew := newProcess ( proc . ctx , proc . processes . metrics , proc . natsConn , proc . processes , proc . toRingbufferCh , proc . configuration , sub , proc . errorCh , processKindSubscriber , nil )
go procNew . spawnWorker ( proc . processes , proc . natsConn )
2021-09-20 09:53:17 +00:00
txt := fmt . Sprintf ( "info: OpProcessStart: started id: %v, subject: %v: node: %v" , procNew . processID , sub , message . ToNode )
er := fmt . Errorf ( txt )
2021-09-20 04:40:34 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-09-20 09:53:17 +00:00
// TODO: What should this look like ?
out = [ ] byte ( txt + "\n" )
2021-09-20 04:40:34 +00:00
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 09:53:17 +00:00
// --- OpProcessStop
type methodREQOpProcessStop struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQOpProcessStop ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func ( m methodREQOpProcessStop ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
var out [ ] byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
// --- Parse and check the method arguments given.
2021-09-21 05:10:41 +00:00
// The Reason for also having the node as one of the arguments is
// that publisher processes are named by the node they are sending the
// message to. Subscriber processes names are named by the node name
// they are running on.
2021-09-23 10:52:59 +00:00
2021-11-16 18:39:42 +00:00
if v := len ( message . MethodArgs ) ; v != 3 {
er := fmt . Errorf ( "error: methodREQCliCommand: got <4 number methodArgs, want: method,node,kind" )
2021-09-23 10:52:59 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
2021-09-20 09:53:17 +00:00
methodString := message . MethodArgs [ 0 ]
2021-09-21 05:10:41 +00:00
node := message . MethodArgs [ 1 ]
kind := message . MethodArgs [ 2 ]
2021-09-20 09:53:17 +00:00
method := Method ( methodString )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject ( method , string ( node ) )
processName := processNameGet ( sub . name ( ) , processKind ( kind ) )
// Remove the process from the processes active map if found.
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Lock ( )
2021-11-16 18:07:24 +00:00
toStopProc , ok := proc . processes . active . procNames [ processName ]
2021-10-08 10:07:10 +00:00
2021-09-20 09:53:17 +00:00
if ok {
// Delete the process from the processes map
2021-11-16 09:21:44 +00:00
delete ( proc . processes . active . procNames , processName )
2021-09-20 09:53:17 +00:00
// Stop started go routines that belong to the process.
toStopProc . ctxCancel ( )
// Stop subscribing for messages on the process's subject.
err := toStopProc . natsSubscription . Unsubscribe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-20 09:53:17 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
// Remove the prometheus label
proc . processes . metrics . promProcessesAllRunning . Delete ( prometheus . Labels { "processName" : string ( processName ) } )
2021-11-16 18:39:42 +00:00
txt := fmt . Sprintf ( "info: OpProcessStop: process stopped id: %v, method: %v on: %v" , toStopProc . processID , sub , message . ToNode )
2021-09-20 09:53:17 +00:00
er := fmt . Errorf ( txt )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
} else {
txt := fmt . Sprintf ( "error: OpProcessStop: did not find process to stop: %v on %v" , sub , message . ToNode )
er := fmt . Errorf ( txt )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
}
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Unlock ( )
2021-09-20 09:53:17 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 04:40:34 +00:00
// ----
2021-03-31 11:29:55 +00:00
2021-04-13 13:54:04 +00:00
type methodREQToFileAppend struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 13:54:04 +00:00
func ( m methodREQToFileAppend ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-11 14:39:19 +00:00
2021-08-16 11:01:12 +00:00
// Handle appending data to file.
2021-04-13 13:54:04 +00:00
func ( m methodREQToFileAppend ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-02 20:15:52 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-03-02 12:46:02 +00:00
2021-04-02 20:15:52 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v" , folderTree , proc . subject , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-02 20:15:52 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-09 12:41:31 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-03-02 12:46:02 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend.handler: failed to open file: %v, %v" , file , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-02 12:46:02 +00:00
return nil , err
}
defer f . Close ( )
2021-02-11 14:39:19 +00:00
for _ , d := range message . Data {
2021-03-02 12:46:02 +00:00
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file : %v, %v" , file , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-02 12:46:02 +00:00
}
2021-02-11 14:39:19 +00:00
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-11 14:39:19 +00:00
}
2021-02-18 13:27:53 +00:00
// -----
2021-04-13 15:15:13 +00:00
type methodREQToFile struct {
2021-04-06 12:05:47 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 15:15:13 +00:00
func ( m methodREQToFile ) getKind ( ) CommandOrEvent {
2021-04-06 12:05:47 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
2021-04-13 15:15:13 +00:00
func ( m methodREQToFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-06 12:05:47 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 12:05:47 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
return nil , er
2021-04-06 12:05:47 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-04-06 17:42:03 +00:00
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
2021-04-06 12:05:47 +00:00
if err != nil {
2021-11-17 12:02:48 +00:00
er := fmt . Errorf ( "error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return nil , err
}
defer f . Close ( )
for _ , d := range message . Data {
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file: file: %v, %v" , file , err )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
type methodREQCopyFileFrom struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQCopyFileFrom ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func ( m methodREQCopyFileFrom ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
SrcFilePath := message . MethodArgs [ 0 ]
DstNode := message . MethodArgs [ 1 ]
DstFilePath := message . MethodArgs [ 2 ]
2021-11-17 13:07:35 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
defer cancel ( )
2021-11-17 12:02:48 +00:00
outCh := make ( chan [ ] byte )
errCh := make ( chan error )
// Read the file, and put the result on the out channel to be sent when done reading.
proc . processes . wg . Add ( 1 )
2021-11-22 08:26:56 +00:00
go copyFileFrom ( ctx , & proc . processes . wg , SrcFilePath , errCh , outCh )
2021-11-17 12:02:48 +00:00
// Wait here until we got the data to send, then create a new message
// and send it.
// Also checking the ctx.Done which calls Cancel will allow us to
// kill all started go routines started by this message.
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQCopyFile: got <-ctx.Done(): %v" , message . MethodArgs )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
case er := <- errCh :
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
case out := <- outCh :
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
// Prepare for sending a new message with the output
// Copy the original message to get the defaults for timeouts etc,
// and set new values for fields to change.
msg := message
msg . ToNode = Node ( DstNode )
2021-11-18 08:34:16 +00:00
//msg.Method = REQToFile
msg . Method = REQCopyFileTo
2021-11-17 12:02:48 +00:00
msg . Data = [ ] string { string ( out ) }
msg . Directory = dstDir
msg . FileName = dstFile
// Create SAM and put the message on the send new message channel.
sam , err := newSubjectAndMessage ( msg )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
// TODO: Should we also send a reply message with the result back
// to where the message originated ?
2021-11-17 13:07:35 +00:00
replyData := fmt . Sprintf ( "info: succesfully read the file %v, and sent the content to %v\n" , SrcFilePath , DstNode )
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2021-11-17 12:02:48 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-01-03 09:40:27 +00:00
// copyFileFrom will read a file to be copied from the specified SrcFilePath.
// The result of be delivered on the provided outCh.
2021-11-22 08:26:56 +00:00
func copyFileFrom ( ctx context . Context , wg * sync . WaitGroup , SrcFilePath string , errCh chan error , outCh chan [ ] byte ) {
defer wg . Done ( )
const natsMaxMsgSize = 1000000
fi , err := os . Stat ( SrcFilePath )
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os . IsNotExist ( err ) :
errCh <- fmt . Errorf ( "error: methodREQCopyFile: src file not found: %v" , SrcFilePath )
return
case fi . Size ( ) > natsMaxMsgSize :
errCh <- fmt . Errorf ( "error: methodREQCopyFile: src file to big. max size: %v" , natsMaxMsgSize )
return
}
fh , err := os . Open ( SrcFilePath )
if err != nil {
errCh <- fmt . Errorf ( "error: methodREQCopyFile: failed to open file: %v, %v" , SrcFilePath , err )
return
}
b , err := io . ReadAll ( fh )
if err != nil {
errCh <- fmt . Errorf ( "error: methodREQCopyFile: failed to read file: %v, %v" , SrcFilePath , err )
return
}
select {
case outCh <- b :
fmt . Printf ( " * DEBUG: after io.ReadAll: outCh <- b\n" )
case <- ctx . Done ( ) :
return
}
}
2021-11-17 12:02:48 +00:00
// ----
type methodREQCopyFileTo struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQCopyFileTo ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
2021-11-18 08:34:16 +00:00
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
2021-11-17 12:02:48 +00:00
func ( m methodREQCopyFileTo ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-11-18 08:34:16 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
defer cancel ( )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
// Put data that should be the result of the action done in the inner
// go routine on the outCh.
outCh := make ( chan [ ] byte )
// Put errors from the inner go routine on the errCh.
errCh := make ( chan error )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
// ---
2021-11-22 16:09:20 +00:00
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
2021-04-06 12:05:47 +00:00
2021-11-22 16:09:20 +00:00
// Pick up the values for the directory and filename for where
// to store the file.
DstFilePath := message . MethodArgs [ 2 ]
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
fileRealPath := path . Join ( dstDir , dstFile )
2021-11-18 08:34:16 +00:00
// Check if folder structure exist, if not create it.
2021-11-22 16:09:20 +00:00
if _ , err := os . Stat ( dstDir ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( dstDir , 0700 )
2021-11-18 08:34:16 +00:00
if err != nil {
2021-11-22 16:09:20 +00:00
er := fmt . Errorf ( "failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , dstDir , err )
2021-11-18 08:34:16 +00:00
errCh <- er
return
}
2021-11-22 16:09:20 +00:00
log . Printf ( "info: MethodREQCopyFileTo: Creating folders %v\n" , dstDir )
2021-11-18 08:34:16 +00:00
}
// Open file and write data. Truncate and overwrite any existing files.
2021-11-22 16:09:20 +00:00
file := filepath . Join ( dstDir , dstFile )
2021-11-18 08:34:16 +00:00
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
er := fmt . Errorf ( "failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
errCh <- er
return
}
defer f . Close ( )
for _ , d := range message . Data {
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
er := fmt . Errorf ( "failed to write to file: file: %v, error: %v" , file , err )
errCh <- er
}
}
// All went ok, send a signal to the outer select statement.
outCh <- [ ] byte ( fileRealPath )
// ---
} ( )
// Wait for messages received from the inner go routine.
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQCopyFileTo: got <-ctx.Done(): %v" , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-11-18 08:34:16 +00:00
return
case err := <- errCh :
er := fmt . Errorf ( "error: methodREQCopyFileTo: %v" , err )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
case out := <- outCh :
replyData := fmt . Sprintf ( "info: succesfully created and wrote the file %v\n" , out )
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
return
2021-04-06 12:05:47 +00:00
}
2021-11-18 08:34:16 +00:00
} ( )
2021-04-06 12:05:47 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-04-06 03:46:07 +00:00
type methodREQHello struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 03:46:07 +00:00
func ( m methodREQHello ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-18 13:27:53 +00:00
2021-08-16 11:01:12 +00:00
// Handler for receiving hello messages.
2021-04-06 03:46:07 +00:00
func ( m methodREQHello ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-27 10:27:38 +00:00
data := fmt . Sprintf ( "%v, Received hello from %#v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-03-09 03:55:51 +00:00
2021-08-24 12:05:44 +00:00
fileName := message . FileName
2021-09-08 02:11:35 +00:00
folderTree := filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . FromNode ) )
2021-08-10 08:53:15 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
if err != nil {
2021-08-24 13:29:56 +00:00
log . Printf ( "error: methodREQHello.handler: failed to open file: %v\n" , err )
2021-08-10 08:53:15 +00:00
return nil , err
}
defer f . Close ( )
_ , err = f . Write ( [ ] byte ( data ) )
f . Sync ( )
if err != nil {
log . Printf ( "error: methodEventTextLogging.handler: failed to write to file: %v\n" , err )
}
// --------------------------
2021-03-04 15:27:55 +00:00
// send the message to the procFuncCh which is running alongside the process
// and can hold registries and handle special things for an individual process.
proc . procFuncCh <- message
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-18 13:27:53 +00:00
}
2021-02-24 14:43:31 +00:00
// ---
2021-04-06 05:56:49 +00:00
type methodREQErrorLog struct {
2021-03-01 16:08:40 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 05:56:49 +00:00
func ( m methodREQErrorLog ) getKind ( ) CommandOrEvent {
2021-03-01 16:08:40 +00:00
return m . commandOrEvent
}
2021-02-24 14:43:31 +00:00
2021-08-16 11:01:12 +00:00
// Handle the writing of error logs.
2021-04-06 05:56:49 +00:00
func ( m methodREQErrorLog ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 09:41:46 +00:00
proc . processes . metrics . promErrorMessagesReceivedTotal . Inc ( )
2021-04-06 05:56:49 +00:00
2021-04-06 07:06:26 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 07:06:26 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
2021-04-06 07:06:26 +00:00
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-12 10:27:47 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-04-06 07:06:26 +00:00
if err != nil {
2021-08-24 13:29:56 +00:00
log . Printf ( "error: methodREQErrorLog.handler: failed to open file: %v\n" , err )
2021-04-06 07:06:26 +00:00
return nil , err
}
defer f . Close ( )
2021-04-06 05:56:49 +00:00
2021-04-06 07:06:26 +00:00
for _ , d := range message . Data {
_ , err := f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
log . Printf ( "error: methodEventTextLogging.handler: failed to write to file: %v\n" , err )
}
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-24 14:43:31 +00:00
}
2021-03-11 05:34:36 +00:00
// ---
2021-04-06 04:08:26 +00:00
type methodREQPing struct {
2021-03-11 05:34:36 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 04:08:26 +00:00
func ( m methodREQPing ) getKind ( ) CommandOrEvent {
2021-03-11 05:34:36 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle receving a ping.
2021-04-06 04:08:26 +00:00
func ( m methodREQPing ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a ping
2021-03-11 05:34:36 +00:00
2021-08-26 06:40:33 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to create toFile directory tree: %v, %v" , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , er
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , err
}
defer f . Close ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, ping received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
}
2021-08-26 05:01:55 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 05:31:50 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
newReplyMessage ( proc , message , nil )
2021-04-06 05:31:50 +00:00
} ( )
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-06 04:08:26 +00:00
type methodREQPong struct {
2021-03-11 05:34:36 +00:00
commandOrEvent CommandOrEvent
}
2021-04-06 04:08:26 +00:00
func ( m methodREQPong ) getKind ( ) CommandOrEvent {
2021-03-11 05:34:36 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handle receiving a pong.
2021-04-06 04:08:26 +00:00
func ( m methodREQPong ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a pong
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to create toFile directory tree %v: %v" , folderTree , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , er
}
log . Printf ( "info: Creating subscribers data folder at %v\n" , folderTree )
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
return nil , err
}
defer f . Close ( )
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, pong received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . PreviousMessage . ToNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-26 06:40:33 +00:00
log . Printf ( "%v\n" , er )
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-03-11 11:07:09 +00:00
2021-03-11 11:51:16 +00:00
// ---
2021-03-11 11:07:09 +00:00
2021-04-04 09:19:17 +00:00
type methodREQCliCommand struct {
2021-03-11 11:07:09 +00:00
commandOrEvent CommandOrEvent
}
2021-04-04 09:19:17 +00:00
func ( m methodREQCliCommand ) getKind ( ) CommandOrEvent {
2021-03-11 11:07:09 +00:00
return m . commandOrEvent
}
2021-03-29 04:53:34 +00:00
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
2021-04-04 09:19:17 +00:00
func ( m methodREQCliCommand ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-11-10 11:58:23 +00:00
log . Printf ( "<--- CLICommandREQUEST received from: %v, containing: %v" , message . FromNode , message . MethodArgs )
2021-03-11 11:07:09 +00:00
2021-03-29 04:53:34 +00:00
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
var a [ ] string
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <1 number methodArgs" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
case len ( message . MethodArgs ) >= 0 :
a = message . MethodArgs [ 1 : ]
}
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
2021-03-11 11:07:09 +00:00
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-03-26 15:04:01 +00:00
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-22 14:58:23 +00:00
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i , v := range message . MethodArgs {
if strings . Contains ( v , "{{STEWARD_DATA}}" ) {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message . MethodArgs [ i ] = strings . Replace ( message . MethodArgs [ i ] , "{{STEWARD_DATA}}" , "$STEWARD_DATA" , - 1 )
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
envData = strings . Join ( message . Data , "" )
}
}
2021-03-26 15:04:01 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
2021-09-08 03:50:45 +00:00
2021-09-22 13:25:40 +00:00
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt . Sprintf ( "STEWARD_DATA=%v" , envData )
cmd . Env = append ( cmd . Env , envData )
}
2021-09-08 03:50:45 +00:00
var out bytes . Buffer
var stderr bytes . Buffer
cmd . Stdout = & out
cmd . Stderr = & stderr
2021-11-10 15:10:51 +00:00
2021-09-08 03:50:45 +00:00
err := cmd . Run ( )
2021-03-26 15:04:01 +00:00
if err != nil {
2021-11-10 15:10:51 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v" , err , message . MethodArgs , stderr . String ( ) )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-03-26 15:04:01 +00:00
}
2021-08-12 10:27:47 +00:00
select {
2021-09-08 03:50:45 +00:00
case outCh <- out . Bytes ( ) :
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-03-26 15:04:01 +00:00
} ( )
select {
case <- ctx . Done ( ) :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: method timed out: %v" , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-03-26 15:04:01 +00:00
case out := <- outCh :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-03-26 15:04:01 +00:00
2021-09-22 14:58:23 +00:00
// NB: Not quite sure what is the best way to handle the below
// isReply right now. Implementing as send to central for now.
//
2021-09-22 14:08:55 +00:00
// If this is this a reply message swap the toNode and fromNode
// fields so the output of the command are sent to central node.
if message . IsReply {
message . ToNode , message . FromNode = message . FromNode , message . ToNode
}
2021-03-31 11:29:55 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-03-26 15:04:01 +00:00
}
2021-03-12 11:08:11 +00:00
2021-03-26 15:04:01 +00:00
} ( )
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-13 15:22:25 +00:00
type methodREQToConsole struct {
2021-03-11 11:07:09 +00:00
commandOrEvent CommandOrEvent
}
2021-04-13 15:22:25 +00:00
func ( m methodREQToConsole ) getKind ( ) CommandOrEvent {
2021-03-11 11:07:09 +00:00
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// Handler to write directly to console.
2021-04-13 15:22:25 +00:00
func ( m methodREQToConsole ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-03-11 11:07:09 +00:00
2021-11-11 12:43:32 +00:00
for _ , v := range message . Data {
fmt . Fprintf ( os . Stdout , "%v" , string ( v ) )
}
fmt . Println ( )
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-06 17:42:03 +00:00
// ---
type methodREQHttpGet struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQHttpGet ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
2021-08-16 11:01:12 +00:00
// handler to do a Http Get.
2021-04-06 17:42:03 +00:00
func ( m methodREQHttpGet ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
log . Printf ( "<--- REQHttpGet received from: %v, containing: %v" , message . FromNode , message . Data )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQHttpGet: got <1 number methodArgs" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
2021-09-16 09:51:34 +00:00
url := message . MethodArgs [ 0 ]
2021-04-06 17:42:03 +00:00
client := http . Client {
2021-12-15 14:13:48 +00:00
Timeout : time . Duration ( message . MethodTimeout ) ,
2021-04-06 17:42:03 +00:00
}
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-04-06 17:42:03 +00:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
cancel ( )
return
}
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-04-06 17:42:03 +00:00
resp , err := client . Do ( req )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, bailing out: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, where %#v, bailing out: %v" , resp . StatusCode , message )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
return
}
2021-08-24 07:25:43 +00:00
body , err := io . ReadAll ( resp . Body )
2021-04-06 17:42:03 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-04-06 17:42:03 +00:00
}
2021-08-24 07:25:43 +00:00
out := body
2021-08-12 10:27:47 +00:00
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
}
2021-04-06 17:42:03 +00:00
} ( )
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: method timed out: %v" , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-06 17:42:03 +00:00
case out := <- outCh :
cancel ( )
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-06 17:42:03 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-13 09:28:52 +00:00
// --- methodREQTailFile
type methodREQTailFile struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQTailFile ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func ( m methodREQTailFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-12 14:39:09 +00:00
log . Printf ( "<--- TailFile REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2021-04-13 09:28:52 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQTailFile: got <1 number methodArgs" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
2021-09-16 09:51:34 +00:00
fp := message . MethodArgs [ 0 ]
2021-04-13 09:28:52 +00:00
2021-04-16 21:58:43 +00:00
var ctx context . Context
var cancel context . CancelFunc
if message . MethodTimeout != 0 {
2021-07-02 09:26:52 +00:00
ctx , cancel = context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-04-16 21:58:43 +00:00
} else {
2021-07-02 09:26:52 +00:00
ctx , cancel = context . WithCancel ( proc . ctx )
2021-04-16 21:58:43 +00:00
}
2021-04-13 09:28:52 +00:00
outCh := make ( chan [ ] byte )
2021-04-16 21:58:43 +00:00
t , err := tail . TailFile ( fp , tail . Config { Follow : true , Location : & tail . SeekInfo {
Offset : 0 ,
Whence : os . SEEK_END ,
} } )
2021-04-13 09:28:52 +00:00
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQToTailFile: tailFile: %v" , err )
2021-04-13 09:28:52 +00:00
log . Printf ( "%v\n" , er )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-04-13 09:28:52 +00:00
}
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-13 10:03:30 +00:00
for {
2021-08-12 10:27:47 +00:00
select {
2021-08-13 10:03:30 +00:00
case line := <- t . Lines :
outCh <- [ ] byte ( line . Text + "\n" )
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-04-13 09:28:52 +00:00
}
} ( )
for {
select {
case <- ctx . Done ( ) :
cancel ( )
// Close the lines channel so we exit the reading lines
// go routine.
2021-04-16 21:58:43 +00:00
// close(t.Lines)
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: method timeout reached REQTailFile, canceling: %v" , message . MethodArgs )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-08-13 10:03:30 +00:00
2021-04-13 09:28:52 +00:00
return
case out := <- outCh :
2021-08-12 14:39:09 +00:00
2021-04-13 09:28:52 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-13 09:28:52 +00:00
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-06-08 18:52:45 +00:00
// ---
2021-09-17 08:17:10 +00:00
type methodREQCliCommandCont struct {
2021-06-08 18:52:45 +00:00
commandOrEvent CommandOrEvent
}
2021-09-17 08:17:10 +00:00
func ( m methodREQCliCommandCont ) getKind ( ) CommandOrEvent {
2021-06-08 18:52:45 +00:00
return m . commandOrEvent
}
2021-09-17 08:17:10 +00:00
// Handler to run REQCliCommandCont, which is the same as normal
2021-08-10 10:49:42 +00:00
// Cli command, but can be used when running a command that will take
// longer time and you want to send the output of the command continually
// back as it is generated, and not just when the command is finished.
2021-09-17 08:17:10 +00:00
func ( m methodREQCliCommandCont ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-06-08 18:52:45 +00:00
log . Printf ( "<--- CLInCommandCont REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
var a [ ] string
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <1 number methodArgs" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
case len ( message . MethodArgs ) >= 0 :
a = message . MethodArgs [ 1 : ]
}
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
2021-06-08 18:52:45 +00:00
2021-07-02 09:26:52 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
2021-06-08 18:52:45 +00:00
outCh := make ( chan [ ] byte )
2021-09-27 12:45:49 +00:00
errCh := make ( chan string )
2021-06-08 18:52:45 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
proc . processes . wg . Done ( )
2021-06-08 18:52:45 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
// Using cmd.StdoutPipe here so we are continuosly
// able to read the out put of the command.
outReader , err := cmd . StdoutPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-09-27 12:45:49 +00:00
log . Printf ( "error: %v\n" , er )
2021-06-08 18:52:45 +00:00
}
2021-09-08 03:50:45 +00:00
ErrorReader , err := cmd . StderrPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-08 03:50:45 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
2021-06-08 18:52:45 +00:00
if err := cmd . Start ( ) ; err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v" , err , message . MethodArgs )
2021-09-07 07:43:54 +00:00
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-09 07:40:35 +00:00
log . Printf ( "%v\n" , er )
2021-06-08 18:52:45 +00:00
}
2021-09-08 03:50:45 +00:00
go func ( ) {
scanner := bufio . NewScanner ( ErrorReader )
for scanner . Scan ( ) {
2021-09-27 12:45:49 +00:00
errCh <- scanner . Text ( )
2021-09-08 03:50:45 +00:00
}
} ( )
2021-09-27 12:45:49 +00:00
go func ( ) {
scanner := bufio . NewScanner ( outReader )
for scanner . Scan ( ) {
outCh <- [ ] byte ( scanner . Text ( ) + "\n" )
2021-08-12 10:27:47 +00:00
}
2021-09-27 12:45:49 +00:00
} ( )
2021-08-12 10:27:47 +00:00
2021-09-27 12:45:49 +00:00
// NB: sending cancel to command context, so processes are killed.
// A github issue is filed on not killing all child processes when using pipes:
// https://github.com/golang/go/issues/23019
// TODO: Check in later if there are any progress on the issue.
// When testing the problem seems to appear when using sudo, or tcpdump without
// the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
// which makes stdout line buffered.
<- ctx . Done ( )
cancel ( )
if err := cmd . Wait ( ) ; err != nil {
log . Printf ( " --------------- * error: REQCliCommandCont: cmd.Wait: %v\n" , err )
2021-06-08 18:52:45 +00:00
}
} ( )
// Check if context timer or command output were received.
for {
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v" , message . MethodArgs )
2021-09-23 06:31:30 +00:00
sendInfoLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
2021-06-08 18:52:45 +00:00
return
case out := <- outCh :
newReplyMessage ( proc , message , out )
2021-09-27 12:45:49 +00:00
case out := <- errCh :
newReplyMessage ( proc , message , [ ] byte ( out ) )
2021-06-08 18:52:45 +00:00
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-07-01 08:05:34 +00:00
// ---
type methodREQToSocket struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQToSocket ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
2022-01-03 06:36:38 +00:00
// TODO: Not implemented.
2021-08-16 11:01:12 +00:00
// Handler to write to unix socket file.
2021-07-01 08:05:34 +00:00
func ( m methodREQToSocket ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
for _ , d := range message . Data {
2021-09-23 03:46:25 +00:00
// TODO: Write the data to the socket here.
2021-07-01 08:05:34 +00:00
fmt . Printf ( "Info: Data to write to socket: %v\n" , d )
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-10 05:22:03 +00:00
2021-11-19 08:35:53 +00:00
type methodREQRelayInitial struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQRelayInitial ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handler to relay messages via a host.
func ( m methodREQRelayInitial ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-11-22 15:02:40 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-19 08:35:53 +00:00
2021-11-22 15:02:40 +00:00
ctx , cancel := context . WithTimeout ( proc . ctx , time . Second * time . Duration ( message . MethodTimeout ) )
defer cancel ( )
2021-11-19 08:35:53 +00:00
2021-11-22 15:02:40 +00:00
outCh := make ( chan [ ] byte )
errCh := make ( chan error )
2021-11-23 18:49:48 +00:00
nothingCh := make ( chan struct { } , 1 )
2021-11-22 15:02:40 +00:00
var out [ ] byte
// If the actual Method for the message is REQCopyFileFrom we need to
// do the actual file reading here so we can fill the data field of the
// message with the content of the file before relaying it.
switch {
case message . RelayOriginalMethod == REQCopyFileFrom :
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
return
}
SrcFilePath := message . MethodArgs [ 0 ]
2021-11-22 16:09:20 +00:00
//DstFilePath := message.MethodArgs[2]
2021-11-22 15:02:40 +00:00
// Read the file, and put the result on the out channel to be sent when done reading.
proc . processes . wg . Add ( 1 )
go copyFileFrom ( ctx , & proc . processes . wg , SrcFilePath , errCh , outCh )
// Since we now have read the source file we don't need the REQCopyFileFrom
// request method anymore, so we change the original method of the message
// so it will write the data after the relaying.
2021-11-22 16:09:20 +00:00
//dstDir := filepath.Dir(DstFilePath)
//dstFile := filepath.Base(DstFilePath)
2021-11-22 15:02:40 +00:00
message . RelayOriginalMethod = REQCopyFileTo
2021-11-22 16:09:20 +00:00
//message.FileName = dstFile
//message.Directory = dstDir
2021-11-22 15:02:40 +00:00
default :
2021-11-23 15:06:06 +00:00
// No request type that need special handling if relayed, so we should signal that
// there is nothing to do for the select below.
// We need to do this signaling in it's own go routine here, so we don't block here
// since the select below is in the same function.
go func ( ) {
2021-11-23 18:49:48 +00:00
nothingCh <- struct { } { }
2021-11-23 15:06:06 +00:00
} ( )
2021-11-22 15:02:40 +00:00
}
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v" , message . MethodArgs )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
case er := <- errCh :
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
return
2021-11-23 18:49:48 +00:00
case <- nothingCh :
// Do nothing.
2021-11-22 15:02:40 +00:00
case out = <- outCh :
}
2021-12-06 13:57:02 +00:00
// relay the message to the actual host here by prefixing the the RelayToNode
// to the subject.
relayTo := fmt . Sprintf ( "%v.%v" , message . RelayToNode , message . RelayOriginalViaNode )
// message.ToNode = message.RelayOriginalViaNode
message . ToNode = Node ( relayTo )
2021-11-22 15:02:40 +00:00
message . FromNode = Node ( node )
message . Method = REQRelay
message . Data = [ ] string { string ( out ) }
sam , err := newSubjectAndMessage ( message )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
} ( )
2021-11-19 08:35:53 +00:00
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed REQRelay from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-10 05:22:03 +00:00
type methodREQRelay struct {
commandOrEvent CommandOrEvent
}
func ( m methodREQRelay ) getKind ( ) CommandOrEvent {
return m . commandOrEvent
}
// Handler to relay messages via a host.
func ( m methodREQRelay ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// relay the message here to the actual host here.
2021-11-10 11:58:23 +00:00
message . ToNode = message . RelayToNode
message . FromNode = Node ( node )
message . Method = message . RelayOriginalMethod
sam , err := newSubjectAndMessage ( message )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
sendErrorLogMessage ( proc . configuration , proc . processes . metrics , proc . toRingbufferCh , proc . node , er )
log . Printf ( "%v\n" , er )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2021-11-10 10:21:38 +00:00
2021-11-10 05:22:03 +00:00
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-18 08:34:16 +00:00
// ---- Template that can be used for creating request methods
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// defer cancel()
//
// // Put data that should be the result of the action done in the inner
// // go routine on the outCh.
// outCh := make(chan []byte)
// // Put errors from the inner go routine on the errCh.
// errCh := make(chan error)
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// // Do some work here....
//
// }()
//
// // Wait for messages received from the inner go routine.
// select {
// case <-ctx.Done():
// fmt.Printf(" ** DEBUG: got ctx.Done\n")
//
// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs)
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case er := <-errCh:
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case out := <-outCh:
// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
// newReplyMessage(proc, message, []byte(replyData))
// return
// }
//
// }()
//
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
// return ackMsg, nil
// }