1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

metric for in memory buffered messages current

This commit is contained in:
postmannen 2021-08-18 15:57:33 +02:00
parent c9e095ff5b
commit 4c895c656e
2 changed files with 21 additions and 4 deletions

View file

@ -17,27 +17,36 @@ type metrics struct {
promRegistry *prometheus.Registry
// host and port where prometheus metrics will be exported.
hostAndPort string
// --- Processes
// Prometheus metrics for total processes.
promProcessesTotal prometheus.Gauge
// Prometheus metrics for vector of process names.
promProcessesAllRunning *prometheus.GaugeVec
// --- Methods
// Prometheus metrics for number of hello nodes.
promHelloNodesTotal prometheus.Gauge
// Prometheus metrics for the vector of hello nodes.
promHelloNodesContactLast *prometheus.GaugeVec
// --- Ringbuffer
// Prometheus metrics for the last processed DB id in key
// value store.
promMessagesProcessedTotal prometheus.Gauge
//
// Prometheus metrics for the total count of stalled
// messages in the ringbuffer.
promRingbufferStalledMessagesTotal prometheus.Counter
// Prometheus metrics for current messages in memory buffer
promInMemoryBufferMessagesCurrent prometheus.Gauge
}
// newMetrics will prepare and return a *metrics
// newMetrics will prepare and return a *metrics.
func newMetrics(hostAndPort string) *metrics {
reg := prometheus.NewRegistry()
//prometheus.Unregister(prometheus.NewGoCollector())
//prometheus.Unregister(prometheus.NewGoCollector()).
reg.MustRegister(collectors.NewGoCollector())
// prometheus.MustRegister(collectors.NewGoCollector())
// prometheus.MustRegister(collectors.NewGoCollector()).
m := metrics{
promRegistry: reg,
hostAndPort: hostAndPort,

View file

@ -84,6 +84,12 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin
})
metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal)
metrics.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "in_memory_buffer_messages_current",
Help: "The current value of messages in memory buffer",
})
metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent)
return &ringBuffer{
bufData: make(chan samDBValue, size),
db: db,
@ -221,6 +227,8 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan samDBValueAndDelivered) {
// Range over the buffer of messages to pass on to processes.
for v := range r.bufData {
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
// Create a done channel per message. A process started by the
// spawnProcess function will handle incomming messages sequentaly.
// So in the spawnProcess function we put a struct{} value when a