diff --git a/process.go b/process.go index 6538f87..a0ae99f 100644 --- a/process.go +++ b/process.go @@ -432,10 +432,13 @@ func (p process) publishMessages(natsConn *nats.Conn) { for { var err error var m Message + fmt.Printf(" * DEBUG2.2 : publishMessages, process nr: %v\n", p.processID) // Wait and read the next message on the message channel, or // exit this function if Cancel are received via ctx. + fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v\n", p.subject.messageCh, p.messageID) select { + // * DEBUG2 NOTE: Can it be that it have chosen the wrong process earler, and are waiting on the wrong channel here ? case m = <-p.subject.messageCh: case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) @@ -443,6 +446,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { log.Printf("%v\n", er) return } + fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v\n", p.subject.messageCh, p.messageID) // Get the process name so we can look up the process in the // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) diff --git a/server.go b/server.go index cc5b8c1..5fd1479 100644 --- a/server.go +++ b/server.go @@ -258,6 +258,9 @@ func (s *server) Start() { // Will stop all processes started during startup. func (s *server) Stop() { + fmt.Printf(" * DEBUG100 * processMap \n") + s.processes.printProcessesMap() + // Stop the started pub/sub message processes. s.processes.Stop() log.Printf("info: stopped all subscribers\n") @@ -367,7 +370,13 @@ func (s *server) routeMessagesToProcess(dbFileName string) { go func() { for samTmp := range ringBufferOutCh { + fmt.Printf(" * DEBUG1.1 * before signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID) samTmp.delivered() + fmt.Printf(" * DEBUG1.2 * after signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID) + + // * DEBUG1 NOTE: It seems the problem are after here, since this loop seems to get stuck + // because the none of the two println's above are getting printed when many messages are + // being pushed on the system. Meaning this for loop gets stuck below. sam := samTmp.samDBValue.Data // Check if the format of the message is correct. @@ -404,12 +413,20 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // for that subject, put the message on that processes incomming // message channel. if ok { + fmt.Printf(" * DEBUG1.3 * before range existingProcIDMap: %#v\n", samTmp.samDBValue.ID) s.processes.mu.Lock() for _, existingProc := range existingProcIDMap { log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) + + fmt.Printf(" * DEBUG1.4 * before putting on channel existingProc.subject.messageCh: %#v, proc.id: %#v\n", samTmp.samDBValue.ID, existingProc.processID) + + // * DEBUG5 NOTE: It seems to get stuck when writing to the messageCh below. + fmt.Printf(" * DEBUG1.5 * before putting on channel to found process: %#v\n", &existingProc.subject.messageCh) existingProc.subject.messageCh <- m + fmt.Printf(" *** DEBUG1.6 * after putting on channel to found process: %#v\n", samTmp.samDBValue.ID) } s.processes.mu.Unlock() + fmt.Printf(" *** DEBUG1.7 * after range existing Proc ID Map: %#v\n", samTmp.samDBValue.ID) // If no process to handle the specific subject exist, // the we create and spawn one.