mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
implemented idea for handling metrics from processes
This commit is contained in:
parent
41d8091506
commit
b608eb01ef
4 changed files with 100 additions and 19 deletions
Binary file not shown.
|
@ -1,43 +1,102 @@
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// metrics are generally used to hold the structure around metrics
|
||||||
|
// handling
|
||||||
type metrics struct {
|
type metrics struct {
|
||||||
helloNodes map[node]struct{}
|
// sayHelloNodes are the register where the register where nodes
|
||||||
HelloNodes prometheus.Gauge
|
// who have sent an sayHello are stored. Since the sayHello
|
||||||
TotalRunningProcesses prometheus.Gauge
|
// subscriber is a handler that will be just be called when a
|
||||||
|
// hello message is received we need to store the metrics somewhere
|
||||||
|
// else, that is why we store it here....at least for now.
|
||||||
|
sayHelloNodes map[node]struct{}
|
||||||
|
// The channel to pass metrics that should be processed
|
||||||
|
metricsCh chan metricType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HERE:
|
||||||
func newMetrics() *metrics {
|
func newMetrics() *metrics {
|
||||||
m := metrics{
|
m := metrics{
|
||||||
helloNodes: make(map[node]struct{}),
|
sayHelloNodes: make(map[node]struct{}),
|
||||||
TotalRunningProcesses: promauto.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "total_running_processes",
|
|
||||||
Help: "The current number of total running processes",
|
|
||||||
}),
|
|
||||||
HelloNodes: promauto.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "hello_nodes",
|
|
||||||
Help: "The current number of total nodes who have said hello",
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
|
m.metricsCh = make(chan metricType)
|
||||||
|
|
||||||
return &m
|
return &m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) startMetrics() {
|
type Metricer interface {
|
||||||
|
Set(float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricType struct {
|
||||||
|
metric prometheus.Collector
|
||||||
|
value float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *server) startMetrics() {
|
||||||
|
// go func(ch chan metricType) {
|
||||||
|
// for {
|
||||||
|
// s.metrics.metricsCh <- metricType{
|
||||||
|
// metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
// Name: "total_running_processes",
|
||||||
|
// Help: "The current number of total running processes",
|
||||||
|
// }),
|
||||||
|
// value: float64(len(s.processes)),
|
||||||
|
// }
|
||||||
|
// time.Sleep(time.Second * 2)
|
||||||
|
// }
|
||||||
|
// }(s.metrics.metricsCh)
|
||||||
|
|
||||||
|
// go func(ch chan metricType) {
|
||||||
|
// for {
|
||||||
|
// s.metrics.metricsCh <- metricType{
|
||||||
|
// metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
// Name: "hello_nodes",
|
||||||
|
// Help: "The current number of total nodes who have said hello",
|
||||||
|
// }),
|
||||||
|
// value: float64(len(s.metrics.sayHelloNodes)),
|
||||||
|
// }
|
||||||
|
// time.Sleep(time.Second * 2)
|
||||||
|
// }
|
||||||
|
// }(s.metrics.metricsCh)
|
||||||
|
|
||||||
|
// Receive and process all metrics
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
s.metrics.TotalRunningProcesses.Set(float64(len(s.processes)))
|
for f := range s.metrics.metricsCh {
|
||||||
s.metrics.HelloNodes.Set(float64(len(s.metrics.helloNodes)))
|
fmt.Printf("********** RANGED A METRIC = %v, %v\n", f.metric, f.value)
|
||||||
time.Sleep(2 * time.Second)
|
// // Try to register the metric of the interface type prometheus.Collector
|
||||||
|
// prometheus.Register(f.metric)
|
||||||
|
|
||||||
|
// Check the real type of the interface type
|
||||||
|
switch ff := f.metric.(type) {
|
||||||
|
case prometheus.Gauge:
|
||||||
|
|
||||||
|
// Try to register. If it is already registered we need to check the error
|
||||||
|
// to get the previously registered collector, and update it's value. If it
|
||||||
|
// is not registered we register the new collector, and sets it's value.
|
||||||
|
err := prometheus.Register(ff)
|
||||||
|
if err != nil {
|
||||||
|
are, ok := err.(prometheus.AlreadyRegisteredError)
|
||||||
|
if ok {
|
||||||
|
// already registered, use the one we have and put it into ff
|
||||||
|
ff = are.ExistingCollector.(prometheus.Gauge)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ff.Set(f.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
10
publisher.go
10
publisher.go
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
|
@ -150,6 +151,15 @@ func (s *server) printProcessesMap() {
|
||||||
for _, v := range s.processes {
|
for _, v := range s.processes {
|
||||||
fmt.Printf("*** - : %v\n", v)
|
fmt.Printf("*** - : %v\n", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.metrics.metricsCh <- metricType{
|
||||||
|
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "total_running_processes",
|
||||||
|
Help: "The current number of total running processes",
|
||||||
|
}),
|
||||||
|
value: float64(len(s.processes)),
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Println("--------------------------------------------------------------------------------------------")
|
fmt.Println("--------------------------------------------------------------------------------------------")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
@ -103,8 +105,18 @@ type methodEventSayHello struct{}
|
||||||
|
|
||||||
func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) {
|
func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) {
|
||||||
log.Printf("################## Received hello from %v ##################\n", message.FromNode)
|
log.Printf("################## Received hello from %v ##################\n", message.FromNode)
|
||||||
s.metrics.helloNodes[message.FromNode] = struct{}{}
|
// Since the handler is only called to handle a specific type of message we need
|
||||||
|
// to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes
|
||||||
|
s.metrics.sayHelloNodes[message.FromNode] = struct{}{}
|
||||||
|
|
||||||
|
// update the prometheus metrics
|
||||||
|
s.metrics.metricsCh <- metricType{
|
||||||
|
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "hello_nodes",
|
||||||
|
Help: "The current number of total nodes who have said hello",
|
||||||
|
}),
|
||||||
|
value: float64(len(s.metrics.sayHelloNodes)),
|
||||||
|
}
|
||||||
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
return outMsg, nil
|
return outMsg, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue