From c9e095ff5b1aba7b6a8201d1bb1c08d877de2ff2 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 18 Aug 2021 15:41:53 +0200 Subject: [PATCH] more metrics --- metrics.go | 23 ++++++++++++++--------- process.go | 2 +- processes.go | 28 ++++++++++++++-------------- ringbuffer.go | 23 +++++++++++++++++++++-- server.go | 2 +- subscriber_method_types.go | 2 +- test.file | 1 - 7 files changed, 52 insertions(+), 29 deletions(-) delete mode 100644 test.file diff --git a/metrics.go b/metrics.go index 8b29286..bfc1e0d 100644 --- a/metrics.go +++ b/metrics.go @@ -13,18 +13,23 @@ import ( // metrics are generally used to hold the structure around metrics // handling type metrics struct { - // The channel to pass metrics that should be processed + // The channel to pass metrics that should be processed. promRegistry *prometheus.Registry - // host and port where prometheus metrics will be exported + // host and port where prometheus metrics will be exported. hostAndPort string + // Prometheus metrics for total processes. + promProcessesTotal prometheus.Gauge + // Prometheus metrics for vector of process names. + promProcessesAllRunning *prometheus.GaugeVec + // Prometheus metrics for number of hello nodes. + promHelloNodesTotal prometheus.Gauge + // Prometheus metrics for the vector of hello nodes. + promHelloNodesContactLast *prometheus.GaugeVec + // Prometheus metrics for the last processed DB id in key + // value store. + promMessagesProcessedTotal prometheus.Gauge // - promTotalProcesses prometheus.Gauge - // - promProcessesVec *prometheus.GaugeVec - // - promHelloNodes prometheus.Gauge - // - promHelloNodesNameVec *prometheus.GaugeVec + promRingbufferStalledMessagesTotal prometheus.Counter } // newMetrics will prepare and return a *metrics diff --git a/process.go b/process.go index 5199888..4595a09 100644 --- a/process.go +++ b/process.go @@ -146,7 +146,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { processName := processNameGet(p.subject.name(), p.processKind) // Add prometheus metrics for the process. - p.processes.metrics.promProcessesVec.With(prometheus.Labels{"processName": string(processName)}) + p.processes.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. diff --git a/processes.go b/processes.go index 4f4fde9..45afded 100644 --- a/processes.go +++ b/processes.go @@ -45,18 +45,18 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes { // Register the metrics for the process. - p.metrics.promTotalProcesses = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "total_running_processes", + p.metrics.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "processes_total", Help: "The current number of total running processes", }) - metrics.promRegistry.MustRegister(p.metrics.promTotalProcesses) + metrics.promRegistry.MustRegister(p.metrics.promProcessesTotal) - p.metrics.promProcessesVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "running_process", - Help: "Name of the running process", + p.metrics.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "processes_all_running", + Help: "Name of the running processes", }, []string{"processName"}, ) - metrics.promRegistry.MustRegister(metrics.promProcessesVec) + metrics.promRegistry.MustRegister(metrics.promProcessesAllRunning) return &p } @@ -274,18 +274,18 @@ func (s startup) subREQHello(p process) { proc.procFunc = func(ctx context.Context) error { sayHelloNodes := make(map[Node]struct{}) - s.metrics.promHelloNodes = prometheus.NewGauge(prometheus.GaugeOpts{ + s.metrics.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "hello_nodes_total", Help: "The current number of total nodes who have said hello", }) - s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodes) + s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesTotal) - s.metrics.promHelloNodesNameVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + s.metrics.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "hello_node_contact_last", Help: "Name of the nodes who have said hello", }, []string{"nodeName"}, ) - s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesNameVec) + s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesContactLast) for { // Receive a copy of the message sent from the method handler. @@ -304,8 +304,8 @@ func (s startup) subREQHello(p process) { sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics - s.metrics.promHelloNodes.Set(float64(len(sayHelloNodes))) - s.metrics.promHelloNodesNameVec.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() + s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) + s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() } } @@ -365,6 +365,6 @@ func (p *processes) printProcessesMap() { } p.mu.Unlock() - p.metrics.promTotalProcesses.Set(float64(len(p.active))) + p.metrics.promProcessesTotal.Set(float64(len(p.active))) } diff --git a/ringbuffer.go b/ringbuffer.go index 399d7bd..9af7b92 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" bolt "go.etcd.io/bbolt" ) @@ -46,11 +47,11 @@ type ringBuffer struct { nodeName Node // New messages to the system to be put into the ring buffer. newMessagesCh chan []subjectAndMessage + metrics *metrics } // newringBuffer returns a push/pop storage for values. -func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { - // --- +func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { // Check if socket folder exists, if not create it if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) { err := os.MkdirAll(c.DatabaseFolder, 0700) @@ -69,12 +70,27 @@ func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, log.Printf("error: failed to open db: %v\n", err) os.Exit(1) } + + // Set up the metrics to be used. + metrics.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "messages_processed_total", + Help: "The last processed db in key value/store", + }) + metrics.promRegistry.MustRegister(metrics.promMessagesProcessedTotal) + + metrics.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ringbuffer_stalled_messages_total", + Help: "Number of stalled messages in ringbuffer", + }) + metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal) + return &ringBuffer{ bufData: make(chan samDBValue, size), db: db, permStore: make(chan string), nodeName: nodeName, newMessagesCh: newMessagesCh, + metrics: metrics, } } @@ -240,6 +256,8 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam // Testing with a timeout here to figure out if messages are stuck // waiting for done signal. log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) + + r.metrics.promRingbufferStalledMessagesTotal.Inc() } // Listen on the done channel here , so a go routine handling the // message will be able to signal back here that the message have @@ -248,6 +266,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam select { case <-v.Data.done: log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) + r.metrics.promMessagesProcessedTotal.Set(float64(v.ID)) // case <-time.After(time.Second * 3): // // Testing with a timeout here to figure out if messages are stuck // // waiting for done signal. diff --git a/server.go b/server.go index 5c62184..47de052 100644 --- a/server.go +++ b/server.go @@ -318,7 +318,7 @@ type samDBValueAndDelivered struct { func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) { // Prepare and start a new ring buffer const bufferSize int = 1000 - rb := newringBuffer(*s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh) + rb := newringBuffer(s.metrics, *s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh) inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValueAndDelivered) // start the ringbuffer. diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 5422d05..5f665c9 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -448,7 +448,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri } // Remove the prometheus label - proc.processes.metrics.promProcessesVec.Delete(prometheus.Labels{"processName": string(processName)}) + proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) diff --git a/test.file b/test.file deleted file mode 100644 index c2e7a8d..0000000 --- a/test.file +++ /dev/null @@ -1 +0,0 @@ -some file content