1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
This commit is contained in:
postmannen 2021-08-12 09:21:56 +02:00
parent a0304e31e7
commit 3fb9aefd78
2 changed files with 7 additions and 17 deletions

View file

@ -27,6 +27,8 @@ type processes struct {
promTotalProcesses prometheus.Gauge
//
promProcessesVec *prometheus.GaugeVec
// Waitgroup to keep track of all the processes started
wg sync.WaitGroup
}
// newProcesses will prepare and return a *processes which
@ -69,7 +71,7 @@ func (p *processes) Start(proc process) {
{
log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node)
sub := newSubject(REQOpCommand, string(proc.node))
proc := newProcess(proc.ctx, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil)
proc := newProcess(proc.ctx, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil)
go proc.spawnWorker(proc.processes, proc.natsConn)
}

View file

@ -358,20 +358,12 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
// Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStopProc)
// Data layout: OPCommand, Method, publisher/subscriber, receivingNode
//
// The processes can be either publishers or subscribers. The subject name
// are used within naming a process. Since the subject structure contains
// the node name of the node that will receive this message we also need
// specify it so we are able to delete the publisher processes, since a
// publisher process will have the name of the node to receive the message,
// and not just the local node name as with subscriber processes.
// receive the message we need to specify
// Process name example: ship2.REQToFileAppend.EventACK_subscriber
// Based on the arg values received in the message we create can
// create a processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(arg.Method, string(arg.RecevingNode))
processName := processNameGet(sub.name(), arg.Kind)
// fmt.Printf(" ** DEBUG1: processName: %v\n", processName)
// Check if the message contains an id.
err = func() error {
@ -392,10 +384,6 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.mu.Lock()
// for k, v := range proc.processes.active {
// fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind)
// }
toStopProc, ok := proc.processes.active[processName][arg.ID]
if ok {
// Delete the process from the processes map