1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-04-09 10:24:17 +00:00

replaced goto in routeMessageToProcess

This commit is contained in:
postmannen 2021-09-21 23:29:42 +02:00
parent dcbe51a191
commit 21f532c128

111
server.go
View file

@ -398,68 +398,71 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
continue
}
redo:
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present. The process will then be created, and
// the code will loop back to the redo: label.
for {
// looping here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present. The process will then be created, and
// the code will loop back here.
m := sam.Message
subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet(subjName, processKindPublisher)
m := sam.Message
subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet(subjName, processKindPublisher)
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
s.processes.mu.Lock()
existingProcIDMap, ok := s.processes.active[pn]
s.processes.mu.Unlock()
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
if ok {
// fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap))
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
s.processes.mu.Lock()
// var pid int
var proc process
// forLoopCounter := 1
for _, existingProc := range existingProcIDMap {
// fmt.Printf(" * DEBUG1.3 * forLoopCounter: %v\n", forLoopCounter)
// pid = existingProc.processID
// log.Printf("info: processNewMessages: found the specific subject: %v, proc.ID: %v\n", subjName, existingProc.processID)
// fmt.Printf(" * DEBUG1.4 * before putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
proc = existingProc
// fmt.Printf(" * DEBUG1.5 * after putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
// forLoopCounter++
}
existingProcIDMap, ok := s.processes.active[pn]
s.processes.mu.Unlock()
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
// fmt.Printf(" *** DEBUG1.6 * MUTEX.UNLOCK after range existing Proc ID Map, samDBValue.id: %#v, proc.id: %v\n", samTmp.samDBValue.ID, pid)
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
if ok {
// fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap))
s.processes.mu.Lock()
// var pid int
var proc process
// forLoopCounter := 1
for _, existingProc := range existingProcIDMap {
// fmt.Printf(" * DEBUG1.3 * forLoopCounter: %v\n", forLoopCounter)
// If no process to handle the specific subject exist,
// the we create and spawn one.
} else {
// If a publisher process do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
// pid = existingProc.processID
// log.Printf("info: processNewMessages: found the specific subject: %v, proc.ID: %v\n", subjName, existingProc.processID)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
// fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s.processes, s.natsConn)
log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
// fmt.Printf(" * DEBUG1.4 * before putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
// Now when the process is spawned we jump back to the redo: label,
// and send the message to that new process.
goto redo
proc = existingProc
// fmt.Printf(" * DEBUG1.5 * after putting on channel to found process, process ch: %#v,existingproc.id: %v\n", &existingProc.subject.messageCh, existingProc.processID)
// forLoopCounter++
}
s.processes.mu.Unlock()
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
// fmt.Printf(" *** DEBUG1.6 * MUTEX.UNLOCK after range existing Proc ID Map, samDBValue.id: %#v, proc.id: %v\n", samTmp.samDBValue.ID, pid)
// If no process to handle the specific subject exist,
// the we create and spawn one.
break
} else {
// If a publisher process do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
// fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s.processes, s.natsConn)
log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
// Now when the process is spawned we jump back to the redo: label,
// and send the message to that new process.
continue
}
}
}
}()