From 8ec63725657336f2067ab67433942e3ef515b716 Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 21 Jun 2022 07:45:36 +0200 Subject: [PATCH] fixed returns and error msgs for copy handlers --- requests_copy.go | 72 +++++++++++++++++------------------------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index 4fec244..0400e9e 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "io/fs" - "log" "os" "path/filepath" "sort" @@ -160,7 +159,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ fileInfo, err := os.Stat(SrcFilePath) if err != nil { // errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err) - log.Printf("error: copySrcSubProcFunc: failed to stat file: %v\n", err) + er := fmt.Errorf("error: copySrcSubProcFunc: failed to stat file: %v", err) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) return } @@ -310,13 +310,16 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { - // We should receive a ready message generated by the procFunc of Dst. + // We should receive a ready message generated by the procFunc of Dst, + // and any messages received we directly route into the procFunc. select { case <-proc.ctx.Done(): - log.Printf(" * copySrcHandler ended: %v\n", proc.processName) + er := fmt.Errorf(" * copySrcHandler ended: %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) case proc.procFuncCh <- message: - log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName) + er := fmt.Errorf(" * copySrcHandler passing message over to procFunc: %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } return nil, nil @@ -330,9 +333,11 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by select { case <-proc.ctx.Done(): - log.Printf(" * copyDstHandler ended: %v\n", proc.processName) + er := fmt.Errorf(" * copyDstHandler ended: %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) case proc.procFuncCh <- message: - log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName) + er := fmt.Errorf(" * copySrcHandler passing message over to procFunc: %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } return nil, nil @@ -378,10 +383,10 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel // Do action based on copyStatus received. for { - fmt.Printf("\n * DEBUG: copySrcSubProcFunc: cia contains: %+v\n\n", cia) select { case <-ctx.Done(): - log.Printf(" info: canceling copySrcProcFunc : %v\n", proc.processName) + er := fmt.Errorf(" info: canceling copySrcProcFunc : %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) return nil // Pick up the message recived by the copySrcSubHandler. @@ -400,7 +405,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel // We set the default status to copyData. If we get an io.EOF we change it to copyDone later. status := copyData - log.Printf(" * RECEIVED in copySrcSubProcFunc from dst * copyStatus=copyReady: %v\n\n", csa.CopyStatus) b := make([]byte, cia.SplitChunkSize) n, err := fh.Read(b) if err != nil && err != io.EOF { @@ -414,13 +418,12 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel lastReadChunk = b[:n] - // Create a hash of the bytes + // Create a hash of the bytes. hash := sha256.Sum256(b[:n]) chunkNumber++ - // Create message and send data to dst - // fmt.Printf("**** DATA READ: %v\n", b) + // Create message and send data to dst. csa := copySubData{ CopyStatus: status, @@ -437,7 +440,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel } // We want to send a message back to src that we are ready to start. - // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.DstNode, FromNode: cia.SrcNode, @@ -447,8 +449,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel IsSubPublishedMsg: true, } - // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) @@ -488,7 +488,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel chunkNumber++ // Create message and send data to dst - // fmt.Printf("**** DATA READ: %v\n", b) csa := copySubData{ CopyStatus: status, @@ -505,7 +504,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel } // We want to send a message back to src that we are ready to start. - // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.DstNode, FromNode: cia.SrcNode, @@ -515,8 +513,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel IsSubPublishedMsg: true, } - // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) @@ -530,9 +526,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel case copyDstDone: cancel() + return nil default: - // TODO: Any error logic here ? er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus) proc.errorKernel.errSend(proc, message, er) return er @@ -549,7 +545,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { - fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) csa := copySubData{ CopyStatus: copyReady, @@ -564,7 +559,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // We want to send a message back to src that we are ready to start. { - fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.SrcNode, FromNode: cia.DstNode, @@ -574,8 +568,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc IsSubPublishedMsg: true, } - fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) @@ -591,10 +583,10 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc os.Mkdir(tmpFolder, 0700) for { - fmt.Printf("\n * DEBUG: copyDstSubProcFunc: cia contains: %+v\n\n", cia) select { case <-ctx.Done(): - log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName) + er := fmt.Errorf(" * copyDstProcFunc ended: %v", proc.processName) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) return nil case message := <-procFuncCh: var csa copySubData @@ -609,18 +601,14 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // trigger the resend of the last message in the switch below. hash := sha256.Sum256(csa.CopyData) if hash != csa.Hash { - log.Printf("error: copyDstSubProcFunc: hash of received message is not correct for: %v\n", cia.DstMethod) + er := fmt.Errorf("error: copyDstSubProcFunc: hash of received message is not correct for: %v", cia.DstMethod) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) csa.CopyStatus = copyResendLast } - // fmt.Printf(" * DEBUG: Hash was verified OK\n") - switch csa.CopyStatus { case copyData: - // Write the data chunk to disk ? - // fmt.Printf("\n * Received data: %s\n\n", csa.CopyData) - err := func() error { filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID) fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) @@ -653,9 +641,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) proc.errorKernel.errSend(proc, message, er) + return er } - // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.SrcNode, FromNode: cia.DstNode, @@ -665,8 +653,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc IsSubPublishedMsg: true, } - // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) @@ -751,7 +737,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc fi.nr = nr files = append(files, fi) - log.Printf("info: copy: appending path for chunk file=%v into=%v, size=%v\n", path, filePath, info.Size()) } @@ -760,12 +745,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // Sort all the source nodes. sort.SliceStable(files, func(i, j int) bool { - fmt.Printf("files[i].nr=%v < files[j].nr=%v\n", files[i].nr, files[j].nr) return files[i].nr < files[j].nr }) - fmt.Printf(" * sorted slice: %v\n", files) - if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err) proc.errorKernel.errSend(proc, message, er) @@ -788,7 +770,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return err } - log.Printf("info: copy: writing content of split chunk file=%v into=%v\n", fp, filePath) _, err = mainfh.Write(b[:n]) if err != nil { return err @@ -800,7 +781,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc }() if err != nil { - log.Printf("error: copyDstSubProcFunc: remove temp dir failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: write to final destination file failed: %v", err) + proc.errorKernel.errSend(proc, message, er) } // Remove the backup file, and tmp folder. @@ -812,7 +794,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath) + er := fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) // Signal back to src that we are done, so it can cancel the process. { @@ -828,7 +811,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc } // We want to send a message back to src that we are ready to start. - // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.SrcNode, FromNode: cia.DstNode, @@ -838,8 +820,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc IsSubPublishedMsg: true, } - // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)