diff --git a/requests_copy.go b/requests_copy.go index 50b177d..f20f017 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "strconv" "github.com/fxamacker/cbor/v2" "github.com/google/uuid" @@ -280,10 +281,26 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by return h } +type copyStatus int + +const ( + copyReady copyStatus = 1 + copyData copyStatus = 2 + copyDone copyStatus = 3 +) + +// copySubData is the structure being used between the src and dst while copying data. +type copySubData struct { + CopyStatus copyStatus + CopyData []byte + ChunkNumber int +} + func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { const readChuncSize = 2 + var chunkNumber = 0 // Initiate the file copier. fh, err := os.Open(cia.SrcFilePath) @@ -325,12 +342,15 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, status = copyDone } + chunkNumber++ + // Create message and send data to dst fmt.Printf("**** DATA READ: %v\n", b) csa := copySubData{ - CopyStatus: status, - CopyData: b, + CopyStatus: status, + CopyData: b, + ChunkNumber: chunkNumber, } csaSerialized, err := cbor.Marshal(csa) @@ -370,21 +390,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, return pf } -type copyStatus int - -const ( - copyReady copyStatus = 1 - copyData copyStatus = 2 - copyDone copyStatus = 3 -) - -// copySubData is the structure being used between the src and dst while copying data. -type copySubData struct { - CopyStatus copyStatus - CopyData []byte -} - func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { + const readChuncSize = 2 + pf := func(ctx context.Context, procFuncCh chan Message) error { fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) @@ -398,23 +406,29 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func } // We want to send a message back to src that we are ready to start. - fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) - msg := Message{ - ToNode: cia.SrcNode, - FromNode: cia.DstNode, - Method: cia.SrcMethod, - ReplyMethod: REQNone, - Data: csaSerialized, + { + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) + msg := Message{ + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + } + + fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) + + sam, err := newSubjectAndMessage(msg) + if err != nil { + log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} } - fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) - - sam, err := newSubjectAndMessage(msg) - if err != nil { - log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) - } - - proc.toRingbufferCh <- []subjectAndMessage{sam} + // Open a tmp folder for where to write the received chunks + tmpFolder := filepath.Join(proc.configuration.SocketFolder, cia.DstFile+"-"+cia.UUID) + os.Mkdir(tmpFolder, 0700) for { fmt.Printf("\n * DEBUG: copyDstSubProcFunc: cia contains: %+v\n\n", cia) @@ -434,7 +448,21 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func // Write the data chunk to disk ? fmt.Printf("\n * Received data: %s\n\n", csa.CopyData) - // Send a ready message for the next chunk. + func() { + filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID) + fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + } + defer fh.Close() + + _, err = fh.Write(csa.CopyData) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + } + }() + + // Prepare and send a ready message to src for the next chunk. csa := copySubData{ CopyStatus: copyReady, } @@ -444,7 +472,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) } - // We want to send a message back to src that we are ready to start. fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: cia.SrcNode, @@ -464,7 +491,58 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func proc.toRingbufferCh <- []subjectAndMessage{sam} case copyDone: - fmt.Printf("\n\n\n ************** DEBUG: Done \n\n\n") + fmt.Printf("\n\n\n ************** DEBUG: copyDone \n\n\n") + + // var mainFileData []byte + + func() { + // Open the main file that chunks files will be written into. + filePath := filepath.Join(cia.DstDir, cia.DstFile) + mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err) + } + defer mainfh.Close() + + // Walk the tmp transfer directory and combine all the chunks into one file. + err = filepath.Walk(tmpFolder, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + fmt.Println(path, info.Size()) + + fh, err := os.Open(path) + if err != nil { + return err + } + defer fh.Close() + + b := make([]byte, readChuncSize) + _, err = fh.Read(b) + if err != nil { + return err + } + + fmt.Printf(" * DEBUG: read: %v\n", b) + + _, err = mainfh.Write(b) + if err != nil { + return err + } + + // mainFileData = append(mainFileData, b...) + } + + return nil + }) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: filewalk failed: %v\n", err) + } + + // fmt.Printf("main file contains: %v\n", mainFileData) + }() } } }