mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
more metrics
This commit is contained in:
parent
a69a5aed19
commit
c9e095ff5b
7 changed files with 52 additions and 29 deletions
23
metrics.go
23
metrics.go
|
@ -13,18 +13,23 @@ import (
|
|||
// metrics are generally used to hold the structure around metrics
|
||||
// handling
|
||||
type metrics struct {
|
||||
// The channel to pass metrics that should be processed
|
||||
// The channel to pass metrics that should be processed.
|
||||
promRegistry *prometheus.Registry
|
||||
// host and port where prometheus metrics will be exported
|
||||
// host and port where prometheus metrics will be exported.
|
||||
hostAndPort string
|
||||
// Prometheus metrics for total processes.
|
||||
promProcessesTotal prometheus.Gauge
|
||||
// Prometheus metrics for vector of process names.
|
||||
promProcessesAllRunning *prometheus.GaugeVec
|
||||
// Prometheus metrics for number of hello nodes.
|
||||
promHelloNodesTotal prometheus.Gauge
|
||||
// Prometheus metrics for the vector of hello nodes.
|
||||
promHelloNodesContactLast *prometheus.GaugeVec
|
||||
// Prometheus metrics for the last processed DB id in key
|
||||
// value store.
|
||||
promMessagesProcessedTotal prometheus.Gauge
|
||||
//
|
||||
promTotalProcesses prometheus.Gauge
|
||||
//
|
||||
promProcessesVec *prometheus.GaugeVec
|
||||
//
|
||||
promHelloNodes prometheus.Gauge
|
||||
//
|
||||
promHelloNodesNameVec *prometheus.GaugeVec
|
||||
promRingbufferStalledMessagesTotal prometheus.Counter
|
||||
}
|
||||
|
||||
// newMetrics will prepare and return a *metrics
|
||||
|
|
|
@ -146,7 +146,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
|||
processName := processNameGet(p.subject.name(), p.processKind)
|
||||
|
||||
// Add prometheus metrics for the process.
|
||||
p.processes.metrics.promProcessesVec.With(prometheus.Labels{"processName": string(processName)})
|
||||
p.processes.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)})
|
||||
|
||||
// Start a publisher worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
|
|
28
processes.go
28
processes.go
|
@ -45,18 +45,18 @@ func newProcesses(ctx context.Context, metrics *metrics) *processes {
|
|||
|
||||
// Register the metrics for the process.
|
||||
|
||||
p.metrics.promTotalProcesses = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "total_running_processes",
|
||||
p.metrics.promProcessesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "processes_total",
|
||||
Help: "The current number of total running processes",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(p.metrics.promTotalProcesses)
|
||||
metrics.promRegistry.MustRegister(p.metrics.promProcessesTotal)
|
||||
|
||||
p.metrics.promProcessesVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "running_process",
|
||||
Help: "Name of the running process",
|
||||
p.metrics.promProcessesAllRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "processes_all_running",
|
||||
Help: "Name of the running processes",
|
||||
}, []string{"processName"},
|
||||
)
|
||||
metrics.promRegistry.MustRegister(metrics.promProcessesVec)
|
||||
metrics.promRegistry.MustRegister(metrics.promProcessesAllRunning)
|
||||
|
||||
return &p
|
||||
}
|
||||
|
@ -274,18 +274,18 @@ func (s startup) subREQHello(p process) {
|
|||
proc.procFunc = func(ctx context.Context) error {
|
||||
sayHelloNodes := make(map[Node]struct{})
|
||||
|
||||
s.metrics.promHelloNodes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
s.metrics.promHelloNodesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "hello_nodes_total",
|
||||
Help: "The current number of total nodes who have said hello",
|
||||
})
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodes)
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesTotal)
|
||||
|
||||
s.metrics.promHelloNodesNameVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
s.metrics.promHelloNodesContactLast = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "hello_node_contact_last",
|
||||
Help: "Name of the nodes who have said hello",
|
||||
}, []string{"nodeName"},
|
||||
)
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesNameVec)
|
||||
s.metrics.promRegistry.MustRegister(s.metrics.promHelloNodesContactLast)
|
||||
|
||||
for {
|
||||
// Receive a copy of the message sent from the method handler.
|
||||
|
@ -304,8 +304,8 @@ func (s startup) subREQHello(p process) {
|
|||
sayHelloNodes[m.FromNode] = struct{}{}
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.promHelloNodes.Set(float64(len(sayHelloNodes)))
|
||||
s.metrics.promHelloNodesNameVec.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||
s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
|
||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -365,6 +365,6 @@ func (p *processes) printProcessesMap() {
|
|||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
p.metrics.promTotalProcesses.Set(float64(len(p.active)))
|
||||
p.metrics.promProcessesTotal.Set(float64(len(p.active)))
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
@ -46,11 +47,11 @@ type ringBuffer struct {
|
|||
nodeName Node
|
||||
// New messages to the system to be put into the ring buffer.
|
||||
newMessagesCh chan []subjectAndMessage
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
// newringBuffer returns a push/pop storage for values.
|
||||
func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||
// ---
|
||||
func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.DatabaseFolder, 0700)
|
||||
|
@ -69,12 +70,27 @@ func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node,
|
|||
log.Printf("error: failed to open db: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Set up the metrics to be used.
|
||||
metrics.promMessagesProcessedTotal = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "messages_processed_total",
|
||||
Help: "The last processed db in key value/store",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(metrics.promMessagesProcessedTotal)
|
||||
|
||||
metrics.promRingbufferStalledMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "ringbuffer_stalled_messages_total",
|
||||
Help: "Number of stalled messages in ringbuffer",
|
||||
})
|
||||
metrics.promRegistry.MustRegister(metrics.promRingbufferStalledMessagesTotal)
|
||||
|
||||
return &ringBuffer{
|
||||
bufData: make(chan samDBValue, size),
|
||||
db: db,
|
||||
permStore: make(chan string),
|
||||
nodeName: nodeName,
|
||||
newMessagesCh: newMessagesCh,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,6 +256,8 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
|||
// Testing with a timeout here to figure out if messages are stuck
|
||||
// waiting for done signal.
|
||||
log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID)
|
||||
|
||||
r.metrics.promRingbufferStalledMessagesTotal.Inc()
|
||||
}
|
||||
// Listen on the done channel here , so a go routine handling the
|
||||
// message will be able to signal back here that the message have
|
||||
|
@ -248,6 +266,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
|||
select {
|
||||
case <-v.Data.done:
|
||||
log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
|
||||
r.metrics.promMessagesProcessedTotal.Set(float64(v.ID))
|
||||
// case <-time.After(time.Second * 3):
|
||||
// // Testing with a timeout here to figure out if messages are stuck
|
||||
// // waiting for done signal.
|
||||
|
|
|
@ -318,7 +318,7 @@ type samDBValueAndDelivered struct {
|
|||
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
|
||||
// Prepare and start a new ring buffer
|
||||
const bufferSize int = 1000
|
||||
rb := newringBuffer(*s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh)
|
||||
rb := newringBuffer(s.metrics, *s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh)
|
||||
inCh := make(chan subjectAndMessage)
|
||||
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
||||
// start the ringbuffer.
|
||||
|
|
|
@ -448,7 +448,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
}
|
||||
|
||||
// Remove the prometheus label
|
||||
proc.processes.metrics.promProcessesVec.Delete(prometheus.Labels{"processName": string(processName)})
|
||||
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||||
|
||||
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
some file content
|
Loading…
Reference in a new issue