From 3fb9aefd785bd482d5d4b37fdbba3ed390d4c12a Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 12 Aug 2021 09:21:56 +0200 Subject: [PATCH] comments --- processes.go | 4 +++- subscriber_method_types.go | 20 ++++---------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/processes.go b/processes.go index 1580f9c..4f25c45 100644 --- a/processes.go +++ b/processes.go @@ -27,6 +27,8 @@ type processes struct { promTotalProcesses prometheus.Gauge // promProcessesVec *prometheus.GaugeVec + // Waitgroup to keep track of all the processes started + wg sync.WaitGroup } // newProcesses will prepare and return a *processes which @@ -69,7 +71,7 @@ func (p *processes) Start(proc process) { { log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node) sub := newSubject(REQOpCommand, string(proc.node)) - proc := newProcess(proc.ctx, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil) + proc := newProcess(proc.ctx, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil) go proc.spawnWorker(proc.processes, proc.natsConn) } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 62cbd64..6623803 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -358,20 +358,12 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri // Assert it into the correct non pointer value. arg := *dst.(*OpCmdStopProc) - // Data layout: OPCommand, Method, publisher/subscriber, receivingNode - // - // The processes can be either publishers or subscribers. The subject name - // are used within naming a process. Since the subject structure contains - // the node name of the node that will receive this message we also need - // specify it so we are able to delete the publisher processes, since a - // publisher process will have the name of the node to receive the message, - // and not just the local node name as with subscriber processes. - // receive the message we need to specify - // Process name example: ship2.REQToFileAppend.EventACK_subscriber - + // Based on the arg values received in the message we create can + // create a processName structure as used in naming the real processes. + // We can then use this processName to get the real values for the + // actual process we want to stop. sub := newSubject(arg.Method, string(arg.RecevingNode)) processName := processNameGet(sub.name(), arg.Kind) - // fmt.Printf(" ** DEBUG1: processName: %v\n", processName) // Check if the message contains an id. err = func() error { @@ -392,10 +384,6 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri proc.processes.mu.Lock() - // for k, v := range proc.processes.active { - // fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind) - // } - toStopProc, ok := proc.processes.active[processName][arg.ID] if ok { // Delete the process from the processes map