1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

ending both src and dst sub procs when done copy

This commit is contained in:
postmannen 2022-06-16 06:42:34 +02:00
parent fab3fa38dd
commit 1bdee3872b

View file

@ -169,7 +169,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia) copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel)
// assign a handler to the sub process // assign a handler to the sub process
copySrcSubProc.handler = copySrcSubHandler(cia) copySrcSubProc.handler = copySrcSubHandler(cia)
@ -249,7 +249,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
// Since the subProc spawned will outlive this method here we do not // Since the subProc spawned will outlive this method here we do not
// want to cancel this method. We care about the methodTimeout, but // want to cancel this method. We care about the methodTimeout, but
// we ignore the CancelFunc. // we ignore the CancelFunc.
ctx, _ := getContextForMethodTimeout(proc.ctx, message) ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
// Create a subject for one copy request // Create a subject for one copy request
sub := newSubjectNoVerifyHandler(cia.DstMethod, node) sub := newSubjectNoVerifyHandler(cia.DstMethod, node)
@ -259,7 +259,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message) copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel)
// assign a handler to the sub process // assign a handler to the sub process
copyDstSubProc.handler = copyDstSubHandler(cia) copyDstSubProc.handler = copyDstSubHandler(cia)
@ -317,8 +317,9 @@ type copyStatus int
const ( const (
copyReady copyStatus = 1 copyReady copyStatus = 1
copyData copyStatus = 2 copyData copyStatus = 2
copyDone copyStatus = 3 copySrcDone copyStatus = 3
copyResendLast copyStatus = 4 copyResendLast copyStatus = 4
copyDstDone copyStatus = 5
) )
// copySubData is the structure being used between the src and dst while copying data. // copySubData is the structure being used between the src and dst while copying data.
@ -329,7 +330,7 @@ type copySubData struct {
Hash [32]byte Hash [32]byte
} }
func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
var chunkNumber = 0 var chunkNumber = 0
@ -374,7 +375,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err)
} }
if err == io.EOF { if err == io.EOF {
status = copyDone status = copySrcDone
} }
lastReadChunk = b[:n] lastReadChunk = b[:n]
@ -476,6 +477,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
resendRetries++ resendRetries++
case copyDstDone:
cancel()
default: default:
// TODO: Any error logic here ? // TODO: Any error logic here ?
log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus) log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus)
@ -489,7 +493,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
return pf return pf
} }
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia)
@ -623,7 +627,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
case copyDone: case copySrcDone:
func() { func() {
// Open the main file that chunks files will be written into. // Open the main file that chunks files will be written into.
@ -689,6 +693,38 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath) log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath)
// Signal back to src that we are done, so it can cancel the process.
{
csa := copySubData{
CopyStatus: copyDstDone,
}
csaSerialized, err := cbor.Marshal(csa)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}
cancel()
}() }()
} }
} }