1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

serializing data in sub req

This commit is contained in:
postmannen 2022-06-14 10:17:09 +02:00
parent fe78d6c070
commit be181ec572

View file

@ -216,7 +216,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copyDstSubProc.procFunc = copyDstProcSubFunc(copyDstSubProc, cia, message)
copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message)
// assign a handler to the sub process
copyDstSubProc.handler = copyDstSubHandler(cia)
@ -237,14 +237,27 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
// HERE!
// We should receive a ready message generated by the procFunc of Dst.
fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE IN copySrcSubHandler------------------\n\n")
// We should receive a ready message generated by the procFunc of Dst.
allDoneCh := make(chan struct{})
go func() {
var csa copySubData
err := cbor.Unmarshal(message.Data, &csa)
if err != nil {
log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err)
}
fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE IN copySrcSubHandler: %v------------------\n\n", csa.CopyStatus)
allDoneCh <- struct{}{}
}()
select {
case <-proc.ctx.Done():
log.Printf(" * copySrcHandler ended: %v\n", proc.processName)
case <-allDoneCh:
log.Printf(" * copySrcHandler ended, all done copying file: %v\n", proc.processName)
}
return nil, nil
@ -284,32 +297,37 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
type copyStatus int
const (
copyReady copyStatus = iota
copyReady copyStatus = 1
)
func copyDstProcSubFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error {
// copySubData is the structure being used between the src and dst while copying data.
type copySubData struct {
CopyStatus copyStatus
CopyData []byte
}
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia)
fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia)
csa := copySubData{
CopyStatus: copyReady,
}
csaSerialized, err := cbor.Marshal(csa)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending to:%v\n ", message.FromNode)
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: message.FromNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
}
// sub := Subject{
// ToNode: string(message.FromNode),
// Event: EventACK,
// Method: cia.SrcMethod,
// }
//
// sam := subjectAndMessage{
// Subject: sub,
// Message: msg,
// }
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)