From 1bdee3872b831e25042da9e8977350d850675cfb Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 16 Jun 2022 06:42:34 +0200 Subject: [PATCH] ending both src and dst sub procs when done copy --- requests_copy.go | 52 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index 7cea244..e2c0ac3 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -169,7 +169,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia) + copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel) // assign a handler to the sub process copySrcSubProc.handler = copySrcSubHandler(cia) @@ -249,7 +249,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Since the subProc spawned will outlive this method here we do not // want to cancel this method. We care about the methodTimeout, but // we ignore the CancelFunc. - ctx, _ := getContextForMethodTimeout(proc.ctx, message) + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) // Create a subject for one copy request sub := newSubjectNoVerifyHandler(cia.DstMethod, node) @@ -259,7 +259,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message) + copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel) // assign a handler to the sub process copyDstSubProc.handler = copyDstSubHandler(cia) @@ -317,8 +317,9 @@ type copyStatus int const ( copyReady copyStatus = 1 copyData copyStatus = 2 - copyDone copyStatus = 3 + copySrcDone copyStatus = 3 copyResendLast copyStatus = 4 + copyDstDone copyStatus = 5 ) // copySubData is the structure being used between the src and dst while copying data. @@ -329,7 +330,7 @@ type copySubData struct { Hash [32]byte } -func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { +func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { var chunkNumber = 0 @@ -374,7 +375,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) } if err == io.EOF { - status = copyDone + status = copySrcDone } lastReadChunk = b[:n] @@ -476,6 +477,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, resendRetries++ + case copyDstDone: + cancel() + default: // TODO: Any error logic here ? log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus) @@ -489,7 +493,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, return pf } -func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { +func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) @@ -623,7 +627,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func proc.toRingbufferCh <- []subjectAndMessage{sam} - case copyDone: + case copySrcDone: func() { // Open the main file that chunks files will be written into. @@ -689,6 +693,38 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath) + // Signal back to src that we are done, so it can cancel the process. + { + csa := copySubData{ + CopyStatus: copyDstDone, + } + + csaSerialized, err := cbor.Marshal(csa) + if err != nil { + log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err) + } + + // We want to send a message back to src that we are ready to start. + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) + msg := Message{ + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + } + + fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) + + sam, err := newSubjectAndMessage(msg) + if err != nil { + log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + } + + cancel() }() } }