mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-15 17:51:15 +00:00
split request code into separate files
This commit is contained in:
parent
9899485ddc
commit
2951689e1e
9 changed files with 2009 additions and 1970 deletions
1970
requests.go
1970
requests.go
File diff suppressed because it is too large
Load diff
255
requests_cli.go
Normal file
255
requests_cli.go
Normal file
|
@ -0,0 +1,255 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type methodREQCliCommand struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQCliCommand) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to run a CLI command with timeout context. The handler will
|
||||||
|
// return the output of the command run back to the calling publisher
|
||||||
|
// as a new message.
|
||||||
|
func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||||
|
|
||||||
|
// Execute the CLI command in it's own go routine, so we are able
|
||||||
|
// to return immediately with an ack reply that the messag was
|
||||||
|
// received, and we create a new message to send back to the calling
|
||||||
|
// node for the out put of the actual command.
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
var a []string
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 1:
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case len(message.MethodArgs) >= 0:
|
||||||
|
a = message.MethodArgs[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
c := message.MethodArgs[0]
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
// Check if {{data}} is defined in the method arguments. If found put the
|
||||||
|
// data payload there.
|
||||||
|
var foundEnvData bool
|
||||||
|
var envData string
|
||||||
|
for i, v := range message.MethodArgs {
|
||||||
|
if strings.Contains(v, "{{STEWARD_DATA}}") {
|
||||||
|
foundEnvData = true
|
||||||
|
// Replace the found env variable placeholder with an actual env variable
|
||||||
|
message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1)
|
||||||
|
|
||||||
|
// Put all the data which is a slice of string into a single
|
||||||
|
// string so we can put it in a single env variable.
|
||||||
|
envData = string(message.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, c, a...)
|
||||||
|
|
||||||
|
// Check for the use of env variable for STEWARD_DATA, and set env if found.
|
||||||
|
if foundEnvData {
|
||||||
|
envData = fmt.Sprintf("STEWARD_DATA=%v", envData)
|
||||||
|
cmd.Env = append(cmd.Env, envData)
|
||||||
|
}
|
||||||
|
|
||||||
|
var out bytes.Buffer
|
||||||
|
var stderr bytes.Buffer
|
||||||
|
cmd.Stdout = &out
|
||||||
|
cmd.Stderr = &stderr
|
||||||
|
|
||||||
|
err := cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case outCh <- out.Bytes():
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
case out := <-outCh:
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// NB: Not quite sure what is the best way to handle the below
|
||||||
|
// isReply right now. Implementing as send to central for now.
|
||||||
|
//
|
||||||
|
// If this is this a reply message swap the toNode and fromNode
|
||||||
|
// fields so the output of the command are sent to central node.
|
||||||
|
if message.IsReply {
|
||||||
|
message.ToNode, message.FromNode = message.FromNode, message.ToNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQCliCommandCont struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQCliCommandCont) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to run REQCliCommandCont, which is the same as normal
|
||||||
|
// Cli command, but can be used when running a command that will take
|
||||||
|
// longer time and you want to send the output of the command continually
|
||||||
|
// back as it is generated, and not just when the command is finished.
|
||||||
|
func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||||
|
|
||||||
|
// Execute the CLI command in it's own go routine, so we are able
|
||||||
|
// to return immediately with an ack reply that the message was
|
||||||
|
// received, and we create a new message to send back to the calling
|
||||||
|
// node for the out put of the actual command.
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// fmt.Printf(" * DONE *\n")
|
||||||
|
}()
|
||||||
|
|
||||||
|
var a []string
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 1:
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case len(message.MethodArgs) >= 0:
|
||||||
|
a = message.MethodArgs[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
c := message.MethodArgs[0]
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
// deadline, _ := ctx.Deadline()
|
||||||
|
// fmt.Printf(" * DEBUG * deadline : %v\n", deadline)
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
errCh := make(chan string)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, c, a...)
|
||||||
|
|
||||||
|
// Using cmd.StdoutPipe here so we are continuosly
|
||||||
|
// able to read the out put of the command.
|
||||||
|
outReader, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ErrorReader, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
scanner := bufio.NewScanner(ErrorReader)
|
||||||
|
for scanner.Scan() {
|
||||||
|
errCh <- scanner.Text()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
scanner := bufio.NewScanner(outReader)
|
||||||
|
for scanner.Scan() {
|
||||||
|
outCh <- []byte(scanner.Text() + "\n")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// NB: sending cancel to command context, so processes are killed.
|
||||||
|
// A github issue is filed on not killing all child processes when using pipes:
|
||||||
|
// https://github.com/golang/go/issues/23019
|
||||||
|
// TODO: Check in later if there are any progress on the issue.
|
||||||
|
// When testing the problem seems to appear when using sudo, or tcpdump without
|
||||||
|
// the -l option. So for now, don't use sudo, and remember to use -l with tcpdump
|
||||||
|
// which makes stdout line buffered.
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check if context timer or command output were received.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.infoSend(proc, message, er)
|
||||||
|
return
|
||||||
|
case out := <-outCh:
|
||||||
|
// fmt.Printf(" * out: %v\n", string(out))
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
case out := <-errCh:
|
||||||
|
newReplyMessage(proc, message, []byte(out))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
458
requests_file_handling.go
Normal file
458
requests_file_handling.go
Normal file
|
@ -0,0 +1,458 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hpcloud/tail"
|
||||||
|
)
|
||||||
|
|
||||||
|
type methodREQToFileAppend struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQToFileAppend) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle appending data to file.
|
||||||
|
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
// If it was a request type message we want to check what the initial messages
|
||||||
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file and write data.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write(message.Data)
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----
|
||||||
|
|
||||||
|
type methodREQToFile struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQToFile) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||||
|
// exist.
|
||||||
|
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
// If it was a request type message we want to check what the initial messages
|
||||||
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, er
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file and write data.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write(message.Data)
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQCopyFileFrom struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQCopyFileFrom) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||||
|
// exist.
|
||||||
|
func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 3:
|
||||||
|
er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
SrcFilePath := message.MethodArgs[0]
|
||||||
|
DstNode := message.MethodArgs[1]
|
||||||
|
DstFilePath := message.MethodArgs[2]
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
// Read the file, and put the result on the out channel to be sent when done reading.
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
|
||||||
|
|
||||||
|
// Wait here until we got the data to send, then create a new message
|
||||||
|
// and send it.
|
||||||
|
// Also checking the ctx.Done which calls Cancel will allow us to
|
||||||
|
// kill all started go routines started by this message.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case er := <-errCh:
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case out := <-outCh:
|
||||||
|
dstDir := filepath.Dir(DstFilePath)
|
||||||
|
dstFile := filepath.Base(DstFilePath)
|
||||||
|
|
||||||
|
// Prepare for sending a new message with the output
|
||||||
|
|
||||||
|
// Copy the original message to get the defaults for timeouts etc,
|
||||||
|
// and set new values for fields to change.
|
||||||
|
msg := message
|
||||||
|
msg.ToNode = Node(DstNode)
|
||||||
|
//msg.Method = REQToFile
|
||||||
|
msg.Method = REQCopyFileTo
|
||||||
|
msg.Data = out
|
||||||
|
msg.Directory = dstDir
|
||||||
|
msg.FileName = dstFile
|
||||||
|
|
||||||
|
// Create SAM and put the message on the send new message channel.
|
||||||
|
|
||||||
|
sam, err := newSubjectAndMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
|
replyData := fmt.Sprintf("info: succesfully read the file %v, and sent the content to %v\n", SrcFilePath, DstNode)
|
||||||
|
|
||||||
|
newReplyMessage(proc, message, []byte(replyData))
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// copyFileFrom will read a file to be copied from the specified SrcFilePath.
|
||||||
|
// The result of be delivered on the provided outCh.
|
||||||
|
func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
const natsMaxMsgSize = 1000000
|
||||||
|
|
||||||
|
fi, err := os.Stat(SrcFilePath)
|
||||||
|
|
||||||
|
// Check if the src file exists, and that it is not bigger than
|
||||||
|
// the default limit used by nats which is 1MB.
|
||||||
|
switch {
|
||||||
|
case os.IsNotExist(err):
|
||||||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
|
||||||
|
return
|
||||||
|
case fi.Size() > natsMaxMsgSize:
|
||||||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fh, err := os.Open(SrcFilePath)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := io.ReadAll(fh)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case outCh <- b:
|
||||||
|
// fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQCopyFileTo struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQCopyFileTo) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||||
|
// exist.
|
||||||
|
// Same as the REQToFile, but this requst type don't use the default data folder path
|
||||||
|
// for where to store files or add information about node names.
|
||||||
|
// This method also sends a msgReply back to the publisher if the method was done
|
||||||
|
// successfully, where REQToFile do not.
|
||||||
|
// This method will truncate and overwrite any existing files.
|
||||||
|
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Put data that should be the result of the action done in the inner
|
||||||
|
// go routine on the outCh.
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
// Put errors from the inner go routine on the errCh.
|
||||||
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
// ---
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 3:
|
||||||
|
er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick up the values for the directory and filename for where
|
||||||
|
// to store the file.
|
||||||
|
DstFilePath := message.MethodArgs[2]
|
||||||
|
dstDir := filepath.Dir(DstFilePath)
|
||||||
|
dstFile := filepath.Base(DstFilePath)
|
||||||
|
|
||||||
|
fileRealPath := path.Join(dstDir, dstFile)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(dstDir, 0700)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, dstDir, err)
|
||||||
|
errCh <- er
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file and write data. Truncate and overwrite any existing files.
|
||||||
|
file := filepath.Join(dstDir, dstFile)
|
||||||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
errCh <- er
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write(message.Data)
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err)
|
||||||
|
errCh <- er
|
||||||
|
}
|
||||||
|
|
||||||
|
// All went ok, send a signal to the outer select statement.
|
||||||
|
outCh <- []byte(fileRealPath)
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for messages received from the inner go routine.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
|
||||||
|
case err := <-errCh:
|
||||||
|
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
|
||||||
|
case out := <-outCh:
|
||||||
|
replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
|
||||||
|
newReplyMessage(proc, message, []byte(replyData))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- methodREQTailFile
|
||||||
|
|
||||||
|
type methodREQTailFile struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQTailFile) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to run a tailing of files with timeout context. The handler will
|
||||||
|
// return the output of the command run back to the calling publisher
|
||||||
|
// as a new message.
|
||||||
|
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 1:
|
||||||
|
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fp := message.MethodArgs[0]
|
||||||
|
|
||||||
|
// var ctx context.Context
|
||||||
|
// var cancel context.CancelFunc
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
// Note: Replacing the 0 timeout with specific timeout.
|
||||||
|
// if message.MethodTimeout != 0 {
|
||||||
|
// ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||||
|
// } else {
|
||||||
|
// ctx, cancel = context.WithCancel(proc.ctx)
|
||||||
|
// }
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{
|
||||||
|
Offset: 0,
|
||||||
|
Whence: os.SEEK_END,
|
||||||
|
}})
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case line := <-t.Lines:
|
||||||
|
outCh <- []byte(line.Text + "\n")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
// Close the lines channel so we exit the reading lines
|
||||||
|
// go routine.
|
||||||
|
// close(t.Lines)
|
||||||
|
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.infoSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case out := <-outCh:
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
257
requests_http.go
Normal file
257
requests_http.go
Normal file
|
@ -0,0 +1,257 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type methodREQHttpGet struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQHttpGet) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to do a Http Get.
|
||||||
|
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 1:
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
url := message.MethodArgs[0]
|
||||||
|
|
||||||
|
client := http.Client{
|
||||||
|
Timeout: time.Second * time.Duration(message.MethodTimeout),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
out := body
|
||||||
|
|
||||||
|
select {
|
||||||
|
case outCh <- out:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
case out := <-outCh:
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQHttpGetScheduled struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQHttpGetScheduled) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to do a Http Get Scheduled.
|
||||||
|
// The second element of the MethodArgs slice holds the timer defined in seconds.
|
||||||
|
func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
// --- Check and prepare the methodArgs
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 3:
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
url := message.MethodArgs[0]
|
||||||
|
|
||||||
|
scheduleInterval, err := strconv.Atoi(message.MethodArgs[1])
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2])
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Prepare and start the scheduler.
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Second * time.Duration(scheduleInterval))
|
||||||
|
|
||||||
|
// Prepare a context that will be for the schedule as a whole.
|
||||||
|
// NB: Individual http get's will create their own context's
|
||||||
|
// derived from this one.
|
||||||
|
ctxScheduler, cancel := context.WithTimeout(proc.ctx, time.Minute*time.Duration(schedulerTotalTime))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Prepare the http request.
|
||||||
|
client := http.Client{
|
||||||
|
Timeout: time.Second * time.Duration(message.MethodTimeout),
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout
|
||||||
|
// for the individual http requests.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run each individual http get in it's own go routine, and
|
||||||
|
// deliver the result on the outCh.
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
out := body
|
||||||
|
|
||||||
|
select {
|
||||||
|
case outCh <- out:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ctxScheduler.Done():
|
||||||
|
// If the scheduler context is done then we also want to kill
|
||||||
|
// all running http request.
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
case <-ctxScheduler.Done():
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctxScheduler.Done():
|
||||||
|
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
case out := <-outCh:
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
204
requests_operator.go
Normal file
204
requests_operator.go
Normal file
|
@ -0,0 +1,204 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// --- OpProcessList
|
||||||
|
type methodREQOpProcessList struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQOpProcessList) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle Op Process List
|
||||||
|
func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
out := []byte{}
|
||||||
|
|
||||||
|
// Loop the the processes map, and find all that is active to
|
||||||
|
// be returned in the reply message.
|
||||||
|
|
||||||
|
proc.processes.active.mu.Lock()
|
||||||
|
for _, pTmp := range proc.processes.active.procNames {
|
||||||
|
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
|
||||||
|
sb := []byte(s)
|
||||||
|
out = append(out, sb...)
|
||||||
|
|
||||||
|
}
|
||||||
|
proc.processes.active.mu.Unlock()
|
||||||
|
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- OpProcessStart
|
||||||
|
|
||||||
|
type methodREQOpProcessStart struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQOpProcessStart) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle Op Process Start
|
||||||
|
func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
var out []byte
|
||||||
|
|
||||||
|
// We need to create a tempory method type to look up the kind for the
|
||||||
|
// real method for the message.
|
||||||
|
var mt Method
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 1:
|
||||||
|
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m := message.MethodArgs[0]
|
||||||
|
method := Method(m)
|
||||||
|
tmpH := mt.getHandler(Method(method))
|
||||||
|
if tmpH == nil {
|
||||||
|
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the process and start it.
|
||||||
|
sub := newSubject(method, proc.configuration.NodeName)
|
||||||
|
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil)
|
||||||
|
go procNew.spawnWorker()
|
||||||
|
|
||||||
|
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
|
er := fmt.Errorf(txt)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
out = []byte(txt + "\n")
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- OpProcessStop
|
||||||
|
|
||||||
|
type methodREQOpProcessStop struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQOpProcessStop) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecevingNode Node `json:"receivingNode"`
|
||||||
|
// Method Method `json:"method"`
|
||||||
|
// Kind processKind `json:"kind"`
|
||||||
|
// ID int `json:"id"`
|
||||||
|
|
||||||
|
// Handle Op Process Start
|
||||||
|
func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
var out []byte
|
||||||
|
|
||||||
|
// We need to create a tempory method type to use to look up the kind for the
|
||||||
|
// real method for the message.
|
||||||
|
var mt Method
|
||||||
|
|
||||||
|
// --- Parse and check the method arguments given.
|
||||||
|
// The Reason for also having the node as one of the arguments is
|
||||||
|
// that publisher processes are named by the node they are sending the
|
||||||
|
// message to. Subscriber processes names are named by the node name
|
||||||
|
// they are running on.
|
||||||
|
|
||||||
|
if v := len(message.MethodArgs); v != 3 {
|
||||||
|
er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
methodString := message.MethodArgs[0]
|
||||||
|
node := message.MethodArgs[1]
|
||||||
|
kind := message.MethodArgs[2]
|
||||||
|
|
||||||
|
method := Method(methodString)
|
||||||
|
tmpH := mt.getHandler(Method(method))
|
||||||
|
if tmpH == nil {
|
||||||
|
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Find, and stop process if found
|
||||||
|
|
||||||
|
// Based on the arg values received in the message we create a
|
||||||
|
// processName structure as used in naming the real processes.
|
||||||
|
// We can then use this processName to get the real values for the
|
||||||
|
// actual process we want to stop.
|
||||||
|
sub := newSubject(method, string(node))
|
||||||
|
processName := processNameGet(sub.name(), processKind(kind))
|
||||||
|
|
||||||
|
// Remove the process from the processes active map if found.
|
||||||
|
proc.processes.active.mu.Lock()
|
||||||
|
toStopProc, ok := proc.processes.active.procNames[processName]
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
// Delete the process from the processes map
|
||||||
|
delete(proc.processes.active.procNames, processName)
|
||||||
|
// Stop started go routines that belong to the process.
|
||||||
|
toStopProc.ctxCancel()
|
||||||
|
// Stop subscribing for messages on the process's subject.
|
||||||
|
err := toStopProc.natsSubscription.Unsubscribe()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the prometheus label
|
||||||
|
proc.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||||||
|
|
||||||
|
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
||||||
|
er := fmt.Errorf(txt)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
out = []byte(txt + "\n")
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
|
||||||
|
er := fmt.Errorf(txt)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
out = []byte(txt + "\n")
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.processes.active.mu.Unlock()
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
350
requests_public_keys.go
Normal file
350
requests_public_keys.go
Normal file
|
@ -0,0 +1,350 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/fxamacker/cbor/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQPublicKey struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKey) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to get the public ed25519 key from a node.
|
||||||
|
func (m methodREQPublicKey) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
||||||
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
||||||
|
// In this case this go func and the below select is not needed, but keeping it so the
|
||||||
|
// structure is the same as the other handlers.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case outCh <- proc.nodeAuth.SignPublicKey:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
case out := <-outCh:
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send back an ACK message.
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQPublicKeysGet struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKeysGet) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to get all the public ed25519 keys from a central server.
|
||||||
|
func (m methodREQPublicKeysGet) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// - Since this is implemented as a NACK message we could implement a
|
||||||
|
// metric thats shows the last time a node did a key request.
|
||||||
|
// - We could also implement a metrics on the receiver showing the last
|
||||||
|
// time a node had done an update.
|
||||||
|
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
||||||
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
||||||
|
// In this case this go func and the below select is not needed, but keeping it so the
|
||||||
|
// structure is the same as the other handlers.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// TODO: Should we receive a hash of he current keys from the node here
|
||||||
|
// to verify if we need to update or not ?
|
||||||
|
case outCh <- []byte{}:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// case out := <-outCh:
|
||||||
|
case <-outCh:
|
||||||
|
// Using a func here to set the scope of the lock, and then be able to
|
||||||
|
// defer the unlock when leaving that scope.
|
||||||
|
func() {
|
||||||
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
// TODO: We should probably create a hash of the current map content,
|
||||||
|
// store it alongside the KeyMap, and send both the KeyMap and hash
|
||||||
|
// back. We can then later send that hash when asking for keys, compare
|
||||||
|
// it with the current one for the KeyMap, and know if we need to send
|
||||||
|
// and update back to the node who published the request to here.
|
||||||
|
|
||||||
|
fmt.Printf(" <---- methodREQPublicKeysGet: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data)
|
||||||
|
|
||||||
|
// Check if the received hash is the same as the one currently active,
|
||||||
|
if bytes.Equal(proc.centralAuth.pki.nodesAcked.keysAndHash.Hash[:], message.Data) {
|
||||||
|
fmt.Printf("\n ------------ NODE AND CENTRAL ARE EQUAL, NOTHING TO DO, EXITING HANDLER\n\n")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\n ------------ NODE AND CENTRAL WERE NOT EQUAL, PREPARING TO SEND NEW VERSION OF KEYS\n\n")
|
||||||
|
|
||||||
|
fmt.Printf(" * methodREQPublicKeysGet: marshalling new keys and hash to send: map=%v, hash=%v\n\n", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
|
||||||
|
|
||||||
|
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
fmt.Printf("\n ----> methodREQPublicKeysGet: SENDING KEYS TO NODE=%v\n", message.FromNode)
|
||||||
|
newReplyMessage(proc, message, b)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// NB: We're not sending an ACK message for this request type.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQPublicKeysToNode struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKeysToNode) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to put the public key replies received from a central server.
|
||||||
|
func (m methodREQPublicKeysToNode) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// - Since this is implemented as a NACK message we could implement a
|
||||||
|
// metric thats shows the last time keys were updated.
|
||||||
|
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
||||||
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
||||||
|
// In this case this go func and the below select is not needed, but keeping it so the
|
||||||
|
// structure is the same as the other handlers.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// TODO: Should we receive a hash of he current keys from the node here ?
|
||||||
|
case outCh <- []byte{}:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-outCh:
|
||||||
|
|
||||||
|
proc.nodeAuth.publicKeys.mu.Lock()
|
||||||
|
|
||||||
|
err := json.Unmarshal(message.Data, proc.nodeAuth.publicKeys.keysAndHash)
|
||||||
|
fmt.Printf("\n <---- REQPublicKeysToNode: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", proc.nodeAuth.publicKeys.keysAndHash)
|
||||||
|
|
||||||
|
proc.nodeAuth.publicKeys.mu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v", err, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO TOMORROW: The hash is not sent with the requests to get public keys, and
|
||||||
|
// the reason is that the hash is not stored on the nodes ?
|
||||||
|
// Idea: We need to also persist the hash on the receiving nodes. We can then load
|
||||||
|
// that key upon startup, and send it along when we do a public keys get.
|
||||||
|
|
||||||
|
err = proc.nodeAuth.publicKeys.saveToFile()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: REQPublicKeysToNode : save to file failed: %v, message: %v", err, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
// newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send back an ACK message.
|
||||||
|
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
// TODO: We should also add a request method methodREQPublicKeysRevoke
|
||||||
|
|
||||||
|
type methodREQPublicKeysAllow struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKeysAllow) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to allow new public keys into the database on central auth.
|
||||||
|
// Nodes will send the public key in the REQHello messages. When they
|
||||||
|
// are recived on the central server they will be put into a temp key
|
||||||
|
// map, and we need to acknowledge them before they are moved into the
|
||||||
|
// main key map, and then allowed to be sent out to other nodes.
|
||||||
|
func (m methodREQPublicKeysAllow) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Normally we would do some logic here, where the result is passed to outCh when done,
|
||||||
|
// so we can split up the working logic, and f.ex. sending a reply logic.
|
||||||
|
// In this case this go func and the below select is not needed, but keeping it so the
|
||||||
|
// structure is the same as the other handlers.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case outCh <- []byte{}:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-outCh:
|
||||||
|
proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Lock()
|
||||||
|
defer proc.centralAuth.pki.nodeNotAckedPublicKeys.mu.Unlock()
|
||||||
|
|
||||||
|
// Range over all the MethodArgs, where each element represents a node to allow,
|
||||||
|
// and move the node from the notAcked map to the allowed map.
|
||||||
|
for _, n := range message.MethodArgs {
|
||||||
|
key, ok := proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap[Node(n)]
|
||||||
|
if ok {
|
||||||
|
|
||||||
|
func() {
|
||||||
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
|
// Store/update the node and public key on the allowed pubKey map.
|
||||||
|
proc.centralAuth.pki.nodesAcked.keysAndHash.Keys[Node(n)] = key
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Add key to persistent storage.
|
||||||
|
proc.centralAuth.pki.dbUpdatePublicKey(string(n), key)
|
||||||
|
|
||||||
|
// Delete the key from the NotAcked map
|
||||||
|
delete(proc.centralAuth.pki.nodeNotAckedPublicKeys.KeyMap, Node(n))
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: REQPublicKeysAllow : allowed new/updated public key for %v to allowed public key map", n)
|
||||||
|
proc.errorKernel.infoSend(proc, message, er)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All new elements are now added, and we can create a new hash
|
||||||
|
// representing the current keys in the allowed map.
|
||||||
|
func() {
|
||||||
|
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||||
|
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||||
|
|
||||||
|
type NodesAndKeys struct {
|
||||||
|
Node Node
|
||||||
|
Key []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a slice of all the map keys, and its value.
|
||||||
|
sortedNodesAndKeys := []NodesAndKeys{}
|
||||||
|
|
||||||
|
// Range the map, and add each k/v to the sorted slice, to be sorted later.
|
||||||
|
for k, v := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
|
||||||
|
nk := NodesAndKeys{
|
||||||
|
Node: k,
|
||||||
|
Key: v,
|
||||||
|
}
|
||||||
|
|
||||||
|
sortedNodesAndKeys = append(sortedNodesAndKeys, nk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort the slice based on the node name.
|
||||||
|
// Sort all the commands.
|
||||||
|
sort.SliceStable(sortedNodesAndKeys, func(i, j int) bool {
|
||||||
|
return sortedNodesAndKeys[i].Node < sortedNodesAndKeys[j].Node
|
||||||
|
})
|
||||||
|
|
||||||
|
// Then create a hash based on the sorted slice.
|
||||||
|
|
||||||
|
b, err := cbor.Marshal(sortedNodesAndKeys)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
log.Printf(" * DEBUG: %v\n", er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the key in the key value map.
|
||||||
|
hash := sha256.Sum256(b)
|
||||||
|
proc.centralAuth.pki.nodesAcked.keysAndHash.Hash = hash
|
||||||
|
|
||||||
|
// Store the key to the db for persistence.
|
||||||
|
proc.centralAuth.pki.dbUpdateHash(hash[:])
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to store the hash into the db: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
log.Printf(" * DEBUG: %v\n", er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
434
requests_std.go
Normal file
434
requests_std.go
Normal file
|
@ -0,0 +1,434 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// -----
|
||||||
|
|
||||||
|
type methodREQHello struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQHello) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler for receiving hello messages.
|
||||||
|
func (m methodREQHello) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
data := fmt.Sprintf("%v, Received hello from %#v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
|
||||||
|
|
||||||
|
fileName := message.FileName
|
||||||
|
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.FromNode))
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file and write data.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
//f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
|
f, err := os.OpenFile(file, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQHello.handler: failed to open file: %v", err)
|
||||||
|
return nil, er
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write([]byte(data))
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --------------------------
|
||||||
|
|
||||||
|
// send the message to the procFuncCh which is running alongside the process
|
||||||
|
// and can hold registries and handle special things for an individual process.
|
||||||
|
proc.procFuncCh <- message
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQErrorLog struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQErrorLog) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle the writing of error logs.
|
||||||
|
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
proc.metrics.promErrorMessagesReceivedTotal.Inc()
|
||||||
|
|
||||||
|
// If it was a request type message we want to check what the initial messages
|
||||||
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file and write data.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQErrorLog.handler: failed to open file: %v", err)
|
||||||
|
return nil, er
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write(message.Data)
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQPing struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPing) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle receving a ping.
|
||||||
|
func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Write to file that we received a ping
|
||||||
|
|
||||||
|
// If it was a request type message we want to check what the initial messages
|
||||||
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, er
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// And write the data
|
||||||
|
d := fmt.Sprintf("%v, ping received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
|
||||||
|
_, err = f.Write([]byte(d))
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
newReplyMessage(proc, message, nil)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQPong struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPong) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle receiving a pong.
|
||||||
|
func (m methodREQPong) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Write to file that we received a pong
|
||||||
|
|
||||||
|
// If it was a request type message we want to check what the initial messages
|
||||||
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||||||
|
|
||||||
|
// Check if folder structure exist, if not create it.
|
||||||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||||
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, er
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file.
|
||||||
|
file := filepath.Join(folderTree, fileName)
|
||||||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// And write the data
|
||||||
|
d := fmt.Sprintf("%v, pong received from %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.PreviousMessage.ToNode)
|
||||||
|
_, err = f.Write([]byte(d))
|
||||||
|
f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQRelayInitial struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQRelayInitial) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to relay messages via a host.
|
||||||
|
func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
errCh := make(chan error)
|
||||||
|
nothingCh := make(chan struct{}, 1)
|
||||||
|
|
||||||
|
var out []byte
|
||||||
|
|
||||||
|
// If the actual Method for the message is REQCopyFileFrom we need to
|
||||||
|
// do the actual file reading here so we can fill the data field of the
|
||||||
|
// message with the content of the file before relaying it.
|
||||||
|
switch {
|
||||||
|
case message.RelayOriginalMethod == REQCopyFileFrom:
|
||||||
|
switch {
|
||||||
|
case len(message.MethodArgs) < 3:
|
||||||
|
er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
SrcFilePath := message.MethodArgs[0]
|
||||||
|
//DstFilePath := message.MethodArgs[2]
|
||||||
|
|
||||||
|
// Read the file, and put the result on the out channel to be sent when done reading.
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
|
||||||
|
|
||||||
|
// Since we now have read the source file we don't need the REQCopyFileFrom
|
||||||
|
// request method anymore, so we change the original method of the message
|
||||||
|
// so it will write the data after the relaying.
|
||||||
|
//dstDir := filepath.Dir(DstFilePath)
|
||||||
|
//dstFile := filepath.Base(DstFilePath)
|
||||||
|
message.RelayOriginalMethod = REQCopyFileTo
|
||||||
|
//message.FileName = dstFile
|
||||||
|
//message.Directory = dstDir
|
||||||
|
default:
|
||||||
|
// No request type that need special handling if relayed, so we should signal that
|
||||||
|
// there is nothing to do for the select below.
|
||||||
|
// We need to do this signaling in it's own go routine here, so we don't block here
|
||||||
|
// since the select below is in the same function.
|
||||||
|
go func() {
|
||||||
|
nothingCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case er := <-errCh:
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
case <-nothingCh:
|
||||||
|
// Do nothing.
|
||||||
|
case out = <-outCh:
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// relay the message to the actual host here by prefixing the the RelayToNode
|
||||||
|
// to the subject.
|
||||||
|
relayTo := fmt.Sprintf("%v.%v", message.RelayToNode, message.RelayOriginalViaNode)
|
||||||
|
// message.ToNode = message.RelayOriginalViaNode
|
||||||
|
message.ToNode = Node(relayTo)
|
||||||
|
message.FromNode = Node(node)
|
||||||
|
message.Method = REQRelay
|
||||||
|
message.Data = out
|
||||||
|
|
||||||
|
sam, err := newSubjectAndMessage(message)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send back an ACK message.
|
||||||
|
ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQRelay struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQRelay) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to relay messages via a host.
|
||||||
|
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// relay the message here to the actual host here.
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
|
||||||
|
message.ToNode = message.RelayToNode
|
||||||
|
message.FromNode = Node(node)
|
||||||
|
message.Method = message.RelayOriginalMethod
|
||||||
|
|
||||||
|
sam, err := newSubjectAndMessage(message)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
||||||
|
case <-proc.ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send back an ACK message.
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQToConsole struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQToConsole) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to write directly to console.
|
||||||
|
// This handler handles the writing to console both for TUI and shell clients.
|
||||||
|
func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case proc.configuration.EnableTUI:
|
||||||
|
if proc.processes.tui.toConsoleCh != nil {
|
||||||
|
proc.processes.tui.toConsoleCh <- message.Data
|
||||||
|
} else {
|
||||||
|
er := fmt.Errorf("error: no tui client started")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
fmt.Fprintf(os.Stdout, "%v", string(message.Data))
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQTuiToConsole struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQTuiToConsole) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to write directly to console.
|
||||||
|
// DEPRECATED
|
||||||
|
func (m methodREQTuiToConsole) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
||||||
|
if proc.processes.tui.toConsoleCh != nil {
|
||||||
|
proc.processes.tui.toConsoleCh <- message.Data
|
||||||
|
} else {
|
||||||
|
er := fmt.Errorf("error: no tui client started")
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
51
requests_template.go
Normal file
51
requests_template.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
// ---- Template that can be used for creating request methods
|
||||||
|
|
||||||
|
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
//
|
||||||
|
// proc.processes.wg.Add(1)
|
||||||
|
// go func() {
|
||||||
|
// defer proc.processes.wg.Done()
|
||||||
|
//
|
||||||
|
// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||||
|
// defer cancel()
|
||||||
|
//
|
||||||
|
// // Put data that should be the result of the action done in the inner
|
||||||
|
// // go routine on the outCh.
|
||||||
|
// outCh := make(chan []byte)
|
||||||
|
// // Put errors from the inner go routine on the errCh.
|
||||||
|
// errCh := make(chan error)
|
||||||
|
//
|
||||||
|
// proc.processes.wg.Add(1)
|
||||||
|
// go func() {
|
||||||
|
// defer proc.processes.wg.Done()
|
||||||
|
//
|
||||||
|
// // Do some work here....
|
||||||
|
//
|
||||||
|
// }()
|
||||||
|
//
|
||||||
|
// // Wait for messages received from the inner go routine.
|
||||||
|
// select {
|
||||||
|
// case <-ctx.Done():
|
||||||
|
// fmt.Printf(" ** DEBUG: got ctx.Done\n")
|
||||||
|
//
|
||||||
|
// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs)
|
||||||
|
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
// return
|
||||||
|
//
|
||||||
|
// case er := <-errCh:
|
||||||
|
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
// return
|
||||||
|
//
|
||||||
|
// case out := <-outCh:
|
||||||
|
// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
|
||||||
|
// newReplyMessage(proc, message, []byte(replyData))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// }()
|
||||||
|
//
|
||||||
|
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
// return ackMsg, nil
|
||||||
|
// }
|
Loading…
Reference in a new issue