1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

refactoring server injection

This commit is contained in:
postmannen 2022-04-01 09:21:50 +02:00
parent 86b3ef9e96
commit 5c55178e34
2 changed files with 17 additions and 13 deletions

View file

@ -162,7 +162,7 @@ func (p process) spawnWorker() {
processName := processNameGet(p.subject.name(), p.processKind)
// Add prometheus metrics for the process.
p.server.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)})
p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)})
// Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
@ -214,9 +214,9 @@ func (p process) spawnWorker() {
p.processName = pn
// Add information about the new process to the started processes map.
p.server.processes.active.mu.Lock()
p.server.processes.active.procNames[pn] = p
p.server.processes.active.mu.Unlock()
p.processes.active.mu.Lock()
p.processes.active.procNames[pn] = p
p.processes.active.mu.Unlock()
}
// messageDeliverNats will create the Nats message with headers and payload.
@ -251,7 +251,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
log.Printf("%v\n", er)
return
}
p.server.metrics.promNatsDeliveredTotal.Inc()
p.metrics.promNatsDeliveredTotal.Inc()
return
}
@ -318,7 +318,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
subReply.Unsubscribe()
p.server.metrics.promNatsMessagesFailedACKsTotal.Inc()
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
return
default:
@ -326,7 +326,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
p.server.metrics.promNatsMessagesMissedACKsTotal.Inc()
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
subReply.Unsubscribe()
continue
@ -337,7 +337,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
subReply.Unsubscribe()
p.server.metrics.promNatsDeliveredTotal.Inc()
p.metrics.promNatsDeliveredTotal.Inc()
return
}

View file

@ -220,10 +220,14 @@ func (p *processes) Stop() {
// Startup holds all the startup methods for subscribers.
type startup struct {
server *server
metrics *metrics
}
func newStartup(server *server) *startup {
s := startup{server}
s := startup{
server: server,
metrics: server.metrics,
}
return &s
}
@ -371,8 +375,8 @@ func (s startup) subREQHello(p process) {
sayHelloNodes[m.FromNode] = struct{}{}
// update the prometheus metrics
s.server.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
s.server.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
@ -481,7 +485,7 @@ func (p *processes) printProcessesMap() {
log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name())
}
p.server.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
p.active.mu.Unlock()
}