From 9846a9eb2fc9ba561f1083e34b2f89443d0a92c2 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 26 Aug 2021 10:50:40 +0200 Subject: [PATCH] moved registering of metrics into newMetrics --- metrics.go | 64 +++++++++++++++++++++++++++++++++- process.go | 3 ++ processes.go | 28 --------------- read_socket_or_tcp_listener.go | 2 +- ringbuffer.go | 20 ----------- 5 files changed, 67 insertions(+), 50 deletions(-) diff --git a/metrics.go b/metrics.go index 70ea85b..1bfee3e 100644 --- a/metrics.go +++ b/metrics.go @@ -37,8 +37,13 @@ type metrics struct { // Prometheus metrics for the total count of stalled // messages in the ringbuffer. promRingbufferStalledMessagesTotal prometheus.Counter - // Prometheus metrics for current messages in memory buffer + // Prometheus metrics for current messages in memory buffer. promInMemoryBufferMessagesCurrent prometheus.Gauge + // Prometheus metrics for current messages delivered by a + // user into the system. + promUserMessagesTotal prometheus.Counter + // Metrics for nats messages delivered total + promNatsDeliveredTotal prometheus.Counter } // newMetrics will prepare and return a *metrics. @@ -52,6 +57,63 @@ func newMetrics(hostAndPort string) *metrics { hostAndPort: hostAndPort, } + m.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "processes_total", + Help: "The current number of total running processes", + }) + m.promRegistry.MustRegister(m.promProcessesTotal) + + m.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "processes_all_running", + Help: "Name of the running processes", + }, []string{"processName"}, + ) + m.promRegistry.MustRegister(m.promProcessesAllRunning) + + m.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes_total", + Help: "The current number of total nodes who have said hello", + }) + m.promRegistry.MustRegister(m.promHelloNodesTotal) + + m.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "hello_node_contact_last", + Help: "Name of the nodes who have said hello", + }, []string{"nodeName"}, + ) + m.promRegistry.MustRegister(m.promHelloNodesContactLast) + + m.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "messages_processed_total", + Help: "The last processed db in key value/store", + }) + m.promRegistry.MustRegister(m.promMessagesProcessedTotal) + + m.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ringbuffer_stalled_messages_total", + Help: "Number of stalled messages in ringbuffer", + }) + m.promRegistry.MustRegister(m.promRingbufferStalledMessagesTotal) + + m.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "in_memory_buffer_messages_current", + Help: "The current value of messages in memory buffer", + }) + m.promRegistry.MustRegister(m.promInMemoryBufferMessagesCurrent) + + // Register som metrics for messages delivered by users into the system. + m.promUserMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "user_messages_total", + Help: "Number of total messages delivered by users into the system", + }) + m.promRegistry.MustRegister(m.promUserMessagesTotal) + + m.promNatsDeliveredTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "nats_delivered_total", + Help: "Number of total messages delivered by nats", + }) + m.promRegistry.MustRegister(m.promNatsDeliveredTotal) + return &m } diff --git a/process.go b/process.go index 4595a09..71f596e 100644 --- a/process.go +++ b/process.go @@ -283,6 +283,9 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { } log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data) } + + p.processes.metrics.promNatsDeliveredTotal.Inc() + return } } diff --git a/processes.go b/processes.go index bfce627..08f5c78 100644 --- a/processes.go +++ b/processes.go @@ -43,21 +43,6 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes { p.metrics = metrics - // Register the metrics for the process. - - p.metrics.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "processes_total", - Help: "The current number of total running processes", - }) - metrics.promRegistry.MustRegister(p.metrics.promProcessesTotal) - - p.metrics.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "processes_all_running", - Help: "Name of the running processes", - }, []string{"processName"}, - ) - metrics.promRegistry.MustRegister(metrics.promProcessesAllRunning) - return &p } @@ -275,19 +260,6 @@ func (s startup) subREQHello(p process) { proc.procFunc = func(ctx context.Context) error { sayHelloNodes := make(map[Node]struct{}) - s.metrics.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes_total", - Help: "The current number of total nodes who have said hello", - }) - s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesTotal) - - s.metrics.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "hello_node_contact_last", - Help: "Name of the nodes who have said hello", - }, []string{"nodeName"}, - ) - s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesContactLast) - for { // Receive a copy of the message sent from the method handler. var m Message diff --git a/read_socket_or_tcp_listener.go b/read_socket_or_tcp_listener.go index bf903bc..3f54e30 100644 --- a/read_socket_or_tcp_listener.go +++ b/read_socket_or_tcp_listener.go @@ -14,7 +14,6 @@ import ( // It will take a channel of []byte as input, and it is in this // channel the content of a file that has changed is returned. func (s *server) readSocket() { - // Loop, and wait for new connections. for { conn, err := s.StewardSocket.Accept() @@ -163,6 +162,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { // Check for toNode and toNodes field. MsgSlice = s.checkMessageToNodes(MsgSlice) + s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice))) sam := []subjectAndMessage{} diff --git a/ringbuffer.go b/ringbuffer.go index 7fb88a2..5a79eed 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -19,7 +19,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" bolt "go.etcd.io/bbolt" ) @@ -73,25 +72,6 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin os.Exit(1) } - // Set up the metrics to be used. - metrics.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "messages_processed_total", - Help: "The last processed db in key value/store", - }) - metrics.promRegistry.MustRegister(metrics.promMessagesProcessedTotal) - - metrics.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "ringbuffer_stalled_messages_total", - Help: "Number of stalled messages in ringbuffer", - }) - metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal) - - metrics.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "in_memory_buffer_messages_current", - Help: "The current value of messages in memory buffer", - }) - metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent) - return &ringBuffer{ bufData: make(chan samDBValue, size), db: db,