1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

changed MessageKind to CommandOrEvent

This commit is contained in:
postmannen 2021-02-10 07:25:44 +01:00
parent 271447d42d
commit 1a3b19071e
8 changed files with 106 additions and 144 deletions

View file

@ -37,9 +37,5 @@ func main() {
// Start the messaging server // Start the messaging server
go s.Start() go s.Start()
//if *modeSubscriber {
// go s.RunSubscriber()
//}
select {} select {}
} }

View file

@ -3,13 +3,13 @@
"subject": "subject":
{ {
"node":"central", "node":"central",
"messageKind":"event", "commandOrEvent":"event",
"method":"textlogging" "method":"textlogging"
}, },
"message": "message":
{ {
"data": ["some message sent from a ship"], "data": ["some message sent from a ship"],
"messageType":"Event" "commandOrEvent":"Event"
} }
} }
] ]

View file

@ -3,13 +3,13 @@
"subject": "subject":
{ {
"node":"ship1", "node":"ship1",
"messageKind":"command", "commandOrEvent":"command",
"method":"shellcommand" "method":"shellcommand"
}, },
"message": "message":
{ {
"data": ["bash","-c","ls -l ../"], "data": ["bash","-c","ls -l ../"],
"messageType":"Command" "commandOrEvent":"Command"
} }
} }
] ]

View file

@ -3,13 +3,13 @@
"subject": "subject":
{ {
"node":"ship2", "node":"ship2",
"messageKind":"command", "commandOrEvent":"command",
"method":"shellcommand" "method":"shellcommand"
}, },
"message": "message":
{ {
"data": ["bash","-c","tree ../"], "data": ["bash","-c","tree ../"],
"messageType":"Command" "commandOrEvent":"Command"
} }
} }
] ]

View file

@ -39,7 +39,7 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, f
} }
for i := range js { for i := range js {
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind) fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent)
js[i].Message.FromNode = node(s.nodeName) js[i].Message.FromNode = node(s.nodeName)
} }

View file

@ -6,19 +6,23 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
"os/exec"
"sync" "sync"
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// MessageKind describes on the message level if this is // CommandOrEvent describes on the message level if this is
// an event or command kind of message in the Subject name. // an event or command kind of message in the Subject name.
// This field is mainly used to be able to spawn up different // This field is mainly used to be able to spawn up different
// worker processes based on the Subject name so we can have // worker processes based on the Subject name so we can have
// one process for handling event kind, and another for // one process for handling event kind, and another for
// handling command kind of messages. // handling command kind of messages.
type MessageKind string // This type is used in both building the subject name, and
// also inside the Message type to describe if it is a Command
// or Event.
type CommandOrEvent string
// TODO: Figure it makes sense to have these types at all. // TODO: Figure it makes sense to have these types at all.
// It might make more sense to implement these as two // It might make more sense to implement these as two
@ -29,13 +33,13 @@ const (
// delivered back in the reply ack message. // delivered back in the reply ack message.
// The message should contain the unique ID of the // The message should contain the unique ID of the
// command. // command.
Command MessageKind = "command" Command CommandOrEvent = "command"
// shellCommand, wait for and return the output // shellCommand, wait for and return the output
// of the command in the ACK message. This means // of the command in the ACK message. This means
// that the command should be executed immediately // that the command should be executed immediately
// and that we should get the confirmation that it // and that we should get the confirmation that it
// was successful or not. // was successful or not.
Event MessageKind = "event" Event CommandOrEvent = "event"
// eventCommand, just wait for the ACK that the // eventCommand, just wait for the ACK that the
// message is received. What action happens on the // message is received. What action happens on the
// receiving side is up to the received to decide. // receiving side is up to the received to decide.
@ -49,8 +53,8 @@ type Message struct {
// interface type here to handle several data types ? // interface type here to handle several data types ?
Data []string `json:"data" yaml:"data"` Data []string `json:"data" yaml:"data"`
// The type of the message being sent // The type of the message being sent
MessageType MessageKind `json:"messageType" yaml:"messageType"` CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
FromNode node FromNode node
} }
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
@ -101,13 +105,16 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
} }
// Start will spawn up all the defined subscriber processes.
// Spawning of publisher processes is done on the fly by checking
// if there is publisher process for a given message subject. This
// checking is also started here in Start by calling handleMessagesToPublish.
func (s *server) Start() { func (s *server) Start() {
// Start the checking the input file for new messages from operator. // Start the checking the input file for new messages from operator.
go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
// Start the textlogging service that will run on the subscribers // Start the textlogging service that will run on the subscribers
// TODO: Figure out how to structure event services like these // TODO: Figure out how to structure event services like these
go s.startTextLogging(s.logCh) go s.startTextLogging(s.logCh)
// Start a subscriber for shellCommand messages // Start a subscriber for shellCommand messages
@ -129,33 +136,26 @@ func (s *server) Start() {
} }
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
fmt.Printf("*** Output of processes map: %#v\n", s.processes) s.printProcessesMap()
// Prepare and start a single process s.handleMessagesToPublish()
//{
// sub := newSubject("ship1", "command", "shellcommand")
// proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// // fmt.Printf("*** %#v\n", proc)
// go s.processSpawnWorker(proc)
//}
// Prepare and start a single process
// {
// sub := newSubject("ship2", "command", "shellcommand")
// proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// // fmt.Printf("*** %#v\n", proc)
// go s.processSpawnWorker(proc)
// }
s.handleNewOperatorMessages()
select {} select {}
} }
func (s *server) printProcessesMap() {
fmt.Println("--------------------------------------------------------------------------------------------")
fmt.Printf("*** Output of processes map :\n")
for _, v := range s.processes {
fmt.Printf("*** - : %v\n", v)
}
fmt.Println("--------------------------------------------------------------------------------------------")
}
// handleNewOperatorMessages will handle all the new operator messages // handleNewOperatorMessages will handle all the new operator messages
// given to the system, and route them to the correct subject queue. // given to the system, and route them to the correct subject queue.
func (s *server) handleNewOperatorMessages() { func (s *server) handleMessagesToPublish() {
// Process the messages that have been received on the incomming // Process the messages that have been received on the incomming
// message pipe. Check and send if there are a specific subject // message pipe. Check and send if there are a specific subject
// for it, and no subject exist throw an error. // for it, and no subject exist throw an error.
@ -185,12 +185,13 @@ func (s *server) handleNewOperatorMessages() {
// If a publisher do not exist for the given subject, create it. // If a publisher do not exist for the given subject, create it.
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName) log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(v[i].Subject.Node, v[i].Subject.MessageKind, v[i].Subject.Method) sub := newSubject(v[i].Subject.Node, v[i].Subject.CommandOrEvent, v[i].Subject.Method)
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
s.printProcessesMap()
goto redo goto redo
} }
} }
@ -206,7 +207,7 @@ type Subject struct {
// node, the name of the node // node, the name of the node
Node string `json:"node" yaml:"node"` Node string `json:"node" yaml:"node"`
// messageType, command/event // messageType, command/event
MessageKind MessageKind `json:"messageKind" yaml:"messageKind"` CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. shellcommand, syslog, etc. // method, what is this message doing, etc. shellcommand, syslog, etc.
Method string `json:"method" yaml:"method"` Method string `json:"method" yaml:"method"`
// messageCh is the channel for receiving new content to be sent // messageCh is the channel for receiving new content to be sent
@ -216,12 +217,12 @@ type Subject struct {
// newSubject will return a new variable of the type subject, and insert // newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel // all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject. // to receive new messages on the specific subject.
func newSubject(node string, messageKind MessageKind, method string) Subject { func newSubject(node string, commandOrEvent CommandOrEvent, method string) Subject {
return Subject{ return Subject{
Node: node, Node: node,
MessageKind: messageKind, CommandOrEvent: commandOrEvent,
Method: method, Method: method,
messageCh: make(chan Message), messageCh: make(chan Message),
} }
} }
@ -229,7 +230,7 @@ func newSubject(node string, messageKind MessageKind, method string) Subject {
type subjectName string type subjectName string
func (s Subject) name() subjectName { func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method)) return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.CommandOrEvent, s.Method))
} }
// processKind are either kindSubscriber or kindPublisher, and are // processKind are either kindSubscriber or kindPublisher, and are
@ -294,6 +295,8 @@ func (s *server) processSpawnWorker(proc process) {
// give the message to the correct publisher process. A channel that // give the message to the correct publisher process. A channel that
// is listened on in the for loop below could be used to receive the // is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process. // messages from the message-pickup-process.
//
// Handle publisher workers
if proc.processKind == processKindPublisher { if proc.processKind == processKindPublisher {
for { for {
// Wait and read the next message on the message channel // Wait and read the next message on the message channel
@ -325,6 +328,7 @@ func (s *server) processSpawnWorker(proc process) {
} }
} }
// handle subscriber workers
if proc.processKind == processKindSubscriber { if proc.processKind == processKindSubscriber {
//subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") //subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
subject := string(proc.subject.name()) subject := string(proc.subject.name())
@ -335,7 +339,7 @@ func (s *server) processSpawnWorker(proc process) {
// We start one handler per message received by using go routines here. // We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent // This is for being able to reply back the current publisher who sent
// the message. // the message.
go s.handler(s.natsConn, s.nodeName, msg) go s.subscriberHandler(s.natsConn, s.nodeName, msg)
}) })
if err != nil { if err != nil {
log.Printf("error: Subscribe failed: %v\n", err) log.Printf("error: Subscribe failed: %v\n", err)
@ -403,3 +407,62 @@ func gobEncodePayload(m Message) ([]byte, error) {
return buf.Bytes(), nil return buf.Bytes(), nil
} }
// handler will deserialize the message when a new message is received,
// check the MessageType field in the message to decide what kind of
// message it is and then it will check how to handle that message type,
// and handle it.
// This handler function should be started in it's own go routine,so
// one individual handler is started per message received so we can keep
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.Msg) {
message := Message{}
// Create a buffer to decode the gob encoded binary data back
// to it's original structure.
buf := bytes.NewBuffer(msg.Data)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
log.Printf("error: gob decoding failed: %v\n", err)
}
//fmt.Printf("%v\n", msg)
// TODO: Maybe the handling of the errors within the subscriber
// should also involve the error-kernel to report back centrally
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case message.CommandOrEvent == "Command":
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.
c := message.Data[0]
a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
fmt.Printf("%s", out)
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out)))
case message.CommandOrEvent == "Event":
fmt.Printf("info: sending over the message %#v\n", message)
for _, d := range message.Data {
s.logCh <- []byte(d)
}
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID)))
default:
log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent)
}
}

View file

@ -1,98 +1 @@
package steward package steward
import (
"bytes"
"encoding/gob"
"fmt"
"log"
"os/exec"
"github.com/nats-io/nats.go"
)
// RunSubscriber will start a subscribing process.
// TODO: Right now the only thing a subscriber can do is ro receive commands,
// check if there are more things a subscriber should be able to do.
func (s *server) RunSubscriber() {
{
fmt.Printf("nodeName: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
}
// subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
//
// // Subscribe will start up a Go routine under the hood calling the
// // callback function specified when a new message is received.
// _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// // We start one handler per message received by using go routines here.
// // This is for being able to reply back the current publisher who sent
// // the message.
// go handler(s.natsConn, s.nodeName, msg)
// })
// if err != nil {
// log.Printf("error: Subscribe failed: %v\n", err)
// }
select {}
}
// handler will deserialize the message when a new message is received,
// check the MessageType field in the message to decide what kind of
// message it and then it will check how to handle that message type,
// and handle it.
// This handler function should be started in it's own go routine,so
// one individual handler is started per message received so we can keep
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
func (s *server) handler(natsConn *nats.Conn, node string, msg *nats.Msg) {
message := Message{}
// Create a buffer to decode the gob encoded binary data back
// to it's original structure.
buf := bytes.NewBuffer(msg.Data)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
log.Printf("error: gob decoding failed: %v\n", err)
}
//fmt.Printf("%v\n", msg)
// TODO: Maybe the handling of the errors within the subscriber
// should also involve the error-kernel to report back centrally
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case message.MessageType == "Command":
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.
c := message.Data[0]
a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
fmt.Printf("%s", out)
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out)))
case message.MessageType == "Event":
fmt.Printf("info: sending over the message %#v\n", message)
for _, d := range message.Data {
s.logCh <- []byte(d)
}
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID)))
default:
log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType)
}
}

View file

@ -1 +1 @@
some message sent from a shipsome message sent from a ship some message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a ship