From 92047c9851951ca322187d60ad74bd0a586974fa Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 8 Oct 2021 12:07:10 +0200 Subject: [PATCH] procsMap seems to work --- process.go | 14 ++++++++------ processes.go | 34 ++++++++++++++++++++-------------- requests.go | 34 ++++++++++++++++------------------ server.go | 8 ++++---- 4 files changed, 48 insertions(+), 42 deletions(-) diff --git a/process.go b/process.go index 847e451..16c9e4c 100644 --- a/process.go +++ b/process.go @@ -187,9 +187,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { idProcMap := make(map[int]process) idProcMap[p.processID] = p - procs.mu.Lock() - procs.active[pn] = idProcMap - procs.mu.Unlock() + procs.active.put(keyValue{k: pn, v: idProcMap}) } // messageDeliverNats will take care of the delivering the message @@ -423,10 +421,14 @@ func (p process) publishMessages(natsConn *nats.Conn) { m.done <- struct{}{} // Increment the counter for the next message to be sent. + // + // TODO: Check if it is possible, or makes sense to move the + // counter out of the map. p.messageID++ - p.processes.mu.Lock() - p.processes.active[pn][p.processID] = p - p.processes.mu.Unlock() + + p1 := p.processes.active.get(pn) + p1.v[p.processID] = p + p.processes.active.put(keyValue{k: pn, v: p1.v}) // Handle the error. // diff --git a/processes.go b/processes.go index 826f9a3..9c7c422 100644 --- a/processes.go +++ b/processes.go @@ -17,7 +17,7 @@ type processes struct { // cancel func to send cancel signal to the subscriber processes context. cancel context.CancelFunc // The active spawned processes - active map[processName]map[int]process + active procsMap // mutex to lock the map mu sync.RWMutex // The last processID created @@ -32,12 +32,17 @@ type processes struct { // is map containing all the currently running processes. func newProcesses(ctx context.Context, metrics *metrics) *processes { p := processes{ - active: make(map[processName]map[int]process), + active: *newProcsMap(), } // Prepare the parent context for the subscribers. ctx, cancel := context.WithCancel(ctx) + // Start the processes map. + go func() { + p.active.run(ctx) + }() + p.ctx = ctx p.cancel = cancel @@ -49,20 +54,20 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes { // ---------------------- type keyValue struct { - k int - v string + k processName + v map[int]process ok bool } type kvCh chan keyValue type getValue struct { - k int + k processName kvCh kvCh } type procsMap struct { - m map[int]string + m map[processName]map[int]process mInCh chan kvCh mGetCh chan getValue mDelCh chan kvCh @@ -71,7 +76,7 @@ type procsMap struct { func newProcsMap() *procsMap { cM := procsMap{ - m: map[int]string{}, + m: make(map[processName]map[int]process), mInCh: make(chan kvCh), mGetCh: make(chan getValue), mDelCh: make(chan kvCh), @@ -106,7 +111,7 @@ func (c *procsMap) run(ctx context.Context) { gaCh <- kvSlice case <-ctx.Done(): - // log.Printf("info: cMap: got ctx.Done\n") + log.Printf("info: processes active.Run: got ctx.Done\n") return } } @@ -118,7 +123,7 @@ func (c *procsMap) put(kv keyValue) { c.mInCh <- kvCh } -func (c *procsMap) get(key int) keyValue { +func (c *procsMap) get(key processName) keyValue { kvCh := make(chan keyValue, 1) gv := getValue{ @@ -443,14 +448,15 @@ func (s startup) subREQToSocket(p process) { // Print the content of the processes map. func (p *processes) printProcessesMap() { log.Printf("*** Output of processes map :\n") - p.mu.Lock() - for _, vSub := range p.active { - for _, vID := range vSub { + + activeProcs := p.active.getAll() + + for _, vSub := range activeProcs { + for _, vID := range vSub.v { log.Printf("* proc - : %v, id: %v, name: %v\n", vID.processKind, vID.processID, vID.subject.name()) } } - p.mu.Unlock() - p.metrics.promProcessesTotal.Set(float64(len(p.active))) + p.metrics.promProcessesTotal.Set(float64(len(activeProcs))) } diff --git a/requests.go b/requests.go index cc1073b..aa8ac5f 100644 --- a/requests.go +++ b/requests.go @@ -378,8 +378,11 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. - for _, idMap := range proc.processes.active { - for _, v := range idMap { + + activeProcs := proc.processes.active.getAll() + + for _, idMap := range activeProcs { + for _, v := range idMap.v { s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) sb := []byte(s) out = append(out, sb...) @@ -464,13 +467,13 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri return } - proc.processes.mu.Lock() - // Remove the process from the processes active map if found. - toStopProc, ok := proc.processes.active[processName][arg.ID] + p1 := proc.processes.active.get(processName) + toStopProc, ok := p1.v[arg.ID] + if ok { // Delete the process from the processes map - delete(proc.processes.active, processName) + proc.processes.active.del(keyValue{k: processName}) // Stop started go routines that belong to the process. toStopProc.ctxCancel() // Stop subscribing for messages on the process's subject. @@ -497,9 +500,6 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri newReplyMessage(proc, message, []byte(er.Error())) } - - proc.processes.mu.Unlock() - } newReplyMessage(proc, message, out) @@ -529,18 +529,17 @@ func (m methodREQOpProcessList) handler(proc process, message Message, node stri out := []byte{} - proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. - for _, idMap := range proc.processes.active { - for _, v := range idMap { + procsAll := proc.processes.active.getAll() + for _, idMap := range procsAll { + for _, v := range idMap.v { s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) sb := []byte(s) out = append(out, sb...) } } - proc.processes.mu.Unlock() newReplyMessage(proc, message, out) }() @@ -683,13 +682,13 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri sub := newSubject(method, string(node)) processName := processNameGet(sub.name(), processKind(kind)) - proc.processes.mu.Lock() - // Remove the process from the processes active map if found. - toStopProc, ok := proc.processes.active[processName][id] + p1 := proc.processes.active.get(processName) + toStopProc, ok := p1.v[id] + if ok { // Delete the process from the processes map - delete(proc.processes.active, processName) + proc.processes.active.del(keyValue{k: processName}) // Stop started go routines that belong to the process. toStopProc.ctxCancel() // Stop subscribing for messages on the process's subject. @@ -721,7 +720,6 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri newReplyMessage(proc, message, out) } - proc.processes.mu.Unlock() }() ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) diff --git a/server.go b/server.go index 00775e8..31c51d5 100644 --- a/server.go +++ b/server.go @@ -422,14 +422,14 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // Check if there is a map of type map[int]process registered // for the processName, and if it exists then return it. - s.processes.mu.Lock() - existingProcIDMap, ok := s.processes.active[pn] - s.processes.mu.Unlock() + kv := s.processes.active.get(pn) + existingProcIDMap := kv.v + valueOK := kv.ok // If found a map above, range it, and are there already a process // for that subject, put the message on that processes incomming // message channel. - if ok { + if valueOK { // fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap)) s.processes.mu.Lock() // var pid int