1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

fixed bug with hang caused by the of mutex

This commit is contained in:
postmannen 2021-09-13 13:15:21 +02:00
parent 254e522092
commit aba7324ba6
2 changed files with 29 additions and 20 deletions

View file

@ -432,11 +432,12 @@ func (p process) publishMessages(natsConn *nats.Conn) {
for {
var err error
var m Message
fmt.Printf(" * DEBUG2.1 : publishMessages, proc.id: %v\n", p.processID)
// fmt.Printf(" * DEBUG2.1 : publishMessages, proc.id: %v\n", p.processID)
// fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v, proc.id: %v\n", &p.subject.messageCh, p.messageID, p.processID)
// Wait and read the next message on the message channel, or
// exit this function if Cancel are received via ctx.
fmt.Printf(" * DEBUG2.2 * before selecting read p.subject.messageCh: %#v, message.id: %#v, proc.id: %v\n", &p.subject.messageCh, p.messageID, p.processID)
select {
// * DEBUG2 NOTE: Can it be that it have chosen the wrong process earler, and are waiting on the wrong channel here ?
case m = <-p.subject.messageCh:
@ -446,7 +447,8 @@ func (p process) publishMessages(natsConn *nats.Conn) {
log.Printf("%v\n", er)
return
}
fmt.Printf(" * DEBUG2.3 * after selecting read p.subject.messageCh: %#v, message.id: %#v, proc.id: %v\n", p.subject.messageCh, p.messageID, p.processID)
// fmt.Printf(" * DEBUG2.3 * after selecting read p.subject.messageCh: %#v, message.id: %#v, proc.id: %v\n", p.subject.messageCh, p.messageID, p.processID)
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)

View file

@ -258,8 +258,8 @@ func (s *server) Start() {
// Will stop all processes started during startup.
func (s *server) Stop() {
fmt.Printf(" * DEBUG100 * processMap \n")
s.processes.printProcessesMap()
// fmt.Printf(" * DEBUG100 * processMap \n")
// s.processes.printProcessesMap()
// Stop the started pub/sub message processes.
s.processes.Stop()
@ -370,13 +370,12 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
go func() {
for samTmp := range ringBufferOutCh {
fmt.Printf(" * DEBUG1.1 * before signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
samTmp.delivered()
fmt.Printf(" * DEBUG1.2 * after signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
// fmt.Printf(" * DEBUG1.1 * before signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
// * DEBUG1 NOTE: It seems the problem are after here, since this loop seems to get stuck
// because the none of the two println's above are getting printed when many messages are
// being pushed on the system. Meaning this for loop gets stuck below.
// Signal back to the ringbuffer that message have been picked up.
samTmp.delivered()
// fmt.Printf(" * DEBUG1.2 * after signaling back to the ringbuffer that message was picked from ring buffer, samTmp.delivered: %#v\n", samTmp.samDBValue.ID)
sam := samTmp.samDBValue.Data
// Check if the format of the message is correct.
@ -413,22 +412,30 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// for that subject, put the message on that processes incomming
// message channel.
if ok {
fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v\n", samTmp.samDBValue.ID)
// fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap))
s.processes.mu.Lock()
var pid int
// var pid int
var proc process
// forLoopCounter := 1
for _, existingProc := range existingProcIDMap {
pid = existingProc.processID
log.Printf("info: processNewMessages: found the specific subject: %v, proc.ID: %v\n", subjName, existingProc.processID)
// fmt.Printf(" * DEBUG1.3 * forLoopCounter: %v\n", forLoopCounter)
// * DEBUG5 NOTE: It seems to get stuck when writing to the messageCh below.
fmt.Printf(" * DEBUG1.4 * before putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
// pid = existingProc.processID
// log.Printf("info: processNewMessages: found the specific subject: %v, proc.ID: %v\n", subjName, existingProc.processID)
existingProc.subject.messageCh <- m
// fmt.Printf(" * DEBUG1.4 * before putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
fmt.Printf(" * DEBUG1.5 * after putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
proc = existingProc
// fmt.Printf(" * DEBUG1.5 * after putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
// forLoopCounter++
}
s.processes.mu.Unlock()
fmt.Printf(" *** DEBUG1.6 * MUTEX.UNLOCK after range existing Proc ID Map, samDBValue.id: %#v, proc.id: %v\n", samTmp.samDBValue.ID, pid)
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
// fmt.Printf(" *** DEBUG1.6 * MUTEX.UNLOCK after range existing Proc ID Map, samDBValue.id: %#v, proc.id: %v\n", samTmp.samDBValue.ID, pid)
// If no process to handle the specific subject exist,
// the we create and spawn one.