From f78647196d05019743abb1a7ff0366a6ac9c1413 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 9 Jun 2022 11:40:53 +0200 Subject: [PATCH] added initial procFunc for methodREQCopyDst --- requests_copy.go | 53 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index d74178a..eb0207b 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -113,7 +113,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Marshal the data payload to send the the dst. cia := copyInitialData{ - UUID: uid.String(), + UUID: uid.String(), + DstDir: dstDir, + DstFile: dstFile, } cb, err := cbor.Marshal(cia) @@ -127,11 +129,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ msg.ToNode = Node(DstNode) //msg.Method = REQToFile msg.Method = REQCopyDst - // NB: We should send the uuid to the src. - // TODO: Use a struct for the data values, and marshal with cbor. msg.Data = cb - msg.Directory = dstDir - msg.FileName = dstFile + // msg.Directory = dstDir + // msg.FileName = dstFile sam, err := newSubjectAndMessage(msg) if err != nil { @@ -153,7 +153,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ } type copyInitialData struct { - UUID string + UUID string + DstDir string + DstFile string } func copySrcProcFunc(proc process) func(context.Context, chan Message) error { @@ -188,6 +190,7 @@ func (m methodREQCopyDst) getKind() Event { // successfully, where REQToFile do not. // This method will truncate and overwrite any existing files. func (m methodREQCopyDst) handler(proc process, message Message, node string) ([]byte, error) { + var subProcessName string proc.processes.wg.Add(1) go func() { @@ -201,10 +204,46 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return } - fmt.Printf("\n * WE RECEIVED UID: %+v\n\n", cia) + fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE: %+v\n\n", cia) + + // Get a context with the timeout specified in message.MethodTimeout. + // Since the subProc spawned will outlive this method here we do not + // want to cancel this method. We care about the methodTimeout, but + // we ignore the CancelFunc. + ctx, _ := getContextForMethodTimeout(proc.ctx, message) + + // Create a subject for one copy request + subProcessName = fmt.Sprintf("copyDst_%v", cia.UUID) + + m := Method(subProcessName) + sub := newSubjectNoVerifyHandler(m, node) + + // Create a new sub process that will do the actual file copying. + copyDstSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) + + // Give the sub process a procFunc so we do the actual copying within a procFunc, + // and not directly within the handler. + copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc) + + // The process will be killed when the context expires. + go copyDstSubProc.spawnWorker() }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +func copyDstProcFunc(proc process) func(context.Context, chan Message) error { + pf := func(ctx context.Context, procFuncCh chan Message) error { + + select { + case <-ctx.Done(): + log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName) + } + + return nil + } + + return pf +}