1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 14:56:49 +00:00

checking for malformed or non existent method in input

This commit is contained in:
postmannen 2021-03-12 12:08:11 +01:00
parent 78a2b2bc56
commit b836adc110
4 changed files with 49 additions and 14 deletions

View file

@ -77,7 +77,11 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
// Range over all the messages parsed from json, and create a subject for // Range over all the messages parsed from json, and create a subject for
// each message. // each message.
for _, m := range MsgSlice { for _, m := range MsgSlice {
sm := newSAM(m) sm, err := newSAM(m)
if err != nil {
log.Printf("error: jsonFromFileData: %v\n", err)
continue
}
sam = append(sam, sm) sam = append(sam, sm)
} }
@ -87,14 +91,21 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
// newSAM will look up the correct values and value types to // newSAM will look up the correct values and value types to
// be used in a subject for a Message, and return the a combined structure // be used in a subject for a Message, and return the a combined structure
// of type subjectAndMessage. // of type subjectAndMessage.
func newSAM(m Message) subjectAndMessage { func newSAM(m Message) (subjectAndMessage, error) {
// We need to create a tempory method type to look up the kind for the // We need to create a tempory method type to look up the kind for the
// real method for the message. // real method for the message.
var mt Method var mt Method
//fmt.Printf("-- \n getKind contains: %v\n\n", mt.getHandler(m.Method).getKind())
tmpH := mt.getHandler(m.Method)
if tmpH == nil {
return subjectAndMessage{}, fmt.Errorf("error: method value did not exist in map")
}
sub := Subject{ sub := Subject{
ToNode: string(m.ToNode), ToNode: string(m.ToNode),
CommandOrEvent: mt.getHandler(m.Method).getKind(), CommandOrEvent: tmpH.getKind(),
Method: m.Method, Method: m.Method,
} }
@ -103,7 +114,7 @@ func newSAM(m Message) subjectAndMessage {
Message: m, Message: m,
} }
return sm return sm, nil
} }
// readTruncateMessageFile, will read all the messages in the given // readTruncateMessageFile, will read all the messages in the given

View file

@ -2,6 +2,7 @@ package steward
import ( import (
"fmt" "fmt"
"log"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -72,7 +73,6 @@ func (s *server) ProcessesStart() {
if s.configuration.PublisherServiceSayhello != 0 { if s.configuration.PublisherServiceSayhello != 0 {
fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName) fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName)
// TODO: Replace "central" name with variable below.
sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName) sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
@ -91,7 +91,11 @@ func (s *server) ProcessesStart() {
Method: SayHello, Method: SayHello,
} }
sam := newSAM(m) sam, err := newSAM(m)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: ProcessesStart: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{sam} proc.newMessagesCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello)) time.Sleep(time.Second * time.Duration(s.configuration.PublisherServiceSayhello))
} }

View file

@ -54,7 +54,6 @@ type server struct {
newMessagesCh chan []subjectAndMessage newMessagesCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if // errorKernel is doing all the error handling like what to do if
// an error occurs. // an error occurs.
// TODO: Will also send error messages to cental error subscriber.
errorKernel *errorKernel errorKernel *errorKernel
// metric exporter // metric exporter
metrics *metrics metrics *metrics
@ -132,8 +131,6 @@ func (s *server) Start() {
// } // }
// Start up the predefined subscribers. // Start up the predefined subscribers.
// TODO: What to subscribe on should be handled via flags, or config
// files.
s.ProcessesStart() s.ProcessesStart()
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
@ -244,11 +241,16 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
// it was unable to process the message with the reason // it was unable to process the message with the reason
// why ? // why ?
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method) er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er)
continue continue
} }
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent) er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er)
continue continue
} }

View file

@ -306,7 +306,13 @@ func (m methodEchoRequest) handler(proc process, message Message, node string) (
Timeout: 3, Timeout: 3,
Retries: 3, Retries: 3,
} }
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodEchoRequest: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{nSAM}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -363,7 +369,13 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str
Retries: 3, Retries: 3,
} }
fmt.Printf("** %#v\n", newMsg) fmt.Printf("** %#v\n", newMsg)
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{nSAM}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -408,7 +420,13 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod
Retries: 3, Retries: 3,
} }
fmt.Printf("** %#v\n", newMsg) fmt.Printf("** %#v\n", newMsg)
proc.newMessagesCh <- []subjectAndMessage{newSAM(newMsg)}
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequestNOSEQ: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{nSAM}
}() }()