From f2fe8606ec7bef777aa6d4633965e3d0f13362d2 Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 14 Jun 2022 11:31:19 +0200 Subject: [PATCH] copySrcSubProcFunc receiving rdy from dst --- requests_copy.go | 50 ++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/requests_copy.go b/requests_copy.go index c6e7133..ac30896 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -239,31 +239,12 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by h := func(proc process, message Message, node string) ([]byte, error) { // We should receive a ready message generated by the procFunc of Dst. - allDoneCh := make(chan struct{}) - - go func() { - var csa copySubData - err := cbor.Unmarshal(message.Data, &csa) - if err != nil { - log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) - } - - switch csa.CopyStatus { - case copyReady: - log.Printf(" * RECEIVED * copyStatus=copyReady copySrcSubHandler: %v\n\n", csa.CopyStatus) - default: - log.Fatalf("error: copySrcSubHandler: not valid copyStatus, exiting: %v\n", csa.CopyStatus) - } - - // TODO: Clean up eventual tmp folders here! - allDoneCh <- struct{}{} - }() select { case <-proc.ctx.Done(): log.Printf(" * copySrcHandler ended: %v\n", proc.processName) - case <-allDoneCh: - log.Printf(" * copySrcHandler ended, all done copying file: %v\n", proc.processName) + case proc.procFuncCh <- message: + log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName) } return nil, nil @@ -289,12 +270,31 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { - select { - case <-ctx.Done(): - log.Printf(" * copySrcProcFunc ended: %v\n", proc.processName) + for { + select { + case <-ctx.Done(): + log.Printf(" * copySrcProcFunc ENDED: %v\n", proc.processName) + return nil + + // Pick up the message recived by the copySrcSubHandler. + case message := <-procFuncCh: + var csa copySubData + err := cbor.Unmarshal(message.Data, &csa) + if err != nil { + log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err) + } + + switch csa.CopyStatus { + case copyReady: + log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus) + default: + // TODO: Any error logic here ? + log.Fatalf("error: copySrcSubHandler: not valid copyStatus, exiting: %v\n", csa.CopyStatus) + } + } } - return nil + //return nil } return pf