diff --git a/configuration_flags.go b/configuration_flags.go index e2cbc90..b1b0ddc 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -74,6 +74,8 @@ type Configuration struct { StartSubREQToFile bool // Subscriber for reading files to copy StartSubREQCopyFileFrom bool + // Subscriber for writing copied files to disk + StartSubREQCopyFileTo bool // Subscriber for Echo Request StartSubREQPing bool // Subscriber for Echo Reply @@ -123,6 +125,7 @@ type ConfigurationFromFile struct { StartSubREQToFileAppend *bool StartSubREQToFile *bool StartSubREQCopyFileFrom *bool + StartSubREQCopyFileTo *bool StartSubREQPing *bool StartSubREQPong *bool StartSubREQCliCommand *bool @@ -168,6 +171,7 @@ func newConfigurationDefaults() Configuration { StartSubREQToFileAppend: true, StartSubREQToFile: true, StartSubREQCopyFileFrom: true, + StartSubREQCopyFileTo: true, StartSubREQPing: true, StartSubREQPong: true, StartSubREQCliCommand: true, @@ -316,6 +320,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom } + if cf.StartSubREQCopyFileTo == nil { + conf.StartSubREQCopyFileTo = cd.StartSubREQCopyFileTo + } else { + conf.StartSubREQCopyFileTo = *cf.StartSubREQCopyFileTo + } if cf.StartSubREQPing == nil { conf.StartSubREQPing = cd.StartSubREQPing } else { @@ -416,6 +425,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false") flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false") + flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false") flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false") flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false") flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false") diff --git a/processes.go b/processes.go index 5ad0950..1814461 100644 --- a/processes.go +++ b/processes.go @@ -112,6 +112,11 @@ func (p *processes) Start(proc process) { proc.startup.subREQCopyFileFrom(proc) } + // Start a subscriber for writing copied file to disk + if proc.configuration.StartSubREQCopyFileTo { + proc.startup.subREQCopyFileTo(proc) + } + // Start a subscriber for Hello messages if proc.configuration.StartSubREQHello { proc.startup.subREQHello(proc) @@ -333,6 +338,14 @@ func (s startup) subREQCopyFileFrom(p process) { go proc.spawnWorker(p.processes, p.natsConn) } +func (s startup) subREQCopyFileTo(p process) { + log.Printf("Starting copy file to subscriber: %#v\n", p.node) + sub := newSubject(REQCopyFileTo, string(p.node)) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + + go proc.spawnWorker(p.processes, p.natsConn) +} + func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) diff --git a/requests.go b/requests.go index 87cc037..cc6c8c3 100644 --- a/requests.go +++ b/requests.go @@ -42,6 +42,7 @@ import ( "net/http" "os" "os/exec" + "path" "path/filepath" "strings" "time" @@ -764,8 +765,8 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin // and set new values for fields to change. msg := message msg.ToNode = Node(DstNode) - msg.Method = REQToFile - // TODO: msg.Method = REQCopyFileTo + //msg.Method = REQToFile + msg.Method = REQCopyFileTo msg.Data = []string{string(out)} msg.Directory = dstDir msg.FileName = dstFile @@ -813,46 +814,92 @@ func (m methodREQCopyFileTo) getKind() CommandOrEvent { // exist. // Same as the REQToFile, but this requst type don't use the default data folder path // for where to store files or add information about node names. +// This method also sends a msgReply back to the publisher if the method was done +// successfully, where REQToFile do not. +// This method will truncate and overwrite any existing files. func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) { - // If it was a request type message we want to check what the initial messages - // method, so we can use that in creating the file name to store the data. - fileName, folderTree := message.FileName, message.Directory + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() - // Check if folder structure exist, if not create it. - if _, err := os.Stat(folderTree); os.IsNotExist(err) { - err := os.MkdirAll(folderTree, 0700) - if err != nil { - er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + defer cancel() + + // Put data that should be the result of the action done in the inner + // go routine on the outCh. + outCh := make(chan []byte) + // Put errors from the inner go routine on the errCh. + errCh := make(chan error) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + // --- + + fileName, folderTree := message.FileName, message.Directory + fileRealPath := path.Join(folderTree, fileName) + + // Check if folder structure exist, if not create it. + if _, err := os.Stat(folderTree); os.IsNotExist(err) { + err := os.MkdirAll(folderTree, 0700) + if err != nil { + er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + errCh <- er + return + } + + log.Printf("info: MethodREQCopyFileTo: Creating folders %v\n", folderTree) + } + + // Open file and write data. Truncate and overwrite any existing files. + file := filepath.Join(folderTree, fileName) + f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) + if err != nil { + er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) + errCh <- er + return + } + defer f.Close() + + for _, d := range message.Data { + _, err := f.Write([]byte(d)) + f.Sync() + if err != nil { + er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err) + errCh <- er + } + } + + // All went ok, send a signal to the outer select statement. + outCh <- []byte(fileRealPath) + + // --- + + }() + + // Wait for messages received from the inner go routine. + select { + case <-ctx.Done(): + fmt.Printf(" ** DEBUG: got ctx.Done\n") + + er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) + return - return nil, er + case err := <-errCh: + er := fmt.Errorf("error: methodREQCopyFileTo: %v", err) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + return + + case out := <-outCh: + replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out) + newReplyMessage(proc, message, []byte(replyData)) + return } - log.Printf("info: Creating subscribers data folder at %v\n", folderTree) - } - - // Open file and write data. - file := filepath.Join(folderTree, fileName) - f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) - if err != nil { - er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) - return nil, err - } - defer f.Close() - - for _, d := range message.Data { - _, err := f.Write([]byte(d)) - f.Sync() - if err != nil { - er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err) - sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) - } - } + }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil @@ -1586,3 +1633,53 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b } // ---- + +// ---- Template that can be used for creating request methods + +// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) { +// +// proc.processes.wg.Add(1) +// go func() { +// defer proc.processes.wg.Done() +// +// ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) +// defer cancel() +// +// // Put data that should be the result of the action done in the inner +// // go routine on the outCh. +// outCh := make(chan []byte) +// // Put errors from the inner go routine on the errCh. +// errCh := make(chan error) +// +// proc.processes.wg.Add(1) +// go func() { +// defer proc.processes.wg.Done() +// +// // Do some work here.... +// +// }() +// +// // Wait for messages received from the inner go routine. +// select { +// case <-ctx.Done(): +// fmt.Printf(" ** DEBUG: got ctx.Done\n") +// +// er := fmt.Errorf("error: methodREQ...: got <-ctx.Done(): %v", message.MethodArgs) +// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) +// return +// +// case er := <-errCh: +// sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) +// return +// +// case out := <-outCh: +// replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out) +// newReplyMessage(proc, message, []byte(replyData)) +// return +// } +// +// }() +// +// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) +// return ackMsg, nil +// }