diff --git a/message_and_subject.go b/message_and_subject.go index ed27026..3eb7781 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -124,7 +124,8 @@ type Subject struct { func newSubject(method Method, node string) Subject { // Get the Event type for the Method. ma := method.GetMethodsAvailable() - mh, ok := ma.Methodhandlers[method] + mh, ok := ma.CheckIfExists(method) + //mh, ok := ma.Methodhandlers[method] if !ok { log.Printf("error: no Event type specified for the method: %v\n", method) os.Exit(1) diff --git a/process.go b/process.go index 51f7b1f..a17fbac 100644 --- a/process.go +++ b/process.go @@ -696,7 +696,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { go p.publishAMessage(m, zEnc, once, natsConn) case <-p.ctx.Done(): - er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) + er := fmt.Errorf("info: canceling publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) log.Printf("%v\n", er) return diff --git a/requests.go b/requests.go index 5943532..6808573 100644 --- a/requests.go +++ b/requests.go @@ -24,10 +24,17 @@ // // --- // You also need to make a constant for the Method, and add -// that constant as the key in the map, where the value is -// the actual type you want to map it to with a handler method. -// You also specify if it is a Command or Event, and if it is -// ACK or NACK. +// that constant as the key in the MethodsAvailable map, where +// the value is the actual type you want to map it to with a +// handler method. You also specify if it is a Command or Event, +// and if it is ACK or NACK. +// +// Requests used in sub processes should always start with the +// naming REQSUB. Since the method of a sub process are defined +// within the method handler of the owning reqest type we should +// use the methodREQSUB for these types. The methodREQSUB handler +// does nothing. +// // Check out the existing code below for more examples. package steward @@ -36,6 +43,7 @@ import ( "context" "fmt" "path/filepath" + "strings" "time" ) @@ -95,11 +103,16 @@ const ( REQCopyFileFrom Method = "REQCopyFileFrom" // Write the destination copied to some node. REQCopyFileTo Method = "REQCopyFileTo" - // Send Hello I'm here message. - // Read the source file to be copied to some node. + // Initial request for file copying. + // Initiated by the user. REQCopySrc Method = "REQCopySrc" - // Write the destination copied to some node. + // Initial request for file copying. + // Generated by the source to send initial information to the destination. REQCopyDst Method = "REQCopyDst" + // Read the source file to be copied to some node. + REQSubCopySrc Method = "REQSubCopySrc" + // Write the destination copied to some node. + REQSubCopyDst Method = "REQSubCopyDst" // Send Hello I'm here message. REQHello Method = "REQHello" // Error log methods to centralError node. @@ -227,6 +240,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQCopyDst: methodREQCopyDst{ event: EventACK, }, + REQSubCopySrc: methodREQSub{ + event: EventACK, + }, + REQSubCopyDst: methodREQSub{ + event: EventACK, + }, REQHello: methodREQHello{ event: EventNACK, }, @@ -370,6 +389,26 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([ // ---- +// place holder method used for sub processes. +// Methods used in sub processes are defined within the the requests +// they are spawned in, so this type is primarily for us to use the +// same logic with sub process requests as we do with normal requests. +type methodREQSub struct { + event Event +} + +func (m methodREQSub) getKind() Event { + return m.event +} + +func (m methodREQSub) handler(proc process, message Message, node string) ([]byte, error) { + // proc.procFuncCh <- message + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ---- + // MethodsAvailable holds a map of all the different method types and the // associated handler to that method type. type MethodsAvailable struct { @@ -380,6 +419,13 @@ type MethodsAvailable struct { // value will be set to true, and the methodHandler function for that type // will be returned. func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { + // First check if it is a sub process. + if strings.HasPrefix(string(m), "REQSub") { + // Strip of the uuid after the method name. + sp := strings.Split(string(m), ".") + m = Method(sp[0]) + } + mFunc, ok := ma.Methodhandlers[m] if ok { return mFunc, true diff --git a/requests_copy.go b/requests_copy.go index 556a70b..6cfc2a6 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -91,18 +91,19 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Create a subject for one copy request uid := uuid.New() - subProcessName = fmt.Sprintf("copySrc.%v", uid.String()) + subProcessName = fmt.Sprintf("REQSubCopySrc.%v", uid.String()) dstDir := filepath.Dir(DstFilePath) dstFile := filepath.Base(DstFilePath) + m := Method(subProcessName) cia := copyInitialData{ - UUID: uid.String(), - DstDir: dstDir, - DstFile: dstFile, + UUID: uid.String(), + SrcMethod: m, + DstDir: dstDir, + DstFile: dstFile, } - m := Method(subProcessName) sub := newSubjectNoVerifyHandler(m, node) // Create a new sub process that will do the actual file copying. @@ -110,10 +111,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc, cia) + copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia) // assign a handler to the sub process - copySrcSubProc.handler = copySrcHandler(cia) + copySrcSubProc.handler = copySrcSubHandler(cia) // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() @@ -157,9 +158,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ } type copyInitialData struct { - UUID string - DstDir string - DstFile string + UUID string + SrcMethod Method + SrcNode Node + DstMethod Method + DstNode Node + DstDir string + DstFile string } // ---- @@ -201,7 +206,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ ctx, _ := getContextForMethodTimeout(proc.ctx, message) // Create a subject for one copy request - subProcessName = fmt.Sprintf("copyDst.%v", cia.UUID) + subProcessName = fmt.Sprintf("REQSubCopyDst.%v", cia.UUID) m := Method(subProcessName) sub := newSubjectNoVerifyHandler(m, node) @@ -211,10 +216,10 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc, cia) + copyDstSubProc.procFunc = copyDstProcSubFunc(copyDstSubProc, cia, message) // assign a handler to the sub process - copyDstSubProc.handler = copyDstHandler(cia) + copyDstSubProc.handler = copyDstSubHandler(cia) // The process will be killed when the context expires. go copyDstSubProc.spawnWorker() @@ -230,11 +235,13 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return ackMsg, nil } -func copySrcHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { +func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { // HERE! // We should receive a ready message generated by the procFunc of Dst. + fmt.Printf("\n-----------------RECEIVED COPY READY MESSAGE------------------\n\n") + select { case <-proc.ctx.Done(): log.Printf(" * copySrcHandler ended: %v\n", proc.processName) @@ -246,8 +253,9 @@ func copySrcHandler(cia copyInitialData) func(process, Message, string) ([]byte, return h } -func copyDstHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { +func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { h := func(proc process, message Message, node string) ([]byte, error) { + select { case <-proc.ctx.Done(): log.Printf(" * copyDstHandler ended: %v\n", proc.processName) @@ -259,7 +267,7 @@ func copyDstHandler(cia copyInitialData) func(process, Message, string) ([]byte, return h } -func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { +func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { select { @@ -273,10 +281,37 @@ func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, ch return pf } -func copyDstProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { +type copyStatus int + +const ( + copyReady copyStatus = iota +) + +func copyDstProcSubFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { fmt.Printf("\n ******* WE RECEIVED COPY MESSAGE, AND ARE WORKING IN PROCFUNC: %+v\n\n", cia) + // We want to send a message back to src that we are ready to start. + fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending to:%v\n ", message.FromNode) + msg := Message{ + ToNode: message.FromNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + } + + sub := Subject{ + ToNode: string(message.FromNode), + Event: EventACK, + Method: cia.SrcMethod, + } + + sam := subjectAndMessage{ + Subject: sub, + Message: msg, + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + select { case <-ctx.Done(): log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName) diff --git a/server.go b/server.go index a1383a9..3ce5b9f 100644 --- a/server.go +++ b/server.go @@ -437,6 +437,8 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // Signal back to the ringbuffer that message have been picked up. samDBVal.delivered() + // TODO HERE!: The message will be dropped here since the method for copy uid does not exist + sam := samDBVal.samDBValue.Data // Check if the format of the message is correct. if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {