mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
added sendInfoLogMessage function
This commit is contained in:
parent
de7a6c0dda
commit
8239c76bda
4 changed files with 25 additions and 6 deletions
|
@ -55,6 +55,8 @@ type metrics struct {
|
||||||
promErrorMessagesReceivedTotal prometheus.Counter
|
promErrorMessagesReceivedTotal prometheus.Counter
|
||||||
// Metrics for sent error messages
|
// Metrics for sent error messages
|
||||||
promErrorMessagesSentTotal prometheus.Counter
|
promErrorMessagesSentTotal prometheus.Counter
|
||||||
|
// Metrics for sent info messages
|
||||||
|
promInfoMessagesSentTotal prometheus.Counter
|
||||||
// Metrics for the amount of messages currently in db.
|
// Metrics for the amount of messages currently in db.
|
||||||
promDBMessagesCurrent prometheus.Gauge
|
promDBMessagesCurrent prometheus.Gauge
|
||||||
}
|
}
|
||||||
|
@ -157,6 +159,12 @@ func newMetrics(hostAndPort string) *metrics {
|
||||||
})
|
})
|
||||||
m.promRegistry.MustRegister(m.promErrorMessagesSentTotal)
|
m.promRegistry.MustRegister(m.promErrorMessagesSentTotal)
|
||||||
|
|
||||||
|
m.promInfoMessagesSentTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "steward_info_messages_sent_total",
|
||||||
|
Help: "Number of info messages sent total",
|
||||||
|
})
|
||||||
|
m.promRegistry.MustRegister(m.promInfoMessagesSentTotal)
|
||||||
|
|
||||||
m.promDBMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
|
m.promDBMessagesCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "steward_db_messages_current",
|
Name: "steward_db_messages_current",
|
||||||
Help: "The current value messages in database",
|
Help: "The current value messages in database",
|
||||||
|
|
|
@ -273,7 +273,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
// We do not want to send errorLogs for REQErrorLog type since
|
// We do not want to send errorLogs for REQErrorLog type since
|
||||||
// it will just cause an endless loop.
|
// it will just cause an endless loop.
|
||||||
if message.Method != REQErrorLog {
|
if message.Method != REQErrorLog {
|
||||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
@ -364,7 +364,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -424,7 +424,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||||
|
|
||||||
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
|
||||||
case "stopProc":
|
case "stopProc":
|
||||||
// Set the interface type dst to &OpStart.
|
// Set the interface type dst to &OpStart.
|
||||||
|
@ -485,7 +485,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||||||
|
|
||||||
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
|
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
newReplyMessage(proc, message, []byte(er.Error()))
|
newReplyMessage(proc, message, []byte(er.Error()))
|
||||||
|
@ -1325,7 +1325,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
||||||
// go routine.
|
// go routine.
|
||||||
// close(t.Lines)
|
// close(t.Lines)
|
||||||
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
|
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
|
||||||
return
|
return
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
|
@ -1444,7 +1444,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
|
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
|
||||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return
|
return
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
// Prepare and queue for sending a new message with the output
|
// Prepare and queue for sending a new message with the output
|
||||||
|
|
11
server.go
11
server.go
|
@ -301,6 +301,17 @@ func sendErrorLogMessage(conf *Configuration, metrics *metrics, newMessagesCh ch
|
||||||
metrics.promErrorMessagesSentTotal.Inc()
|
metrics.promErrorMessagesSentTotal.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendInfoMessage will put the error message directly on the channel that is
|
||||||
|
// read by the nats publishing functions.
|
||||||
|
func sendInfoLogMessage(conf *Configuration, metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
|
||||||
|
// NB: Adding log statement here for more visuality during development.
|
||||||
|
log.Printf("%v\n", theError)
|
||||||
|
sam := createErrorMsgContent(conf, FromNode, theError)
|
||||||
|
newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
|
metrics.promInfoMessagesSentTotal.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
// createErrorMsgContent will prepare a subject and message with the content
|
// createErrorMsgContent will prepare a subject and message with the content
|
||||||
// of the error
|
// of the error
|
||||||
func createErrorMsgContent(conf *Configuration, FromNode Node, theError error) subjectAndMessage {
|
func createErrorMsgContent(conf *Configuration, FromNode Node, theError error) subjectAndMessage {
|
||||||
|
|
Loading…
Add table
Reference in a new issue