1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Added REQCopyFileTo request type

This commit is contained in:
postmannen 2021-11-18 09:34:16 +01:00
parent 81bd4d0589
commit f387eb76e1
3 changed files with 155 additions and 35 deletions

View file

@ -74,6 +74,8 @@ type Configuration struct {
StartSubREQToFile bool
// Subscriber for reading files to copy
StartSubREQCopyFileFrom bool
// Subscriber for writing copied files to disk
StartSubREQCopyFileTo bool
// Subscriber for Echo Request
StartSubREQPing bool
// Subscriber for Echo Reply
@ -123,6 +125,7 @@ type ConfigurationFromFile struct {
StartSubREQToFileAppend *bool
StartSubREQToFile *bool
StartSubREQCopyFileFrom *bool
StartSubREQCopyFileTo *bool
StartSubREQPing *bool
StartSubREQPong *bool
StartSubREQCliCommand *bool
@ -168,6 +171,7 @@ func newConfigurationDefaults() Configuration {
StartSubREQToFileAppend: true,
StartSubREQToFile: true,
StartSubREQCopyFileFrom: true,
StartSubREQCopyFileTo: true,
StartSubREQPing: true,
StartSubREQPong: true,
StartSubREQCliCommand: true,
@ -316,6 +320,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom
}
if cf.StartSubREQCopyFileTo == nil {
conf.StartSubREQCopyFileTo = cd.StartSubREQCopyFileTo
} else {
conf.StartSubREQCopyFileTo = *cf.StartSubREQCopyFileTo
}
if cf.StartSubREQPing == nil {
conf.StartSubREQPing = cd.StartSubREQPing
} else {
@ -416,6 +425,7 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false")
flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false")
flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false")
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false")
flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false")

View file

@ -112,6 +112,11 @@ func (p *processes) Start(proc process) {
proc.startup.subREQCopyFileFrom(proc)
}
// Start a subscriber for writing copied file to disk
if proc.configuration.StartSubREQCopyFileTo {
proc.startup.subREQCopyFileTo(proc)
}
// Start a subscriber for Hello messages
if proc.configuration.StartSubREQHello {
proc.startup.subREQHello(proc)
@ -333,6 +338,14 @@ func (s startup) subREQCopyFileFrom(p process) {
go proc.spawnWorker(p.processes, p.natsConn)
}
func (s startup) subREQCopyFileTo(p process) {
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
sub := newSubject(REQCopyFileTo, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn)
}
func (s startup) subREQToFileAppend(p process) {
log.Printf("Starting text logging subscriber: %#v\n", p.node)
sub := newSubject(REQToFileAppend, string(p.node))

View file

@ -42,6 +42,7 @@ import (
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"time"
@ -764,8 +765,8 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
// and set new values for fields to change.
msg := message
msg.ToNode = Node(DstNode)
msg.Method = REQToFile
// TODO: msg.Method = REQCopyFileTo
//msg.Method = REQToFile
msg.Method = REQCopyFileTo
msg.Data = []string{string(out)}
msg.Directory = dstDir
msg.FileName = dstFile
@ -813,46 +814,92 @@ func (m methodREQCopyFileTo) getKind() CommandOrEvent {
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := message.FileName, message.Directory
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
defer cancel()
// Put data that should be the result of the action done in the inner
// go routine on the outCh.
outCh := make(chan []byte)
// Put errors from the inner go routine on the errCh.
errCh := make(chan error)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// ---
fileName, folderTree := message.FileName, message.Directory
fileRealPath := path.Join(folderTree, fileName)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0700)
if err != nil {
er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
errCh <- er
return
}
log.Printf("info: MethodREQCopyFileTo: Creating folders %v\n", folderTree)
}
// Open file and write data. Truncate and overwrite any existing files.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
errCh <- er
return
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err)
errCh <- er
}
}
// All went ok, send a signal to the outer select statement.
outCh <- []byte(fileRealPath)
// ---
}()
// Wait for messages received from the inner go routine.
select {
case <-ctx.Done():
fmt.Printf(" ** DEBUG: got ctx.Done\n")
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
return nil, er
case err := <-errCh:
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
case out := <-outCh:
replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
newReplyMessage(proc, message, []byte(replyData))
return
}
log.Printf("info: Creating subscribers data folder at %v\n", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil, err
}
defer f.Close()
for _, d := range message.Data {
_, err := f.Write([]byte(d))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
@ -1586,3 +1633,53 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b
}
// ----
// ---- Template that can be used for creating request methods
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// defer cancel()
//
// // Put data that should be the result of the action done in the inner
// // go routine on the outCh.
// outCh := make(chan []byte)
// // Put errors from the inner go routine on the errCh.
// errCh := make(chan error)
//
// proc.processes.wg.Add(1)
// go func() {
// defer proc.processes.wg.Done()
//
// // Do some work here....
//
// }()
//
// // Wait for messages received from the inner go routine.
// select {
// case <-ctx.Done():
// fmt.Printf(" ** DEBUG: got ctx.Done\n")
//
// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs)
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case er := <-errCh:
// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// return
//
// case out := <-outCh:
// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
// newReplyMessage(proc, message, []byte(replyData))
// return
// }
//
// }()
//
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
// return ackMsg, nil
// }