diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 26b6ed9..6e0e7e4 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/prometheus.go b/prometheus.go index 57bd096..2e26641 100644 --- a/prometheus.go +++ b/prometheus.go @@ -1,43 +1,102 @@ package steward import ( + "fmt" "net/http" - "time" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) +// metrics are generally used to hold the structure around metrics +// handling type metrics struct { - helloNodes map[node]struct{} - HelloNodes prometheus.Gauge - TotalRunningProcesses prometheus.Gauge + // sayHelloNodes are the register where the register where nodes + // who have sent an sayHello are stored. Since the sayHello + // subscriber is a handler that will be just be called when a + // hello message is received we need to store the metrics somewhere + // else, that is why we store it here....at least for now. + sayHelloNodes map[node]struct{} + // The channel to pass metrics that should be processed + metricsCh chan metricType } +// HERE: func newMetrics() *metrics { m := metrics{ - helloNodes: make(map[node]struct{}), - TotalRunningProcesses: promauto.NewGauge(prometheus.GaugeOpts{ - Name: "total_running_processes", - Help: "The current number of total running processes", - }), - HelloNodes: promauto.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes", - Help: "The current number of total nodes who have said hello", - }), + sayHelloNodes: make(map[node]struct{}), } + m.metricsCh = make(chan metricType) return &m } -func (s *server) startMetrics() { +type Metricer interface { + Set(float64) +} +type metricType struct { + metric prometheus.Collector + value float64 +} + +func (s *server) startMetrics() { + // go func(ch chan metricType) { + // for { + // s.metrics.metricsCh <- metricType{ + // metric: prometheus.NewGauge(prometheus.GaugeOpts{ + // Name: "total_running_processes", + // Help: "The current number of total running processes", + // }), + // value: float64(len(s.processes)), + // } + // time.Sleep(time.Second * 2) + // } + // }(s.metrics.metricsCh) + + // go func(ch chan metricType) { + // for { + // s.metrics.metricsCh <- metricType{ + // metric: prometheus.NewGauge(prometheus.GaugeOpts{ + // Name: "hello_nodes", + // Help: "The current number of total nodes who have said hello", + // }), + // value: float64(len(s.metrics.sayHelloNodes)), + // } + // time.Sleep(time.Second * 2) + // } + // }(s.metrics.metricsCh) + + // Receive and process all metrics go func() { for { - s.metrics.TotalRunningProcesses.Set(float64(len(s.processes))) - s.metrics.HelloNodes.Set(float64(len(s.metrics.helloNodes))) - time.Sleep(2 * time.Second) + for f := range s.metrics.metricsCh { + fmt.Printf("********** RANGED A METRIC = %v, %v\n", f.metric, f.value) + // // Try to register the metric of the interface type prometheus.Collector + // prometheus.Register(f.metric) + + // Check the real type of the interface type + switch ff := f.metric.(type) { + case prometheus.Gauge: + + // Try to register. If it is already registered we need to check the error + // to get the previously registered collector, and update it's value. If it + // is not registered we register the new collector, and sets it's value. + err := prometheus.Register(ff) + if err != nil { + are, ok := err.(prometheus.AlreadyRegisteredError) + if ok { + // already registered, use the one we have and put it into ff + ff = are.ExistingCollector.(prometheus.Gauge) + + } + } + + ff.Set(f.value) + } + + } + } }() diff --git a/publisher.go b/publisher.go index 9f1a599..70a689f 100644 --- a/publisher.go +++ b/publisher.go @@ -11,6 +11,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus" ) type Message struct { @@ -150,6 +151,15 @@ func (s *server) printProcessesMap() { for _, v := range s.processes { fmt.Printf("*** - : %v\n", v) } + + s.metrics.metricsCh <- metricType{ + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "total_running_processes", + Help: "The current number of total running processes", + }), + value: float64(len(s.processes)), + } + fmt.Println("--------------------------------------------------------------------------------------------") } diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index fcc22af..3592b74 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -10,6 +10,8 @@ import ( "fmt" "log" "os/exec" + + "github.com/prometheus/client_golang/prometheus" ) // ------------------------------------------------------------ @@ -103,8 +105,18 @@ type methodEventSayHello struct{} func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) { log.Printf("################## Received hello from %v ##################\n", message.FromNode) - s.metrics.helloNodes[message.FromNode] = struct{}{} + // Since the handler is only called to handle a specific type of message we need + // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes + s.metrics.sayHelloNodes[message.FromNode] = struct{}{} + // update the prometheus metrics + s.metrics.metricsCh <- metricType{ + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes", + Help: "The current number of total nodes who have said hello", + }), + value: float64(len(s.metrics.sayHelloNodes)), + } outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return outMsg, nil }