1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

copySrcSubProcFunc receiving rdy from dst

This commit is contained in:
postmannen 2022-06-14 11:31:19 +02:00
parent 7c4fd58b04
commit f2fe8606ec

View file

@ -239,31 +239,12 @@ func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]by
h := func(proc process, message Message, node string) ([]byte, error) {
// We should receive a ready message generated by the procFunc of Dst.
allDoneCh := make(chan struct{})
go func() {
var csa copySubData
err := cbor.Unmarshal(message.Data, &csa)
if err != nil {
log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err)
}
switch csa.CopyStatus {
case copyReady:
log.Printf(" * RECEIVED * copyStatus=copyReady copySrcSubHandler: %v\n\n", csa.CopyStatus)
default:
log.Fatalf("error: copySrcSubHandler: not valid copyStatus, exiting: %v\n", csa.CopyStatus)
}
// TODO: Clean up eventual tmp folders here!
allDoneCh <- struct{}{}
}()
select {
case <-proc.ctx.Done():
log.Printf(" * copySrcHandler ended: %v\n", proc.processName)
case <-allDoneCh:
log.Printf(" * copySrcHandler ended, all done copying file: %v\n", proc.processName)
case proc.procFuncCh <- message:
log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName)
}
return nil, nil
@ -289,12 +270,31 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by
func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
for {
select {
case <-ctx.Done():
log.Printf(" * copySrcProcFunc ended: %v\n", proc.processName)
log.Printf(" * copySrcProcFunc ENDED: %v\n", proc.processName)
return nil
// Pick up the message recived by the copySrcSubHandler.
case message := <-procFuncCh:
var csa copySubData
err := cbor.Unmarshal(message.Data, &csa)
if err != nil {
log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err)
}
return nil
switch csa.CopyStatus {
case copyReady:
log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus)
default:
// TODO: Any error logic here ?
log.Fatalf("error: copySrcSubHandler: not valid copyStatus, exiting: %v\n", csa.CopyStatus)
}
}
}
//return nil
}
return pf