diff --git a/process.go b/process.go index 1bf8c99..51f7b1f 100644 --- a/process.go +++ b/process.go @@ -96,6 +96,13 @@ type process struct { // Process name processName processName + // handler is used to directly attach a handler to a process upon + // creation of the process, like when a process is spawning a sub + // process like REQCopySrc do. If we're not spawning a sub process + // and it is a regular process the handler to use is found with the + // getHandler method + handler func(proc process, message Message, node string) ([]byte, error) + // startup holds the startup functions for starting up publisher // or subscriber processes startup *startup @@ -514,16 +521,22 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // Check for ACK type Event. case p.subject.Event == EventACK: - // Look up the method handler for the specified method. - mh, ok := p.methodsAvailable.CheckIfExists(message.Method) - if !ok { - er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) - p.errorKernel.errSend(p, message, er) + // When spawning sub processes we can directly assign handlers to the process upon + // creation. We here check if a handler is already assigned, and if it is nil, we + // lookup and find the correct handler to use if available. + if p.handler == nil { + // Look up the method handler for the specified method. + mh, ok := p.methodsAvailable.CheckIfExists(message.Method) + p.handler = mh.handler + if !ok { + er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) + p.errorKernel.errSend(p, message, er) + } } //var err error - out := p.callHandler(message, mh, thisNode) + out := p.callHandler(message, thisNode) // Send a confirmation message back to the publisher to ACK that the // message was received by the subscriber. The reply should be sent @@ -531,14 +544,21 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, natsConn.Publish(msg.Reply, out) case p.subject.Event == EventNACK: - mh, ok := p.methodsAvailable.CheckIfExists(message.Method) - if !ok { - er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - p.errorKernel.errSend(p, message, er) + // When spawning sub processes we can directly assign handlers to the process upon + // creation. We here check if a handler is already assigned, and if it is nil, we + // lookup and find the correct handler to use if available. + if p.handler == nil { + // Look up the method handler for the specified method. + mh, ok := p.methodsAvailable.CheckIfExists(message.Method) + p.handler = mh.handler + if !ok { + er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) + p.errorKernel.errSend(p, message, er) + } } // We do not send reply messages for EventNACL, so we can discard the output. - _ = p.callHandler(message, mh, thisNode) + _ = p.callHandler(message, thisNode) default: er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event) @@ -550,14 +570,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // callHandler will call the handler for the Request type defined in the message. // If checking signatures and/or acl's are enabled the signatures they will be // verified, and if OK the handler is called. -func (p process) callHandler(message Message, mh methodHandler, thisNode string) []byte { +func (p process) callHandler(message Message, thisNode string) []byte { out := []byte{} var err error switch p.verifySigOrAclFlag(message) { case true: log.Printf("info: subscriberHandler: doHandler=true: %v\n", true) - out, err = mh.handler(p, message, thisNode) + out, err = p.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) p.errorKernel.errSend(p, message, er) diff --git a/requests_copy.go b/requests_copy.go index 0892a41..556a70b 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -112,6 +112,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // and not directly within the handler. copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc, cia) + // assign a handler to the sub process + copySrcSubProc.handler = copySrcHandler(cia) + // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() @@ -210,6 +213,9 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // and not directly within the handler. copyDstSubProc.procFunc = copyDstProcFunc(copyDstSubProc, cia) + // assign a handler to the sub process + copyDstSubProc.handler = copyDstHandler(cia) + // The process will be killed when the context expires. go copyDstSubProc.spawnWorker() @@ -224,6 +230,35 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return ackMsg, nil } +func copySrcHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + // HERE! + // We should receive a ready message generated by the procFunc of Dst. + + select { + case <-proc.ctx.Done(): + log.Printf(" * copySrcHandler ended: %v\n", proc.processName) + } + + return nil, nil + } + + return h +} + +func copyDstHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + select { + case <-proc.ctx.Done(): + log.Printf(" * copyDstHandler ended: %v\n", proc.processName) + } + + return nil, nil + } + + return h +} + func copySrcProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {