mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
459 lines
13 KiB
Go
459 lines
13 KiB
Go
|
package steward
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"os"
|
||
|
"path"
|
||
|
"path/filepath"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/hpcloud/tail"
|
||
|
)
|
||
|
|
||
|
type methodREQToFileAppend struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQToFileAppend) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle appending data to file.
|
||
|
func (m methodREQToFileAppend) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
|
||
|
// If it was a request type message we want to check what the initial messages
|
||
|
// method, so we can use that in creating the file name to store the data.
|
||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||
|
|
||
|
// Check if folder structure exist, if not create it.
|
||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||
|
err := os.MkdirAll(folderTree, 0700)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||
|
}
|
||
|
|
||
|
// Open file and write data.
|
||
|
file := filepath.Join(folderTree, fileName)
|
||
|
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return nil, err
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
_, err = f.Write(message.Data)
|
||
|
f.Sync()
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|
||
|
|
||
|
// -----
|
||
|
|
||
|
type methodREQToFile struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQToFile) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||
|
// exist.
|
||
|
func (m methodREQToFile) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
|
||
|
// If it was a request type message we want to check what the initial messages
|
||
|
// method, so we can use that in creating the file name to store the data.
|
||
|
fileName, folderTree := selectFileNaming(message, proc)
|
||
|
|
||
|
// Check if folder structure exist, if not create it.
|
||
|
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||
|
err := os.MkdirAll(folderTree, 0700)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return nil, er
|
||
|
}
|
||
|
|
||
|
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||
|
}
|
||
|
|
||
|
// Open file and write data.
|
||
|
file := filepath.Join(folderTree, fileName)
|
||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return nil, err
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
_, err = f.Write(message.Data)
|
||
|
f.Sync()
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|
||
|
|
||
|
// ----
|
||
|
|
||
|
type methodREQCopyFileFrom struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQCopyFileFrom) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||
|
// exist.
|
||
|
func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
switch {
|
||
|
case len(message.MethodArgs) < 3:
|
||
|
er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
SrcFilePath := message.MethodArgs[0]
|
||
|
DstNode := message.MethodArgs[1]
|
||
|
DstFilePath := message.MethodArgs[2]
|
||
|
|
||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||
|
defer cancel()
|
||
|
|
||
|
outCh := make(chan []byte)
|
||
|
errCh := make(chan error)
|
||
|
|
||
|
// Read the file, and put the result on the out channel to be sent when done reading.
|
||
|
proc.processes.wg.Add(1)
|
||
|
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
|
||
|
|
||
|
// Wait here until we got the data to send, then create a new message
|
||
|
// and send it.
|
||
|
// Also checking the ctx.Done which calls Cancel will allow us to
|
||
|
// kill all started go routines started by this message.
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
case er := <-errCh:
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
case out := <-outCh:
|
||
|
dstDir := filepath.Dir(DstFilePath)
|
||
|
dstFile := filepath.Base(DstFilePath)
|
||
|
|
||
|
// Prepare for sending a new message with the output
|
||
|
|
||
|
// Copy the original message to get the defaults for timeouts etc,
|
||
|
// and set new values for fields to change.
|
||
|
msg := message
|
||
|
msg.ToNode = Node(DstNode)
|
||
|
//msg.Method = REQToFile
|
||
|
msg.Method = REQCopyFileTo
|
||
|
msg.Data = out
|
||
|
msg.Directory = dstDir
|
||
|
msg.FileName = dstFile
|
||
|
|
||
|
// Create SAM and put the message on the send new message channel.
|
||
|
|
||
|
sam, err := newSubjectAndMessage(msg)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||
|
|
||
|
replyData := fmt.Sprintf("info: succesfully read the file %v, and sent the content to %v\n", SrcFilePath, DstNode)
|
||
|
|
||
|
newReplyMessage(proc, message, []byte(replyData))
|
||
|
}
|
||
|
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|
||
|
|
||
|
// copyFileFrom will read a file to be copied from the specified SrcFilePath.
|
||
|
// The result of be delivered on the provided outCh.
|
||
|
func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) {
|
||
|
defer wg.Done()
|
||
|
|
||
|
const natsMaxMsgSize = 1000000
|
||
|
|
||
|
fi, err := os.Stat(SrcFilePath)
|
||
|
|
||
|
// Check if the src file exists, and that it is not bigger than
|
||
|
// the default limit used by nats which is 1MB.
|
||
|
switch {
|
||
|
case os.IsNotExist(err):
|
||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
|
||
|
return
|
||
|
case fi.Size() > natsMaxMsgSize:
|
||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
fh, err := os.Open(SrcFilePath)
|
||
|
if err != nil {
|
||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
b, err := io.ReadAll(fh)
|
||
|
if err != nil {
|
||
|
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case outCh <- b:
|
||
|
// fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ----
|
||
|
|
||
|
type methodREQCopyFileTo struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQCopyFileTo) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle writing to a file. Will truncate any existing data if the file did already
|
||
|
// exist.
|
||
|
// Same as the REQToFile, but this requst type don't use the default data folder path
|
||
|
// for where to store files or add information about node names.
|
||
|
// This method also sends a msgReply back to the publisher if the method was done
|
||
|
// successfully, where REQToFile do not.
|
||
|
// This method will truncate and overwrite any existing files.
|
||
|
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||
|
defer cancel()
|
||
|
|
||
|
// Put data that should be the result of the action done in the inner
|
||
|
// go routine on the outCh.
|
||
|
outCh := make(chan []byte)
|
||
|
// Put errors from the inner go routine on the errCh.
|
||
|
errCh := make(chan error)
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
// ---
|
||
|
switch {
|
||
|
case len(message.MethodArgs) < 3:
|
||
|
er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Pick up the values for the directory and filename for where
|
||
|
// to store the file.
|
||
|
DstFilePath := message.MethodArgs[2]
|
||
|
dstDir := filepath.Dir(DstFilePath)
|
||
|
dstFile := filepath.Base(DstFilePath)
|
||
|
|
||
|
fileRealPath := path.Join(dstDir, dstFile)
|
||
|
|
||
|
// Check if folder structure exist, if not create it.
|
||
|
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
|
||
|
err := os.MkdirAll(dstDir, 0700)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, dstDir, err)
|
||
|
errCh <- er
|
||
|
return
|
||
|
}
|
||
|
|
||
|
{
|
||
|
er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir)
|
||
|
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Open file and write data. Truncate and overwrite any existing files.
|
||
|
file := filepath.Join(dstDir, dstFile)
|
||
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||
|
errCh <- er
|
||
|
return
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
_, err = f.Write(message.Data)
|
||
|
f.Sync()
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err)
|
||
|
errCh <- er
|
||
|
}
|
||
|
|
||
|
// All went ok, send a signal to the outer select statement.
|
||
|
outCh <- []byte(fileRealPath)
|
||
|
|
||
|
// ---
|
||
|
|
||
|
}()
|
||
|
|
||
|
// Wait for messages received from the inner go routine.
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return
|
||
|
|
||
|
case err := <-errCh:
|
||
|
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return
|
||
|
|
||
|
case out := <-outCh:
|
||
|
replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
|
||
|
newReplyMessage(proc, message, []byte(replyData))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|
||
|
|
||
|
// --- methodREQTailFile
|
||
|
|
||
|
type methodREQTailFile struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQTailFile) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// handler to run a tailing of files with timeout context. The handler will
|
||
|
// return the output of the command run back to the calling publisher
|
||
|
// as a new message.
|
||
|
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||
|
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
switch {
|
||
|
case len(message.MethodArgs) < 1:
|
||
|
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
fp := message.MethodArgs[0]
|
||
|
|
||
|
// var ctx context.Context
|
||
|
// var cancel context.CancelFunc
|
||
|
|
||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||
|
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||
|
|
||
|
// Note: Replacing the 0 timeout with specific timeout.
|
||
|
// if message.MethodTimeout != 0 {
|
||
|
// ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||
|
// } else {
|
||
|
// ctx, cancel = context.WithCancel(proc.ctx)
|
||
|
// }
|
||
|
|
||
|
outCh := make(chan []byte)
|
||
|
t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{
|
||
|
Offset: 0,
|
||
|
Whence: os.SEEK_END,
|
||
|
}})
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case line := <-t.Lines:
|
||
|
outCh <- []byte(line.Text + "\n")
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
cancel()
|
||
|
// Close the lines channel so we exit the reading lines
|
||
|
// go routine.
|
||
|
// close(t.Lines)
|
||
|
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
|
||
|
proc.errorKernel.infoSend(proc, message, er)
|
||
|
|
||
|
return
|
||
|
case out := <-outCh:
|
||
|
|
||
|
// Prepare and queue for sending a new message with the output
|
||
|
// of the action executed.
|
||
|
newReplyMessage(proc, message, out)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|