diff --git a/process.go b/process.go index 2464e1e..b7219eb 100644 --- a/process.go +++ b/process.go @@ -189,8 +189,12 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { p.processName = pn // Add information about the new process to the started processes map. + + idProcMap := make(map[int]process) + idProcMap[p.processID] = p + procs.mu.Lock() - procs.active[pn] = p + procs.active[pn] = idProcMap procs.mu.Unlock() } @@ -426,7 +430,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { // Increment the counter for the next message to be sent. p.messageID++ p.processes.mu.Lock() - p.processes.active[pn] = p + p.processes.active[pn][p.processID] = p p.processes.mu.Unlock() // Handle the error. diff --git a/server.go b/server.go index 587dd9f..41b711e 100644 --- a/server.go +++ b/server.go @@ -27,7 +27,7 @@ func processNameGet(sn subjectName, pk processKind) processName { // processes holds all the information about running processes type processes struct { // The active spawned processes - active map[processName]process + active map[processName]map[int]process // mutex to lock the map mu sync.RWMutex // The last processID created @@ -41,7 +41,7 @@ type processes struct { // newProcesses will prepare and return a *processes func newProcesses(promRegistry *prometheus.Registry) *processes { p := processes{ - active: make(map[processName]process), + active: make(map[processName]map[int]process), } p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ @@ -220,8 +220,10 @@ func (p *processes) printProcessesMap() { fmt.Println("--------------------------------------------------------------------------------------------") log.Printf("*** Output of processes map :\n") p.mu.Lock() - for _, v := range p.active { - log.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers) + for _, vSub := range p.active { + for _, vID := range vSub { + log.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", vID.processKind, vID.processID, vID.subject.name(), vID.allowedReceivers) + } } p.mu.Unlock() @@ -327,15 +329,22 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) pn := processNameGet(subjName, processKindPublisher) + // Check if there is a map of type map[int]process registered + // for the processName, and if it exists then return it. s.processes.mu.Lock() - existingProc, ok := s.processes.active[pn] + existingProcIDMap, ok := s.processes.active[pn] s.processes.mu.Unlock() - // Are there already a process for that subject, put the - // message on that processes incomming message channel. + // If found a map above, range it, and are there already a process + // for that subject, put the message on that processes incomming + // message channel. if ok { - log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) - existingProc.subject.messageCh <- m + s.processes.mu.Lock() + for _, existingProc := range existingProcIDMap { + log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) + existingProc.subject.messageCh <- m + } + s.processes.mu.Unlock() // If no process to handle the specific subject exist, // the we create and spawn one. diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 309edf6..8661593 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -253,6 +253,7 @@ type OpCmdStopProc struct { RecevingNode node `json:"receivingNode"` Method Method `json:"method"` Kind processKind `json:"kind"` + ID int `json:"id"` } // handler to run a CLI command with timeout context. The handler will @@ -273,10 +274,13 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. - for _, v := range proc.processes.active { - s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v, allowed from: %s\n", time.Now().UTC(), v.processKind, v.processID, v.subject.name(), v.allowedReceivers) - sb := []byte(s) - out = append(out, sb...) + for _, idMap := range proc.processes.active { + for _, v := range idMap { + s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v, allowed from: %s\n", time.Now().UTC(), v.processKind, v.processID, v.subject.name(), v.allowedReceivers) + sb := []byte(s) + out = append(out, sb...) + } + } proc.processes.mu.Unlock() @@ -341,13 +345,28 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri processName := processNameGet(sub.name(), arg.Kind) // fmt.Printf(" ** DEBUG1: processName: %v\n", processName) + // Check if the message contains an id. + err = func() error { + if arg.ID == 0 { + er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode) + return er + } + return nil + }() + + if err != nil { + sendErrorLogMessage(proc.toRingbufferCh, proc.node, err) + log.Printf("%v\n", err) + return + } + proc.processes.mu.Lock() // for k, v := range proc.processes.active { // fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind) // } - toStopProc, ok := proc.processes.active[processName] + toStopProc, ok := proc.processes.active[processName][arg.ID] if ok { // Delete the process from the processes map delete(proc.processes.active, processName)