1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

sending errors from ringbuffer

This commit is contained in:
postmannen 2021-03-12 09:49:27 +01:00
parent ca295d97d8
commit 8355f4154a
2 changed files with 21 additions and 15 deletions

View file

@ -135,7 +135,7 @@ func (p process) spawnWorker(s *server) {
if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er)
}
}()
}
@ -156,7 +156,7 @@ func (p process) spawnWorker(s *server) {
if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er)
}
}()
}
@ -177,7 +177,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
if err != nil {
er := fmt.Errorf("error: createDataPayload: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er)
continue
}
@ -199,7 +199,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
if err != nil {
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er)
continue
}
@ -208,7 +208,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
if err != nil {
er := fmt.Errorf("error: publish failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er)
continue
}
@ -267,7 +267,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
switch {
@ -276,7 +276,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
out := []byte("not allowed from " + message.FromNode)
@ -296,7 +296,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
} else {
log.Printf("info: we don't allow receiving from: %v, %v\n", message.FromNode, p.subject)
@ -311,7 +311,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
// Start the method handler for that specific subject type.
@ -327,12 +327,12 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
default:
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er)
}
}

View file

@ -90,7 +90,9 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
func() {
s, err := r.dumpBucket(samValueBucket)
if err != nil {
log.Printf("error: retreival of values from k/v store failed: %v\n", err)
er := fmt.Errorf("error: fillBuffer: retreival of values from k/v store failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
}
for _, v := range s {
@ -113,7 +115,9 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
// Check if the command or event exists in commandOrEvent.go
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
log.Printf("error: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v\n", coeAvailableValues)
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v", coeAvailableValues)
log.Printf("%v\n", er)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
fmt.Println()
// if it was not a valid value, we jump back up, and
@ -145,7 +149,9 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
js, err := json.Marshal(samV)
if err != nil {
log.Printf("error: gob encoding samValue: %v\n", err)
er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
}
// Store the incomming message in key/value store
@ -153,7 +159,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
if err != nil {
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), err)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
}