1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00

more metrics

This commit is contained in:
postmannen 2021-08-26 11:41:46 +02:00
parent 9846a9eb2f
commit a669472c03
4 changed files with 51 additions and 16 deletions

View file

@ -33,7 +33,7 @@ type metrics struct {
// --- Ringbuffer // --- Ringbuffer
// Prometheus metrics for the last processed DB id in key // Prometheus metrics for the last processed DB id in key
// value store. // value store.
promMessagesProcessedTotal prometheus.Gauge promMessagesProcessedIDLast prometheus.Gauge
// Prometheus metrics for the total count of stalled // Prometheus metrics for the total count of stalled
// messages in the ringbuffer. // messages in the ringbuffer.
promRingbufferStalledMessagesTotal prometheus.Counter promRingbufferStalledMessagesTotal prometheus.Counter
@ -42,8 +42,16 @@ type metrics struct {
// Prometheus metrics for current messages delivered by a // Prometheus metrics for current messages delivered by a
// user into the system. // user into the system.
promUserMessagesTotal prometheus.Counter promUserMessagesTotal prometheus.Counter
// Metrics for nats messages delivered total // Metrics for nats messages delivered total.
promNatsDeliveredTotal prometheus.Counter promNatsDeliveredTotal prometheus.Counter
// Metrics for messages that failed to get ack replies.
promNatsMessagesFailedACKsTotal prometheus.Counter
// Metrics for messages that missed to get ack replies.
promNatsMessagesMissedACKsTotal prometheus.Counter
// Metrics for received error messages
promErrorMessagesReceivedTotal prometheus.Counter
// Metrics for sent error messages
promErrorMessagesSentTotal prometheus.Counter
} }
// newMetrics will prepare and return a *metrics. // newMetrics will prepare and return a *metrics.
@ -58,62 +66,86 @@ func newMetrics(hostAndPort string) *metrics {
} }
m.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ m.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "processes_total", Name: "steward_processes_total",
Help: "The current number of total running processes", Help: "The current number of total running processes",
}) })
m.promRegistry.MustRegister(m.promProcessesTotal) m.promRegistry.MustRegister(m.promProcessesTotal)
m.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{ m.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "processes_all_running", Name: "steward_processes_all_running",
Help: "Name of the running processes", Help: "Name of the running processes",
}, []string{"processName"}, }, []string{"processName"},
) )
m.promRegistry.MustRegister(m.promProcessesAllRunning) m.promRegistry.MustRegister(m.promProcessesAllRunning)
m.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{ m.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes_total", Name: "steward_hello_nodes_total",
Help: "The current number of total nodes who have said hello", Help: "The current number of total nodes who have said hello",
}) })
m.promRegistry.MustRegister(m.promHelloNodesTotal) m.promRegistry.MustRegister(m.promHelloNodesTotal)
m.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{ m.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "hello_node_contact_last", Name: "steward_hello_node_contact_last",
Help: "Name of the nodes who have said hello", Help: "Name of the nodes who have said hello",
}, []string{"nodeName"}, }, []string{"nodeName"},
) )
m.promRegistry.MustRegister(m.promHelloNodesContactLast) m.promRegistry.MustRegister(m.promHelloNodesContactLast)
m.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{ m.promMessagesProcessedIDLast = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "messages_processed_total", Name: "steward_messages_processed_id_last",
Help: "The last processed db in key value/store", Help: "The last processed id in key value/store db",
}) })
m.promRegistry.MustRegister(m.promMessagesProcessedTotal) m.promRegistry.MustRegister(m.promMessagesProcessedIDLast)
m.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "ringbuffer_stalled_messages_total", Name: "steward_ringbuffer_stalled_messages_total",
Help: "Number of stalled messages in ringbuffer", Help: "Number of stalled messages in ringbuffer",
}) })
m.promRegistry.MustRegister(m.promRingbufferStalledMessagesTotal) m.promRegistry.MustRegister(m.promRingbufferStalledMessagesTotal)
m.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ m.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "in_memory_buffer_messages_current", Name: "steward_in_memory_buffer_messages_current",
Help: "The current value of messages in memory buffer", Help: "The current value of messages in memory buffer",
}) })
m.promRegistry.MustRegister(m.promInMemoryBufferMessagesCurrent) m.promRegistry.MustRegister(m.promInMemoryBufferMessagesCurrent)
// Register som metrics for messages delivered by users into the system. // Register som metrics for messages delivered by users into the system.
m.promUserMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.promUserMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "user_messages_total", Name: "steward_user_messages_total",
Help: "Number of total messages delivered by users into the system", Help: "Number of total messages delivered by users into the system",
}) })
m.promRegistry.MustRegister(m.promUserMessagesTotal) m.promRegistry.MustRegister(m.promUserMessagesTotal)
m.promNatsDeliveredTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.promNatsDeliveredTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "nats_delivered_total", Name: "steward_nats_delivered_total",
Help: "Number of total messages delivered by nats", Help: "Number of total messages delivered by nats",
}) })
m.promRegistry.MustRegister(m.promNatsDeliveredTotal) m.promRegistry.MustRegister(m.promNatsDeliveredTotal)
m.promNatsMessagesFailedACKsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "steward_nats_messages_failed_acks_total",
Help: "Number of messages that never received an ack total",
})
m.promRegistry.MustRegister(m.promNatsMessagesFailedACKsTotal)
m.promNatsMessagesMissedACKsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "steward_nats_messages_missed_acks_total",
Help: "Number of messages missed receiving an ack total",
})
m.promRegistry.MustRegister(m.promNatsMessagesMissedACKsTotal)
m.promErrorMessagesReceivedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "steward_error_messages_received_total",
Help: "Number of error messages received total",
})
m.promRegistry.MustRegister(m.promNatsMessagesMissedACKsTotal)
m.promErrorMessagesSentTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "steward_error_messages_sent_total",
Help: "Number of error messages sent total",
})
m.promRegistry.MustRegister(m.promErrorMessagesReceivedTotal)
return &m return &m
} }

View file

@ -274,10 +274,13 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// max retries reached // max retries reached
er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method) er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method)
sendErrorLogMessage(p.toRingbufferCh, p.node, er) sendErrorLogMessage(p.toRingbufferCh, p.node, er)
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
return return
default: default:
// none of the above matched, so we've not reached max retries yet // none of the above matched, so we've not reached max retries yet
p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc()
continue continue
} }
} }

View file

@ -256,7 +256,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
select { select {
case <-v.Data.done: case <-v.Data.done:
log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
r.metrics.promMessagesProcessedTotal.Set(float64(v.ID)) r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))
// case <-time.After(time.Second * 3): // case <-time.After(time.Second * 3):
// // Testing with a timeout here to figure out if messages are stuck // // Testing with a timeout here to figure out if messages are stuck
// // waiting for done signal. // // waiting for done signal.

View file

@ -678,7 +678,7 @@ func (m methodREQErrorLog) getKind() CommandOrEvent {
// Handle the writing of error logs. // Handle the writing of error logs.
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- Received error from: %v, containing: %v", message.FromNode, message.Data) proc.processes.metrics.promErrorMessagesReceivedTotal.Inc()
// If it was a request type message we want to check what the initial messages // If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data. // method, so we can use that in creating the file name to store the data.