1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Unique process naming with id's used in active map to be able to control all individual processes

This commit is contained in:
postmannen 2021-06-08 13:56:31 +02:00
parent c57c2ffc6f
commit 63c043e0ff
3 changed files with 48 additions and 16 deletions

View file

@ -189,8 +189,12 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
p.processName = pn
// Add information about the new process to the started processes map.
idProcMap := make(map[int]process)
idProcMap[p.processID] = p
procs.mu.Lock()
procs.active[pn] = p
procs.active[pn] = idProcMap
procs.mu.Unlock()
}
@ -426,7 +430,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// Increment the counter for the next message to be sent.
p.messageID++
p.processes.mu.Lock()
p.processes.active[pn] = p
p.processes.active[pn][p.processID] = p
p.processes.mu.Unlock()
// Handle the error.

View file

@ -27,7 +27,7 @@ func processNameGet(sn subjectName, pk processKind) processName {
// processes holds all the information about running processes
type processes struct {
// The active spawned processes
active map[processName]process
active map[processName]map[int]process
// mutex to lock the map
mu sync.RWMutex
// The last processID created
@ -41,7 +41,7 @@ type processes struct {
// newProcesses will prepare and return a *processes
func newProcesses(promRegistry *prometheus.Registry) *processes {
p := processes{
active: make(map[processName]process),
active: make(map[processName]map[int]process),
}
p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{
@ -220,8 +220,10 @@ func (p *processes) printProcessesMap() {
fmt.Println("--------------------------------------------------------------------------------------------")
log.Printf("*** Output of processes map :\n")
p.mu.Lock()
for _, v := range p.active {
log.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
for _, vSub := range p.active {
for _, vID := range vSub {
log.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", vID.processKind, vID.processID, vID.subject.name(), vID.allowedReceivers)
}
}
p.mu.Unlock()
@ -327,15 +329,22 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
pn := processNameGet(subjName, processKindPublisher)
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
s.processes.mu.Lock()
existingProc, ok := s.processes.active[pn]
existingProcIDMap, ok := s.processes.active[pn]
s.processes.mu.Unlock()
// Are there already a process for that subject, put the
// message on that processes incomming message channel.
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
if ok {
s.processes.mu.Lock()
for _, existingProc := range existingProcIDMap {
log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName)
existingProc.subject.messageCh <- m
}
s.processes.mu.Unlock()
// If no process to handle the specific subject exist,
// the we create and spawn one.

View file

@ -253,6 +253,7 @@ type OpCmdStopProc struct {
RecevingNode node `json:"receivingNode"`
Method Method `json:"method"`
Kind processKind `json:"kind"`
ID int `json:"id"`
}
// handler to run a CLI command with timeout context. The handler will
@ -273,11 +274,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
for _, v := range proc.processes.active {
for _, idMap := range proc.processes.active {
for _, v := range idMap {
s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v, allowed from: %s\n", time.Now().UTC(), v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
sb := []byte(s)
out = append(out, sb...)
}
}
proc.processes.mu.Unlock()
case "startProc":
@ -341,13 +345,28 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
processName := processNameGet(sub.name(), arg.Kind)
// fmt.Printf(" ** DEBUG1: processName: %v\n", processName)
// Check if the message contains an id.
err = func() error {
if arg.ID == 0 {
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
return er
}
return nil
}()
if err != nil {
sendErrorLogMessage(proc.toRingbufferCh, proc.node, err)
log.Printf("%v\n", err)
return
}
proc.processes.mu.Lock()
// for k, v := range proc.processes.active {
// fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind)
// }
toStopProc, ok := proc.processes.active[processName]
toStopProc, ok := proc.processes.active[processName][arg.ID]
if ok {
// Delete the process from the processes map
delete(proc.processes.active, processName)