From 48500aa1fb156c0ad9d2b5c2ea751e810e3b0983 Mon Sep 17 00:00:00 2001 From: postmannen <postmannen@gmail.com> Date: Fri, 21 Jan 2022 06:15:26 +0100 Subject: [PATCH] sendInfo also now via error kernel --- errorkernel.go | 90 +++++++++++++++++++++++++++++++++----------------- process.go | 4 +-- requests.go | 4 +-- server.go | 18 +++++----- 4 files changed, 73 insertions(+), 43 deletions(-) diff --git a/errorkernel.go b/errorkernel.go index 9356aac..88221b4 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -64,6 +64,34 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error return fmt.Errorf("info: stopping errorKernel") } + sendErrorOrInfo := func(errEvent errorEvent) { + var er string + // Decide what extra information to add to the error message. + switch { + case errEvent.message.RelayFromNode != "": + er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err) + default: + er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) + } + + sam := subjectAndMessage{ + Subject: newSubject(REQErrorLog, "errorCentral"), + Message: Message{ + Directory: "errorLog", + ToNode: "errorCentral", + FromNode: errEvent.process.node, + FileName: "error.log", + Data: []string{er}, + Method: REQErrorLog, + ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout, + Retries: errEvent.process.configuration.ErrorMessageRetries, + }, + } + + // Put the message on the channel to the ringbuffer. + ringBufferBulkInCh <- []subjectAndMessage{sam} + } + // Check the type of the error to decide what to do. // // We should be able to handle each error individually and @@ -75,40 +103,24 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error // that fails. switch errEvent.errorType { - case errTypeSendToCentralErrorLogger: + case errTypeSendError: // Just log the error by creating a message and send it // to the errorCentral log server. go func() { - var er string - // Decide what extra information to add to the error message. - switch { - case errEvent.message.RelayFromNode != "": - er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err) - default: - er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) - } - - sam := subjectAndMessage{ - Subject: newSubject(REQErrorLog, "errorCentral"), - Message: Message{ - Directory: "errorLog", - ToNode: "errorCentral", - FromNode: errEvent.process.node, - FileName: "error.log", - Data: []string{er}, - Method: REQErrorLog, - ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout, - Retries: errEvent.process.configuration.ErrorMessageRetries, - }, - } - - // Put the message on the channel to the ringbuffer. - ringBufferBulkInCh <- []subjectAndMessage{sam} - + sendErrorOrInfo(errEvent) e.metrics.promErrorMessagesSentTotal.Inc() }() + case errTypeSendInfo: + // Just log the error by creating a message and send it + // to the errorCentral log server. + + go func() { + sendErrorOrInfo(errEvent) + e.metrics.promInfoMessagesSentTotal.Inc() + }() + case errTypeWithAction: // Just print the error, and tell the process to continue. The // process who sent the error should block and wait for receiving @@ -141,11 +153,26 @@ func (e *errorKernel) stop() { e.cancel() } -// sendError will just send an error to the errorCentral. +// errSend will just send an error message to the errorCentral. func (e *errorKernel) errSend(proc process, msg Message, err error) { ev := errorEvent{ err: err, - errorType: errTypeSendToCentralErrorLogger, + errorType: errTypeSendError, + process: proc, + message: msg, + // We don't want to create any actions when just + // sending errors. + // errorActionCh: make(chan errorAction), + } + + e.errorCh <- ev +} + +// infoSend will just send an info message to the errorCentral. +func (e *errorKernel) infoSend(proc process, msg Message, err error) { + ev := errorEvent{ + err: err, + errorType: errTypeSendInfo, process: proc, message: msg, // We don't want to create any actions when just @@ -205,8 +232,9 @@ type errorType int const ( // errSend will just send the content of the error to the // central error logger. - errTypeSendToCentralErrorLogger errorType = iota - errTypeWithAction errorType = iota + errTypeSendError errorType = iota + errTypeSendInfo errorType = iota + errTypeWithAction errorType = iota ) type errorEvent struct { diff --git a/process.go b/process.go index d1b79f7..0e9a70e 100644 --- a/process.go +++ b/process.go @@ -308,7 +308,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // We do not want to send errorLogs for REQErrorLog type since // it will just cause an endless loop. if message.Method != REQErrorLog { - sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er) + p.processes.errorKernel.infoSend(p, message, er) } log.Printf("%v\n", er) @@ -537,7 +537,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, default: er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent) - sendInfoLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) + p.processes.errorKernel.infoSend(p, message, er) } } diff --git a/requests.go b/requests.go index 4bc1217..3aab673 100644 --- a/requests.go +++ b/requests.go @@ -1503,7 +1503,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( // go routine. // close(t.Lines) er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs) - sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + proc.processes.errorKernel.infoSend(proc, message, er) return case out := <-outCh: @@ -1629,7 +1629,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str case <-ctx.Done(): cancel() er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs) - sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + proc.processes.errorKernel.infoSend(proc, message, er) return case out := <-outCh: newReplyMessage(proc, message, out) diff --git a/server.go b/server.go index 5d8822c..839700d 100644 --- a/server.go +++ b/server.go @@ -305,14 +305,16 @@ func (s *server) Stop() { // sendInfoMessage will put the error message directly on the channel that is // read by the nats publishing functions. -func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) { - // NB: Adding log statement here for more visuality during development. - log.Printf("%v\n", theError) - sam := createErrorMsgContent(conf, FromNode, theError) - ringBufferBulkInCh <- []subjectAndMessage{sam} - - metrics.promInfoMessagesSentTotal.Inc() -} +// +// DEPRECATED: +// func sendInfoLogMessage(conf *Configuration, metrics *metrics, ringBufferBulkInCh chan<- []subjectAndMessage, FromNode Node, theError error) { +// // NB: Adding log statement here for more visuality during development. +// log.Printf("%v\n", theError) +// sam := createErrorMsgContent(conf, FromNode, theError) +// ringBufferBulkInCh <- []subjectAndMessage{sam} +// +// metrics.promInfoMessagesSentTotal.Inc() +// } // createErrorMsgContent will prepare a subject and message with the content // of the error