diff --git a/process.go b/process.go index 8ea3ccf..d23ff89 100644 --- a/process.go +++ b/process.go @@ -162,7 +162,7 @@ func (p process) spawnWorker() { processName := processNameGet(p.subject.name(), p.processKind) // Add prometheus metrics for the process. - p.server.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) + p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. @@ -214,9 +214,9 @@ func (p process) spawnWorker() { p.processName = pn // Add information about the new process to the started processes map. - p.server.processes.active.mu.Lock() - p.server.processes.active.procNames[pn] = p - p.server.processes.active.mu.Unlock() + p.processes.active.mu.Lock() + p.processes.active.procNames[pn] = p + p.processes.active.mu.Unlock() } // messageDeliverNats will create the Nats message with headers and payload. @@ -251,7 +251,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He log.Printf("%v\n", er) return } - p.server.metrics.promNatsDeliveredTotal.Inc() + p.metrics.promNatsDeliveredTotal.Inc() return } @@ -318,7 +318,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He subReply.Unsubscribe() - p.server.metrics.promNatsMessagesFailedACKsTotal.Inc() + p.metrics.promNatsMessagesFailedACKsTotal.Inc() return default: @@ -326,7 +326,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - p.server.metrics.promNatsMessagesMissedACKsTotal.Inc() + p.metrics.promNatsMessagesMissedACKsTotal.Inc() subReply.Unsubscribe() continue @@ -337,7 +337,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He subReply.Unsubscribe() - p.server.metrics.promNatsDeliveredTotal.Inc() + p.metrics.promNatsDeliveredTotal.Inc() return } diff --git a/processes.go b/processes.go index 44914ed..2403d3d 100644 --- a/processes.go +++ b/processes.go @@ -219,11 +219,15 @@ func (p *processes) Stop() { // Startup holds all the startup methods for subscribers. type startup struct { - server *server + server *server + metrics *metrics } func newStartup(server *server) *startup { - s := startup{server} + s := startup{ + server: server, + metrics: server.metrics, + } return &s } @@ -371,8 +375,8 @@ func (s startup) subREQHello(p process) { sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics - s.server.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) - s.server.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() + s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) + s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() } } @@ -481,7 +485,7 @@ func (p *processes) printProcessesMap() { log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name()) } - p.server.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) + p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) p.active.mu.Unlock() }