mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 18:58:14 +00:00
added error message timeouts and retries
This commit is contained in:
parent
4dcedfbe14
commit
0f93e9a439
7 changed files with 120 additions and 93 deletions
|
@ -648,6 +648,10 @@ For CliCommand message to a node named "ship1" of type Command and it wants an A
|
||||||
|
|
||||||
`ship1.REQCliCommand.CommandACK`
|
`ship1.REQCliCommand.CommandACK`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
Dont send error messages when max retries reached for error messages.
|
||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
Put in a limit on error messages if the central for some reason have not started errorLog, since you will be spammed with this..
|
Put in a limit on error messages if the central for some reason have not started errorLog, since you will be spammed with this..
|
||||||
|
|
|
@ -109,6 +109,10 @@ type Configuration struct {
|
||||||
NkeySeedFile string
|
NkeySeedFile string
|
||||||
// The host and port to expose the data folder
|
// The host and port to expose the data folder
|
||||||
ExposeDataFolder string
|
ExposeDataFolder string
|
||||||
|
// Timeout for error messages
|
||||||
|
ErrorMessageTimeout int
|
||||||
|
// Retries for error messages.
|
||||||
|
ErrorMessageRetries int
|
||||||
// Make the current node send hello messages to central at given interval in seconds
|
// Make the current node send hello messages to central at given interval in seconds
|
||||||
StartPubREQHello int
|
StartPubREQHello int
|
||||||
// Start the central error logger.
|
// Start the central error logger.
|
||||||
|
@ -164,6 +168,9 @@ func newConfigurationDefaults() Configuration {
|
||||||
RootCAPath: "",
|
RootCAPath: "",
|
||||||
NkeySeedFile: "",
|
NkeySeedFile: "",
|
||||||
ExposeDataFolder: "",
|
ExposeDataFolder: "",
|
||||||
|
ErrorMessageTimeout: 60,
|
||||||
|
ErrorMessageRetries: 10,
|
||||||
|
|
||||||
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}},
|
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}},
|
||||||
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
@ -224,6 +231,8 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.StringVar(&c.RootCAPath, "rootCAPath", fc.RootCAPath, "If TLS, enter the path for where to find the root CA certificate")
|
flag.StringVar(&c.RootCAPath, "rootCAPath", fc.RootCAPath, "If TLS, enter the path for where to find the root CA certificate")
|
||||||
flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", fc.NkeySeedFile, "The full path of the nkeys seed file")
|
flag.StringVar(&c.NkeySeedFile, "nkeySeedFile", fc.NkeySeedFile, "The full path of the nkeys seed file")
|
||||||
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
|
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
|
||||||
|
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out")
|
||||||
|
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it")
|
||||||
|
|
||||||
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
|
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
|
||||||
|
|
||||||
|
|
36
process.go
36
process.go
|
@ -160,7 +160,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
err := p.procFunc(p.ctx)
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
err := p.procFunc(p.ctx)
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -209,11 +209,13 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
const publishTimer time.Duration = 5
|
const publishTimer time.Duration = 5
|
||||||
const subscribeSyncTimer time.Duration = 5
|
const subscribeSyncTimer time.Duration = 5
|
||||||
|
|
||||||
|
// The for loop will run until the message is delivered successfully,
|
||||||
|
// or that retries are reached.
|
||||||
for {
|
for {
|
||||||
dataPayload, err := gobEncodeMessage(message)
|
dataPayload, err := gobEncodeMessage(message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: createDataPayload: %v", err)
|
er := fmt.Errorf("error: createDataPayload: %v", err)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,7 +256,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
// fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message)
|
// fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message)
|
||||||
if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK {
|
if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK {
|
||||||
// Wait up until ACKTimeout specified for a reply,
|
// Wait up until ACKTimeout specified for a reply,
|
||||||
// continue and resend if noo reply received,
|
// continue and resend if no reply received,
|
||||||
// or exit if max retries for the message reached.
|
// or exit if max retries for the message reached.
|
||||||
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -273,13 +275,21 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
case retryAttempts >= message.Retries:
|
case retryAttempts >= message.Retries:
|
||||||
// max retries reached
|
// max retries reached
|
||||||
er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method)
|
er := fmt.Errorf("info: toNode: %v, fromNode: %v, method: %v: max retries reached, check if node is up and running and if it got a subscriber for the given REQ type", message.ToNode, message.FromNode, message.Method)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, p.node, er)
|
|
||||||
|
// We do not want to send errorLogs for REQErrorLog type since
|
||||||
|
// it will just cause an endless loop.
|
||||||
|
if message.Method != REQErrorLog {
|
||||||
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// none of the above matched, so we've not reached max retries yet
|
// none of the above matched, so we've not reached max retries yet
|
||||||
|
log.Printf("max retries for message not reached, retrying sending of message with ID %v\n", message.ID)
|
||||||
p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -314,7 +324,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
err := gobDec.Decode(&message)
|
err := gobDec.Decode(&message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: gob decoding failed: %v", err)
|
er := fmt.Errorf("error: gob decoding failed: %v", err)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
|
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
|
||||||
|
@ -324,7 +334,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
if !ok {
|
if !ok {
|
||||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
out := []byte("not allowed from " + message.FromNode)
|
out := []byte("not allowed from " + message.FromNode)
|
||||||
|
@ -343,11 +353,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a confirmation message back to the publisher
|
// Send a confirmation message back to the publisher
|
||||||
|
@ -358,7 +368,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
if !ok {
|
if !ok {
|
||||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we are allowed to receive from that host
|
// Check if we are allowed to receive from that host
|
||||||
|
@ -378,16 +388,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ func (s *server) readSocket() {
|
||||||
conn, err := s.StewardSocket.Accept()
|
conn, err := s.StewardSocket.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(conn net.Conn) {
|
go func(conn net.Conn) {
|
||||||
|
@ -32,7 +32,7 @@ func (s *server) readSocket() {
|
||||||
_, err = conn.Read(b)
|
_, err = conn.Read(b)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ func (s *server) readSocket() {
|
||||||
sams, err := s.convertBytesToSAMs(readBytes)
|
sams, err := s.convertBytesToSAMs(readBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
|
||||||
conn, err := ln.Accept()
|
conn, err := ln.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
|
||||||
_, err = conn.Read(b)
|
_, err = conn.Read(b)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ func (s *server) readTCPListener(toRingbufferCh chan []subjectAndMessage) {
|
||||||
sam, err := s.convertBytesToSAMs(readBytes)
|
sam, err := s.convertBytesToSAMs(readBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
sendErrorLogMessage(s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, toRingbufferCh, Node(s.nodeName), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
|
||||||
// the slice since it is not valid.
|
// the slice since it is not valid.
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("error: no toNode or toNodes where specified in the message got'n, dropping message: %v", v)
|
er := fmt.Errorf("error: no toNode or toNodes where specified in the message got'n, dropping message: %v", v)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,20 +49,21 @@ type ringBuffer struct {
|
||||||
// way as all messages are handled.
|
// way as all messages are handled.
|
||||||
newMessagesCh chan []subjectAndMessage
|
newMessagesCh chan []subjectAndMessage
|
||||||
metrics *metrics
|
metrics *metrics
|
||||||
|
configuration *Configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
// newringBuffer returns a push/pop storage for values.
|
// newringBuffer returns a push/pop storage for values.
|
||||||
func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
func newringBuffer(metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||||
// Check if socket folder exists, if not create it
|
// Check if socket folder exists, if not create it
|
||||||
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
if _, err := os.Stat(configuration.DatabaseFolder); os.IsNotExist(err) {
|
||||||
err := os.MkdirAll(c.DatabaseFolder, 0700)
|
err := os.MkdirAll(configuration.DatabaseFolder, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: failed to create database directory %v: %v\n", c.DatabaseFolder, err)
|
log.Printf("error: failed to create database directory %v: %v\n", configuration.DatabaseFolder, err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DatabaseFilepath := filepath.Join(c.DatabaseFolder, dbFileName)
|
DatabaseFilepath := filepath.Join(configuration.DatabaseFolder, dbFileName)
|
||||||
|
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
|
@ -79,6 +80,7 @@ func newringBuffer(metrics *metrics, c Configuration, size int, dbFileName strin
|
||||||
nodeName: nodeName,
|
nodeName: nodeName,
|
||||||
newMessagesCh: newMessagesCh,
|
newMessagesCh: newMessagesCh,
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
|
configuration: configuration,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +146,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
// Check if the command or event exists in commandOrEvent.go
|
// Check if the command or event exists in commandOrEvent.go
|
||||||
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
||||||
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
|
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
|
||||||
sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
sendErrorLogMessage(r.configuration, r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
||||||
|
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
// if it was not a valid value, we jump back up, and
|
// if it was not a valid value, we jump back up, and
|
||||||
|
@ -177,14 +179,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
js, err := json.Marshal(samV)
|
js, err := json.Marshal(samV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
|
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
|
||||||
sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
sendErrorLogMessage(r.configuration, r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the incomming message in key/value store
|
// Store the incomming message in key/value store
|
||||||
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
||||||
sendErrorLogMessage(r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
sendErrorLogMessage(r.configuration, r.metrics, r.newMessagesCh, Node(r.nodeName), er)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
14
server.go
14
server.go
|
@ -273,10 +273,10 @@ func (s *server) Stop() {
|
||||||
|
|
||||||
// sendErrorMessage will put the error message directly on the channel that is
|
// sendErrorMessage will put the error message directly on the channel that is
|
||||||
// read by the nats publishing functions.
|
// read by the nats publishing functions.
|
||||||
func sendErrorLogMessage(metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
|
func sendErrorLogMessage(conf *Configuration, metrics *metrics, newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
|
||||||
// NB: Adding log statement here for more visuality during development.
|
// NB: Adding log statement here for more visuality during development.
|
||||||
log.Printf("%v\n", theError)
|
log.Printf("%v\n", theError)
|
||||||
sam := createErrorMsgContent(FromNode, theError)
|
sam := createErrorMsgContent(conf, FromNode, theError)
|
||||||
newMessagesCh <- []subjectAndMessage{sam}
|
newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
metrics.promErrorMessagesSentTotal.Inc()
|
metrics.promErrorMessagesSentTotal.Inc()
|
||||||
|
@ -284,7 +284,7 @@ func sendErrorLogMessage(metrics *metrics, newMessagesCh chan<- []subjectAndMess
|
||||||
|
|
||||||
// createErrorMsgContent will prepare a subject and message with the content
|
// createErrorMsgContent will prepare a subject and message with the content
|
||||||
// of the error
|
// of the error
|
||||||
func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
|
func createErrorMsgContent(conf *Configuration, FromNode Node, theError error) subjectAndMessage {
|
||||||
// Add time stamp
|
// Add time stamp
|
||||||
er := fmt.Sprintf("%v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), theError.Error())
|
er := fmt.Sprintf("%v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), theError.Error())
|
||||||
|
|
||||||
|
@ -297,6 +297,8 @@ func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
|
||||||
FileName: "error.log",
|
FileName: "error.log",
|
||||||
Data: []string{er},
|
Data: []string{er},
|
||||||
Method: REQErrorLog,
|
Method: REQErrorLog,
|
||||||
|
ACKTimeout: conf.ErrorMessageTimeout,
|
||||||
|
Retries: conf.ErrorMessageRetries,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +325,7 @@ type samDBValueAndDelivered struct {
|
||||||
func (s *server) routeMessagesToProcess(dbFileName string) {
|
func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// Prepare and start a new ring buffer
|
// Prepare and start a new ring buffer
|
||||||
const bufferSize int = 1000
|
const bufferSize int = 1000
|
||||||
rb := newringBuffer(s.metrics, *s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.newMessagesCh)
|
rb := newringBuffer(s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.newMessagesCh)
|
||||||
|
|
||||||
ringBufferInCh := make(chan subjectAndMessage)
|
ringBufferInCh := make(chan subjectAndMessage)
|
||||||
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
||||||
|
@ -359,12 +361,12 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// Check if the format of the message is correct.
|
// Check if the format of the message is correct.
|
||||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
|
||||||
sendErrorLogMessage(s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
sendErrorLogMessage(s.configuration, s.metrics, s.newMessagesCh, Node(s.nodeName), er)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,7 +285,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// In theory the system should drop the message before it reaches here.
|
// In theory the system should drop the message before it reaches here.
|
||||||
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
|
er := fmt.Errorf("error: newReplyMessage : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
@ -386,7 +386,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,14 +397,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
|
|
||||||
if len(arg.AllowedNodes) == 0 {
|
if len(arg.AllowedNodes) == 0 {
|
||||||
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
|
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if arg.Method == "" {
|
if arg.Method == "" {
|
||||||
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
|
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -415,7 +415,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||||
|
|
||||||
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
|
||||||
case "stopProc":
|
case "stopProc":
|
||||||
// Set the interface type dst to &OpStart.
|
// Set the interface type dst to &OpStart.
|
||||||
|
@ -424,7 +424,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,7 +442,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
err = func() error {
|
err = func() error {
|
||||||
if arg.ID == 0 {
|
if arg.ID == 0 {
|
||||||
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
|
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -450,7 +450,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode)
|
er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -468,7 +468,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
err := toStopProc.natsSubscription.Unsubscribe()
|
err := toStopProc.natsSubscription.Unsubscribe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -476,14 +476,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
||||||
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||||||
|
|
||||||
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
|
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
newReplyMessage(proc, message, []byte(er.Error()))
|
newReplyMessage(proc, message, []byte(er.Error()))
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode)
|
er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
newReplyMessage(proc, message, []byte(er.Error()))
|
newReplyMessage(proc, message, []byte(er.Error()))
|
||||||
|
@ -522,7 +522,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
||||||
err := os.MkdirAll(folderTree, 0700)
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message)
|
er := fmt.Errorf("error: methodREQToFileAppend failed to create toFileAppend directory tree:%v, %v, message: %v", folderTree, err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -534,7 +534,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
||||||
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -545,7 +545,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
||||||
f.Sync()
|
f.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -577,7 +577,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
||||||
err := os.MkdirAll(folderTree, 0700)
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
return nil, er
|
return nil, er
|
||||||
|
@ -591,7 +591,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
||||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -602,7 +602,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
||||||
f.Sync()
|
f.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message)
|
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v, %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -735,7 +735,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
||||||
err := os.MkdirAll(folderTree, 0700)
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
return nil, er
|
return nil, er
|
||||||
|
@ -749,7 +749,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
||||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -761,7 +761,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
||||||
f.Sync()
|
f.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message)
|
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: %v, %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -799,7 +799,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
||||||
err := os.MkdirAll(folderTree, 0700)
|
err := os.MkdirAll(folderTree, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v, %v", folderTree, err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
return nil, er
|
return nil, er
|
||||||
|
@ -813,7 +813,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
||||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: %v", err)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -825,7 +825,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
||||||
f.Sync()
|
f.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message)
|
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: %v, %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,7 +872,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
||||||
out, err := cmd.Output()
|
out, err := cmd.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
|
@ -886,7 +886,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message)
|
er := fmt.Errorf("error: methodREQCliCommand: method timed out %v", message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -943,7 +943,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
|
||||||
out, err := cmd.Output()
|
out, err := cmd.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQnCliCommand: cmd.Output : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQnCliCommand: cmd.Output : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,7 +958,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("error: methodREQnCliCommand: method timed out %v", message)
|
er := fmt.Errorf("error: methodREQnCliCommand: method timed out %v", message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -1020,7 +1020,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message)
|
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1034,7 +1034,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
|
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
@ -1042,14 +1042,14 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
||||||
if resp.StatusCode != 200 {
|
if resp.StatusCode != 200 {
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
|
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1066,7 +1066,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message)
|
er := fmt.Errorf("error: methodREQHttpGet: method timed out %v", message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -1120,7 +1120,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.processes.wg.Add(1)
|
proc.processes.wg.Add(1)
|
||||||
|
@ -1146,7 +1146,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
||||||
// go routine.
|
// go routine.
|
||||||
// close(t.Lines)
|
// close(t.Lines)
|
||||||
er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
|
er := fmt.Errorf("info: method timeout reached, canceling: %v", message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
|
|
||||||
return
|
return
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
|
@ -1205,7 +1205,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
|
||||||
outReader, err := cmd.StdoutPipe()
|
outReader, err := cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.StdoutPipe failed : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
log.Printf("error: %v\n", err)
|
log.Printf("error: %v\n", err)
|
||||||
|
@ -1213,7 +1213,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.Start failed : %v, message: %v", err, message)
|
er := fmt.Errorf("error: methodREQnCliCommandCont: cmd.Start failed : %v, message: %v", err, message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1236,7 +1236,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
er := fmt.Errorf("info: methodREQnCliCommandCont: method timeout reached, canceling: %v", message)
|
er := fmt.Errorf("info: methodREQnCliCommandCont: method timeout reached, canceling: %v", message)
|
||||||
sendErrorLogMessage(proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||||
return
|
return
|
||||||
case out := <-outCh:
|
case out := <-outCh:
|
||||||
// Prepare and queue for sending a new message with the output
|
// Prepare and queue for sending a new message with the output
|
||||||
|
|
Loading…
Add table
Reference in a new issue