mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 06:46:48 +00:00
moved registering of metrics into newMetrics
This commit is contained in:
parent
1c1f56a007
commit
9846a9eb2f
5 changed files with 67 additions and 50 deletions
64
metrics.go
64
metrics.go
|
@ -37,8 +37,13 @@ type metrics struct {
|
|||
// Prometheus metrics for the total count of stalled
|
||||
// messages in the ringbuffer.
|
||||
promRingbufferStalledMessagesTotal prometheus.Counter
|
||||
// Prometheus metrics for current messages in memory buffer
|
||||
// Prometheus metrics for current messages in memory buffer.
|
||||
promInMemoryBufferMessagesCurrent prometheus.Gauge
|
||||
// Prometheus metrics for current messages delivered by a
|
||||
// user into the system.
|
||||
promUserMessagesTotal prometheus.Counter
|
||||
// Metrics for nats messages delivered total
|
||||
promNatsDeliveredTotal prometheus.Counter
|
||||
}
|
||||
|
||||
// newMetrics will prepare and return a *metrics.
|
||||
|
@ -52,6 +57,63 @@ func newMetrics(hostAndPort string) *metrics {
|
|||
hostAndPort: hostAndPort,
|
||||
}
|
||||
|
||||
m.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "processes_total",
|
||||
Help: "The current number of total running processes",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promProcessesTotal)
|
||||
|
||||
m.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "processes_all_running",
|
||||
Help: "Name of the running processes",
|
||||
}, []string{"processName"},
|
||||
)
|
||||
m.promRegistry.MustRegister(m.promProcessesAllRunning)
|
||||
|
||||
m.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "hello_nodes_total",
|
||||
Help: "The current number of total nodes who have said hello",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promHelloNodesTotal)
|
||||
|
||||
m.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "hello_node_contact_last",
|
||||
Help: "Name of the nodes who have said hello",
|
||||
}, []string{"nodeName"},
|
||||
)
|
||||
m.promRegistry.MustRegister(m.promHelloNodesContactLast)
|
||||
|
||||
m.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "messages_processed_total",
|
||||
Help: "The last processed db in key value/store",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promMessagesProcessedTotal)
|
||||
|
||||
m.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "ringbuffer_stalled_messages_total",
|
||||
Help: "Number of stalled messages in ringbuffer",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promRingbufferStalledMessagesTotal)
|
||||
|
||||
m.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "in_memory_buffer_messages_current",
|
||||
Help: "The current value of messages in memory buffer",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promInMemoryBufferMessagesCurrent)
|
||||
|
||||
// Register som metrics for messages delivered by users into the system.
|
||||
m.promUserMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "user_messages_total",
|
||||
Help: "Number of total messages delivered by users into the system",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promUserMessagesTotal)
|
||||
|
||||
m.promNatsDeliveredTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "nats_delivered_total",
|
||||
Help: "Number of total messages delivered by nats",
|
||||
})
|
||||
m.promRegistry.MustRegister(m.promNatsDeliveredTotal)
|
||||
|
||||
return &m
|
||||
}
|
||||
|
||||
|
|
|
@ -283,6 +283,9 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
}
|
||||
log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data)
|
||||
}
|
||||
|
||||
p.processes.metrics.promNatsDeliveredTotal.Inc()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
28
processes.go
28
processes.go
|
@ -43,21 +43,6 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes {
|
|||
|
||||
p.metrics = metrics
|
||||
|
||||
// Register the metrics for the process.
|
||||
|
||||
p.metrics.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "processes_total",
|
||||
Help: "The current number of total running processes",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(p.metrics.promProcessesTotal)
|
||||
|
||||
p.metrics.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "processes_all_running",
|
||||
Help: "Name of the running processes",
|
||||
}, []string{"processName"},
|
||||
)
|
||||
metrics.promRegistry.MustRegister(metrics.promProcessesAllRunning)
|
||||
|
||||
return &p
|
||||
}
|
||||
|
||||
|
@ -275,19 +260,6 @@ func (s startup) subREQHello(p process) {
|
|||
proc.procFunc = func(ctx context.Context) error {
|
||||
sayHelloNodes := make(map[Node]struct{})
|
||||
|
||||
s.metrics.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "hello_nodes_total",
|
||||
Help: "The current number of total nodes who have said hello",
|
||||
})
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesTotal)
|
||||
|
||||
s.metrics.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "hello_node_contact_last",
|
||||
Help: "Name of the nodes who have said hello",
|
||||
}, []string{"nodeName"},
|
||||
)
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesContactLast)
|
||||
|
||||
for {
|
||||
// Receive a copy of the message sent from the method handler.
|
||||
var m Message
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
// It will take a channel of []byte as input, and it is in this
|
||||
// channel the content of a file that has changed is returned.
|
||||
func (s *server) readSocket() {
|
||||
|
||||
// Loop, and wait for new connections.
|
||||
for {
|
||||
conn, err := s.StewardSocket.Accept()
|
||||
|
@ -163,6 +162,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
|||
|
||||
// Check for toNode and toNodes field.
|
||||
MsgSlice = s.checkMessageToNodes(MsgSlice)
|
||||
s.metrics.promUserMessagesTotal.Add(float64(len(MsgSlice)))
|
||||
|
||||
sam := []subjectAndMessage{}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
@ -73,25 +72,6 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Set up the metrics to be used.
|
||||
metrics.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "messages_processed_total",
|
||||
Help: "The last processed db in key value/store",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(metrics.promMessagesProcessedTotal)
|
||||
|
||||
metrics.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "ringbuffer_stalled_messages_total",
|
||||
Help: "Number of stalled messages in ringbuffer",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal)
|
||||
|
||||
metrics.promInMemoryBufferMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "in_memory_buffer_messages_current",
|
||||
Help: "The current value of messages in memory buffer",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(metrics.promInMemoryBufferMessagesCurrent)
|
||||
|
||||
return &ringBuffer{
|
||||
bufData: make(chan samDBValue, size),
|
||||
db: db,
|
||||
|
|
Loading…
Add table
Reference in a new issue