1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 13:49:29 +00:00

added metrics for current messages in db

This commit is contained in:
postmannen 2021-09-15 07:26:36 +02:00
parent cb867ba7a2
commit b83749cbc0
2 changed files with 32 additions and 0 deletions

View file

@ -55,6 +55,8 @@ type metrics struct {
promErrorMessagesReceivedTotal prometheus.Counter promErrorMessagesReceivedTotal prometheus.Counter
// Metrics for sent error messages // Metrics for sent error messages
promErrorMessagesSentTotal prometheus.Counter promErrorMessagesSentTotal prometheus.Counter
// Metrics for the amount of messages currently in db.
promDBMessagesCurrent prometheus.Gauge
} }
// newMetrics will prepare and return a *metrics. // newMetrics will prepare and return a *metrics.
@ -155,6 +157,12 @@ func newMetrics(hostAndPort string) *metrics {
}) })
m.promRegistry.MustRegister(m.promErrorMessagesSentTotal) m.promRegistry.MustRegister(m.promErrorMessagesSentTotal)
m.promDBMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "steward_db_messages_current",
Help: "The current value messages in database",
})
m.promRegistry.MustRegister(m.promDBMessagesCurrent)
return &m return &m
} }

View file

@ -106,6 +106,17 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValueAnd
// Start the process that will handle messages present in the ringbuffer. // Start the process that will handle messages present in the ringbuffer.
go r.processBufferMessages(outCh) go r.processBufferMessages(outCh)
go func() {
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
r.dbUpdateMetrics(r.samValueBucket)
}
}
}()
} }
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel. // fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
@ -364,6 +375,19 @@ func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error {
return err return err
} }
// db update metrics.
func (r *ringBuffer) dbUpdateMetrics(bucket string) error {
err := r.db.Update(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
r.metrics.promDBMessagesCurrent.Set(float64(bu.Stats().KeyN))
return nil
})
return err
}
// getIndexValue will get the last index value stored in DB. // getIndexValue will get the last index value stored in DB.
func (r *ringBuffer) getIndexValue() int { func (r *ringBuffer) getIndexValue() int {
const indexKey string = "index" const indexKey string = "index"