diff --git a/requests_copy.go b/requests_copy.go index 102466c..e0728d6 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -370,9 +370,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel fh, err := os.Open(cia.SrcFilePath) if err != nil { - // errCh <- fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v, %v", SrcFilePath, err) - log.Fatalf("error: copySrcSubProcFunc: failed to open file: %v\n", err) - return nil + er := fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v", err) + proc.errorKernel.errSend(proc, Message{}, er) + return er } defer fh.Close() @@ -389,7 +389,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel var csa copySubData err := cbor.Unmarshal(message.Data, &csa) if err != nil { - log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } switch csa.CopyStatus { @@ -401,7 +403,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel b := make([]byte, cia.SplitChunkSize) n, err := fh.Read(b) if err != nil && err != io.EOF { - log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) + er := fmt.Errorf("error: copySrcSubHandler: failed to read chunk from file: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } if err == io.EOF { status = copySrcDone @@ -426,7 +430,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel csaSerialized, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } // We want to send a message back to src that we are ready to start. @@ -444,7 +450,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -483,7 +491,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel csaSerialized, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } // We want to send a message back to src that we are ready to start. @@ -501,7 +511,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -513,7 +525,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel default: // TODO: Any error logic here ? - log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus) + er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus) + proc.errorKernel.errSend(proc, message, er) + return er } } } @@ -535,7 +549,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSerialized, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } // We want to send a message back to src that we are ready to start. @@ -554,7 +570,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -574,7 +592,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc var csa copySubData err := cbor.Unmarshal(message.Data, &csa) if err != nil { - log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } // Check if the hash matches. If it fails we set the status so we can @@ -593,20 +613,29 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // Write the data chunk to disk ? // fmt.Printf("\n * Received data: %s\n\n", csa.CopyData) - func() { + err := func() error { filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID) fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err) + return er } defer fh.Close() _, err = fh.Write(csa.CopyData) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err) + return er } + + return nil }() + if err != nil { + proc.errorKernel.errSend(proc, message, err) + return err + } + // Prepare and send a ready message to src for the next chunk. csa := copySubData{ CopyStatus: copyReady, @@ -614,7 +643,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSer, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) } // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) @@ -631,7 +661,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -642,7 +674,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // resend of the last message. csaSer, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } msg := Message{ @@ -656,13 +690,15 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} case copySrcDone: - func() { + err := func() error { // Open the main file that chunks files will be written into. filePath := filepath.Join(cia.DstDir, cia.DstFile) @@ -673,7 +709,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } defer mainfh.Close() @@ -721,9 +759,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc fmt.Printf(" * sorted slice: %v\n", files) if err != nil { - log.Printf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v\n", err) - - return + er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } err = func() error { @@ -761,7 +799,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc os.Remove(backupOriginalFileName) err = os.RemoveAll(tmpFolder) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: remove temp dir failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: remove temp dir failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath) @@ -774,7 +814,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSerialized, err := cbor.Marshal(csa) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } // We want to send a message back to src that we are ready to start. @@ -792,14 +834,22 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) + proc.errorKernel.errSend(proc, message, er) + return er } proc.toRingbufferCh <- []subjectAndMessage{sam} } cancel() + + return nil }() + + if err != nil { + return err + } } } }