1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-15 17:51:15 +00:00

added debug output

This commit is contained in:
postmannen 2021-09-13 07:02:14 +02:00
parent 6bbd01bd05
commit 0b61e510b4
2 changed files with 21 additions and 0 deletions

View file

@ -432,10 +432,13 @@ func (p process) publishMessages(natsConn *nats.Conn) {
for { for {
var err error var err error
var m Message var m Message
fmt.Printf(" * DEBUG2.2 : publishMessages, process nr: %v\n", p.processID)
// Wait and read the next message on the message channel, or // Wait and read the next message on the message channel, or
// exit this function if Cancel are received via ctx. // exit this function if Cancel are received via ctx.
fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v\n", p.subject.messageCh, p.messageID)
select { select {
// * DEBUG2 NOTE: Can it be that it have chosen the wrong process earler, and are waiting on the wrong channel here ?
case m = <-p.subject.messageCh: case m = <-p.subject.messageCh:
case <-p.ctx.Done(): case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
@ -443,6 +446,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
log.Printf("%v\n", er) log.Printf("%v\n", er)
return return
} }
fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v\n", p.subject.messageCh, p.messageID)
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name(), processKindPublisher)

View file

@ -258,6 +258,9 @@ func (s *server) Start() {
// Will stop all processes started during startup. // Will stop all processes started during startup.
func (s *server) Stop() { func (s *server) Stop() {
fmt.Printf(" * DEBUG100 * processMap \n")
s.processes.printProcessesMap()
// Stop the started pub/sub message processes. // Stop the started pub/sub message processes.
s.processes.Stop() s.processes.Stop()
log.Printf("info: stopped all subscribers\n") log.Printf("info: stopped all subscribers\n")
@ -367,7 +370,13 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
go func() { go func() {
for samTmp := range ringBufferOutCh { for samTmp := range ringBufferOutCh {
fmt.Printf(" * DEBUG1.1 * before signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
samTmp.delivered() samTmp.delivered()
fmt.Printf(" * DEBUG1.2 * after signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
// * DEBUG1 NOTE: It seems the problem are after here, since this loop seems to get stuck
// because the none of the two println's above are getting printed when many messages are
// being pushed on the system. Meaning this for loop gets stuck below.
sam := samTmp.samDBValue.Data sam := samTmp.samDBValue.Data
// Check if the format of the message is correct. // Check if the format of the message is correct.
@ -404,12 +413,20 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// for that subject, put the message on that processes incomming // for that subject, put the message on that processes incomming
// message channel. // message channel.
if ok { if ok {
fmt.Printf(" * DEBUG1.3 * before range existingProcIDMap: %#v\n", samTmp.samDBValue.ID)
s.processes.mu.Lock() s.processes.mu.Lock()
for _, existingProc := range existingProcIDMap { for _, existingProc := range existingProcIDMap {
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
fmt.Printf(" * DEBUG1.4 * before putting on channel existingProc.subject.messageCh: %#v, proc.id: %#v\n", samTmp.samDBValue.ID, existingProc.processID)
// * DEBUG5 NOTE: It seems to get stuck when writing to the messageCh below.
fmt.Printf(" * DEBUG1.5 * before putting on channel to found process: %#v\n", &existingProc.subject.messageCh)
existingProc.subject.messageCh <- m existingProc.subject.messageCh <- m
fmt.Printf(" *** DEBUG1.6 * after putting on channel to found process: %#v\n", samTmp.samDBValue.ID)
} }
s.processes.mu.Unlock() s.processes.mu.Unlock()
fmt.Printf(" *** DEBUG1.7 * after range existing Proc ID Map: %#v\n", samTmp.samDBValue.ID)
// If no process to handle the specific subject exist, // If no process to handle the specific subject exist,
// the we create and spawn one. // the we create and spawn one.