diff --git a/metrics.go b/metrics.go index bfc1e0d..70ea85b 100644 --- a/metrics.go +++ b/metrics.go @@ -17,27 +17,36 @@ type metrics struct { promRegistry *prometheus.Registry // host and port where prometheus metrics will be exported. hostAndPort string + + // --- Processes // Prometheus metrics for total processes. promProcessesTotal prometheus.Gauge // Prometheus metrics for vector of process names. promProcessesAllRunning *prometheus.GaugeVec + + // --- Methods // Prometheus metrics for number of hello nodes. promHelloNodesTotal prometheus.Gauge // Prometheus metrics for the vector of hello nodes. promHelloNodesContactLast *prometheus.GaugeVec + + // --- Ringbuffer // Prometheus metrics for the last processed DB id in key // value store. promMessagesProcessedTotal prometheus.Gauge - // + // Prometheus metrics for the total count of stalled + // messages in the ringbuffer. promRingbufferStalledMessagesTotal prometheus.Counter + // Prometheus metrics for current messages in memory buffer + promInMemoryBufferMessagesCurrent prometheus.Gauge } -// newMetrics will prepare and return a *metrics +// newMetrics will prepare and return a *metrics. func newMetrics(hostAndPort string) *metrics { reg := prometheus.NewRegistry() - //prometheus.Unregister(prometheus.NewGoCollector()) + //prometheus.Unregister(prometheus.NewGoCollector()). reg.MustRegister(collectors.NewGoCollector()) - // prometheus.MustRegister(collectors.NewGoCollector()) + // prometheus.MustRegister(collectors.NewGoCollector()). m := metrics{ promRegistry: reg, hostAndPort: hostAndPort, diff --git a/ringbuffer.go b/ringbuffer.go index 9af7b92..47c6fc7 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -84,6 +84,12 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin }) metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal) + metrics.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "in_memory_buffer_messages_current", + Help: "The current value of messages in memory buffer", + }) + metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent) + return &ringBuffer{ bufData: make(chan samDBValue, size), db: db, @@ -221,6 +227,8 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan samDBValueAndDelivered) { // Range over the buffer of messages to pass on to processes. for v := range r.bufData { + r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) + // Create a done channel per message. A process started by the // spawnProcess function will handle incomming messages sequentaly. // So in the spawnProcess function we put a struct{} value when a