1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added copyInitialData to copy procFunc

This commit is contained in:
postmannen 2022-06-10 10:38:11 +02:00
parent 95ce5fed3b
commit 237b946cf0

View file

@ -93,6 +93,15 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
uid := uuid.New() uid := uuid.New()
subProcessName = fmt.Sprintf("copySrc.%v", uid.String()) subProcessName = fmt.Sprintf("copySrc.%v", uid.String())
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
cia := copyInitialData{
UUID: uid.String(),
DstDir: dstDir,
DstFile: dstFile,
}
m := Method(subProcessName) m := Method(subProcessName)
sub := newSubjectNoVerifyHandler(m, node) sub := newSubjectNoVerifyHandler(m, node)
@ -101,23 +110,15 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc) copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc, cia)
// The process will be killed when the context expires. // The process will be killed when the context expires.
go copySrcSubProc.spawnWorker() go copySrcSubProc.spawnWorker()
// Send a message over the the node where the destination file will be written, // Send a message over the the node where the destination file will be written,
// to also start up a sub process on the destination node. // to also start up a sub process on the destination node.
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
// Marshal the data payload to send the the dst. // Marshal the data payload to send the the dst.
cia := copyInitialData{
UUID: uid.String(),
DstDir: dstDir,
DstFile: dstFile,
}
cb, err := cbor.Marshal(cia) cb, err := cbor.Marshal(cia)
if err != nil { if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
@ -190,8 +191,6 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
return return
} }
fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE: %+v\n\n", cia)
// Get a context with the timeout specified in message.MethodTimeout. // Get a context with the timeout specified in message.MethodTimeout.
// Since the subProc spawned will outlive this method here we do not // Since the subProc spawned will outlive this method here we do not
// want to cancel this method. We care about the methodTimeout, but // want to cancel this method. We care about the methodTimeout, but
@ -209,7 +208,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc) copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc, cia)
// The process will be killed when the context expires. // The process will be killed when the context expires.
go copyDstSubProc.spawnWorker() go copyDstSubProc.spawnWorker()
@ -225,7 +224,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
return ackMsg, nil return ackMsg, nil
} }
func copySrcProcFunc(proc process) func(context.Context, chan Message) error { func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
select { select {
@ -239,8 +238,9 @@ func copySrcProcFunc(proc process) func(context.Context, chan Message) error {
return pf return pf
} }
func copyDstProcFunc(proc process) func(context.Context, chan Message) error { func copyDstProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia)
select { select {
case <-ctx.Done(): case <-ctx.Done():