1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

added initial procFunc for methodREQCopyDst

This commit is contained in:
postmannen 2022-06-09 11:40:53 +02:00
parent cd3618c5ea
commit f78647196d

View file

@ -114,6 +114,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Marshal the data payload to send the the dst. // Marshal the data payload to send the the dst.
cia := copyInitialData{ cia := copyInitialData{
UUID: uid.String(), UUID: uid.String(),
DstDir: dstDir,
DstFile: dstFile,
} }
cb, err := cbor.Marshal(cia) cb, err := cbor.Marshal(cia)
@ -127,11 +129,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
msg.ToNode = Node(DstNode) msg.ToNode = Node(DstNode)
//msg.Method = REQToFile //msg.Method = REQToFile
msg.Method = REQCopyDst msg.Method = REQCopyDst
// NB: We should send the uuid to the src.
// TODO: Use a struct for the data values, and marshal with cbor.
msg.Data = cb msg.Data = cb
msg.Directory = dstDir // msg.Directory = dstDir
msg.FileName = dstFile // msg.FileName = dstFile
sam, err := newSubjectAndMessage(msg) sam, err := newSubjectAndMessage(msg)
if err != nil { if err != nil {
@ -154,6 +154,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
type copyInitialData struct { type copyInitialData struct {
UUID string UUID string
DstDir string
DstFile string
} }
func copySrcProcFunc(proc process) func(context.Context, chan Message) error { func copySrcProcFunc(proc process) func(context.Context, chan Message) error {
@ -188,6 +190,7 @@ func (m methodREQCopyDst) getKind() Event {
// successfully, where REQToFile do not. // successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files. // This method will truncate and overwrite any existing files.
func (m methodREQCopyDst) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQCopyDst) handler(proc process, message Message, node string) ([]byte, error) {
var subProcessName string
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
go func() { go func() {
@ -201,10 +204,46 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
return return
} }
fmt.Printf("\n * WE RECEIVED UID: %+v\n\n", cia) fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE: %+v\n\n", cia)
// Get a context with the timeout specified in message.MethodTimeout.
// Since the subProc spawned will outlive this method here we do not
// want to cancel this method. We care about the methodTimeout, but
// we ignore the CancelFunc.
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
// Create a subject for one copy request
subProcessName = fmt.Sprintf("copyDst_%v", cia.UUID)
m := Method(subProcessName)
sub := newSubjectNoVerifyHandler(m, node)
// Create a new sub process that will do the actual file copying.
copyDstSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc)
// The process will be killed when the context expires.
go copyDstSubProc.spawnWorker()
}() }()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
} }
func copyDstProcFunc(proc process) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
select {
case <-ctx.Done():
log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName)
}
return nil
}
return pf
}