From 7c991ee3d520334e4da96689c9bfd9149bff1851 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 17 Nov 2021 13:02:48 +0100 Subject: [PATCH] Transer of files initially works --- configuration_flags.go | 10 ++ processes.go | 13 +++ requests.go | 212 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) diff --git a/configuration_flags.go b/configuration_flags.go index 64c1e8e..bc39c2a 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -70,6 +70,8 @@ type Configuration struct { StartSubREQToFileAppend bool // Subscriber for writing to file StartSubREQToFile bool + // Subscriber for reading files to copy + StartSubREQCopyFileFrom bool // Subscriber for Echo Request StartSubREQPing bool // Subscriber for Echo Reply @@ -117,6 +119,7 @@ type ConfigurationFromFile struct { StartSubREQHello *bool StartSubREQToFileAppend *bool StartSubREQToFile *bool + StartSubREQCopyFileFrom *bool StartSubREQPing *bool StartSubREQPong *bool StartSubREQCliCommand *bool @@ -160,6 +163,7 @@ func newConfigurationDefaults() Configuration { StartSubREQHello: true, StartSubREQToFileAppend: true, StartSubREQToFile: true, + StartSubREQCopyFileFrom: true, StartSubREQPing: true, StartSubREQPong: true, StartSubREQCliCommand: true, @@ -298,6 +302,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQToFile = *cf.StartSubREQToFile } + if cf.StartSubREQCopyFileFrom == nil { + conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom + } else { + conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom + } if cf.StartSubREQPing == nil { conf.StartSubREQPing = cd.StartSubREQPing } else { @@ -396,6 +405,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false") + flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false") flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false") flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false") flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false") diff --git a/processes.go b/processes.go index 1a61e97..5ad0950 100644 --- a/processes.go +++ b/processes.go @@ -107,6 +107,11 @@ func (p *processes) Start(proc process) { proc.startup.subREQToFile(proc) } + // Start a subscriber for reading file to copy + if proc.configuration.StartSubREQCopyFileFrom { + proc.startup.subREQCopyFileFrom(proc) + } + // Start a subscriber for Hello messages if proc.configuration.StartSubREQHello { proc.startup.subREQHello(proc) @@ -320,6 +325,14 @@ func (s startup) subREQToFile(p process) { go proc.spawnWorker(p.processes, p.natsConn) } +func (s startup) subREQCopyFileFrom(p process) { + log.Printf("Starting copy file from subscriber: %#v\n", p.node) + sub := newSubject(REQCopyFileFrom, string(p.node)) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + + go proc.spawnWorker(p.processes, p.natsConn) +} + func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) diff --git a/requests.go b/requests.go index d95549d..463bad1 100644 --- a/requests.go +++ b/requests.go @@ -98,6 +98,10 @@ const ( // The data field is a slice of strings where the values of the // slice will be written to the file. REQToFile Method = "REQToFile" + // Read the source file to be copied to some node. + REQCopyFileFrom Method = "REQCopyFileFrom" + // Write the destination copied to some node. + REQCopyFileTo Method = "REQCopyFileTo" // Send Hello I'm here message. REQHello Method = "REQHello" // Error log methods to centralError node. @@ -159,6 +163,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQToFile: methodREQToFile{ commandOrEvent: EventACK, }, + REQCopyFileFrom: methodREQCopyFileFrom{ + commandOrEvent: EventACK, + }, + REQCopyFileTo: methodREQCopyFileTo{ + commandOrEvent: EventACK, + }, REQHello: methodREQHello{ commandOrEvent: EventNACK, }, @@ -642,6 +652,208 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] return ackMsg, nil } +// ---- + +type methodREQCopyFileFrom struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQCopyFileFrom) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// Handle writing to a file. Will truncate any existing data if the file did already +// exist. +func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) { + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + switch { + case len(message.MethodArgs) < 3: + er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return + } + + SrcFilePath := message.MethodArgs[0] + DstNode := message.MethodArgs[1] + DstFilePath := message.MethodArgs[2] + + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*2) + + outCh := make(chan []byte) + errCh := make(chan error) + + // Read the file, and put the result on the out channel to be sent when done reading. + proc.processes.wg.Add(1) + go func() { + fmt.Printf(" * DEBUG: beginning of read file go routine\n") + defer proc.processes.wg.Done() + + const natsMaxMsgSize = 1000000 + + fi, err := os.Stat(SrcFilePath) + + // Check if the src file exists, and that it is not bigger than + // the default limit used by nats which is 1MB. + switch { + case os.IsNotExist(err): + errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath) + return + case fi.Size() > natsMaxMsgSize: + errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize) + return + } + + fh, err := os.Open(SrcFilePath) + if err != nil { + errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) + return + } + + fmt.Printf(" * DEBUG: before io.ReadAll\n") + + b, err := io.ReadAll(fh) + if err != nil { + errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err) + return + } + + fmt.Printf(" * DEBUG: after io.ReadAll: b contains: %v\n", string(b)) + + select { + case outCh <- b: + fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n") + case <-ctx.Done(): + fmt.Printf(" * DEBUG: after io.ReadAll: ctx.Done\n") + return + } + }() + + // Wait here until we got the data to send, then create a new message + // and send it. + // Also checking the ctx.Done which calls Cancel will allow us to + // kill all started go routines started by this message. + select { + case <-ctx.Done(): + fmt.Printf(" ** DEBUG: got ctx.Done\n") + cancel() + er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + + return + case er := <-errCh: + fmt.Printf(" ** DEBUG: received on errCh: <-errCh\n") + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + cancel() + + return + case out := <-outCh: + fmt.Printf(" ** DEBUG: got data on out channel: case out:=<-outCh\n") + cancel() + + dstDir := filepath.Dir(DstFilePath) + dstFile := filepath.Base(DstFilePath) + + // Prepare for sending a new message with the output + // TODO: Maybe changing the type of Message.Data to []byte ? + + // Copy the original message to get the defaults for timeouts etc, + // and set new values for fields to change. + msg := message + msg.ToNode = Node(DstNode) + msg.Method = REQToFile + // TODO: msg.Method = REQCopyFileTo + msg.Data = []string{string(out)} + msg.Directory = dstDir + msg.FileName = dstFile + + // Create SAM and put the message on the send new message channel. + + sam, err := newSubjectAndMessage(msg) + if err != nil { + er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + } + + fmt.Printf(" * DEBUG: sending SAM: %#v\n", sam) + + proc.toRingbufferCh <- []subjectAndMessage{sam} + + // TODO: Should we also send a reply message with the result back + // to where the message originated ? + + } + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ---- + +type methodREQCopyFileTo struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQCopyFileTo) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// Handle writing to a file. Will truncate any existing data if the file did already +// exist. +// Same as the REQToFile, but this requst type don't use the default data folder path +// for where to store files or add information about node names. +func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) { + + // If it was a request type message we want to check what the initial messages + // method, so we can use that in creating the file name to store the data. + fileName, folderTree := message.FileName, message.Directory + + // Check if folder structure exist, if not create it. + if _, err := os.Stat(folderTree); os.IsNotExist(err) { + err := os.MkdirAll(folderTree, 0700) + if err != nil { + er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + + return nil, er + } + + log.Printf("info: Creating subscribers data folder at %v\n", folderTree) + } + + // Open file and write data. + file := filepath.Join(folderTree, fileName) + f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) + if err != nil { + er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return nil, err + } + defer f.Close() + + for _, d := range message.Data { + _, err := f.Write([]byte(d)) + f.Sync() + if err != nil { + er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + } + } + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + // ---- type methodREQHello struct { commandOrEvent CommandOrEvent