1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
This commit is contained in:
postmannen 2022-06-15 05:31:46 +02:00
parent df0c2ddc90
commit 23c22210a5

View file

@ -20,7 +20,10 @@ func (m methodREQCopySrc) getKind() Event {
return m.event
}
// Idea:
// methodREQCopySrc are handles the initial and first part of
// the message flow for a copy to destination request.
// It's main role is to start up a sub process for the destination
// in which all the actual file copying is done.
//
// Initialization, Source:
// - Use the REQCopySrc method to handle the initial request from the user.
@ -40,7 +43,7 @@ func (m methodREQCopySrc) getKind() Event {
// - src->dst, send chunk when read.
// - dst->src, wait for status "ready" indicating the chuck was transfered.
// - Loop and read new chunc.
// - src->dst, when last chunch is sent send status "last"
// - src->dst, when last chunch is sent send status back that we are ready for the next message.
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
//
// - dst, read and store each chunch to tmp folder and verify hash.
@ -190,13 +193,10 @@ func (m methodREQCopyDst) getKind() Event {
return m.event
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
// methodREQCopyDst are handles the initial and first part of
// the message flow for a copy to destination request.
// It's main role is to start up a sub process for the destination
// in which all the actual file copying is done.
func (m methodREQCopyDst) handler(proc process, message Message, node string) ([]byte, error) {
var subProcessName string
@ -204,6 +204,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
go func() {
defer proc.processes.wg.Done()
// Get the status message sent from source.
var cia copyInitialData
err := cbor.Unmarshal(message.Data, &cia)
if err != nil {
@ -219,8 +220,6 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
// Create a subject for one copy request
// subProcessName = fmt.Sprintf("REQSUBCopyDst.%v", cia.UUID)
// m := Method(subProcessName)
sub := newSubjectNoVerifyHandler(cia.DstMethod, node)
// Create a new sub process that will do the actual file copying.
@ -329,38 +328,35 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
// Create message and send data to dst
fmt.Printf("**** DATA READ: %v\n", b)
// TESTING HERE:
{
csa := copySubData{
CopyStatus: status,
CopyData: b,
}
csaSerialized, err := cbor.Marshal(csa)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.DstNode,
FromNode: cia.SrcNode,
Method: cia.DstMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
csa := copySubData{
CopyStatus: status,
CopyData: b,
}
csaSerialized, err := cbor.Marshal(csa)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.DstNode,
FromNode: cia.SrcNode,
Method: cia.DstMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
default:
// TODO: Any error logic here ?
log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus)
@ -404,8 +400,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: message.FromNode,
FromNode: message.ToNode,
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
Data: csaSerialized,