2021-03-01 16:08:40 +00:00
// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
2021-04-04 05:33:18 +00:00
// type methodCommandCLICommandRequest struct {
2021-03-01 16:08:40 +00:00
// commandOrEvent CommandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
2021-03-01 16:08:40 +00:00
// return m.commandOrEvent
// }
//
2021-04-04 05:33:18 +00:00
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
2021-03-01 16:08:40 +00:00
// ...
// ...
2021-03-11 05:34:36 +00:00
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
2021-03-01 16:08:40 +00:00
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
2021-02-11 14:39:19 +00:00
package steward
import (
2021-06-08 18:52:45 +00:00
"bufio"
2021-09-08 03:50:45 +00:00
"bytes"
2021-03-26 15:04:01 +00:00
"context"
2022-05-12 04:22:11 +00:00
"crypto/sha256"
2022-04-07 07:34:06 +00:00
"encoding/json"
2021-02-11 14:39:19 +00:00
"fmt"
2021-04-06 17:42:03 +00:00
"io"
2022-05-12 04:22:11 +00:00
"log"
2021-04-06 17:42:03 +00:00
"net/http"
2021-03-02 12:46:02 +00:00
"os"
2021-02-11 14:39:19 +00:00
"os/exec"
2021-11-18 08:34:16 +00:00
"path"
2021-03-02 12:46:02 +00:00
"path/filepath"
2022-05-12 04:22:11 +00:00
"sort"
2022-02-11 06:27:51 +00:00
"strconv"
2021-09-21 21:19:55 +00:00
"strings"
2021-11-22 08:26:56 +00:00
"sync"
2021-03-11 11:07:09 +00:00
"time"
2021-04-12 13:35:20 +00:00
2022-05-12 04:22:11 +00:00
"github.com/fxamacker/cbor/v2"
2021-04-13 09:28:52 +00:00
"github.com/hpcloud/tail"
2021-04-12 13:35:20 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-02-11 14:39:19 +00:00
)
2021-03-11 16:14:43 +00:00
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-03-01 16:08:40 +00:00
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
REQInitial Method = "REQInitial"
2021-09-20 04:40:34 +00:00
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
2021-09-20 09:53:17 +00:00
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
2021-03-11 16:14:43 +00:00
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-04 09:19:17 +00:00
REQCliCommand Method = "REQCliCommand"
2021-09-17 08:17:10 +00:00
// REQCliCommandCont same as normal Cli command, but can be used
2021-06-08 18:52:45 +00:00
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
2021-09-17 08:17:10 +00:00
// generated, and not wait until the command is finished.
REQCliCommandCont Method = "REQCliCommandCont"
2021-04-05 06:17:04 +00:00
// Send text to be logged to the console.
2021-03-11 16:14:43 +00:00
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
2021-04-13 15:22:25 +00:00
REQToConsole Method = "REQToConsole"
2022-01-12 06:42:41 +00:00
// REQTuiToConsole
REQTuiToConsole Method = "REQTuiToConsole"
2021-04-05 06:17:04 +00:00
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
2021-03-11 16:14:43 +00:00
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
2021-04-13 13:54:04 +00:00
REQToFileAppend Method = "REQToFileAppend"
2021-04-06 12:05:47 +00:00
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
2021-04-13 15:15:13 +00:00
REQToFile Method = "REQToFile"
2022-03-04 14:02:43 +00:00
// REQToFileNACK same as REQToFile but NACK.
REQToFileNACK Method = "REQToFileNACK"
2021-11-17 12:02:48 +00:00
// Read the source file to be copied to some node.
REQCopyFileFrom Method = "REQCopyFileFrom"
// Write the destination copied to some node.
REQCopyFileTo Method = "REQCopyFileTo"
2021-03-11 16:14:43 +00:00
// Send Hello I'm here message.
2021-04-06 03:46:07 +00:00
REQHello Method = "REQHello"
2021-03-11 16:14:43 +00:00
// Error log methods to centralError node.
2021-04-06 05:56:49 +00:00
REQErrorLog Method = "REQErrorLog"
2021-03-11 05:34:36 +00:00
// Echo request will ask the subscriber for a
2021-03-11 16:14:43 +00:00
// reply generated as a new message, and sent back to where
// the initial request was made.
2021-04-06 04:08:26 +00:00
REQPing Method = "REQPing"
2021-03-11 11:07:09 +00:00
// Will generate a reply for a ECHORequest
2021-04-06 04:08:26 +00:00
REQPong Method = "REQPong"
2021-04-06 17:42:03 +00:00
// Http Get
REQHttpGet Method = "REQHttpGet"
2022-02-11 06:27:51 +00:00
// Http Get Scheduled
// The second element of the MethodArgs slice holds the timer defined in seconds.
REQHttpGetScheduled Method = "REQHttpGetScheduled"
2021-04-13 09:28:52 +00:00
// Tail file
REQTailFile Method = "REQTailFile"
2021-07-01 08:05:34 +00:00
// Write to steward socket
2021-11-10 05:22:03 +00:00
REQRelay Method = "REQRelay"
2021-11-19 08:35:53 +00:00
// The method handler for the first step in a relay chain.
REQRelayInitial Method = "REQRelayInitial"
2022-01-05 07:47:06 +00:00
// REQNone is used when there should be no reply.
REQNone Method = "REQNone"
2022-04-07 07:34:06 +00:00
// REQPublicKey will get the public ed25519 key from a node.
2022-02-08 10:49:32 +00:00
REQPublicKey Method = "REQPublicKey"
2022-04-07 07:34:06 +00:00
// REQPublicKeysGet will get all the public keys from central.
REQPublicKeysGet Method = "REQPublicKeysGet"
2022-04-20 04:26:01 +00:00
// REQPublicKeysToNode will put all the public received from central.
REQPublicKeysToNode Method = "REQPublicKeysToNode"
2022-04-20 16:33:52 +00:00
// REQAuthPublicKeysAllow
REQPublicKeysAllow Method = "REQPublicKeysAllow"
2022-05-18 12:43:35 +00:00
// REQAclAddCommand
REQAclAddCommand = "REQAclAddCommand"
// REQAclDeleteCommand
REQAclDeleteCommand = "REQAclDeleteCommand"
2021-03-01 16:08:40 +00:00
)
2021-02-11 14:39:19 +00:00
2021-03-01 16:08:40 +00:00
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
2022-01-27 13:25:24 +00:00
// Allowed values for the Event field are:
2021-03-01 16:08:40 +00:00
// - EventACK
// - EventNack
2021-02-11 14:39:19 +00:00
func ( m Method ) GetMethodsAvailable ( ) MethodsAvailable {
2021-04-03 05:14:39 +00:00
2021-02-11 14:39:19 +00:00
ma := MethodsAvailable {
2021-06-29 06:21:42 +00:00
Methodhandlers : map [ Method ] methodHandler {
2021-04-08 11:43:47 +00:00
REQInitial : methodREQInitial {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-04-08 11:43:47 +00:00
} ,
2021-09-20 04:40:34 +00:00
REQOpProcessList : methodREQOpProcessList {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-09-20 04:40:34 +00:00
} ,
REQOpProcessStart : methodREQOpProcessStart {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-09-20 04:40:34 +00:00
} ,
2021-09-20 09:53:17 +00:00
REQOpProcessStop : methodREQOpProcessStop {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-09-20 09:53:17 +00:00
} ,
2021-04-04 09:19:17 +00:00
REQCliCommand : methodREQCliCommand {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-11 11:07:09 +00:00
} ,
2021-09-17 08:17:10 +00:00
REQCliCommandCont : methodREQCliCommandCont {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-06-08 18:52:45 +00:00
} ,
2021-04-13 15:22:25 +00:00
REQToConsole : methodREQToConsole {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-11 11:07:09 +00:00
} ,
2022-01-12 06:42:41 +00:00
REQTuiToConsole : methodREQTuiToConsole {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2022-01-12 06:42:41 +00:00
} ,
2021-04-13 13:54:04 +00:00
REQToFileAppend : methodREQToFileAppend {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-01 16:08:40 +00:00
} ,
2021-04-13 15:15:13 +00:00
REQToFile : methodREQToFile {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-04-06 12:05:47 +00:00
} ,
2022-03-04 14:02:43 +00:00
REQToFileNACK : methodREQToFile {
event : EventNACK ,
} ,
2021-11-17 12:02:48 +00:00
REQCopyFileFrom : methodREQCopyFileFrom {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-11-17 12:02:48 +00:00
} ,
REQCopyFileTo : methodREQCopyFileTo {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-11-17 12:02:48 +00:00
} ,
2021-04-06 03:46:07 +00:00
REQHello : methodREQHello {
2022-01-27 09:06:06 +00:00
event : EventNACK ,
2021-03-01 16:08:40 +00:00
} ,
2021-04-06 05:56:49 +00:00
REQErrorLog : methodREQErrorLog {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-01 16:08:40 +00:00
} ,
2021-04-06 04:08:26 +00:00
REQPing : methodREQPing {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-11 05:34:36 +00:00
} ,
2021-04-06 04:08:26 +00:00
REQPong : methodREQPong {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-03-11 05:34:36 +00:00
} ,
2021-04-06 17:42:03 +00:00
REQHttpGet : methodREQHttpGet {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-04-06 17:42:03 +00:00
} ,
2022-02-11 06:27:51 +00:00
REQHttpGetScheduled : methodREQHttpGetScheduled {
event : EventACK ,
} ,
2021-04-13 09:28:52 +00:00
REQTailFile : methodREQTailFile {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-04-13 09:28:52 +00:00
} ,
2021-11-10 05:22:03 +00:00
REQRelay : methodREQRelay {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-11-10 05:22:03 +00:00
} ,
2021-11-19 08:35:53 +00:00
REQRelayInitial : methodREQRelayInitial {
2022-01-27 09:06:06 +00:00
event : EventACK ,
2021-11-19 08:35:53 +00:00
} ,
2022-02-08 04:17:38 +00:00
REQPublicKey : methodREQPublicKey {
event : EventACK ,
} ,
2022-04-07 07:34:06 +00:00
REQPublicKeysGet : methodREQPublicKeysGet {
event : EventNACK ,
} ,
2022-04-20 04:26:01 +00:00
REQPublicKeysToNode : methodREQPublicKeysToNode {
2022-04-07 07:34:06 +00:00
event : EventNACK ,
} ,
2022-04-20 16:33:52 +00:00
REQPublicKeysAllow : methodREQPublicKeysAllow {
event : EventACK ,
} ,
2022-05-18 12:43:35 +00:00
REQAclAddCommand : methodREQAclAddCommand {
event : EventACK ,
} ,
REQAclDeleteCommand : methodREQAclDeleteCommand {
2022-05-18 09:26:06 +00:00
event : EventACK ,
} ,
2021-02-11 14:39:19 +00:00
} ,
}
return ma
}
2021-07-01 05:42:45 +00:00
// Reply methods. The slice generated here is primarily used within
// the Stew client for knowing what of the req types are generally
// used as reply methods.
2021-06-29 06:21:42 +00:00
func ( m Method ) GetReplyMethods ( ) [ ] Method {
2022-04-20 04:41:45 +00:00
rm := [ ] Method { REQToConsole , REQTuiToConsole , REQCliCommand , REQCliCommandCont , REQToFile , REQToFileAppend , REQNone }
2021-06-16 19:38:33 +00:00
return rm
}
2021-03-01 16:08:40 +00:00
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func ( m Method ) getHandler ( method Method ) methodHandler {
ma := m . GetMethodsAvailable ( )
2021-06-29 06:21:42 +00:00
mh := ma . Methodhandlers [ method ]
2021-03-01 16:08:40 +00:00
return mh
}
2021-02-11 14:39:19 +00:00
2022-01-26 14:35:31 +00:00
// getContextForMethodTimeout, will return a context with cancel function
// with the timeout set to the method timeout in the message.
// If the value of timeout is set to -1, we don't want it to stop, so we
// return a context with a timeout set to 200 years.
func getContextForMethodTimeout ( ctx context . Context , message Message ) ( context . Context , context . CancelFunc ) {
// If methodTimeout == -1, which means we don't want a timeout, set the
// time out to 200 years.
if message . MethodTimeout == - 1 {
return context . WithTimeout ( ctx , time . Hour * time . Duration ( 8760 * 200 ) )
}
return context . WithTimeout ( ctx , time . Second * time . Duration ( message . MethodTimeout ) )
}
2021-04-08 11:43:47 +00:00
// ----
2021-08-16 11:01:12 +00:00
// Initial parent method used to start other processes.
2021-04-08 11:43:47 +00:00
type methodREQInitial struct {
2022-01-27 09:06:06 +00:00
event Event
2021-04-08 11:43:47 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQInitial ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-04-08 11:43:47 +00:00
}
func ( m methodREQInitial ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// proc.procFuncCh <- message
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-08-16 11:01:12 +00:00
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
2021-02-11 14:39:19 +00:00
type MethodsAvailable struct {
2021-06-29 06:21:42 +00:00
Methodhandlers map [ Method ] methodHandler
2021-02-11 14:39:19 +00:00
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func ( ma MethodsAvailable ) CheckIfExists ( m Method ) ( methodHandler , bool ) {
2021-06-29 06:21:42 +00:00
mFunc , ok := ma . Methodhandlers [ m ]
2021-02-11 14:39:19 +00:00
if ok {
return mFunc , true
} else {
return nil , false
}
}
2021-11-22 03:24:15 +00:00
// newReplyMessage will create and send a reply message back to where
// the original provided message came from. The primary use of this
// function is to report back to a node who sent a message with the
// result of the request method of the original message.
//
// The method to use for the reply message when reporting back should
// be specified within a message in the replyMethod field. We will
2021-08-16 11:01:12 +00:00
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
2021-11-22 03:24:15 +00:00
//
// There will also be a copy of the original message put in the
// previousMessage field. For the copy of the original message the data
// field will be set to nil before the whole message is put in the
// previousMessage field so we don't copy around the original data in
// the reply response when it is not needed anymore.
2021-08-16 11:01:12 +00:00
func newReplyMessage ( proc process , message Message , outData [ ] byte ) {
2022-01-05 07:47:06 +00:00
// If REQNone is specified, we don't want to send a reply message
// so we silently just return without sending anything.
if message . ReplyMethod == "REQNone" {
return
}
2021-08-16 11:01:12 +00:00
// If no replyMethod is set we default to writing to writing to
// a log file.
if message . ReplyMethod == "" {
message . ReplyMethod = REQToFileAppend
}
2021-11-22 03:24:15 +00:00
// Make a copy of the message as it is right now to use
// in the previous message field, but set the data field
// to nil so we don't copy around the original data when
// we don't need to for the reply message.
thisMsg := message
thisMsg . Data = nil
2021-08-16 11:01:12 +00:00
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
2022-04-07 07:34:06 +00:00
// TODO: Check that we still got all the fields present that are needed here.
2021-08-16 11:01:12 +00:00
newMsg := Message {
2021-09-16 10:37:46 +00:00
ToNode : message . FromNode ,
2021-09-22 13:25:40 +00:00
FromNode : message . ToNode ,
2022-01-31 07:49:46 +00:00
Data : outData ,
2021-09-16 10:37:46 +00:00
Method : message . ReplyMethod ,
MethodArgs : message . ReplyMethodArgs ,
MethodTimeout : message . ReplyMethodTimeout ,
2021-09-22 14:08:55 +00:00
IsReply : true ,
2021-09-16 10:37:46 +00:00
ACKTimeout : message . ReplyACKTimeout ,
Retries : message . ReplyRetries ,
2021-09-22 13:25:40 +00:00
Directory : message . Directory ,
FileName : message . FileName ,
2021-08-16 11:01:12 +00:00
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
2021-11-22 03:24:15 +00:00
PreviousMessage : & thisMsg ,
2021-08-16 11:01:12 +00:00
}
2021-08-25 06:56:44 +00:00
sam , err := newSubjectAndMessage ( newMsg )
2021-08-16 11:01:12 +00:00
if err != nil {
// In theory the system should drop the message before it reaches here.
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-16 11:01:12 +00:00
}
2022-01-03 09:40:27 +00:00
2021-08-25 06:56:44 +00:00
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2021-08-16 11:01:12 +00:00
}
2021-09-07 04:24:21 +00:00
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming ( message Message , proc process ) ( string , string ) {
var fileName string
var folderTree string
switch {
case message . PreviousMessage == nil :
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . ToNode ) )
case message . PreviousMessage . ToNode != "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . PreviousMessage . ToNode ) )
case message . PreviousMessage . ToNode == "" :
fileName = message . PreviousMessage . FileName
folderTree = filepath . Join ( proc . configuration . SubscribersDataFolder , message . PreviousMessage . Directory , string ( message . FromNode ) )
}
return fileName , folderTree
}
2021-02-18 13:27:53 +00:00
// ------------------------------------------------------------
// Subscriber method handlers
2021-02-11 14:39:19 +00:00
// ------------------------------------------------------------
2021-08-16 11:01:12 +00:00
// The methodHandler interface.
2021-02-11 14:39:19 +00:00
type methodHandler interface {
2021-03-08 13:09:14 +00:00
handler ( proc process , message Message , node string ) ( [ ] byte , error )
2022-01-27 06:19:04 +00:00
getKind ( ) Event
2021-02-11 14:39:19 +00:00
}
2021-03-01 16:08:40 +00:00
// -----
2021-09-20 04:40:34 +00:00
// --- OpProcessList
type methodREQOpProcessList struct {
2022-01-27 09:06:06 +00:00
event Event
2021-09-20 04:40:34 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQOpProcessList ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-09-20 04:40:34 +00:00
}
// Handle Op Process List
func ( m methodREQOpProcessList ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
out := [ ] byte { }
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Lock ( )
2021-11-16 18:07:24 +00:00
for _ , pTmp := range proc . processes . active . procNames {
s := fmt . Sprintf ( "%v, process: %v, id: %v, name: %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , pTmp . processKind , pTmp . processID , pTmp . subject . name ( ) )
sb := [ ] byte ( s )
out = append ( out , sb ... )
2021-09-20 04:40:34 +00:00
}
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Unlock ( )
2021-09-20 04:40:34 +00:00
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
2022-01-27 09:06:06 +00:00
event Event
2021-09-20 04:40:34 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQOpProcessStart ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-09-20 04:40:34 +00:00
}
// Handle Op Process Start
func ( m methodREQOpProcessStart ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-09-20 09:53:17 +00:00
var out [ ] byte
2021-09-20 04:40:34 +00:00
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQOpProcessStart: got <1 number methodArgs" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-23 10:52:59 +00:00
return
}
2021-09-20 04:40:34 +00:00
m := message . MethodArgs [ 0 ]
method := Method ( m )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStart: no such request type defined: %v" + m )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 04:40:34 +00:00
return
}
// Create the process and start it.
sub := newSubject ( method , proc . configuration . NodeName )
2022-04-01 05:09:55 +00:00
procNew := newProcess ( proc . ctx , proc . server , sub , processKindSubscriber , nil )
go procNew . spawnWorker ( )
2021-09-20 04:40:34 +00:00
2021-09-20 09:53:17 +00:00
txt := fmt . Sprintf ( "info: OpProcessStart: started id: %v, subject: %v: node: %v" , procNew . processID , sub , message . ToNode )
er := fmt . Errorf ( txt )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 04:40:34 +00:00
2021-09-20 09:53:17 +00:00
out = [ ] byte ( txt + "\n" )
2021-09-20 04:40:34 +00:00
newReplyMessage ( proc , message , out )
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 09:53:17 +00:00
// --- OpProcessStop
type methodREQOpProcessStop struct {
2022-01-27 09:06:06 +00:00
event Event
2021-09-20 09:53:17 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQOpProcessStop ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-09-20 09:53:17 +00:00
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func ( m methodREQOpProcessStop ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
var out [ ] byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
// --- Parse and check the method arguments given.
2021-09-21 05:10:41 +00:00
// The Reason for also having the node as one of the arguments is
// that publisher processes are named by the node they are sending the
// message to. Subscriber processes names are named by the node name
// they are running on.
2021-09-23 10:52:59 +00:00
2021-11-16 18:39:42 +00:00
if v := len ( message . MethodArgs ) ; v != 3 {
2022-01-21 07:35:08 +00:00
er := fmt . Errorf ( "error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-23 10:52:59 +00:00
}
2021-09-20 09:53:17 +00:00
methodString := message . MethodArgs [ 0 ]
2021-09-21 05:10:41 +00:00
node := message . MethodArgs [ 1 ]
kind := message . MethodArgs [ 2 ]
2021-09-20 09:53:17 +00:00
method := Method ( methodString )
tmpH := mt . getHandler ( Method ( method ) )
if tmpH == nil {
er := fmt . Errorf ( "error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 09:53:17 +00:00
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject ( method , string ( node ) )
processName := processNameGet ( sub . name ( ) , processKind ( kind ) )
// Remove the process from the processes active map if found.
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Lock ( )
2021-11-16 18:07:24 +00:00
toStopProc , ok := proc . processes . active . procNames [ processName ]
2021-10-08 10:07:10 +00:00
2021-09-20 09:53:17 +00:00
if ok {
// Delete the process from the processes map
2021-11-16 09:21:44 +00:00
delete ( proc . processes . active . procNames , processName )
2021-09-20 09:53:17 +00:00
// Stop started go routines that belong to the process.
toStopProc . ctxCancel ( )
// Stop subscribing for messages on the process's subject.
err := toStopProc . natsSubscription . Unsubscribe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 09:53:17 +00:00
}
// Remove the prometheus label
2022-04-01 06:51:14 +00:00
proc . metrics . promProcessesAllRunning . Delete ( prometheus . Labels { "processName" : string ( processName ) } )
2021-09-20 09:53:17 +00:00
2021-11-16 18:39:42 +00:00
txt := fmt . Sprintf ( "info: OpProcessStop: process stopped id: %v, method: %v on: %v" , toStopProc . processID , sub , message . ToNode )
2021-09-20 09:53:17 +00:00
er := fmt . Errorf ( txt )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 09:53:17 +00:00
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
} else {
txt := fmt . Sprintf ( "error: OpProcessStop: did not find process to stop: %v on %v" , sub , message . ToNode )
er := fmt . Errorf ( txt )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-20 09:53:17 +00:00
out = [ ] byte ( txt + "\n" )
newReplyMessage ( proc , message , out )
}
2021-11-16 09:21:44 +00:00
proc . processes . active . mu . Unlock ( )
2021-09-20 09:53:17 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-09-20 04:40:34 +00:00
// ----
2021-03-31 11:29:55 +00:00
2021-04-13 13:54:04 +00:00
type methodREQToFileAppend struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-01 16:08:40 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQToFileAppend ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-01 16:08:40 +00:00
}
2021-02-11 14:39:19 +00:00
2021-08-16 11:01:12 +00:00
// Handle appending data to file.
2021-04-13 13:54:04 +00:00
func ( m methodREQToFileAppend ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-02 20:15:52 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-03-02 12:46:02 +00:00
2021-04-02 20:15:52 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v" , folderTree , proc . subject , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-02 20:15:52 +00:00
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-04-02 20:15:52 +00:00
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-09 12:41:31 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-03-02 12:46:02 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFileAppend.handler: failed to open file: %v, %v" , file , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-03-02 12:46:02 +00:00
return nil , err
}
defer f . Close ( )
2022-01-31 07:49:46 +00:00
_ , err = f . Write ( message . Data )
f . Sync ( )
if err != nil {
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file : %v, %v" , file , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-02-11 14:39:19 +00:00
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-11 14:39:19 +00:00
}
2021-02-18 13:27:53 +00:00
// -----
2021-04-13 15:15:13 +00:00
type methodREQToFile struct {
2022-01-27 09:06:06 +00:00
event Event
2021-04-06 12:05:47 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQToFile ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-04-06 12:05:47 +00:00
}
2021-08-16 11:01:12 +00:00
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
2021-04-13 15:15:13 +00:00
func ( m methodREQToFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-04-06 12:05:47 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 12:05:47 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , folderTree , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-06-09 07:40:35 +00:00
return nil , er
2021-04-06 12:05:47 +00:00
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-04-06 12:05:47 +00:00
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-04-06 17:42:03 +00:00
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
2021-04-06 12:05:47 +00:00
if err != nil {
2021-11-17 12:02:48 +00:00
er := fmt . Errorf ( "error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-11-17 12:02:48 +00:00
return nil , err
}
defer f . Close ( )
2022-01-31 07:49:46 +00:00
_ , err = f . Write ( message . Data )
f . Sync ( )
if err != nil {
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file: file: %v, %v" , file , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-17 12:02:48 +00:00
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
type methodREQCopyFileFrom struct {
2022-01-27 09:06:06 +00:00
event Event
2021-11-17 12:02:48 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQCopyFileFrom ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-11-17 12:02:48 +00:00
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func ( m methodREQCopyFileFrom ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 3 :
2022-01-21 07:35:08 +00:00
er := fmt . Errorf ( "error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-11-17 12:02:48 +00:00
return
}
SrcFilePath := message . MethodArgs [ 0 ]
DstNode := message . MethodArgs [ 1 ]
DstFilePath := message . MethodArgs [ 2 ]
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2021-11-17 13:07:35 +00:00
defer cancel ( )
2021-11-17 12:02:48 +00:00
outCh := make ( chan [ ] byte )
errCh := make ( chan error )
// Read the file, and put the result on the out channel to be sent when done reading.
proc . processes . wg . Add ( 1 )
2021-11-22 08:26:56 +00:00
go copyFileFrom ( ctx , & proc . processes . wg , SrcFilePath , errCh , outCh )
2021-11-17 12:02:48 +00:00
// Wait here until we got the data to send, then create a new message
// and send it.
// Also checking the ctx.Done which calls Cancel will allow us to
// kill all started go routines started by this message.
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQCopyFile: got <-ctx.Done(): %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-17 12:02:48 +00:00
return
case er := <- errCh :
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-17 12:02:48 +00:00
return
case out := <- outCh :
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
// Prepare for sending a new message with the output
// Copy the original message to get the defaults for timeouts etc,
// and set new values for fields to change.
msg := message
msg . ToNode = Node ( DstNode )
2021-11-18 08:34:16 +00:00
//msg.Method = REQToFile
msg . Method = REQCopyFileTo
2022-01-31 07:49:46 +00:00
msg . Data = out
2021-11-17 12:02:48 +00:00
msg . Directory = dstDir
msg . FileName = dstFile
// Create SAM and put the message on the send new message channel.
sam , err := newSubjectAndMessage ( msg )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-17 12:02:48 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2021-11-17 13:07:35 +00:00
replyData := fmt . Sprintf ( "info: succesfully read the file %v, and sent the content to %v\n" , SrcFilePath , DstNode )
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2021-11-17 12:02:48 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-01-03 09:40:27 +00:00
// copyFileFrom will read a file to be copied from the specified SrcFilePath.
// The result of be delivered on the provided outCh.
2021-11-22 08:26:56 +00:00
func copyFileFrom ( ctx context . Context , wg * sync . WaitGroup , SrcFilePath string , errCh chan error , outCh chan [ ] byte ) {
defer wg . Done ( )
const natsMaxMsgSize = 1000000
fi , err := os . Stat ( SrcFilePath )
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os . IsNotExist ( err ) :
errCh <- fmt . Errorf ( "error: methodREQCopyFile: src file not found: %v" , SrcFilePath )
return
case fi . Size ( ) > natsMaxMsgSize :
errCh <- fmt . Errorf ( "error: methodREQCopyFile: src file to big. max size: %v" , natsMaxMsgSize )
return
}
fh , err := os . Open ( SrcFilePath )
if err != nil {
errCh <- fmt . Errorf ( "error: methodREQCopyFile: failed to open file: %v, %v" , SrcFilePath , err )
return
}
b , err := io . ReadAll ( fh )
if err != nil {
errCh <- fmt . Errorf ( "error: methodREQCopyFile: failed to read file: %v, %v" , SrcFilePath , err )
return
}
select {
case outCh <- b :
2022-01-26 14:35:31 +00:00
// fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
2021-11-22 08:26:56 +00:00
case <- ctx . Done ( ) :
return
}
}
2021-11-17 12:02:48 +00:00
// ----
type methodREQCopyFileTo struct {
2022-01-27 09:06:06 +00:00
event Event
2021-11-17 12:02:48 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQCopyFileTo ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-11-17 12:02:48 +00:00
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
2021-11-18 08:34:16 +00:00
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
2021-11-17 12:02:48 +00:00
func ( m methodREQCopyFileTo ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-11-18 08:34:16 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-17 12:02:48 +00:00
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2021-11-18 08:34:16 +00:00
defer cancel ( )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
// Put data that should be the result of the action done in the inner
// go routine on the outCh.
outCh := make ( chan [ ] byte )
// Put errors from the inner go routine on the errCh.
errCh := make ( chan error )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-17 12:02:48 +00:00
2021-11-18 08:34:16 +00:00
// ---
2021-11-22 16:09:20 +00:00
switch {
case len ( message . MethodArgs ) < 3 :
2022-01-21 07:35:08 +00:00
er := fmt . Errorf ( "error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-11-22 16:09:20 +00:00
return
}
2021-04-06 12:05:47 +00:00
2021-11-22 16:09:20 +00:00
// Pick up the values for the directory and filename for where
// to store the file.
DstFilePath := message . MethodArgs [ 2 ]
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
fileRealPath := path . Join ( dstDir , dstFile )
2021-11-18 08:34:16 +00:00
// Check if folder structure exist, if not create it.
2021-11-22 16:09:20 +00:00
if _ , err := os . Stat ( dstDir ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( dstDir , 0700 )
2021-11-18 08:34:16 +00:00
if err != nil {
2021-11-22 16:09:20 +00:00
er := fmt . Errorf ( "failed to create toFile directory tree: subject:%v, folderTree: %v, %v" , proc . subject , dstDir , err )
2021-11-18 08:34:16 +00:00
errCh <- er
return
}
2022-02-18 08:51:11 +00:00
{
er := fmt . Errorf ( "info: MethodREQCopyFileTo: Creating folders %v" , dstDir )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-02-18 08:51:11 +00:00
}
2021-11-18 08:34:16 +00:00
}
// Open file and write data. Truncate and overwrite any existing files.
2021-11-22 16:09:20 +00:00
file := filepath . Join ( dstDir , dstFile )
2021-11-18 08:34:16 +00:00
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
er := fmt . Errorf ( "failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
errCh <- er
return
}
defer f . Close ( )
2022-01-31 07:49:46 +00:00
_ , err = f . Write ( message . Data )
f . Sync ( )
if err != nil {
er := fmt . Errorf ( "failed to write to file: file: %v, error: %v" , file , err )
errCh <- er
2021-11-18 08:34:16 +00:00
}
// All went ok, send a signal to the outer select statement.
outCh <- [ ] byte ( fileRealPath )
// ---
} ( )
// Wait for messages received from the inner go routine.
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQCopyFileTo: got <-ctx.Done(): %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-18 08:34:16 +00:00
return
case err := <- errCh :
er := fmt . Errorf ( "error: methodREQCopyFileTo: %v" , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-18 08:34:16 +00:00
return
case out := <- outCh :
replyData := fmt . Sprintf ( "info: succesfully created and wrote the file %v\n" , out )
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
return
2021-04-06 12:05:47 +00:00
}
2021-11-18 08:34:16 +00:00
} ( )
2021-04-06 12:05:47 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-04-06 03:46:07 +00:00
type methodREQHello struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-01 16:08:40 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQHello ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-01 16:08:40 +00:00
}
2021-02-18 13:27:53 +00:00
2021-08-16 11:01:12 +00:00
// Handler for receiving hello messages.
2021-04-06 03:46:07 +00:00
func ( m methodREQHello ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-27 10:27:38 +00:00
data := fmt . Sprintf ( "%v, Received hello from %#v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-03-09 03:55:51 +00:00
2021-08-24 12:05:44 +00:00
fileName := message . FileName
2021-09-08 02:11:35 +00:00
folderTree := filepath . Join ( proc . configuration . SubscribersDataFolder , message . Directory , string ( message . FromNode ) )
2021-08-10 08:53:15 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-08-10 08:53:15 +00:00
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2022-01-04 06:50:14 +00:00
//f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
f , err := os . OpenFile ( file , os . O_TRUNC | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-08-10 08:53:15 +00:00
if err != nil {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: methodREQHello.handler: failed to open file: %v" , err )
return nil , er
2021-08-10 08:53:15 +00:00
}
defer f . Close ( )
_ , err = f . Write ( [ ] byte ( data ) )
f . Sync ( )
if err != nil {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file: %v" , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-10 08:53:15 +00:00
}
// --------------------------
2021-03-04 15:27:55 +00:00
// send the message to the procFuncCh which is running alongside the process
// and can hold registries and handle special things for an individual process.
proc . procFuncCh <- message
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-18 13:27:53 +00:00
}
2021-02-24 14:43:31 +00:00
// ---
2021-04-06 05:56:49 +00:00
type methodREQErrorLog struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-01 16:08:40 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQErrorLog ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-01 16:08:40 +00:00
}
2021-02-24 14:43:31 +00:00
2021-08-16 11:01:12 +00:00
// Handle the writing of error logs.
2021-04-06 05:56:49 +00:00
func ( m methodREQErrorLog ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-04-01 06:51:14 +00:00
proc . metrics . promErrorMessagesReceivedTotal . Inc ( )
2021-04-06 05:56:49 +00:00
2021-04-06 07:06:26 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-04-06 07:06:26 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-05-20 10:27:25 +00:00
return nil , fmt . Errorf ( "error: failed to create errorLog directory tree %v: %v" , folderTree , err )
2021-04-06 07:06:26 +00:00
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-04-06 07:06:26 +00:00
}
// Open file and write data.
file := filepath . Join ( folderTree , fileName )
2021-08-12 10:27:47 +00:00
f , err := os . OpenFile ( file , os . O_APPEND | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
2021-04-06 07:06:26 +00:00
if err != nil {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: methodREQErrorLog.handler: failed to open file: %v" , err )
return nil , er
2021-04-06 07:06:26 +00:00
}
defer f . Close ( )
2021-04-06 05:56:49 +00:00
2022-01-31 07:49:46 +00:00
_ , err = f . Write ( message . Data )
f . Sync ( )
if err != nil {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: methodEventTextLogging.handler: failed to write to file: %v" , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 07:06:26 +00:00
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
2021-02-24 14:43:31 +00:00
}
2021-03-11 05:34:36 +00:00
// ---
2021-04-06 04:08:26 +00:00
type methodREQPing struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-11 05:34:36 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQPing ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-11 05:34:36 +00:00
}
2021-08-16 11:01:12 +00:00
// Handle receving a ping.
2021-04-06 04:08:26 +00:00
func ( m methodREQPing ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a ping
2021-03-11 05:34:36 +00:00
2021-08-26 06:40:33 +00:00
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to create toFile directory tree: %v, %v" , folderTree , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-26 06:40:33 +00:00
return nil , er
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-08-26 06:40:33 +00:00
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-08-26 06:40:33 +00:00
return nil , err
}
defer f . Close ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, ping received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . FromNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-26 06:40:33 +00:00
}
2021-08-26 05:01:55 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 05:31:50 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-26 05:01:55 +00:00
2021-08-26 06:40:33 +00:00
newReplyMessage ( proc , message , nil )
2021-04-06 05:31:50 +00:00
} ( )
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-06 04:08:26 +00:00
type methodREQPong struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-11 05:34:36 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQPong ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-11 05:34:36 +00:00
}
2021-08-16 11:01:12 +00:00
// Handle receiving a pong.
2021-04-06 04:08:26 +00:00
func ( m methodREQPong ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-08-26 06:40:33 +00:00
// Write to file that we received a pong
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
2021-09-07 04:24:21 +00:00
fileName , folderTree := selectFileNaming ( message , proc )
2021-08-26 06:40:33 +00:00
// Check if folder structure exist, if not create it.
if _ , err := os . Stat ( folderTree ) ; os . IsNotExist ( err ) {
err := os . MkdirAll ( folderTree , 0700 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to create toFile directory tree %v: %v" , folderTree , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-26 06:40:33 +00:00
return nil , er
}
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "info: Creating subscribers data folder at %v" , folderTree )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2021-08-26 06:40:33 +00:00
}
// Open file.
file := filepath . Join ( folderTree , fileName )
f , err := os . OpenFile ( file , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0755 )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-08-26 06:40:33 +00:00
return nil , err
}
defer f . Close ( )
// And write the data
2021-08-27 10:27:38 +00:00
d := fmt . Sprintf ( "%v, pong received from %v\n" , time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) , message . PreviousMessage . ToNode )
2021-08-26 06:40:33 +00:00
_ , err = f . Write ( [ ] byte ( d ) )
f . Sync ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v" , message . Directory , message . FileName , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-08-26 06:40:33 +00:00
}
2021-03-11 05:34:36 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-03-11 11:07:09 +00:00
2021-03-11 11:51:16 +00:00
// ---
2021-03-11 11:07:09 +00:00
2021-04-04 09:19:17 +00:00
type methodREQCliCommand struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-11 11:07:09 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQCliCommand ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-11 11:07:09 +00:00
}
2021-03-29 04:53:34 +00:00
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
2021-04-04 09:19:17 +00:00
func ( m methodREQCliCommand ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-18 08:51:11 +00:00
inf := fmt . Errorf ( "<--- CLICommandREQUEST received from: %v, containing: %v" , message . FromNode , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2021-03-11 11:07:09 +00:00
2021-03-29 04:53:34 +00:00
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
var a [ ] string
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <1 number methodArgs" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-09-23 10:52:59 +00:00
return
case len ( message . MethodArgs ) >= 0 :
a = message . MethodArgs [ 1 : ]
}
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
2021-03-11 11:07:09 +00:00
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2021-03-26 15:04:01 +00:00
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-03-26 15:04:01 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-22 14:58:23 +00:00
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i , v := range message . MethodArgs {
if strings . Contains ( v , "{{STEWARD_DATA}}" ) {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message . MethodArgs [ i ] = strings . Replace ( message . MethodArgs [ i ] , "{{STEWARD_DATA}}" , "$STEWARD_DATA" , - 1 )
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
2022-01-31 07:49:46 +00:00
envData = string ( message . Data )
2021-09-22 14:58:23 +00:00
}
}
2021-03-26 15:04:01 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
2021-09-08 03:50:45 +00:00
2021-09-22 13:25:40 +00:00
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt . Sprintf ( "STEWARD_DATA=%v" , envData )
cmd . Env = append ( cmd . Env , envData )
}
2021-09-08 03:50:45 +00:00
var out bytes . Buffer
var stderr bytes . Buffer
cmd . Stdout = & out
cmd . Stderr = & stderr
2021-11-10 15:10:51 +00:00
2021-09-08 03:50:45 +00:00
err := cmd . Run ( )
2021-03-26 15:04:01 +00:00
if err != nil {
2021-11-10 15:10:51 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v" , err , message . MethodArgs , stderr . String ( ) )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-03-26 15:04:01 +00:00
}
2022-01-26 14:35:31 +00:00
2021-08-12 10:27:47 +00:00
select {
2021-09-08 03:50:45 +00:00
case outCh <- out . Bytes ( ) :
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-03-26 15:04:01 +00:00
} ( )
select {
case <- ctx . Done ( ) :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommand: method timed out: %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-03-26 15:04:01 +00:00
case out := <- outCh :
2021-03-29 04:53:34 +00:00
cancel ( )
2021-03-26 15:04:01 +00:00
2021-09-22 14:58:23 +00:00
// NB: Not quite sure what is the best way to handle the below
// isReply right now. Implementing as send to central for now.
//
2021-09-22 14:08:55 +00:00
// If this is this a reply message swap the toNode and fromNode
// fields so the output of the command are sent to central node.
if message . IsReply {
message . ToNode , message . FromNode = message . FromNode , message . ToNode
}
2021-03-31 11:29:55 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-03-26 15:04:01 +00:00
}
2021-03-12 11:08:11 +00:00
2021-03-26 15:04:01 +00:00
} ( )
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-13 15:22:25 +00:00
type methodREQToConsole struct {
2022-01-27 09:06:06 +00:00
event Event
2021-03-11 11:07:09 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQToConsole ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-03-11 11:07:09 +00:00
}
2021-08-16 11:01:12 +00:00
// Handler to write directly to console.
2022-01-17 14:13:14 +00:00
// This handler handles the writing to console both for TUI and shell clients.
2021-04-13 15:22:25 +00:00
func ( m methodREQToConsole ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-03-11 11:07:09 +00:00
2022-01-17 14:13:14 +00:00
switch {
case proc . configuration . EnableTUI :
if proc . processes . tui . toConsoleCh != nil {
proc . processes . tui . toConsoleCh <- message . Data
} else {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: no tui client started" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-01-17 14:13:14 +00:00
}
default :
2022-01-31 07:49:46 +00:00
fmt . Fprintf ( os . Stdout , "%v" , string ( message . Data ) )
2022-01-17 14:13:14 +00:00
fmt . Println ( )
}
2021-11-11 12:43:32 +00:00
2021-03-11 11:07:09 +00:00
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-06 17:42:03 +00:00
// ---
2022-01-12 06:42:41 +00:00
type methodREQTuiToConsole struct {
2022-01-27 09:06:06 +00:00
event Event
2022-01-12 06:42:41 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQTuiToConsole ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2022-01-12 06:42:41 +00:00
}
// Handler to write directly to console.
2022-01-17 14:13:14 +00:00
// DEPRECATED
2022-01-12 06:42:41 +00:00
func ( m methodREQTuiToConsole ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
if proc . processes . tui . toConsoleCh != nil {
proc . processes . tui . toConsoleCh <- message . Data
} else {
2022-02-18 08:51:11 +00:00
er := fmt . Errorf ( "error: no tui client started" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-01-12 06:42:41 +00:00
}
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
2021-04-06 17:42:03 +00:00
type methodREQHttpGet struct {
2022-01-27 09:06:06 +00:00
event Event
2021-04-06 17:42:03 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQHttpGet ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-04-06 17:42:03 +00:00
}
2021-08-16 11:01:12 +00:00
// handler to do a Http Get.
2021-04-06 17:42:03 +00:00
func ( m methodREQHttpGet ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-18 08:51:11 +00:00
inf := fmt . Errorf ( "<--- REQHttpGet received from: %v, containing: %v" , message . FromNode , message . Data )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2021-04-06 17:42:03 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQHttpGet: got <1 number methodArgs" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-09-23 10:52:59 +00:00
return
}
2021-09-16 09:51:34 +00:00
url := message . MethodArgs [ 0 ]
2021-04-06 17:42:03 +00:00
client := http . Client {
2022-01-28 07:36:09 +00:00
Timeout : time . Second * time . Duration ( message . MethodTimeout ) ,
2021-04-06 17:42:03 +00:00
}
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2021-04-06 17:42:03 +00:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 17:42:03 +00:00
cancel ( )
return
}
outCh := make ( chan [ ] byte )
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-06 17:42:03 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-04-06 17:42:03 +00:00
resp , err := client . Do ( req )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, bailing out: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 17:42:03 +00:00
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
2022-02-11 06:27:51 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, were %#v, bailing out: %v" , resp . StatusCode , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 17:42:03 +00:00
return
}
2021-08-24 07:25:43 +00:00
body , err := io . ReadAll ( resp . Body )
2021-04-06 17:42:03 +00:00
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 17:42:03 +00:00
}
2021-08-24 07:25:43 +00:00
out := body
2021-08-12 10:27:47 +00:00
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
}
2021-04-06 17:42:03 +00:00
} ( )
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQHttpGet: method timed out: %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-06 17:42:03 +00:00
case out := <- outCh :
cancel ( )
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-06 17:42:03 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-13 09:28:52 +00:00
2022-02-11 06:27:51 +00:00
// ---
type methodREQHttpGetScheduled struct {
event Event
}
func ( m methodREQHttpGetScheduled ) getKind ( ) Event {
return m . event
}
// handler to do a Http Get Scheduled.
// The second element of the MethodArgs slice holds the timer defined in seconds.
func ( m methodREQHttpGetScheduled ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-18 08:51:11 +00:00
inf := fmt . Errorf ( "<--- REQHttpGetScheduled received from: %v, containing: %v" , message . FromNode , message . Data )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2022-02-11 06:27:51 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
// --- Check and prepare the methodArgs
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2022-02-11 06:27:51 +00:00
return
}
url := message . MethodArgs [ 0 ]
scheduleInterval , err := strconv . Atoi ( message . MethodArgs [ 1 ] )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
return
}
2022-02-23 11:32:21 +00:00
schedulerTotalTime , err := strconv . Atoi ( message . MethodArgs [ 2 ] )
2022-02-11 06:27:51 +00:00
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
return
}
// --- Prepare and start the scheduler.
outCh := make ( chan [ ] byte )
ticker := time . NewTicker ( time . Second * time . Duration ( scheduleInterval ) )
// Prepare a context that will be for the schedule as a whole.
// NB: Individual http get's will create their own context's
// derived from this one.
2022-02-23 11:32:21 +00:00
ctxScheduler , cancel := context . WithTimeout ( proc . ctx , time . Minute * time . Duration ( schedulerTotalTime ) )
2022-02-11 06:27:51 +00:00
go func ( ) {
// Prepare the http request.
client := http . Client {
Timeout : time . Second * time . Duration ( message . MethodTimeout ) ,
}
for {
select {
case <- ticker . C :
proc . processes . wg . Add ( 1 )
// Get a context with the timeout specified in message.MethodTimeout
// for the individual http requests.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, error: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
cancel ( )
return
}
// Run each individual http get in it's own go routine, and
// deliver the result on the outCh.
go func ( ) {
defer proc . processes . wg . Done ( )
resp , err := client . Do ( req )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, error: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, were %#v, error: %v" , resp . StatusCode , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
return
}
body , err := io . ReadAll ( resp . Body )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
}
out := body
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
case <- ctxScheduler . Done ( ) :
// If the scheduler context is done then we also want to kill
// all running http request.
cancel ( )
return
}
} ( )
case <- ctxScheduler . Done ( ) :
cancel ( )
return
}
}
} ( )
for {
select {
case <- ctxScheduler . Done ( ) :
2022-02-11 08:04:14 +00:00
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
2022-02-11 06:27:51 +00:00
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: schedule context timed out: %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-11 06:27:51 +00:00
return
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-04-13 09:28:52 +00:00
// --- methodREQTailFile
type methodREQTailFile struct {
2022-01-27 09:06:06 +00:00
event Event
2021-04-13 09:28:52 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQTailFile ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-04-13 09:28:52 +00:00
}
// handler to run a tailing of files with timeout context. The handler will
// return the output of the command run back to the calling publisher
// as a new message.
func ( m methodREQTailFile ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-18 08:51:11 +00:00
inf := fmt . Errorf ( "<--- TailFile REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2021-04-13 09:28:52 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-09-23 10:52:59 +00:00
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQTailFile: got <1 number methodArgs" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-09-23 10:52:59 +00:00
return
}
2021-09-16 09:51:34 +00:00
fp := message . MethodArgs [ 0 ]
2021-04-13 09:28:52 +00:00
2022-01-26 14:35:31 +00:00
// var ctx context.Context
// var cancel context.CancelFunc
2021-04-16 21:58:43 +00:00
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
// Note: Replacing the 0 timeout with specific timeout.
// if message.MethodTimeout != 0 {
// ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// } else {
// ctx, cancel = context.WithCancel(proc.ctx)
// }
2021-04-13 09:28:52 +00:00
outCh := make ( chan [ ] byte )
2021-04-16 21:58:43 +00:00
t , err := tail . TailFile ( fp , tail . Config { Follow : true , Location : & tail . SeekInfo {
Offset : 0 ,
Whence : os . SEEK_END ,
} } )
2021-04-13 09:28:52 +00:00
if err != nil {
2021-06-09 07:40:35 +00:00
er := fmt . Errorf ( "error: methodREQToTailFile: tailFile: %v" , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-04-13 09:28:52 +00:00
}
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-04-13 09:28:52 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2021-08-13 10:03:30 +00:00
for {
2021-08-12 10:27:47 +00:00
select {
2021-08-13 10:03:30 +00:00
case line := <- t . Lines :
outCh <- [ ] byte ( line . Text + "\n" )
2021-08-12 10:27:47 +00:00
case <- ctx . Done ( ) :
return
}
2021-04-13 09:28:52 +00:00
}
} ( )
for {
select {
case <- ctx . Done ( ) :
cancel ( )
// Close the lines channel so we exit the reading lines
// go routine.
2021-04-16 21:58:43 +00:00
// close(t.Lines)
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: method timeout reached REQTailFile, canceling: %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . infoSend ( proc , message , er )
2021-08-13 10:03:30 +00:00
2021-04-13 09:28:52 +00:00
return
case out := <- outCh :
2021-08-12 14:39:09 +00:00
2021-04-13 09:28:52 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
2021-04-13 13:28:54 +00:00
newReplyMessage ( proc , message , out )
2021-04-13 09:28:52 +00:00
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-06-08 18:52:45 +00:00
// ---
2021-09-17 08:17:10 +00:00
type methodREQCliCommandCont struct {
2022-01-27 09:06:06 +00:00
event Event
2021-06-08 18:52:45 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQCliCommandCont ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-06-08 18:52:45 +00:00
}
2021-09-17 08:17:10 +00:00
// Handler to run REQCliCommandCont, which is the same as normal
2021-08-10 10:49:42 +00:00
// Cli command, but can be used when running a command that will take
// longer time and you want to send the output of the command continually
// back as it is generated, and not just when the command is finished.
2021-09-17 08:17:10 +00:00
func ( m methodREQCliCommandCont ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-18 08:51:11 +00:00
inf := fmt . Errorf ( "<--- CLInCommandCont REQUEST received from: %v, containing: %v" , message . FromNode , message . Data )
2022-04-01 06:43:14 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2021-06-08 18:52:45 +00:00
// Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2021-08-12 10:27:47 +00:00
defer proc . processes . wg . Done ( )
2022-01-26 14:35:31 +00:00
defer func ( ) {
2022-02-11 08:04:14 +00:00
// fmt.Printf(" * DONE *\n")
2022-01-26 14:35:31 +00:00
} ( )
2021-09-23 10:52:59 +00:00
var a [ ] string
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQCliCommand: got <1 number methodArgs" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-09-23 10:52:59 +00:00
return
case len ( message . MethodArgs ) >= 0 :
a = message . MethodArgs [ 1 : ]
}
2021-09-16 09:51:34 +00:00
c := message . MethodArgs [ 0 ]
2021-06-08 18:52:45 +00:00
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
// deadline, _ := ctx.Deadline()
// fmt.Printf(" * DEBUG * deadline : %v\n", deadline)
2021-06-08 18:52:45 +00:00
outCh := make ( chan [ ] byte )
2021-09-27 12:45:49 +00:00
errCh := make ( chan string )
2021-06-08 18:52:45 +00:00
2021-08-12 10:27:47 +00:00
proc . processes . wg . Add ( 1 )
2021-06-08 18:52:45 +00:00
go func ( ) {
2022-04-20 17:14:24 +00:00
defer proc . processes . wg . Done ( )
2021-08-12 10:27:47 +00:00
2021-06-08 18:52:45 +00:00
cmd := exec . CommandContext ( ctx , c , a ... )
// Using cmd.StdoutPipe here so we are continuosly
// able to read the out put of the command.
outReader , err := cmd . StdoutPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-06-08 18:52:45 +00:00
}
2021-09-08 03:50:45 +00:00
ErrorReader , err := cmd . StderrPipe ( )
if err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-09-08 03:50:45 +00:00
}
2021-06-08 18:52:45 +00:00
if err := cmd . Start ( ) ; err != nil {
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v" , err , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-06-08 18:52:45 +00:00
}
2021-09-08 03:50:45 +00:00
go func ( ) {
scanner := bufio . NewScanner ( ErrorReader )
for scanner . Scan ( ) {
2021-09-27 12:45:49 +00:00
errCh <- scanner . Text ( )
2021-09-08 03:50:45 +00:00
}
} ( )
2021-09-27 12:45:49 +00:00
go func ( ) {
scanner := bufio . NewScanner ( outReader )
for scanner . Scan ( ) {
outCh <- [ ] byte ( scanner . Text ( ) + "\n" )
2021-08-12 10:27:47 +00:00
}
2021-09-27 12:45:49 +00:00
} ( )
2021-08-12 10:27:47 +00:00
2021-09-27 12:45:49 +00:00
// NB: sending cancel to command context, so processes are killed.
// A github issue is filed on not killing all child processes when using pipes:
// https://github.com/golang/go/issues/23019
// TODO: Check in later if there are any progress on the issue.
// When testing the problem seems to appear when using sudo, or tcpdump without
// the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
// which makes stdout line buffered.
<- ctx . Done ( )
cancel ( )
if err := cmd . Wait ( ) ; err != nil {
2022-01-26 23:06:30 +00:00
er := fmt . Errorf ( "info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v" , message . MethodArgs , err )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-06-08 18:52:45 +00:00
}
} ( )
// Check if context timer or command output were received.
for {
select {
case <- ctx . Done ( ) :
cancel ( )
2021-09-23 06:19:53 +00:00
er := fmt . Errorf ( "info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . infoSend ( proc , message , er )
2021-06-08 18:52:45 +00:00
return
case out := <- outCh :
2022-02-11 08:04:14 +00:00
// fmt.Printf(" * out: %v\n", string(out))
2021-06-08 18:52:45 +00:00
newReplyMessage ( proc , message , out )
2021-09-27 12:45:49 +00:00
case out := <- errCh :
newReplyMessage ( proc , message , [ ] byte ( out ) )
2021-06-08 18:52:45 +00:00
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2021-07-01 08:05:34 +00:00
// ---
2021-11-19 08:35:53 +00:00
type methodREQRelayInitial struct {
2022-01-27 09:06:06 +00:00
event Event
2021-11-19 08:35:53 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQRelayInitial ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-11-19 08:35:53 +00:00
}
// Handler to relay messages via a host.
func ( m methodREQRelayInitial ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2021-11-22 15:02:40 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2021-11-19 08:35:53 +00:00
2022-01-26 14:35:31 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2021-11-22 15:02:40 +00:00
defer cancel ( )
2021-11-19 08:35:53 +00:00
2021-11-22 15:02:40 +00:00
outCh := make ( chan [ ] byte )
errCh := make ( chan error )
2021-11-23 18:49:48 +00:00
nothingCh := make ( chan struct { } , 1 )
2021-11-22 15:02:40 +00:00
var out [ ] byte
// If the actual Method for the message is REQCopyFileFrom we need to
// do the actual file reading here so we can fill the data field of the
// message with the content of the file before relaying it.
switch {
case message . RelayOriginalMethod == REQCopyFileFrom :
switch {
case len ( message . MethodArgs ) < 3 :
2022-01-21 07:35:08 +00:00
er := fmt . Errorf ( "error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2021-11-22 15:02:40 +00:00
return
}
SrcFilePath := message . MethodArgs [ 0 ]
2021-11-22 16:09:20 +00:00
//DstFilePath := message.MethodArgs[2]
2021-11-22 15:02:40 +00:00
// Read the file, and put the result on the out channel to be sent when done reading.
proc . processes . wg . Add ( 1 )
go copyFileFrom ( ctx , & proc . processes . wg , SrcFilePath , errCh , outCh )
// Since we now have read the source file we don't need the REQCopyFileFrom
// request method anymore, so we change the original method of the message
// so it will write the data after the relaying.
2021-11-22 16:09:20 +00:00
//dstDir := filepath.Dir(DstFilePath)
//dstFile := filepath.Base(DstFilePath)
2021-11-22 15:02:40 +00:00
message . RelayOriginalMethod = REQCopyFileTo
2021-11-22 16:09:20 +00:00
//message.FileName = dstFile
//message.Directory = dstDir
2021-11-22 15:02:40 +00:00
default :
2021-11-23 15:06:06 +00:00
// No request type that need special handling if relayed, so we should signal that
// there is nothing to do for the select below.
// We need to do this signaling in it's own go routine here, so we don't block here
// since the select below is in the same function.
go func ( ) {
2021-11-23 18:49:48 +00:00
nothingCh <- struct { } { }
2021-11-23 15:06:06 +00:00
} ( )
2021-11-22 15:02:40 +00:00
}
select {
case <- ctx . Done ( ) :
er := fmt . Errorf ( "error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v" , message . MethodArgs )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-22 15:02:40 +00:00
return
case er := <- errCh :
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-22 15:02:40 +00:00
return
2021-11-23 18:49:48 +00:00
case <- nothingCh :
// Do nothing.
2021-11-22 15:02:40 +00:00
case out = <- outCh :
}
2021-12-06 13:57:02 +00:00
// relay the message to the actual host here by prefixing the the RelayToNode
// to the subject.
relayTo := fmt . Sprintf ( "%v.%v" , message . RelayToNode , message . RelayOriginalViaNode )
// message.ToNode = message.RelayOriginalViaNode
message . ToNode = Node ( relayTo )
2021-11-22 15:02:40 +00:00
message . FromNode = Node ( node )
message . Method = REQRelay
2022-01-31 07:49:46 +00:00
message . Data = out
2021-11-22 15:02:40 +00:00
sam , err := newSubjectAndMessage ( message )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2021-11-22 15:02:40 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
} ( )
2021-11-19 08:35:53 +00:00
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed REQRelay from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-10 05:22:03 +00:00
type methodREQRelay struct {
2022-01-27 09:06:06 +00:00
event Event
2021-11-10 05:22:03 +00:00
}
2022-01-27 06:19:04 +00:00
func ( m methodREQRelay ) getKind ( ) Event {
2022-01-27 09:06:06 +00:00
return m . event
2021-11-10 05:22:03 +00:00
}
// Handler to relay messages via a host.
func ( m methodREQRelay ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// relay the message here to the actual host here.
2022-02-08 04:27:39 +00:00
proc . processes . wg . Add ( 1 )
2022-02-08 04:17:38 +00:00
go func ( ) {
2022-02-08 04:27:39 +00:00
defer proc . processes . wg . Done ( )
2022-02-08 04:17:38 +00:00
message . ToNode = message . RelayToNode
message . FromNode = Node ( node )
message . Method = message . RelayOriginalMethod
2021-11-10 11:58:23 +00:00
2022-02-08 04:17:38 +00:00
sam , err := newSubjectAndMessage ( message )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
2022-04-01 06:43:14 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-02-18 08:51:11 +00:00
2022-02-08 04:17:38 +00:00
return
}
2021-11-10 11:58:23 +00:00
2022-02-08 04:27:39 +00:00
select {
case proc . toRingbufferCh <- [ ] subjectAndMessage { sam } :
case <- proc . ctx . Done ( ) :
}
2022-02-08 04:17:38 +00:00
} ( )
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
type methodREQPublicKey struct {
event Event
}
func ( m methodREQPublicKey ) getKind ( ) Event {
return m . event
}
// Handler to get the public ed25519 key from a node.
func ( m methodREQPublicKey ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-02-08 10:49:32 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
2022-04-20 17:18:16 +00:00
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
2022-02-08 10:49:32 +00:00
select {
case <- ctx . Done ( ) :
2022-04-21 11:21:36 +00:00
case outCh <- proc . nodeAuth . SignPublicKey :
2022-02-08 10:49:32 +00:00
}
} ( )
select {
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <- ctx . Done ( ) :
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
} ( )
2021-11-10 10:21:38 +00:00
2021-11-10 05:22:03 +00:00
// Send back an ACK message.
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-18 08:34:16 +00:00
2022-04-07 07:34:06 +00:00
type methodREQPublicKeysGet struct {
event Event
}
func ( m methodREQPublicKeysGet ) getKind ( ) Event {
return m . event
}
// Handler to get all the public ed25519 keys from a central server.
func ( m methodREQPublicKeysGet ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// Get a context with the timeout specified in message.MethodTimeout.
// TODO:
// - Since this is implemented as a NACK message we could implement a
// metric thats shows the last time a node did a key request.
// - We could also implement a metrics on the receiver showing the last
// time a node had done an update.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
2022-04-20 17:18:16 +00:00
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
2022-04-20 16:33:52 +00:00
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
2022-04-07 07:34:06 +00:00
select {
case <- ctx . Done ( ) :
// TODO: Should we receive a hash of he current keys from the node here
// to verify if we need to update or not ?
case outCh <- [ ] byte { } :
}
} ( )
select {
case <- ctx . Done ( ) :
// case out := <-outCh:
case <- outCh :
2022-05-16 09:20:39 +00:00
// Using a func here to set the scope of the lock, and then be able to
// defer the unlock when leaving that scope.
func ( ) {
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
// TODO: We should probably create a hash of the current map content,
// store it alongside the KeyMap, and send both the KeyMap and hash
// back. We can then later send that hash when asking for keys, compare
// it with the current one for the KeyMap, and know if we need to send
// and update back to the node who published the request to here.
fmt . Printf ( " <---- methodREQPublicKeysGet: received hash from NODE=%v, HASH=%v\n" , message . FromNode , message . Data )
// Check if the received hash is the same as the one currently active,
if bytes . Equal ( proc . centralAuth . pki . nodesAcked . keysAndHash . Hash [ : ] , message . Data ) {
fmt . Printf ( "\n ------------ NODE AND CENTRAL ARE EQUAL, NOTHING TO DO, EXITING HANDLER\n\n" )
return
}
2022-05-16 05:15:38 +00:00
2022-05-16 09:20:39 +00:00
fmt . Printf ( "\n ------------ NODE AND CENTRAL WERE NOT EQUAL, PREPARING TO SEND NEW VERSION OF KEYS\n\n" )
2022-05-16 05:15:38 +00:00
2022-05-16 09:20:39 +00:00
fmt . Printf ( " * methodREQPublicKeysGet: marshalling new keys and hash to send: map=%v, hash=%v\n\n" , proc . centralAuth . pki . nodesAcked . keysAndHash . Keys , proc . centralAuth . pki . nodesAcked . keysAndHash . Hash )
2022-05-16 05:15:38 +00:00
2022-05-16 09:20:39 +00:00
b , err := json . Marshal ( proc . centralAuth . pki . nodesAcked . keysAndHash )
2022-05-12 09:32:12 +00:00
2022-05-16 09:20:39 +00:00
if err != nil {
er := fmt . Errorf ( "error: REQPublicKeysGet, failed to marshal keys map: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
}
fmt . Printf ( "\n ----> methodREQPublicKeysGet: SENDING KEYS TO NODE=%v\n" , message . FromNode )
newReplyMessage ( proc , message , b )
} ( )
2022-04-07 07:34:06 +00:00
}
} ( )
// NB: We're not sending an ACK message for this request type.
return nil , nil
}
// ----
2022-04-20 04:26:01 +00:00
type methodREQPublicKeysToNode struct {
2022-04-07 07:34:06 +00:00
event Event
}
2022-04-20 04:26:01 +00:00
func ( m methodREQPublicKeysToNode ) getKind ( ) Event {
2022-04-07 07:34:06 +00:00
return m . event
}
// Handler to put the public key replies received from a central server.
2022-04-20 04:26:01 +00:00
func ( m methodREQPublicKeysToNode ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-04-07 07:34:06 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
// TODO:
// - Since this is implemented as a NACK message we could implement a
// metric thats shows the last time keys were updated.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
2022-04-20 17:18:16 +00:00
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
2022-04-20 16:33:52 +00:00
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
2022-04-07 07:34:06 +00:00
select {
case <- ctx . Done ( ) :
2022-04-20 05:13:47 +00:00
// TODO: Should we receive a hash of he current keys from the node here ?
2022-04-07 07:34:06 +00:00
case outCh <- [ ] byte { } :
}
} ( )
select {
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <- ctx . Done ( ) :
case <- outCh :
2022-05-12 09:32:12 +00:00
2022-04-21 11:21:36 +00:00
proc . nodeAuth . publicKeys . mu . Lock ( )
2022-05-16 05:15:38 +00:00
err := json . Unmarshal ( message . Data , proc . nodeAuth . publicKeys . keysAndHash )
fmt . Printf ( "\n <---- REQPublicKeysToNode: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n" , proc . nodeAuth . publicKeys . keysAndHash )
proc . nodeAuth . publicKeys . mu . Unlock ( )
2022-04-20 16:33:52 +00:00
if err != nil {
er := fmt . Errorf ( "error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
}
2022-05-12 09:32:12 +00:00
2022-05-16 05:15:38 +00:00
// TODO TOMORROW: The hash is not sent with the requests to get public keys, and
// the reason is that the hash is not stored on the nodes ?
// Idea: We need to also persist the hash on the receiving nodes. We can then load
// that key upon startup, and send it along when we do a public keys get.
2022-04-07 07:34:06 +00:00
2022-04-21 11:21:36 +00:00
err = proc . nodeAuth . publicKeys . saveToFile ( )
if err != nil {
er := fmt . Errorf ( "error: REQPublicKeysToNode : save to file failed: %v, message: %v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
}
2022-04-07 19:43:00 +00:00
2022-04-07 07:34:06 +00:00
// Prepare and queue for sending a new message with the output
// of the action executed.
// newReplyMessage(proc, message, out)
}
} ( )
// Send back an ACK message.
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return nil , nil
}
// ----
2022-05-11 12:23:59 +00:00
// TODO: We should also add a request method methodREQPublicKeysRevoke
2022-04-20 16:33:52 +00:00
type methodREQPublicKeysAllow struct {
event Event
}
func ( m methodREQPublicKeysAllow ) getKind ( ) Event {
return m . event
}
2022-05-11 11:45:49 +00:00
// Handler to allow new public keys into the database on central auth.
// Nodes will send the public key in the REQHello messages. When they
// are recived on the central server they will be put into a temp key
// map, and we need to acknowledge them before they are moved into the
// main key map, and then allowed to be sent out to other nodes.
2022-04-20 16:33:52 +00:00
func ( m methodREQPublicKeysAllow ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
// Get a context with the timeout specified in message.MethodTimeout.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
outCh := make ( chan [ ] byte )
go func ( ) {
2022-04-20 17:18:16 +00:00
// Normally we would do some logic here, where the result is passed to outCh when done,
// so we can split up the working logic, and f.ex. sending a reply logic.
2022-04-20 16:33:52 +00:00
// In this case this go func and the below select is not needed, but keeping it so the
// structure is the same as the other handlers.
select {
case <- ctx . Done ( ) :
case outCh <- [ ] byte { } :
}
} ( )
select {
case <- ctx . Done ( ) :
case <- outCh :
2022-05-12 07:25:10 +00:00
proc . centralAuth . pki . nodeNotAckedPublicKeys . mu . Lock ( )
defer proc . centralAuth . pki . nodeNotAckedPublicKeys . mu . Unlock ( )
2022-04-20 16:33:52 +00:00
2022-05-12 04:22:11 +00:00
// Range over all the MethodArgs, where each element represents a node to allow,
// and move the node from the notAcked map to the allowed map.
2022-04-20 16:33:52 +00:00
for _ , n := range message . MethodArgs {
2022-05-12 07:25:10 +00:00
key , ok := proc . centralAuth . pki . nodeNotAckedPublicKeys . KeyMap [ Node ( n ) ]
2022-04-20 16:33:52 +00:00
if ok {
2022-05-12 04:22:11 +00:00
func ( ) {
2022-05-16 05:15:38 +00:00
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
2022-05-12 04:22:11 +00:00
// Store/update the node and public key on the allowed pubKey map.
2022-05-16 05:15:38 +00:00
proc . centralAuth . pki . nodesAcked . keysAndHash . Keys [ Node ( n ) ] = key
2022-05-12 04:22:11 +00:00
} ( )
2022-04-20 16:33:52 +00:00
// Add key to persistent storage.
2022-05-12 07:25:10 +00:00
proc . centralAuth . pki . dbUpdatePublicKey ( string ( n ) , key )
2022-04-20 16:33:52 +00:00
// Delete the key from the NotAcked map
2022-05-12 07:25:10 +00:00
delete ( proc . centralAuth . pki . nodeNotAckedPublicKeys . KeyMap , Node ( n ) )
2022-04-20 16:33:52 +00:00
er := fmt . Errorf ( "info: REQPublicKeysAllow : allowed new/updated public key for %v to allowed public key map" , n )
proc . errorKernel . infoSend ( proc , message , er )
}
}
2022-05-12 04:22:11 +00:00
// All new elements are now added, and we can create a new hash
// representing the current keys in the allowed map.
func ( ) {
2022-05-16 05:15:38 +00:00
proc . centralAuth . pki . nodesAcked . mu . Lock ( )
defer proc . centralAuth . pki . nodesAcked . mu . Unlock ( )
2022-05-12 04:22:11 +00:00
type NodesAndKeys struct {
Node Node
Key [ ] byte
}
// Create a slice of all the map keys, and its value.
sortedNodesAndKeys := [ ] NodesAndKeys { }
// Range the map, and add each k/v to the sorted slice, to be sorted later.
2022-05-16 05:15:38 +00:00
for k , v := range proc . centralAuth . pki . nodesAcked . keysAndHash . Keys {
2022-05-12 04:22:11 +00:00
nk := NodesAndKeys {
Node : k ,
Key : v ,
}
sortedNodesAndKeys = append ( sortedNodesAndKeys , nk )
}
// sort the slice based on the node name.
// Sort all the commands.
sort . SliceStable ( sortedNodesAndKeys , func ( i , j int ) bool {
return sortedNodesAndKeys [ i ] . Node < sortedNodesAndKeys [ j ] . Node
} )
// Then create a hash based on the sorted slice.
b , err := cbor . Marshal ( sortedNodesAndKeys )
if err != nil {
er := fmt . Errorf ( "error: methodREQPublicKeysAllow, failed to marshal slice, and will not update hash for public keys: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-05-16 05:15:38 +00:00
log . Printf ( " * DEBUG: %v\n" , er )
2022-05-12 04:22:11 +00:00
return
}
2022-05-16 05:15:38 +00:00
// Store the key in the key value map.
hash := sha256 . Sum256 ( b )
proc . centralAuth . pki . nodesAcked . keysAndHash . Hash = hash
// Store the key to the db for persistence.
proc . centralAuth . pki . dbUpdateHash ( hash [ : ] )
if err != nil {
er := fmt . Errorf ( "error: methodREQPublicKeysAllow, failed to store the hash into the db: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
log . Printf ( " * DEBUG: %v\n" , er )
return
}
2022-05-12 04:22:11 +00:00
} ( )
2022-04-20 16:33:52 +00:00
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ----
2021-11-18 08:34:16 +00:00
// ---- Template that can be used for creating request methods
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// defer cancel()
//
// // Put data that should be the result of the action done in the inner
// // go routine on the outCh.
// outCh := make(chan []byte)
// // Put errors from the inner go routine on the errCh.
// errCh := make(chan error)
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// // Do some work here....
//
// }()
//
// // Wait for messages received from the inner go routine.
// select {
// case <-ctx.Done():
// fmt.Printf(" ** DEBUG: got ctx.Done\n")
//
// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs)
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case er := <-errCh:
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case out := <-outCh:
// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
// newReplyMessage(proc, message, []byte(replyData))
// return
// }
//
// }()
//
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
// return ackMsg, nil
// }