From a669472c0348fd8a1505023c9615db928fd229c7 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 26 Aug 2021 11:41:46 +0200 Subject: [PATCH] more metrics --- metrics.go | 60 +++++++++++++++++++++++++++++--------- process.go | 3 ++ ringbuffer.go | 2 +- subscriber_method_types.go | 2 +- 4 files changed, 51 insertions(+), 16 deletions(-) diff --git a/metrics.go b/metrics.go index 1bfee3e..bea8b4a 100644 --- a/metrics.go +++ b/metrics.go @@ -33,7 +33,7 @@ type metrics struct { // --- Ringbuffer // Prometheus metrics for the last processed DB id in key // value store. - promMessagesProcessedTotal prometheus.Gauge + promMessagesProcessedIDLast prometheus.Gauge // Prometheus metrics for the total count of stalled // messages in the ringbuffer. promRingbufferStalledMessagesTotal prometheus.Counter @@ -42,8 +42,16 @@ type metrics struct { // Prometheus metrics for current messages delivered by a // user into the system. promUserMessagesTotal prometheus.Counter - // Metrics for nats messages delivered total + // Metrics for nats messages delivered total. promNatsDeliveredTotal prometheus.Counter + // Metrics for messages that failed to get ack replies. + promNatsMessagesFailedACKsTotal prometheus.Counter + // Metrics for messages that missed to get ack replies. + promNatsMessagesMissedACKsTotal prometheus.Counter + // Metrics for received error messages + promErrorMessagesReceivedTotal prometheus.Counter + // Metrics for sent error messages + promErrorMessagesSentTotal prometheus.Counter } // newMetrics will prepare and return a *metrics. @@ -58,62 +66,86 @@ func newMetrics(hostAndPort string) *metrics { } m.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "processes_total", + Name: "steward_processes_total", Help: "The current number of total running processes", }) m.promRegistry.MustRegister(m.promProcessesTotal) m.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "processes_all_running", + Name: "steward_processes_all_running", Help: "Name of the running processes", }, []string{"processName"}, ) m.promRegistry.MustRegister(m.promProcessesAllRunning) m.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes_total", + Name: "steward_hello_nodes_total", Help: "The current number of total nodes who have said hello", }) m.promRegistry.MustRegister(m.promHelloNodesTotal) m.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "hello_node_contact_last", + Name: "steward_hello_node_contact_last", Help: "Name of the nodes who have said hello", }, []string{"nodeName"}, ) m.promRegistry.MustRegister(m.promHelloNodesContactLast) - m.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "messages_processed_total", - Help: "The last processed db in key value/store", + m.promMessagesProcessedIDLast = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "steward_messages_processed_id_last", + Help: "The last processed id in key value/store db", }) - m.promRegistry.MustRegister(m.promMessagesProcessedTotal) + m.promRegistry.MustRegister(m.promMessagesProcessedIDLast) m.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "ringbuffer_stalled_messages_total", + Name: "steward_ringbuffer_stalled_messages_total", Help: "Number of stalled messages in ringbuffer", }) m.promRegistry.MustRegister(m.promRingbufferStalledMessagesTotal) m.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "in_memory_buffer_messages_current", + Name: "steward_in_memory_buffer_messages_current", Help: "The current value of messages in memory buffer", }) m.promRegistry.MustRegister(m.promInMemoryBufferMessagesCurrent) // Register som metrics for messages delivered by users into the system. m.promUserMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "user_messages_total", + Name: "steward_user_messages_total", Help: "Number of total messages delivered by users into the system", }) m.promRegistry.MustRegister(m.promUserMessagesTotal) m.promNatsDeliveredTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "nats_delivered_total", + Name: "steward_nats_delivered_total", Help: "Number of total messages delivered by nats", }) m.promRegistry.MustRegister(m.promNatsDeliveredTotal) + m.promNatsMessagesFailedACKsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "steward_nats_messages_failed_acks_total", + Help: "Number of messages that never received an ack total", + }) + m.promRegistry.MustRegister(m.promNatsMessagesFailedACKsTotal) + + m.promNatsMessagesMissedACKsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "steward_nats_messages_missed_acks_total", + Help: "Number of messages missed receiving an ack total", + }) + m.promRegistry.MustRegister(m.promNatsMessagesMissedACKsTotal) + + m.promErrorMessagesReceivedTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "steward_error_messages_received_total", + Help: "Number of error messages received total", + }) + m.promRegistry.MustRegister(m.promNatsMessagesMissedACKsTotal) + + m.promErrorMessagesSentTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "steward_error_messages_sent_total", + Help: "Number of error messages sent total", + }) + m.promRegistry.MustRegister(m.promErrorMessagesReceivedTotal) + return &m } diff --git a/process.go b/process.go index 71f596e..9a55126 100644 --- a/process.go +++ b/process.go @@ -274,10 +274,13 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // max retries reached er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method) sendErrorLogMessage(p.toRingbufferCh, p.node, er) + + p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc() return default: // none of the above matched, so we've not reached max retries yet + p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc() continue } } diff --git a/ringbuffer.go b/ringbuffer.go index 5a79eed..f4743de 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -256,7 +256,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam select { case <-v.Data.done: log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) - r.metrics.promMessagesProcessedTotal.Set(float64(v.ID)) + r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID)) // case <-time.After(time.Second * 3): // // Testing with a timeout here to figure out if messages are stuck // // waiting for done signal. diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 3bc457b..ac90aff 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -678,7 +678,7 @@ func (m methodREQErrorLog) getKind() CommandOrEvent { // Handle the writing of error logs. func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { - log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) + proc.processes.metrics.promErrorMessagesReceivedTotal.Inc() // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data.