1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

Added error handling for correctness of message fields

This commit is contained in:
postmannen 2021-02-10 14:29:17 +01:00
parent dce6152496
commit 4071650f74
4 changed files with 167 additions and 68 deletions

View file

@ -1,7 +1,7 @@
[
{
"toNode": "central",
"data": ["some message sent from a ship\n"],
"data": ["some message sent from a ship for testing\n"],
"commandOrEvent":"event",
"method":"textLogging"
}

123
message.go Normal file
View file

@ -0,0 +1,123 @@
// NB:
// When adding new constants for the Method or CommandOrEvent
// types, make sure to also add them to the map
// <Method/CommandOrEvent>Available since the this will be used
// to check if the message values are valid later on.
package steward
import "fmt"
// CommandOrEvent describes on the message level if this is
// an event or command kind of message in the Subject name.
// This field is mainly used to be able to spawn up different
// worker processes based on the Subject name so we can have
// one process for handling event kind, and another for
// handling command kind of messages.
// This type is used in both building the subject name, and
// also inside the Message type to describe if it is a Command
// or Event.
type CommandOrEvent string
func (c CommandOrEvent) GetCommandOrEventAvailable() CommandOrEventAvailable {
ma := CommandOrEventAvailable{
topics: map[CommandOrEvent]struct{}{
Command: struct{}{},
Event: struct{}{},
},
}
return ma
}
const (
// Command, command that will just wait for an
// ack, and nothing of the output of the command are
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
Command CommandOrEvent = "command"
// Event, wait for and return the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation if it
// was successful or not.
Event CommandOrEvent = "event"
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
)
type CommandOrEventAvailable struct {
topics map[CommandOrEvent]struct{}
}
func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent) bool {
_, ok := co.topics[c]
if ok {
fmt.Printf("******THE TOPIC EXISTS: %v******\n", c)
return true
} else {
fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", c)
return false
}
}
// ------------------------------------------------------------
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
func (m Method) GetMethodsAvailable() MethodsAvailable {
// var ma MethodsAvailable
// ma.topics = make(map[Method]struct{})
//
// ma.topics[ShellCommand] = struct{}{}
// ma.topics[TextLogging] = struct{}{}
ma := MethodsAvailable{
topics: map[Method]struct{}{
ShellCommand: struct{}{},
TextLogging: struct{}{},
},
}
return ma
}
const (
// Shell command to be executed via f.ex. bash
ShellCommand Method = "shellCommand"
// Send text logging to some host
TextLogging Method = "textLogging"
)
type MethodsAvailable struct {
topics map[Method]struct{}
}
func (ma MethodsAvailable) CheckIfExists(m Method) bool {
_, ok := ma.topics[m]
if ok {
fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
return true
} else {
fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m)
return false
}
}
// ------------------------------------------------------------
type Message struct {
ToNode node `json:"toNode" yaml:"toNode"`
// The Unique ID of the message
ID int `json:"id" yaml:"id"`
// The actual data in the message
// TODO: Change this to a slice instead...or maybe use an
// interface type here to handle several data types ?
Data []string `json:"data" yaml:"data"`
// The type of the message being sent
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. shellCommand, syslog, etc.
Method Method `json:"method" yaml:"method"`
FromNode node
}

View file

@ -14,52 +14,6 @@ import (
"github.com/nats-io/nats.go"
)
// CommandOrEvent describes on the message level if this is
// an event or command kind of message in the Subject name.
// This field is mainly used to be able to spawn up different
// worker processes based on the Subject name so we can have
// one process for handling event kind, and another for
// handling command kind of messages.
// This type is used in both building the subject name, and
// also inside the Message type to describe if it is a Command
// or Event.
type CommandOrEvent string
// TODO: Figure it makes sense to have these types at all.
// It might make more sense to implement these as two
// individual subjects.
const (
// Command, command that will just wait for an
// ack, and nothing of the output of the command are
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
Command CommandOrEvent = "command"
// Event, wait for and return the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation if it
// was successful or not.
Event CommandOrEvent = "event"
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
)
type Message struct {
ToNode node `json:"toNode" yaml:"toNode"`
// The Unique ID of the message
ID int `json:"id" yaml:"id"`
// The actual data in the message
// TODO: Change this to a slice instead...or maybe use an
// interface type here to handle several data types ?
Data []string `json:"data" yaml:"data"`
// The type of the message being sent
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. shellCommand, syslog, etc.
Method string `json:"method" yaml:"method"`
FromNode node
}
// server is the structure that will hold the state about spawned
// processes on a local instance.
type server struct {
@ -81,6 +35,10 @@ type server struct {
errorKernel *errorKernel
// TODO: replace this with some structure to hold the logCh value
logCh chan []byte
// used to check if the methods specified in message is valid
methodsAvailable MethodsAvailable
// used to check if the commandOrEvent specified in message is valid
commandOrEventAvailable CommandOrEventAvailable
}
// newServer will prepare and return a server type
@ -90,13 +48,18 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
log.Printf("error: nats.Connect failed: %v\n", err)
}
var m Method
var co CommandOrEvent
s := &server{
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []subjectAndMessage),
errorCh: make(chan errProcess, 2),
logCh: make(chan []byte),
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []subjectAndMessage),
errorCh: make(chan errProcess, 2),
logCh: make(chan []byte),
methodsAvailable: m.GetMethodsAvailable(),
commandOrEventAvailable: co.GetCommandOrEventAvailable(),
}
// Start the error kernel that will do all the error handling
@ -162,33 +125,39 @@ func (s *server) handleMessagesToPublish() {
// Process the messages that have been received on the incomming
// message pipe. Check and send if there are a specific subject
// for it, and no subject exist throw an error.
//
// TODO: Later on the only thing that should be checked here is
// that there is a node for the specific message, and the super-
// visor should create the process with the wanted subject on both
// the publishing and the receiving node. If there is no such node
// an error should be generated and processed by the error-kernel.
go func() {
for v := range s.newMessagesCh {
for i, vv := range v {
for samSlice := range s.newMessagesCh {
for i, sam := range samSlice {
// Check if the format of the message is correct.
// TODO: Send a message to the error kernel here that
// it was unable to process the message with the reason
// why ?
if !s.methodsAvailable.CheckIfExists(sam.Message.Method) {
continue
}
if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) {
continue
}
// Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject
// is not present.
redo:
m := vv.Message
subjName := vv.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, vv.Subject)
m := sam.Message
subjName := sam.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName]
if ok {
log.Printf("info: found the specific subject: %v\n", subjName)
// Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m
} else {
// If a publisher do not exist for the given subject, create it.
// If a publisher do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(v[i].Subject.Node, v[i].Subject.CommandOrEvent, v[i].Subject.Method)
sub := newSubject(samSlice[i].Subject.Node, samSlice[i].Subject.CommandOrEvent, samSlice[i].Subject.Method)
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
@ -212,7 +181,7 @@ type Subject struct {
// messageType, command/event
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. shellCommand, syslog, etc.
Method string `json:"method" yaml:"method"`
Method Method `json:"method" yaml:"method"`
// messageCh is the channel for receiving new content to be sent
messageCh chan Message
}
@ -220,7 +189,7 @@ type Subject struct {
// newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject.
func newSubject(node string, commandOrEvent CommandOrEvent, method string) Subject {
func newSubject(node string, commandOrEvent CommandOrEvent, method Method) Subject {
return Subject{
Node: node,
CommandOrEvent: commandOrEvent,

View file

@ -7,3 +7,10 @@ some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing