diff --git a/requests_copy.go b/requests_copy.go index 2c37f1a..1b06510 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -216,7 +216,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copyDstSubProc.procFunc = copyDstProcSubFunc(copyDstSubProc, cia, message) + copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message) // assign a handler to the sub process copyDstSubProc.handler = copyDstSubHandler(cia) @@ -237,14 +237,27 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { - // HERE! - // We should receive a ready message generated by the procFunc of Dst. - fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE IN copySrcSubHandler------------------\n\n") + // We should receive a ready message generated by the procFunc of Dst. + allDoneCh := make(chan struct{}) + + go func() { + var csa copySubData + err := cbor.Unmarshal(message.Data, &csa) + if err != nil { + log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) + } + + fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE IN copySrcSubHandler: %v------------------\n\n", csa.CopyStatus) + + allDoneCh <- struct{}{} + }() select { case <-proc.ctx.Done(): log.Printf(" * copySrcHandler ended: %v\n", proc.processName) + case <-allDoneCh: + log.Printf(" * copySrcHandler ended, all done copying file: %v\n", proc.processName) } return nil, nil @@ -284,32 +297,37 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, type copyStatus int const ( - copyReady copyStatus = iota + copyReady copyStatus = 1 ) -func copyDstProcSubFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { +// copySubData is the structure being used between the src and dst while copying data. +type copySubData struct { + CopyStatus copyStatus + CopyData []byte +} + +func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { - fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia) + fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) + + csa := copySubData{ + CopyStatus: copyReady, + } + + csaSerialized, err := cbor.Marshal(csa) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + } // We want to send a message back to src that we are ready to start. - fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending to:%v\n ", message.FromNode) + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ ToNode: message.FromNode, Method: cia.SrcMethod, ReplyMethod: REQNone, + Data: csaSerialized, } - // sub := Subject{ - // ToNode: string(message.FromNode), - // Event: EventACK, - // Method: cia.SrcMethod, - // } - // - // sam := subjectAndMessage{ - // Subject: sub, - // Message: msg, - // } - sam, err := newSubjectAndMessage(msg) if err != nil { log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)