mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
removed message from error messages
This commit is contained in:
parent
e33fba19e1
commit
de7a6c0dda
4 changed files with 41 additions and 84 deletions
61
process.go
61
process.go
|
@ -152,7 +152,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
|||
go func() {
|
||||
err := p.procFunc(p.ctx)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
|
||||
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||
}
|
||||
}()
|
||||
|
@ -172,7 +172,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
|||
go func() {
|
||||
err := p.procFunc(p.ctx)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
|
||||
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||
}
|
||||
}()
|
||||
|
@ -207,7 +207,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
for {
|
||||
dataPayload, err := gobEncodeMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: createDataPayload: %v", err)
|
||||
er := fmt.Errorf("error: messageDeliverNats: createDataPayload failed: %v", err)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||
continue
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
// Create a subscriber for the reply message.
|
||||
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
|
||||
time.Sleep(time.Second * subscribeSyncTimer)
|
||||
|
@ -238,7 +238,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
// Publish message
|
||||
err = natsConn.PublishMsg(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: publish failed: %v", err)
|
||||
er := fmt.Errorf("error: nats publish failed: %v", err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||
log.Printf("%v, waiting %ds before retrying\n", er, publishTimer)
|
||||
time.Sleep(time.Second * publishTimer)
|
||||
|
@ -254,7 +254,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
// or exit if max retries for the message reached.
|
||||
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subReply.NextMsg failed for node=%v, subject=%v: %v", p.node, p.subject.name(), err)
|
||||
er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err)
|
||||
// sendErrorLogMessage(p.toRingbufferCh, p.node, er)
|
||||
log.Printf(" ** %v\n", er)
|
||||
|
||||
|
@ -268,7 +268,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
|||
// continue
|
||||
case retryAttempts >= message.Retries:
|
||||
// max retries reached
|
||||
er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method)
|
||||
er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject)
|
||||
|
||||
// We do not want to send errorLogs for REQErrorLog type since
|
||||
// it will just cause an endless loop.
|
||||
|
@ -331,38 +331,18 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
|
||||
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.CommandOrEvent)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
}
|
||||
|
||||
var out []byte
|
||||
|
||||
// NB:
|
||||
// The commented out code below is for the allowed recievers functionality
|
||||
// for the REQ type specified at startup. Keeping the code for now, but it
|
||||
// is probably redundant since the same information can be specified in
|
||||
// the broker config
|
||||
//
|
||||
// // out := []byte("not allowed from " + message.FromNode)
|
||||
// // Check if we are allowed to receive from that host
|
||||
// _, arOK1 := p.allowedReceivers[message.FromNode]
|
||||
// _, arOK2 := p.allowedReceivers["*"]
|
||||
//
|
||||
// if arOK1 || arOK2 {
|
||||
// // Start the method handler for that specific subject type.
|
||||
// // The handler started here is what actually doing the action
|
||||
// // that executed a CLI command, or writes to a log file on
|
||||
// // the node who received the message.
|
||||
out, err = mh.handler(p, message, thisNode)
|
||||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
}
|
||||
// } else {
|
||||
// er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||
// sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
// }
|
||||
|
||||
// Send a confirmation message back to the publisher
|
||||
natsConn.Publish(msg.Reply, out)
|
||||
|
@ -375,38 +355,15 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
}
|
||||
|
||||
// NB:
|
||||
// The commented out code below is for the allowed recievers functionality
|
||||
// for the REQ type specified at startup. Keeping the code for now, but it
|
||||
// is probably redundant since the same information can be specified in
|
||||
// the broker config
|
||||
//
|
||||
// // Check if we are allowed to receive from that host
|
||||
// _, arOK1 := p.allowedReceivers[message.FromNode]
|
||||
// _, arOK2 := p.allowedReceivers["*"]
|
||||
//
|
||||
// if arOK1 || arOK2 {
|
||||
//
|
||||
// // Start the method handler for that specific subject type.
|
||||
// // The handler started here is what actually doing the action
|
||||
// // that executed a CLI command, or writes to a log file on
|
||||
// // the node who received the message.
|
||||
// //
|
||||
// // since we don't send a reply for a NACK message, we don't care about the
|
||||
// // out return when calling mf.handler
|
||||
_, err := mf.handler(p, message, thisNode)
|
||||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
}
|
||||
// } else {
|
||||
// er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||
// sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
// }
|
||||
|
||||
default:
|
||||
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
|
||||
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ func (s *server) readSocket() {
|
|||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json: %v", err)
|
||||
er := fmt.Errorf("error: malformed json received on socket: %v", err)
|
||||
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||
return
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func (s *server) readTCPListener() {
|
|||
// unmarshal the JSON into a struct
|
||||
sam, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json: %v", err)
|
||||
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
|
||||
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||
return
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
|||
// unmarshal the JSON into a struct
|
||||
sam, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json: %v", err)
|
||||
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
|
||||
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||
return
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
|
|||
// No toNode or toNodes specified. Drop the message by not appending it to
|
||||
// the slice since it is not valid.
|
||||
default:
|
||||
er := fmt.Errorf("error: no toNode or toNodes where specified in the message got'n, dropping message: %v", v)
|
||||
er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v)
|
||||
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||
continue
|
||||
}
|
||||
|
|
54
requests.go
54
requests.go
|
@ -293,7 +293,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
|||
sam, err := newSubjectAndMessage(newMsg)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
|
||||
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, OpArg: %v", err, message.Operation.OpArg)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
// Stop subscribing for messages on the process's subject.
|
||||
err := toStopProc.natsSubscription.Unsubscribe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -733,7 +733,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message)
|
||||
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -745,7 +745,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
|
@ -756,7 +756,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
_, err := f.Write([]byte(d))
|
||||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -788,7 +788,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -802,7 +802,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
|
@ -813,7 +813,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
_, err := f.Write([]byte(d))
|
||||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message)
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -946,7 +946,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -960,7 +960,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
|
@ -972,7 +972,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
_, err = f.Write([]byte(d))
|
||||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -1010,7 +1010,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -1024,7 +1024,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
file := filepath.Join(folderTree, fileName)
|
||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
|
@ -1036,7 +1036,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
_, err = f.Write([]byte(d))
|
||||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -1114,7 +1114,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
log.Printf("error: failed cmd.Run: %v\n", err)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String())
|
||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -1128,7 +1128,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message)
|
||||
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
@ -1198,7 +1198,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
cancel()
|
||||
return
|
||||
|
@ -1212,7 +1212,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
|
@ -1227,7 +1227,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
@ -1244,7 +1244,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message)
|
||||
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
@ -1324,7 +1324,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
// Close the lines channel so we exit the reading lines
|
||||
// go routine.
|
||||
// close(t.Lines)
|
||||
er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
|
||||
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
return
|
||||
|
@ -1383,7 +1383,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
// able to read the out put of the command.
|
||||
outReader, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -1392,7 +1392,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
|
||||
ErrorReader, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -1400,7 +1400,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v", err, message)
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
|
@ -1417,7 +1417,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
for scanner.Scan() {
|
||||
select {
|
||||
case errCh <- scanner.Text():
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, message: %v, error_output: %v", err, message, <-errCh)
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, <-errCh)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
case <-ctx.Done():
|
||||
|
@ -1443,7 +1443,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: %v", message)
|
||||
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
case out := <-outCh:
|
||||
|
|
|
@ -305,7 +305,7 @@ func sendErrorLogMessage(conf *Configuration, metrics *metrics, newMessagesCh ch
|
|||
// of the error
|
||||
func createErrorMsgContent(conf *Configuration, FromNode Node, theError error) subjectAndMessage {
|
||||
// Add time stamp
|
||||
er := fmt.Sprintf("%v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), theError.Error())
|
||||
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), FromNode, theError.Error())
|
||||
|
||||
sam := subjectAndMessage{
|
||||
Subject: newSubject(REQErrorLog, "errorCentral"),
|
||||
|
|
Loading…
Reference in a new issue