diff --git a/requests_copy.go b/requests_copy.go index c0ca3d8..0892a41 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -93,6 +93,15 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ uid := uuid.New() subProcessName = fmt.Sprintf("copySrc.%v", uid.String()) + dstDir := filepath.Dir(DstFilePath) + dstFile := filepath.Base(DstFilePath) + + cia := copyInitialData{ + UUID: uid.String(), + DstDir: dstDir, + DstFile: dstFile, + } + m := Method(subProcessName) sub := newSubjectNoVerifyHandler(m, node) @@ -101,23 +110,15 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc) + copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc, cia) // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() // Send a message over the the node where the destination file will be written, // to also start up a sub process on the destination node. - dstDir := filepath.Dir(DstFilePath) - dstFile := filepath.Base(DstFilePath) // Marshal the data payload to send the the dst. - cia := copyInitialData{ - UUID: uid.String(), - DstDir: dstDir, - DstFile: dstFile, - } - cb, err := cbor.Marshal(cia) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) @@ -190,8 +191,6 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return } - fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE: %+v\n\n", cia) - // Get a context with the timeout specified in message.MethodTimeout. // Since the subProc spawned will outlive this method here we do not // want to cancel this method. We care about the methodTimeout, but @@ -209,7 +208,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc) + copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc, cia) // The process will be killed when the context expires. go copyDstSubProc.spawnWorker() @@ -225,7 +224,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return ackMsg, nil } -func copySrcProcFunc(proc process) func(context.Context, chan Message) error { +func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { select { @@ -239,8 +238,9 @@ func copySrcProcFunc(proc process) func(context.Context, chan Message) error { return pf } -func copyDstProcFunc(proc process) func(context.Context, chan Message) error { +func copyDstProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { + fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia) select { case <-ctx.Done():