1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 14:56:49 +00:00

removed the inner map of procsmap

This commit is contained in:
postmannen 2021-11-16 19:07:24 +01:00
parent 37d120950e
commit 307695842f
4 changed files with 17 additions and 39 deletions

View file

@ -184,11 +184,8 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
p.processName = pn p.processName = pn
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
idProcMap := make(map[int]process)
idProcMap[p.processID] = p
procs.active.mu.Lock() procs.active.mu.Lock()
procs.active.procNames[pn] = idProcMap procs.active.procNames[pn] = p
procs.active.mu.Unlock() procs.active.mu.Unlock()
} }
@ -461,12 +458,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
{ {
p.processes.active.mu.Lock() p.processes.active.mu.Lock()
procToUpdate, ok := p.processes.active.procNames[pn] p.processes.active.procNames[pn] = p
if !ok {
log.Printf(" * debugError: found no proc to update by that name: %v\n", pn)
}
procToUpdate[p.processID] = p
p.processes.active.mu.Unlock() p.processes.active.mu.Unlock()
} }

View file

@ -54,13 +54,13 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes {
// ---------------------- // ----------------------
type procsMap struct { type procsMap struct {
procNames map[processName]map[int]process procNames map[processName]process
mu sync.Mutex mu sync.Mutex
} }
func newProcsMap() *procsMap { func newProcsMap() *procsMap {
cM := procsMap{ cM := procsMap{
procNames: make(map[processName]map[int]process), procNames: make(map[processName]process),
} }
return &cM return &cM
} }
@ -377,10 +377,8 @@ func (p *processes) printProcessesMap() {
{ {
p.active.mu.Lock() p.active.mu.Lock()
for pName, pidMap := range p.active.procNames { for pName, proc := range p.active.procNames {
for pid, proc := range pidMap { log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name())
log.Printf("* proc - pub/sub: %v, procName in map: %v ,pid in map: %v,id: %v, subject: %v\n", proc.processKind, pName, pid, proc.processID, proc.subject.name())
}
} }
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))

View file

@ -381,12 +381,10 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
// be returned in the reply message. // be returned in the reply message.
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
for _, idMap := range proc.processes.active.procNames { for _, pTmp := range proc.processes.active.procNames {
for _, idMapValue := range idMap { s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), idMapValue.processKind, idMapValue.processID, idMapValue.subject.name()) sb := []byte(s)
sb := []byte(s) out = append(out, sb...)
out = append(out, sb...)
}
} }
proc.processes.active.mu.Unlock() proc.processes.active.mu.Unlock()
@ -461,7 +459,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
// Remove the process from the processes active map if found. // Remove the process from the processes active map if found.
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
toStopProc, ok := proc.processes.active.procNames[processName][arg.ID] toStopProc, ok := proc.processes.active.procNames[processName]
if ok { if ok {
// Delete the process from the processes map // Delete the process from the processes map
@ -530,12 +528,10 @@ func (m methodREQOpProcessList) handler(proc process, message Message, node stri
// be returned in the reply message. // be returned in the reply message.
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
for _, pidMap := range proc.processes.active.procNames { for _, pTmp := range proc.processes.active.procNames {
for _, pid := range pidMap { s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pid.processKind, pid.processID, pid.subject.name()) sb := []byte(s)
sb := []byte(s) out = append(out, sb...)
out = append(out, sb...)
}
} }
proc.processes.active.mu.Unlock() proc.processes.active.mu.Unlock()
@ -683,7 +679,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
// Remove the process from the processes active map if found. // Remove the process from the processes active map if found.
proc.processes.active.mu.Lock() proc.processes.active.mu.Lock()
toStopProc, ok := proc.processes.active.procNames[processName][id] toStopProc, ok := proc.processes.active.procNames[processName]
if ok { if ok {
// Delete the process from the processes map // Delete the process from the processes map

View file

@ -453,21 +453,13 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// Check if there is a map of type map[int]process registered // Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it. // for the processName, and if it exists then return it.
s.processes.active.mu.Lock() s.processes.active.mu.Lock()
existingProcIDMap, ok := s.processes.active.procNames[pn] proc, ok := s.processes.active.procNames[pn]
s.processes.active.mu.Unlock() s.processes.active.mu.Unlock()
// If found a map above, range it, and are there already a process // If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming // for that subject, put the message on that processes incomming
// message channel. // message channel.
if ok { if ok {
var proc process
s.processes.active.mu.Lock()
for _, existingProc := range existingProcIDMap {
proc = existingProc
}
s.processes.active.mu.Unlock()
// We have found the process to route the message to, deliver it. // We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m proc.subject.messageCh <- m