1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

sendInfo also now via error kernel

This commit is contained in:
postmannen 2022-01-21 06:15:26 +01:00
parent ccd57dad10
commit 48500aa1fb
4 changed files with 73 additions and 43 deletions

View file

@ -64,6 +64,34 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
return fmt.Errorf("info: stopping errorKernel")
}
sendErrorOrInfo := func(errEvent errorEvent) {
var er string
// Decide what extra information to add to the error message.
switch {
case errEvent.message.RelayFromNode != "":
er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err)
default:
er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
}
sam := subjectAndMessage{
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: errEvent.process.node,
FileName: "error.log",
Data: []string{er},
Method: REQErrorLog,
ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout,
Retries: errEvent.process.configuration.ErrorMessageRetries,
},
}
// Put the message on the channel to the ringbuffer.
ringBufferBulkInCh <- []subjectAndMessage{sam}
}
// Check the type of the error to decide what to do.
//
// We should be able to handle each error individually and
@ -75,40 +103,24 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
// that fails.
switch errEvent.errorType {
case errTypeSendToCentralErrorLogger:
case errTypeSendError:
// Just log the error by creating a message and send it
// to the errorCentral log server.
go func() {
var er string
// Decide what extra information to add to the error message.
switch {
case errEvent.message.RelayFromNode != "":
er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err)
default:
er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
}
sam := subjectAndMessage{
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: errEvent.process.node,
FileName: "error.log",
Data: []string{er},
Method: REQErrorLog,
ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout,
Retries: errEvent.process.configuration.ErrorMessageRetries,
},
}
// Put the message on the channel to the ringbuffer.
ringBufferBulkInCh <- []subjectAndMessage{sam}
sendErrorOrInfo(errEvent)
e.metrics.promErrorMessagesSentTotal.Inc()
}()
case errTypeSendInfo:
// Just log the error by creating a message and send it
// to the errorCentral log server.
go func() {
sendErrorOrInfo(errEvent)
e.metrics.promInfoMessagesSentTotal.Inc()
}()
case errTypeWithAction:
// Just print the error, and tell the process to continue. The
// process who sent the error should block and wait for receiving
@ -141,11 +153,26 @@ func (e *errorKernel) stop() {
e.cancel()
}
// sendError will just send an error to the errorCentral.
// errSend will just send an error message to the errorCentral.
func (e *errorKernel) errSend(proc process, msg Message, err error) {
ev := errorEvent{
err: err,
errorType: errTypeSendToCentralErrorLogger,
errorType: errTypeSendError,
process: proc,
message: msg,
// We don't want to create any actions when just
// sending errors.
// errorActionCh: make(chan errorAction),
}
e.errorCh <- ev
}
// infoSend will just send an info message to the errorCentral.
func (e *errorKernel) infoSend(proc process, msg Message, err error) {
ev := errorEvent{
err: err,
errorType: errTypeSendInfo,
process: proc,
message: msg,
// We don't want to create any actions when just
@ -205,8 +232,9 @@ type errorType int
const (
// errSend will just send the content of the error to the
// central error logger.
errTypeSendToCentralErrorLogger errorType = iota
errTypeWithAction errorType = iota
errTypeSendError errorType = iota
errTypeSendInfo errorType = iota
errTypeWithAction errorType = iota
)
type errorEvent struct {

View file

@ -308,7 +308,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// We do not want to send errorLogs for REQErrorLog type since
// it will just cause an endless loop.
if message.Method != REQErrorLog {
sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
p.processes.errorKernel.infoSend(p, message, er)
}
log.Printf("%v\n", er)
@ -537,7 +537,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
default:
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent)
sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
p.processes.errorKernel.infoSend(p, message, er)
}
}

View file

@ -1503,7 +1503,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
// go routine.
// close(t.Lines)
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
proc.processes.errorKernel.infoSend(proc, message, er)
return
case out := <-outCh:
@ -1629,7 +1629,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
case <-ctx.Done():
cancel()
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
proc.processes.errorKernel.infoSend(proc, message, er)
return
case out := <-outCh:
newReplyMessage(proc, message, out)

View file

@ -305,14 +305,16 @@ func (s *server) Stop() {
// sendInfoMessage will put the error message directly on the channel that is
// read by the nats publishing functions.
func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// NB: Adding log statement here for more visuality during development.
log.Printf("%v\n", theError)
sam := createErrorMsgContent(conf, FromNode, theError)
ringBufferBulkInCh <- []subjectAndMessage{sam}
metrics.promInfoMessagesSentTotal.Inc()
}
//
// DEPRECATED:
// func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) {
// // NB: Adding log statement here for more visuality during development.
// log.Printf("%v\n", theError)
// sam := createErrorMsgContent(conf, FromNode, theError)
// ringBufferBulkInCh <- []subjectAndMessage{sam}
//
// metrics.promInfoMessagesSentTotal.Inc()
// }
// createErrorMsgContent will prepare a subject and message with the content
// of the error