1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-15 17:51:15 +00:00

procsMap seems to work

This commit is contained in:
postmannen 2021-10-08 12:07:10 +02:00
parent ada5fdaa59
commit 92047c9851
4 changed files with 48 additions and 42 deletions

View file

@ -187,9 +187,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
idProcMap := make(map[int]process) idProcMap := make(map[int]process)
idProcMap[p.processID] = p idProcMap[p.processID] = p
procs.mu.Lock() procs.active.put(keyValue{k: pn, v: idProcMap})
procs.active[pn] = idProcMap
procs.mu.Unlock()
} }
// messageDeliverNats will take care of the delivering the message // messageDeliverNats will take care of the delivering the message
@ -423,10 +421,14 @@ func (p process) publishMessages(natsConn *nats.Conn) {
m.done <- struct{}{} m.done <- struct{}{}
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
//
// TODO: Check if it is possible, or makes sense to move the
// counter out of the map.
p.messageID++ p.messageID++
p.processes.mu.Lock()
p.processes.active[pn][p.processID] = p p1 := p.processes.active.get(pn)
p.processes.mu.Unlock() p1.v[p.processID] = p
p.processes.active.put(keyValue{k: pn, v: p1.v})
// Handle the error. // Handle the error.
// //

View file

@ -17,7 +17,7 @@ type processes struct {
// cancel func to send cancel signal to the subscriber processes context. // cancel func to send cancel signal to the subscriber processes context.
cancel context.CancelFunc cancel context.CancelFunc
// The active spawned processes // The active spawned processes
active map[processName]map[int]process active procsMap
// mutex to lock the map // mutex to lock the map
mu sync.RWMutex mu sync.RWMutex
// The last processID created // The last processID created
@ -32,12 +32,17 @@ type processes struct {
// is map containing all the currently running processes. // is map containing all the currently running processes.
func newProcesses(ctx context.Context, metrics *metrics) *processes { func newProcesses(ctx context.Context, metrics *metrics) *processes {
p := processes{ p := processes{
active: make(map[processName]map[int]process), active: *newProcsMap(),
} }
// Prepare the parent context for the subscribers. // Prepare the parent context for the subscribers.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// Start the processes map.
go func() {
p.active.run(ctx)
}()
p.ctx = ctx p.ctx = ctx
p.cancel = cancel p.cancel = cancel
@ -49,20 +54,20 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes {
// ---------------------- // ----------------------
type keyValue struct { type keyValue struct {
k int k processName
v string v map[int]process
ok bool ok bool
} }
type kvCh chan keyValue type kvCh chan keyValue
type getValue struct { type getValue struct {
k int k processName
kvCh kvCh kvCh kvCh
} }
type procsMap struct { type procsMap struct {
m map[int]string m map[processName]map[int]process
mInCh chan kvCh mInCh chan kvCh
mGetCh chan getValue mGetCh chan getValue
mDelCh chan kvCh mDelCh chan kvCh
@ -71,7 +76,7 @@ type procsMap struct {
func newProcsMap() *procsMap { func newProcsMap() *procsMap {
cM := procsMap{ cM := procsMap{
m: map[int]string{}, m: make(map[processName]map[int]process),
mInCh: make(chan kvCh), mInCh: make(chan kvCh),
mGetCh: make(chan getValue), mGetCh: make(chan getValue),
mDelCh: make(chan kvCh), mDelCh: make(chan kvCh),
@ -106,7 +111,7 @@ func (c *procsMap) run(ctx context.Context) {
gaCh <- kvSlice gaCh <- kvSlice
case <-ctx.Done(): case <-ctx.Done():
// log.Printf("info: cMap: got ctx.Done\n") log.Printf("info: processes active.Run: got ctx.Done\n")
return return
} }
} }
@ -118,7 +123,7 @@ func (c *procsMap) put(kv keyValue) {
c.mInCh <- kvCh c.mInCh <- kvCh
} }
func (c *procsMap) get(key int) keyValue { func (c *procsMap) get(key processName) keyValue {
kvCh := make(chan keyValue, 1) kvCh := make(chan keyValue, 1)
gv := getValue{ gv := getValue{
@ -443,14 +448,15 @@ func (s startup) subREQToSocket(p process) {
// Print the content of the processes map. // Print the content of the processes map.
func (p *processes) printProcessesMap() { func (p *processes) printProcessesMap() {
log.Printf("*** Output of processes map :\n") log.Printf("*** Output of processes map :\n")
p.mu.Lock()
for _, vSub := range p.active { activeProcs := p.active.getAll()
for _, vID := range vSub {
for _, vSub := range activeProcs {
for _, vID := range vSub.v {
log.Printf("* proc - : %v, id: %v, name: %v\n", vID.processKind, vID.processID, vID.subject.name()) log.Printf("* proc - : %v, id: %v, name: %v\n", vID.processKind, vID.processID, vID.subject.name())
} }
} }
p.mu.Unlock()
p.metrics.promProcessesTotal.Set(float64(len(p.active))) p.metrics.promProcessesTotal.Set(float64(len(activeProcs)))
} }

View file

@ -378,8 +378,11 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.mu.Lock() proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to // Loop the the processes map, and find all that is active to
// be returned in the reply message. // be returned in the reply message.
for _, idMap := range proc.processes.active {
for _, v := range idMap { activeProcs := proc.processes.active.getAll()
for _, idMap := range activeProcs {
for _, v := range idMap.v {
s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name())
sb := []byte(s) sb := []byte(s)
out = append(out, sb...) out = append(out, sb...)
@ -464,13 +467,13 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
return return
} }
proc.processes.mu.Lock()
// Remove the process from the processes active map if found. // Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][arg.ID] p1 := proc.processes.active.get(processName)
toStopProc, ok := p1.v[arg.ID]
if ok { if ok {
// Delete the process from the processes map // Delete the process from the processes map
delete(proc.processes.active, processName) proc.processes.active.del(keyValue{k: processName})
// Stop started go routines that belong to the process. // Stop started go routines that belong to the process.
toStopProc.ctxCancel() toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject. // Stop subscribing for messages on the process's subject.
@ -497,9 +500,6 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
newReplyMessage(proc, message, []byte(er.Error())) newReplyMessage(proc, message, []byte(er.Error()))
} }
proc.processes.mu.Unlock()
} }
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
@ -529,18 +529,17 @@ func (m methodREQOpProcessList) handler(proc process, message Message, node stri
out := []byte{} out := []byte{}
proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to // Loop the the processes map, and find all that is active to
// be returned in the reply message. // be returned in the reply message.
for _, idMap := range proc.processes.active { procsAll := proc.processes.active.getAll()
for _, v := range idMap { for _, idMap := range procsAll {
for _, v := range idMap.v {
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name()) s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name())
sb := []byte(s) sb := []byte(s)
out = append(out, sb...) out = append(out, sb...)
} }
} }
proc.processes.mu.Unlock()
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
}() }()
@ -683,13 +682,13 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
sub := newSubject(method, string(node)) sub := newSubject(method, string(node))
processName := processNameGet(sub.name(), processKind(kind)) processName := processNameGet(sub.name(), processKind(kind))
proc.processes.mu.Lock()
// Remove the process from the processes active map if found. // Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][id] p1 := proc.processes.active.get(processName)
toStopProc, ok := p1.v[id]
if ok { if ok {
// Delete the process from the processes map // Delete the process from the processes map
delete(proc.processes.active, processName) proc.processes.active.del(keyValue{k: processName})
// Stop started go routines that belong to the process. // Stop started go routines that belong to the process.
toStopProc.ctxCancel() toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject. // Stop subscribing for messages on the process's subject.
@ -721,7 +720,6 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
} }
proc.processes.mu.Unlock()
}() }()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))

View file

@ -422,14 +422,14 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// Check if there is a map of type map[int]process registered // Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it. // for the processName, and if it exists then return it.
s.processes.mu.Lock() kv := s.processes.active.get(pn)
existingProcIDMap, ok := s.processes.active[pn] existingProcIDMap := kv.v
s.processes.mu.Unlock() valueOK := kv.ok
// If found a map above, range it, and are there already a process // If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming // for that subject, put the message on that processes incomming
// message channel. // message channel.
if ok { if valueOK {
// fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap)) // fmt.Printf(" * DEBUG1.3 * MUTEX.LOCK before range existingProcIDMap, samDBValue.id: %#v, existingProcIDMap length: %v\n", samTmp.samDBValue.ID, len(existingProcIDMap))
s.processes.mu.Lock() s.processes.mu.Lock()
// var pid int // var pid int