diff --git a/requests_copy.go b/requests_copy.go index da647a0..0aff1ae 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/fs" "log" "os" "path/filepath" @@ -24,6 +25,7 @@ type copyInitialData struct { DstDir string DstFile string SplitChunkSize int + FileMode fs.FileMode } type methodREQCopySrc struct { @@ -137,6 +139,16 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ dstSubProcessName := fmt.Sprintf("REQSUBCopyDst.%v", uid.String()) dstM := Method(dstSubProcessName) + // Get the file permissions + fileInfo, err := os.Stat(SrcFilePath) + if err != nil { + // errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) + log.Printf("error: copySrcSubProcFunc: failed to stat file: %v\n", err) + return + } + + fileMode := fileInfo.Mode() + cia := copyInitialData{ UUID: uid.String(), SrcNode: proc.node, @@ -147,6 +159,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ DstDir: dstDir, DstFile: dstFile, SplitChunkSize: splitChunkSize, + FileMode: fileMode, } sub := newSubjectNoVerifyHandler(m, node) @@ -302,9 +315,10 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by type copyStatus int const ( - copyReady copyStatus = 1 - copyData copyStatus = 2 - copyDone copyStatus = 3 + copyReady copyStatus = 1 + copyData copyStatus = 2 + copyDone copyStatus = 3 + copyResendLast copyStatus = 4 ) // copySubData is the structure being used between the src and dst while copying data. @@ -319,8 +333,11 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, pf := func(ctx context.Context, procFuncCh chan Message) error { var chunkNumber = 0 + var lastReadChunk []byte + var resendRetries int // Initiate the file copier. + fh, err := os.Open(cia.SrcFilePath) if err != nil { // errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) @@ -360,6 +377,8 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, status = copyDone } + lastReadChunk = b + // Create a hash of the bytes hash := sha256.Sum256(b) @@ -399,6 +418,64 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, proc.toRingbufferCh <- []subjectAndMessage{sam} + resendRetries = 0 + + // Testing with contect canceling here. + // proc.ctxCancel() + + case copyResendLast: + if resendRetries > message.Retries { + er := fmt.Errorf("error: %v: failed to resend the chunk for the %v time, giving up", cia.DstMethod, resendRetries) + proc.errorKernel.errSend(proc, message, er) + // NB: Should we call cancel here, or wait for the timeout ? + proc.ctxCancel() + } + + // HERE! + b := lastReadChunk + status := copyData + + // Create a hash of the bytes + hash := sha256.Sum256(b) + + chunkNumber++ + + // Create message and send data to dst + fmt.Printf("**** DATA READ: %v\n", b) + + csa := copySubData{ + CopyStatus: status, + CopyData: b, + ChunkNumber: chunkNumber, + Hash: hash, + } + + csaSerialized, err := cbor.Marshal(csa) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + } + + // We want to send a message back to src that we are ready to start. + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) + msg := Message{ + ToNode: cia.DstNode, + FromNode: cia.SrcNode, + Method: cia.DstMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + } + + fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) + + sam, err := newSubjectAndMessage(msg) + if err != nil { + log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + + resendRetries++ + default: // TODO: Any error logic here ? log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus) @@ -464,10 +541,13 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) } - // Check if the hash matches. + // Check if the hash matches. If it fails we set the status so we can + // trigger the resend of the last message in the switch below. hash := sha256.Sum256(csa.CopyData) if hash != csa.Hash { - log.Fatalf("error: hash of received message is not correct\n") + log.Printf("error: copyDstSubProcFunc: hash of received message is not correct for: %v\n", cia.DstMethod) + + csa.CopyStatus = copyResendLast } fmt.Printf(" * DEBUG: Hash was verified OK\n") @@ -496,7 +576,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func CopyStatus: copyReady, } - csaSerialized, err := cbor.Marshal(csa) + csaSer, err := cbor.Marshal(csa) if err != nil { log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) } @@ -507,7 +587,33 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func FromNode: cia.DstNode, Method: cia.SrcMethod, ReplyMethod: REQNone, - Data: csaSerialized, + Data: csaSer, + } + + fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) + + sam, err := newSubjectAndMessage(msg) + if err != nil { + log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + + case copyResendLast: + // The csa already contains copyStatus copyResendLast when reached here, + // so we can just serialize csa, and send a message back to sourcde for + // resend of the last message. + csaSer, err := cbor.Marshal(csa) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + } + + msg := Message{ + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSer, } fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) @@ -525,9 +631,15 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func // var mainFileData []byte func() { + // Open the main file that chunks files will be written into. filePath := filepath.Join(cia.DstDir, cia.DstFile) - mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) + + // Rename the file so we got a backup. + backupOriginalFileName := filePath + ".bck" + os.Rename(filePath, backupOriginalFileName) + + mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode) if err != nil { log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) } @@ -562,15 +674,25 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func return err } - // mainFileData = append(mainFileData, b...) + // TODO: delete tmp files } return nil }) if err != nil { - log.Fatalf("error: copyDstSubProcFunc: filewalk failed: %v\n", err) + log.Printf("error: copyDstSubProcFunc: combining the file chunks back to original file failed: %v\n", err) + + // Delete the file we've been trying to write to. + os.Remove(filePath) + // Rename the old file back to it's original name. + os.Rename(backupOriginalFileName, filePath) + + return } + // All ok, remove the backup file + os.Remove(backupOriginalFileName) + // fmt.Printf("main file contains: %v\n", mainFileData) }() }