diff --git a/requests_copy.go b/requests_copy.go index cc20931..95af0c2 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -3,6 +3,7 @@ package steward import ( "context" "fmt" + "io" "log" "os" "path/filepath" @@ -312,11 +313,17 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, switch csa.CopyStatus { case copyReady: + // We set the default status to copyData. If we get an io.EOF we change it to copyDone later. + status := copyData + log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus) b := make([]byte, readChuncSize) _, err := fh.Read(b) - if err != nil { - log.Fatalf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) + if err != nil && err != io.EOF { + log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) + } + if err == io.EOF { + status = copyDone } // Create message and send data to dst @@ -325,7 +332,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, // TESTING HERE: { csa := copySubData{ - CopyStatus: copyData, + CopyStatus: status, CopyData: b, } @@ -372,6 +379,7 @@ type copyStatus int const ( copyReady copyStatus = 1 copyData copyStatus = 2 + copyDone copyStatus = 3 ) // copySubData is the structure being used between the src and dst while copying data. @@ -429,6 +437,38 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func case copyData: // Write the data chunk to disk ? fmt.Printf("\n * Received data: %s\n\n", csa.CopyData) + + // Send a ready message for the next chunk. + csa := copySubData{ + CopyStatus: copyReady, + } + + csaSerialized, err := cbor.Marshal(csa) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + } + + // We want to send a message back to src that we are ready to start. + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) + msg := Message{ + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + } + + fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) + + sam, err := newSubjectAndMessage(msg) + if err != nil { + log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + + case copyDone: + fmt.Printf("\n\n\n ************** DEBUG: Done \n\n\n") } } }