From c7eec5a196cc5509c5912af3822d43d9b359e5ac Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 14 Jun 2022 14:32:35 +0200 Subject: [PATCH] reading from file --- requests_copy.go | 74 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index ac30896..811a3cd 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "os" "path/filepath" "github.com/fxamacker/cbor/v2" @@ -98,10 +99,11 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ m := Method(subProcessName) cia := copyInitialData{ - UUID: uid.String(), - SrcMethod: m, - DstDir: dstDir, - DstFile: dstFile, + UUID: uid.String(), + SrcMethod: m, + SrcFilePath: SrcFilePath, + DstDir: dstDir, + DstFile: dstFile, } sub := newSubjectNoVerifyHandler(m, node) @@ -158,13 +160,14 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ } type copyInitialData struct { - UUID string - SrcMethod Method - SrcNode Node - DstMethod Method - DstNode Node - DstDir string - DstFile string + UUID string + SrcMethod Method + SrcNode Node + DstMethod Method + DstNode Node + SrcFilePath string + DstDir string + DstFile string } // ---- @@ -259,6 +262,8 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by select { case <-proc.ctx.Done(): log.Printf(" * copyDstHandler ended: %v\n", proc.processName) + case proc.procFuncCh <- message: + log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName) } return nil, nil @@ -270,6 +275,18 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { + const readChuncSize = 2 + + // Initiate the file copier. + fh, err := os.Open(cia.SrcFilePath) + if err != nil { + // errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) + log.Fatalf("error: copySrcSubProcFunc: failed to open file: %v\n", err) + return nil + } + defer fh.Close() + + // Do action based on copyStatus received. for { select { case <-ctx.Done(): @@ -287,9 +304,18 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, switch csa.CopyStatus { case copyReady: log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus) + b := make([]byte, readChuncSize) + _, err := fh.Read(b) + if err != nil { + log.Fatalf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) + } + + // Create message and send data to dst + fmt.Printf("**** DATA READ: %v\n", b) + default: // TODO: Any error logic here ? - log.Fatalf("error: copySrcSubHandler: not valid copyStatus, exiting: %v\n", csa.CopyStatus) + log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus) } } } @@ -304,6 +330,7 @@ type copyStatus int const ( copyReady copyStatus = 1 + copyData copyStatus = 2 ) // copySubData is the structure being used between the src and dst while copying data. @@ -329,6 +356,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: message.FromNode, + FromNode: message.ToNode, Method: cia.SrcMethod, ReplyMethod: REQNone, Data: csaSerialized, @@ -341,12 +369,26 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func proc.toRingbufferCh <- []subjectAndMessage{sam} - select { - case <-ctx.Done(): - log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName) + for { + select { + case <-ctx.Done(): + log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName) + return nil + case message := <-procFuncCh: + var csa copySubData + err := cbor.Unmarshal(message.Data, &csa) + if err != nil { + log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) + } + + switch csa.CopyStatus { + case copyData: + // Write the data chunk to disk ? + fmt.Printf("\n * Received data: %s\n\n", csa.CopyData) + } + } } - return nil } return pf