1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

rewrote prometheus metrics handling

This commit is contained in:
postmannen 2021-04-12 10:51:26 +02:00
parent 393c9b63b2
commit 1acddfa8b4
3 changed files with 23 additions and 64 deletions

View file

@ -14,7 +14,7 @@ import (
// handling // handling
type metrics struct { type metrics struct {
// The channel to pass metrics that should be processed // The channel to pass metrics that should be processed
metricsCh chan metricType promRegistry *prometheus.Registry
// host and port where prometheus metrics will be exported // host and port where prometheus metrics will be exported
hostAndPort string hostAndPort string
} }
@ -22,56 +22,15 @@ type metrics struct {
// newMetrics will prepare and return a *metrics // newMetrics will prepare and return a *metrics
func newMetrics(hostAndPort string) *metrics { func newMetrics(hostAndPort string) *metrics {
m := metrics{ m := metrics{
metricsCh: make(chan metricType), promRegistry: prometheus.NewRegistry(),
hostAndPort: hostAndPort, hostAndPort: hostAndPort,
} }
return &m return &m
} }
type Metricer interface {
Set(float64)
}
type metricType struct {
metric prometheus.Collector
value float64
}
func (s *server) startMetrics() { func (s *server) startMetrics() {
// Receive and process all metrics
go func() {
for {
for f := range s.metrics.metricsCh {
// // Try to register the metric of the interface type prometheus.Collector
// prometheus.Register(f.metric)
// Check the real type of the interface type
switch ff := f.metric.(type) {
case prometheus.Gauge:
// Try to register. If it is already registered we need to check the error
// to get the previously registered collector, and update it's value. If it
// is not registered we register the new collector, and sets it's value.
err := prometheus.Register(ff)
if err != nil {
are, ok := err.(prometheus.AlreadyRegisteredError)
if ok {
// already registered, use the one we have and put it into ff
ff = are.ExistingCollector.(prometheus.Gauge)
}
}
ff.Set(f.value)
}
}
}
}()
//http.Handle("/metrics", promhttp.Handler()) //http.Handle("/metrics", promhttp.Handler())
//http.ListenAndServe(":2112", nil) //http.ListenAndServe(":2112", nil)
n, err := net.Listen("tcp", s.metrics.hostAndPort) n, err := net.Listen("tcp", s.metrics.hostAndPort)

View file

@ -11,6 +11,7 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
type processName string type processName string
@ -29,17 +30,21 @@ type processes struct {
mu sync.RWMutex mu sync.RWMutex
// The last processID created // The last processID created
lastProcessID int lastProcessID int
// metrics channel //
metricsCh chan metricType promTotalProcesses prometheus.Gauge
} }
// newProcesses will prepare and return a *processes // newProcesses will prepare and return a *processes
func newProcesses(metricsCh chan metricType) *processes { func newProcesses(promRegistry *prometheus.Registry) *processes {
p := processes{ p := processes{
active: make(map[processName]process), active: make(map[processName]process),
metricsCh: metricsCh,
} }
p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{
Name: "total_running_processes",
Help: "The current number of total running processes",
})
return &p return &p
} }
@ -91,7 +96,7 @@ func NewServer(c *Configuration) (*server, error) {
nodeName: c.NodeName, nodeName: c.NodeName,
natsConn: conn, natsConn: conn,
netListener: nl, netListener: nl,
processes: newProcesses(metrics.metricsCh), processes: newProcesses(metrics.promRegistry),
toRingbufferCh: make(chan []subjectAndMessage), toRingbufferCh: make(chan []subjectAndMessage),
metrics: metrics, metrics: metrics,
} }
@ -156,13 +161,7 @@ func (p *processes) printProcessesMap() {
} }
p.mu.Unlock() p.mu.Unlock()
p.metricsCh <- metricType{ p.promTotalProcesses.Set(float64(len(p.active)))
metric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "total_running_processes",
Help: "The current number of total running processes",
}),
value: float64(len(p.active)),
}
fmt.Println("--------------------------------------------------------------------------------------------") fmt.Println("--------------------------------------------------------------------------------------------")
} }

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
func (p process) ProcessesStart() { func (p process) ProcessesStart() {
@ -186,6 +187,12 @@ func (s startup) subREQHello(p process) {
// which is the information we pass along to generate metrics. // which is the information we pass along to generate metrics.
proc.procFunc = func(ctx context.Context) error { proc.procFunc = func(ctx context.Context) error {
sayHelloNodes := make(map[node]struct{}) sayHelloNodes := make(map[node]struct{})
promHelloNodes := promauto.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes",
Help: "The current number of total nodes who have said hello",
})
for { for {
// Receive a copy of the message sent from the method handler. // Receive a copy of the message sent from the method handler.
var m Message var m Message
@ -203,13 +210,7 @@ func (s startup) subREQHello(p process) {
sayHelloNodes[m.FromNode] = struct{}{} sayHelloNodes[m.FromNode] = struct{}{}
// update the prometheus metrics // update the prometheus metrics
proc.processes.metricsCh <- metricType{ promHelloNodes.Set(float64(len(sayHelloNodes)))
metric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes",
Help: "The current number of total nodes who have said hello",
}),
value: float64(len(sayHelloNodes)),
}
} }
} }
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)