1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

metrics on error messages

This commit is contained in:
postmannen 2021-08-26 12:26:08 +02:00
parent a669472c03
commit 18272d73ab
6 changed files with 66 additions and 64 deletions

View file

@ -138,13 +138,13 @@ func newMetrics(hostAndPort string) *metrics {
Name: "steward_error_messages_received_total", Name: "steward_error_messages_received_total",
Help: "Number of error messages received total", Help: "Number of error messages received total",
}) })
m.promRegistry.MustRegister(m.promNatsMessagesMissedACKsTotal) m.promRegistry.MustRegister(m.promErrorMessagesReceivedTotal)
m.promErrorMessagesSentTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.promErrorMessagesSentTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "steward_error_messages_sent_total", Name: "steward_error_messages_sent_total",
Help: "Number of error messages sent total", Help: "Number of error messages sent total",
}) })
m.promRegistry.MustRegister(m.promErrorMessagesReceivedTotal) m.promRegistry.MustRegister(m.promErrorMessagesSentTotal)
return &m return &m
} }

View file

@ -160,7 +160,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
err := p.procFunc(p.ctx) err := p.procFunc(p.ctx)
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) sendErrorLogMessage(procs.metrics, p.toRingbufferCh, Node(p.node), er)
} }
}() }()
} }
@ -180,7 +180,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
err := p.procFunc(p.ctx) err := p.procFunc(p.ctx)
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) sendErrorLogMessage(procs.metrics, p.toRingbufferCh, Node(p.node), er)
} }
}() }()
} }
@ -213,7 +213,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
dataPayload, err := gobEncodeMessage(message) dataPayload, err := gobEncodeMessage(message)
if err != nil { if err != nil {
er := fmt.Errorf("error: createDataPayload: %v", err) er := fmt.Errorf("error: createDataPayload: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
continue continue
} }
@ -273,7 +273,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
case retryAttempts >= message.Retries: case retryAttempts >= message.Retries:
// max retries reached // max retries reached
er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method) er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method)
sendErrorLogMessage(p.toRingbufferCh, p.node, er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, p.node, er)
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc() p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
return return
@ -314,7 +314,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err) er := fmt.Errorf("error: gob decoding failed: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
// Check if it is an ACK or NACK message, and do the appropriate action accordingly. // Check if it is an ACK or NACK message, and do the appropriate action accordingly.
@ -324,7 +324,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
out := []byte("not allowed from " + message.FromNode) out := []byte("not allowed from " + message.FromNode)
@ -343,11 +343,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
} else { } else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
@ -358,7 +358,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mf, ok := p.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
// Check if we are allowed to receive from that host // Check if we are allowed to receive from that host
@ -378,16 +378,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
} else { } else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
default: default:
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
} }
} }

View file

@ -19,7 +19,7 @@ func (s *server) readSocket() {
conn, err := s.StewardSocket.Accept() conn, err := s.StewardSocket.Accept()
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err) er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
} }
go func(conn net.Conn) { go func(conn net.Conn) {
@ -32,7 +32,7 @@ func (s *server) readSocket() {
_, err = conn.Read(b) _, err = conn.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
return return
} }
@ -49,7 +49,7 @@ func (s *server) readSocket() {
sams, err := s.convertBytesToSAMs(readBytes) sams, err := s.convertBytesToSAMs(readBytes)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json: %v", err) er := fmt.Errorf("error: malformed json: %v", err)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
return return
} }
@ -83,7 +83,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
conn, err := ln.Accept() conn, err := ln.Accept()
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err) er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
continue continue
} }
@ -97,7 +97,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
_, err = conn.Read(b) _, err = conn.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
return return
} }
@ -114,7 +114,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
sam, err := s.convertBytesToSAMs(readBytes) sam, err := s.convertBytesToSAMs(readBytes)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json: %v", err) er := fmt.Errorf("error: malformed json: %v", err)
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
return return
} }
@ -215,7 +215,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
// the slice since it is not valid. // the slice since it is not valid.
default: default:
er := fmt.Errorf("error: no toNode or toNodes where specified in the message got'n, dropping message: %v", v) er := fmt.Errorf("error: no toNode or toNodes where specified in the message got'n, dropping message: %v", v)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
continue continue
} }
} }

View file

@ -144,7 +144,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
// Check if the command or event exists in commandOrEvent.go // Check if the command or event exists in commandOrEvent.go
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) { if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject) er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
fmt.Println() fmt.Println()
// if it was not a valid value, we jump back up, and // if it was not a valid value, we jump back up, and
@ -177,14 +177,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
js, err := json.Marshal(samV) js, err := json.Marshal(samV)
if err != nil { if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
} }
// Store the incomming message in key/value store // Store the incomming message in key/value store
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js) err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
if err != nil { if err != nil {
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err) er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
} }

View file

@ -282,11 +282,13 @@ func (s *server) Stop() {
// sendErrorMessage will put the error message directly on the channel that is // sendErrorMessage will put the error message directly on the channel that is
// read by the nats publishing functions. // read by the nats publishing functions.
func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) { func sendErrorLogMessage(metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// NB: Adding log statement here for more visuality during development. // NB: Adding log statement here for more visuality during development.
log.Printf("%v\n", theError) log.Printf("%v\n", theError)
sam := createErrorMsgContent(FromNode, theError) sam := createErrorMsgContent(FromNode, theError)
newMessagesCh <- []subjectAndMessage{sam} newMessagesCh <- []subjectAndMessage{sam}
metrics.promErrorMessagesSentTotal.Inc()
} }
// createErrorMsgContent will prepare a subject and message with the content // createErrorMsgContent will prepare a subject and message with the content
@ -366,12 +368,12 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// Check if the format of the message is correct. // Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
continue continue
} }
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method) er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
sendErrorLogMessage(s.newMessagesCh, Node(s.nodeName), er) sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
continue continue
} }

View file

@ -285,7 +285,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
if err != nil { if err != nil {
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
@ -361,7 +361,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
err := json.Unmarshal(message.Operation.OpArg, &dst) err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -372,14 +372,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
if len(arg.AllowedNodes) == 0 { if len(arg.AllowedNodes) == 0 {
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message)) er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return return
} }
if arg.Method == "" { if arg.Method == "" {
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message)) er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return return
} }
@ -390,7 +390,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
go procNew.spawnWorker(proc.processes, proc.natsConn) go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case "stopProc": case "stopProc":
// Set the interface type dst to &OpStart. // Set the interface type dst to &OpStart.
@ -399,7 +399,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
err := json.Unmarshal(message.Operation.OpArg, &dst) err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -417,7 +417,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
err = func() error { err = func() error {
if arg.ID == 0 { if arg.ID == 0 {
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode) er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return er return er
} }
return nil return nil
@ -425,7 +425,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
if err != nil { if err != nil {
er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode) er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return return
} }
@ -443,7 +443,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
err := toStopProc.natsSubscription.Unsubscribe() err := toStopProc.natsSubscription.Unsubscribe()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message) er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -451,14 +451,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode) er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error())) newReplyMessage(proc, message, []byte(er.Error()))
} else { } else {
er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode) er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error())) newReplyMessage(proc, message, []byte(er.Error()))
@ -511,7 +511,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message) er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -523,7 +523,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, err return nil, err
} }
@ -534,7 +534,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
} }
@ -580,7 +580,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message) er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, er return nil, er
@ -594,7 +594,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, err return nil, err
} }
@ -605,7 +605,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
} }
@ -766,7 +766,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message) er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, er return nil, er
@ -780,7 +780,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, err return nil, err
} }
@ -792,7 +792,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message) er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -844,7 +844,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message) er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, er return nil, er
@ -858,7 +858,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err) er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
return nil, err return nil, err
} }
@ -870,7 +870,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message) er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -917,7 +917,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
out, err := cmd.Output() out, err := cmd.Output()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
select { select {
@ -931,7 +931,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message) er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -988,7 +988,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
out, err := cmd.Output() out, err := cmd.Output()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQnCliCommand: cmd.Output : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQnCliCommand: cmd.Output : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -1003,7 +1003,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: methodREQnCliCommand: method timed out %v", message) er := fmt.Errorf("error: methodREQnCliCommand: method timed out %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -1065,7 +1065,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message) er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
cancel() cancel()
return return
} }
@ -1079,7 +1079,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message) er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return return
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -1087,14 +1087,14 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message) er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return return
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -1111,7 +1111,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message) er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -1165,7 +1165,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err) er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
log.Printf("%v\n", er) log.Printf("%v\n", er)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
} }
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
@ -1191,7 +1191,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
// go routine. // go routine.
// close(t.Lines) // close(t.Lines)
er := fmt.Errorf("info: method timeout reached, canceling: %v", message) er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return return
case out := <-outCh: case out := <-outCh:
@ -1250,7 +1250,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
outReader, err := cmd.StdoutPipe() outReader, err := cmd.StdoutPipe()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
log.Printf("error: %v\n", err) log.Printf("error: %v\n", err)
@ -1258,7 +1258,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.Start failed : %v, message: %v", err, message) er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.Start failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
@ -1281,7 +1281,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("info: methodREQnCliCommandCont: method timeout reached, canceling: %v", message) er := fmt.Errorf("info: methodREQnCliCommandCont: method timeout reached, canceling: %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return return
case out := <-outCh: case out := <-outCh:
// Prepare and queue for sending a new message with the output // Prepare and queue for sending a new message with the output