mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
cbor marshaling data in initial copy message
This commit is contained in:
parent
f1e1577a03
commit
cd3618c5ea
1 changed files with 55 additions and 2 deletions
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
|
@ -85,7 +87,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
// Since the subProc spawned will outlive this method here we do not
|
||||
// want to cancel this method. We care about the methodTimeout, but
|
||||
// we ignore the CancelFunc.
|
||||
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||
|
||||
// Create a subject for one copy request
|
||||
uid := uuid.New()
|
||||
|
@ -96,14 +98,51 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc)
|
||||
|
||||
// The process will be killed when the context expires.
|
||||
go copySrcSubProc.spawnWorker()
|
||||
|
||||
// Send a message over the the node where the destination file will be written,
|
||||
// to also start up a sub process on the destination node.
|
||||
dstDir := filepath.Dir(DstFilePath)
|
||||
dstFile := filepath.Base(DstFilePath)
|
||||
|
||||
// Marshal the data payload to send the the dst.
|
||||
cia := copyInitialData{
|
||||
UUID: uid.String(),
|
||||
}
|
||||
|
||||
cb, err := cbor.Marshal(cia)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
cancel()
|
||||
}
|
||||
|
||||
msg := message
|
||||
msg.ToNode = Node(DstNode)
|
||||
//msg.Method = REQToFile
|
||||
msg.Method = REQCopyDst
|
||||
// NB: We should send the uuid to the src.
|
||||
// TODO: Use a struct for the data values, and marshal with cbor.
|
||||
msg.Data = cb
|
||||
msg.Directory = dstDir
|
||||
msg.FileName = dstFile
|
||||
|
||||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopySrc failed to cbor Marshal data: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
cancel()
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
replyData := fmt.Sprintf("info: succesfully initiated copy process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
|
||||
//
|
||||
|
||||
newReplyMessage(proc, message, []byte(replyData))
|
||||
|
||||
|
@ -113,6 +152,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
return ackMsg, nil
|
||||
}
|
||||
|
||||
type copyInitialData struct {
|
||||
UUID string
|
||||
}
|
||||
|
||||
func copySrcProcFunc(proc process) func(context.Context, chan Message) error {
|
||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
|
||||
|
@ -150,6 +193,16 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
|
|||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
var cia copyInitialData
|
||||
err := cbor.Unmarshal(message.Data, &cia)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("\n * WE RECEIVED UID: %+v\n\n", cia)
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
|
Loading…
Reference in a new issue