diff --git a/prometheus.go b/prometheus.go index d73188a..ba91be0 100644 --- a/prometheus.go +++ b/prometheus.go @@ -14,7 +14,7 @@ import ( // handling type metrics struct { // The channel to pass metrics that should be processed - metricsCh chan metricType + promRegistry *prometheus.Registry // host and port where prometheus metrics will be exported hostAndPort string } @@ -22,56 +22,15 @@ type metrics struct { // newMetrics will prepare and return a *metrics func newMetrics(hostAndPort string) *metrics { m := metrics{ - metricsCh: make(chan metricType), - hostAndPort: hostAndPort, + promRegistry: prometheus.NewRegistry(), + hostAndPort: hostAndPort, } return &m } -type Metricer interface { - Set(float64) -} - -type metricType struct { - metric prometheus.Collector - value float64 -} - func (s *server) startMetrics() { - // Receive and process all metrics - go func() { - for { - for f := range s.metrics.metricsCh { - // // Try to register the metric of the interface type prometheus.Collector - // prometheus.Register(f.metric) - - // Check the real type of the interface type - switch ff := f.metric.(type) { - case prometheus.Gauge: - - // Try to register. If it is already registered we need to check the error - // to get the previously registered collector, and update it's value. If it - // is not registered we register the new collector, and sets it's value. - err := prometheus.Register(ff) - if err != nil { - are, ok := err.(prometheus.AlreadyRegisteredError) - if ok { - // already registered, use the one we have and put it into ff - ff = are.ExistingCollector.(prometheus.Gauge) - - } - } - - ff.Set(f.value) - } - - } - - } - }() - //http.Handle("/metrics", promhttp.Handler()) //http.ListenAndServe(":2112", nil) n, err := net.Listen("tcp", s.metrics.hostAndPort) diff --git a/server.go b/server.go index 4be7236..c92e13f 100644 --- a/server.go +++ b/server.go @@ -11,6 +11,7 @@ import ( "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type processName string @@ -29,17 +30,21 @@ type processes struct { mu sync.RWMutex // The last processID created lastProcessID int - // metrics channel - metricsCh chan metricType + // + promTotalProcesses prometheus.Gauge } // newProcesses will prepare and return a *processes -func newProcesses(metricsCh chan metricType) *processes { +func newProcesses(promRegistry *prometheus.Registry) *processes { p := processes{ - active: make(map[processName]process), - metricsCh: metricsCh, + active: make(map[processName]process), } + p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "total_running_processes", + Help: "The current number of total running processes", + }) + return &p } @@ -91,7 +96,7 @@ func NewServer(c *Configuration) (*server, error) { nodeName: c.NodeName, natsConn: conn, netListener: nl, - processes: newProcesses(metrics.metricsCh), + processes: newProcesses(metrics.promRegistry), toRingbufferCh: make(chan []subjectAndMessage), metrics: metrics, } @@ -156,13 +161,7 @@ func (p *processes) printProcessesMap() { } p.mu.Unlock() - p.metricsCh <- metricType{ - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "total_running_processes", - Help: "The current number of total running processes", - }), - value: float64(len(p.active)), - } + p.promTotalProcesses.Set(float64(len(p.active))) fmt.Println("--------------------------------------------------------------------------------------------") } diff --git a/startup_processes.go b/startup_processes.go index 17da955..a7977f2 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) func (p process) ProcessesStart() { @@ -186,6 +187,12 @@ func (s startup) subREQHello(p process) { // which is the information we pass along to generate metrics. proc.procFunc = func(ctx context.Context) error { sayHelloNodes := make(map[node]struct{}) + + promHelloNodes := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes", + Help: "The current number of total nodes who have said hello", + }) + for { // Receive a copy of the message sent from the method handler. var m Message @@ -203,13 +210,7 @@ func (s startup) subREQHello(p process) { sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics - proc.processes.metricsCh <- metricType{ - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes", - Help: "The current number of total nodes who have said hello", - }), - value: float64(len(sayHelloNodes)), - } + promHelloNodes.Set(float64(len(sayHelloNodes))) } } go proc.spawnWorker(p.processes, p.natsConn)