1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed the need for specifying the kind of event/command within a message. This is now picked from the generated subject

This commit is contained in:
postmannen 2021-03-01 17:08:40 +01:00
parent 52a92af20e
commit 35e0c12109
14 changed files with 130 additions and 57 deletions

View file

@ -2,7 +2,6 @@
{ {
"toNode": "central", "toNode": "central",
"data": [""], "data": [""],
"commandOrEvent":"EventNACK",
"method":"SayHello" "method":"SayHello"
} }
] ]

View file

@ -3,7 +3,6 @@
"toNode": "ship1", "toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"], "data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"CLICommand", "method":"CLICommand",
"timeout":3, "timeout":3,
"retries":3 "retries":3

View file

@ -3,7 +3,6 @@
"toNode": "ship1", "toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"], "data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"CLICommand", "method":"CLICommand",
"timeout":3, "timeout":3,
"retries":3 "retries":3

View file

@ -2,7 +2,6 @@
{ {
"toNode": "*", "toNode": "*",
"data": ["bash","-c","tree ../"], "data": ["bash","-c","tree ../"],
"commandOrEvent":"CommandACK",
"method":"CLICommand" "method":"CLICommand"
} }

View file

@ -2,7 +2,6 @@
{ {
"toNode": "ship2", "toNode": "ship2",
"data": ["bash","-c","tree ../"], "data": ["bash","-c","tree ../"],
"commandOrEvent":"CommandACK",
"method":"CLICommand" "method":"CLICommand"
} }

View file

@ -3,7 +3,6 @@
"toNode": "ship2", "toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"], "data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"CLICommand", "method":"CLICommand",
"timeout":3, "timeout":3,
"retries":3 "retries":3

View file

@ -67,13 +67,16 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
} }
sam := []subjectAndMessage{} sam := []subjectAndMessage{}
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
// Range over all the messages parsed from json, and create a subject for // Range over all the messages parsed from json, and create a subject for
// each message. // each message.
for _, m := range MsgSlice { for _, m := range MsgSlice {
s := Subject{ s := Subject{
ToNode: string(m.ToNode), ToNode: string(m.ToNode),
CommandOrEvent: m.CommandOrEvent, CommandOrEvent: mt.getHandler(m.Method).getKind(),
Method: m.Method, Method: m.Method,
} }

Binary file not shown.

View file

@ -16,8 +16,6 @@ type Message struct {
// TODO: Change this to a slice instead...or maybe use an // TODO: Change this to a slice instead...or maybe use an
// interface type here to handle several data types ? // interface type here to handle several data types ?
Data []string `json:"data" yaml:"data"` Data []string `json:"data" yaml:"data"`
// The type of the message being sent
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. CLI, syslog, etc. // method, what is this message doing, etc. CLI, syslog, etc.
Method Method `json:"method" yaml:"method"` Method Method `json:"method" yaml:"method"`
FromNode node FromNode node

View file

@ -47,11 +47,10 @@ func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage {
Method: ErrorLog, Method: ErrorLog,
}, },
Message: Message{ Message: Message{
ToNode: "errorCentral", ToNode: "errorCentral",
FromNode: FromNode, FromNode: FromNode,
Data: []string{m}, Data: []string{m},
CommandOrEvent: EventNACK, Method: SayHello,
Method: SayHello,
}, },
} }

View file

@ -49,8 +49,8 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
log.Printf("error: the method do not exist: %v\n", sam.Message.Method) log.Printf("error: the method do not exist: %v\n", sam.Message.Method)
continue continue
} }
if !s.commandOrEventAvailable.CheckIfExists(sam.Message.CommandOrEvent) { if !s.commandOrEventAvailable.CheckIfExists(sam.Subject.CommandOrEvent) {
log.Printf("error: the command or evnt do not exist: %v\n", sam.Message.CommandOrEvent) log.Printf("error: the command or evnt do not exist: %v\n", sam.Subject.CommandOrEvent)
continue continue
} }

View file

@ -110,7 +110,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
for v := range inCh { for v := range inCh {
// Check if the command or event exists in commandOrEvent.go // Check if the command or event exists in commandOrEvent.go
if !coeAvailable.CheckIfExists(v.Message.CommandOrEvent) { if !coeAvailable.CheckIfExists(v.CommandOrEvent) {
log.Printf("error: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v\n", coeAvailableValues) log.Printf("error: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v\n", coeAvailableValues)
fmt.Println() fmt.Println()

View file

@ -282,7 +282,7 @@ func (s *server) messageDeliverNats(proc process, message Message) {
// If the message is an ACK type of message we must check that a // If the message is an ACK type of message we must check that a
// reply, and if it is not we don't wait here at all. // reply, and if it is not we don't wait here at all.
fmt.Printf("---- MESSAGE : %v\n", message) fmt.Printf("---- MESSAGE : %v\n", message)
if message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK { if proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK {
// Wait up until 10 seconds for a reply, // Wait up until 10 seconds for a reply,
// continue and resend if to reply received. // continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout))
@ -339,12 +339,12 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// that there was a problem like missing method to handle a specific // that there was a problem like missing method to handle a specific
// method etc. // method etc.
switch { switch {
case message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK: case proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK:
log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", message.CommandOrEvent) log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", proc.subject.CommandOrEvent)
mf, ok := s.methodsAvailable.CheckIfExists(message.Method) mf, ok := s.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
// TODO: Check how errors should be handled here!!! // TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent) log.Printf("error: subscriberHandler: method type not available: %v\n", proc.subject.CommandOrEvent)
} }
fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: ACK\n") fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: ACK\n")
@ -386,12 +386,12 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
err := fmt.Errorf("error: some testing error we want to send out") err := fmt.Errorf("error: some testing error we want to send out")
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
} }
case message.CommandOrEvent == CommandNACK || message.CommandOrEvent == EventNACK: case proc.subject.CommandOrEvent == CommandNACK || proc.subject.CommandOrEvent == EventNACK:
log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", message.CommandOrEvent) log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", proc.subject.CommandOrEvent)
mf, ok := s.methodsAvailable.CheckIfExists(message.Method) mf, ok := s.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
// TODO: Check how errors should be handled here!!! // TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent) log.Printf("error: subscriberHandler: method type not available: %v\n", proc.subject.CommandOrEvent)
} }
// since we don't send a reply for a NACK message, we don't care about the // since we don't send a reply for a NACK message, we don't care about the
// out return when calling mf.handler // out return when calling mf.handler
@ -403,7 +403,7 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
} }
default: default:
log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent) log.Printf("info: did not find that specific type of command: %#v\n", proc.subject.CommandOrEvent)
} }
} }
@ -427,11 +427,10 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
Method: ErrorLog, Method: ErrorLog,
}, },
Message: Message{ Message: Message{
ToNode: "errorCentral", ToNode: "errorCentral",
FromNode: FromNode, FromNode: FromNode,
Data: []string{theError.Error()}, Data: []string{theError.Error()},
CommandOrEvent: EventNACK, Method: ErrorLog,
Method: ErrorLog,
}, },
} }

View file

@ -1,8 +1,34 @@
// NB: // The structure of how to add new method types to the system.
// When adding new constants for the Method or CommandOrEvent // -----------------------------------------------------------
// types, make sure to also add them to the map // All methods need 3 things:
// <Method/CommandOrEvent>Available since the this will be used // - A type definition
// to check if the message values are valid later on. // - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
// type methodCommandCLICommand struct {
// commandOrEvent CommandOrEvent
// }
//
// func (m methodCommandCLICommand) getKind() CommandOrEvent {
// return m.commandOrEvent
// }
//
// func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) {
// ...
// ...
// outMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return outMsg, nil
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the map, where the value is
// the actual type you want to map it to with a handler method.
// You also specify if it is a Command or Event, and if it is
// ACK or NACK.
// Check out the existing code below for more examples.
package steward package steward
@ -15,24 +41,8 @@ import (
) )
// ------------------------------------------------------------ // ------------------------------------------------------------
// The constants that will be used throughout the system for
// Method is used to specify the actual function/method that // when specifying what kind of Method to send or work with.
// is represented in a typed manner.
type Method string
func (m Method) GetMethodsAvailable() MethodsAvailable {
ma := MethodsAvailable{
topics: map[Method]methodHandler{
CLICommand: methodCommandCLICommand{},
TextLogging: methodEventTextLogging{},
SayHello: methodEventSayHello{},
ErrorLog: methodEventErrorLog{},
},
}
return ma
}
const ( const (
// Shell command to be executed via f.ex. bash // Shell command to be executed via f.ex. bash
CLICommand Method = "CLICommand" CLICommand Method = "CLICommand"
@ -44,6 +54,49 @@ const (
ErrorLog Method = "ErrorLog" ErrorLog Method = "ErrorLog"
) )
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
// The mapping of all the method constants specified, what type
// it references, and the kind if it is an Event or Command, and
// if it is ACK or NACK.
// Allowed values for the commandOrEvent field are:
// - CommandACK
// - CommandNACK
// - EventACK
// - EventNack
func (m Method) GetMethodsAvailable() MethodsAvailable {
ma := MethodsAvailable{
topics: map[Method]methodHandler{
CLICommand: methodCommandCLICommand{
commandOrEvent: CommandACK,
},
TextLogging: methodEventTextLogging{
commandOrEvent: EventACK,
},
SayHello: methodEventSayHello{
commandOrEvent: EventNACK,
},
ErrorLog: methodEventErrorLog{
commandOrEvent: EventACK,
},
},
}
return ma
}
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func (m Method) getHandler(method Method) methodHandler {
ma := m.GetMethodsAvailable()
mh := ma.topics[method]
return mh
}
type MethodsAvailable struct { type MethodsAvailable struct {
topics map[Method]methodHandler topics map[Method]methodHandler
} }
@ -68,9 +121,18 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
type methodHandler interface { type methodHandler interface {
handler(server *server, message Message, node string) ([]byte, error) handler(server *server, message Message, node string) ([]byte, error)
getKind() CommandOrEvent
} }
type methodCommandCLICommand struct{} // -----
type methodCommandCLICommand struct {
commandOrEvent CommandOrEvent
}
func (m methodCommandCLICommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) { func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) {
// Since the command to execute is at the first position in the // Since the command to execute is at the first position in the
@ -91,7 +153,13 @@ func (m methodCommandCLICommand) handler(s *server, message Message, node string
// ----- // -----
type methodEventTextLogging struct{} type methodEventTextLogging struct {
commandOrEvent CommandOrEvent
}
func (m methodEventTextLogging) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodEventTextLogging) handler(s *server, message Message, node string) ([]byte, error) { func (m methodEventTextLogging) handler(s *server, message Message, node string) ([]byte, error) {
for _, d := range message.Data { for _, d := range message.Data {
@ -104,7 +172,13 @@ func (m methodEventTextLogging) handler(s *server, message Message, node string)
// ----- // -----
type methodEventSayHello struct{} type methodEventSayHello struct {
commandOrEvent CommandOrEvent
}
func (m methodEventSayHello) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) { func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) {
log.Printf("<--- Received hello from %v \n", message.FromNode) log.Printf("<--- Received hello from %v \n", message.FromNode)
@ -126,7 +200,13 @@ func (m methodEventSayHello) handler(s *server, message Message, node string) ([
// --- // ---
type methodEventErrorLog struct{} type methodEventErrorLog struct {
commandOrEvent CommandOrEvent
}
func (m methodEventErrorLog) getKind() CommandOrEvent {
return m.commandOrEvent
}
func (m methodEventErrorLog) handler(s *server, message Message, node string) ([]byte, error) { func (m methodEventErrorLog) handler(s *server, message Message, node string) ([]byte, error) {
log.Printf("----------------------------------------------------------------------------..\n") log.Printf("----------------------------------------------------------------------------..\n")