From cd3618c5eacaee1c866a86c02679f043018b3a07 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 9 Jun 2022 11:25:59 +0200 Subject: [PATCH] cbor marshaling data in initial copy message --- requests_copy.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index e7dbd69..d74178a 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "log" + "path/filepath" + "github.com/fxamacker/cbor/v2" "github.com/google/uuid" ) @@ -85,7 +87,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Since the subProc spawned will outlive this method here we do not // want to cancel this method. We care about the methodTimeout, but // we ignore the CancelFunc. - ctx, _ := getContextForMethodTimeout(proc.ctx, message) + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) // Create a subject for one copy request uid := uuid.New() @@ -96,14 +98,51 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Create a new sub process that will do the actual file copying. copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) + // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc) + // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() + // Send a message over the the node where the destination file will be written, + // to also start up a sub process on the destination node. + dstDir := filepath.Dir(DstFilePath) + dstFile := filepath.Base(DstFilePath) + + // Marshal the data payload to send the the dst. + cia := copyInitialData{ + UUID: uid.String(), + } + + cb, err := cbor.Marshal(cia) + if err != nil { + er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) + proc.errorKernel.errSend(proc, message, er) + cancel() + } + + msg := message + msg.ToNode = Node(DstNode) + //msg.Method = REQToFile + msg.Method = REQCopyDst + // NB: We should send the uuid to the src. + // TODO: Use a struct for the data values, and marshal with cbor. + msg.Data = cb + msg.Directory = dstDir + msg.FileName = dstFile + + sam, err := newSubjectAndMessage(msg) + if err != nil { + er := fmt.Errorf("error: methodREQCopySrc failed to cbor Marshal data: %v, message=%v", err, message) + proc.errorKernel.errSend(proc, message, er) + cancel() + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + replyData := fmt.Sprintf("info: succesfully initiated copy process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName) - // newReplyMessage(proc, message, []byte(replyData)) @@ -113,6 +152,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ return ackMsg, nil } +type copyInitialData struct { + UUID string +} + func copySrcProcFunc(proc process) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { @@ -150,6 +193,16 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ go func() { defer proc.processes.wg.Done() + var cia copyInitialData + err := cbor.Unmarshal(message.Data, &cia) + if err != nil { + er := fmt.Errorf("error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v", err, message) + proc.errorKernel.errSend(proc, message, er) + return + } + + fmt.Printf("\n * WE RECEIVED UID: %+v\n\n", cia) + }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))