diff --git a/process.go b/process.go index 71428ed..e216005 100644 --- a/process.go +++ b/process.go @@ -125,9 +125,10 @@ func (p process) spawnWorker(s *server) { // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindPublisher { + // If there is a procFunc for the process, start it. if p.procFunc != nil { - // REMOVED: p.procFuncCh = make(chan Message) + // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { @@ -148,7 +149,7 @@ func (p process) spawnWorker(s *server) { if p.processKind == processKindSubscriber { // If there is a procFunc for the process, start it. if p.procFunc != nil { - // REMOVED: p.procFuncCh = make(chan Message) + // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { @@ -161,7 +162,6 @@ func (p process) spawnWorker(s *server) { }() } - //fmt.Printf("-- DEBUG 1.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) p.subscribeMessages(s) } } @@ -214,7 +214,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // If the message is an ACK type of message we must check that a // reply, and if it is not we don't wait here at all. - fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message) + // fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message) if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK { // Wait up until timeout specified for a reply, // continue and resend if noo reply received, @@ -308,7 +308,6 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na natsConn.Publish(msg.Reply, out) case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: - // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) @@ -316,12 +315,12 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) } - // --- // Check if we are allowed to receive from that host _, arOK1 := p.allowedReceivers[message.FromNode] _, arOK2 := p.allowedReceivers["*"] if arOK1 || arOK2 { + // Start the method handler for that specific subject type. // The handler started here is what actually doing the action // that executed a CLI command, or writes to a log file on @@ -329,7 +328,6 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // // since we don't send a reply for a NACK message, we don't care about the // out return when calling mf.handler - //fmt.Printf("-- DEBUG 2.2.1: %#v\n\n", p.subject) _, err := mf.handler(p, message, thisNode) if err != nil { @@ -354,13 +352,12 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. func (p process) subscribeMessages(s *server) { - //fmt.Printf("-- DEBUG 2.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) subject := string(p.subject.name()) _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent // the message. - //fmt.Printf("-- DEBUG 2.2: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) go p.subscriberHandler(s.natsConn, s.nodeName, msg, s) }) if err != nil {