1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added messageKind and handling of different types of message

This commit is contained in:
postmannen 2021-02-09 11:16:02 +01:00
parent 809ea472a4
commit 39e29a079e
5 changed files with 50 additions and 47 deletions

View file

@ -3,9 +3,8 @@
"subject": "subject":
{ {
"node":"ship1", "node":"ship1",
"messageType":"command", "messageKind":"command",
"method":"shellcommand", "method":"shellcommand"
"domain":"shell"
}, },
"message": "message":
{ {

View file

@ -3,9 +3,8 @@
"subject": "subject":
{ {
"node":"ship2", "node":"ship2",
"messageType":"command", "messageKind":"command",
"method":"shellcommand", "method":"shellcommand"
"domain":"shell"
}, },
"message": "message":
{ {

View file

@ -39,7 +39,7 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh
} }
for i := range js { for i := range js {
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind)
} }
// Send the data back to be consumed // Send the data back to be consumed

View file

@ -12,7 +12,13 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
type MessageType string // MessageKind describes on the message level if this is
// an event or command kind of message in the Subject name.
// This field is mainly used to be able to spawn up different
// worker processes based on the Subject name so we can have
// one process for handling event kind, and another for
// handling command kind of messages.
type MessageKind string
// TODO: Figure it makes sense to have these types at all. // TODO: Figure it makes sense to have these types at all.
// It might make more sense to implement these as two // It might make more sense to implement these as two
@ -23,13 +29,13 @@ const (
// delivered back in the reply ack message. // delivered back in the reply ack message.
// The message should contain the unique ID of the // The message should contain the unique ID of the
// command. // command.
Command MessageType = "command" Command MessageKind = "command"
// shellCommand, wait for and return the output // shellCommand, wait for and return the output
// of the command in the ACK message. This means // of the command in the ACK message. This means
// that the command should be executed immediately // that the command should be executed immediately
// and that we should get the confirmation that it // and that we should get the confirmation that it
// was successful or not. // was successful or not.
Event MessageType = "event" Event MessageKind = "event"
// eventCommand, just wait for the ACK that the // eventCommand, just wait for the ACK that the
// message is received. What action happens on the // message is received. What action happens on the
// receiving side is up to the received to decide. // receiving side is up to the received to decide.
@ -43,7 +49,7 @@ type Message struct {
// interface type here to handle several data types ? // interface type here to handle several data types ?
Data []string `json:"data" yaml:"data"` Data []string `json:"data" yaml:"data"`
// The type of the message being sent // The type of the message being sent
MessageType MessageType `json:"messageType" yaml:"messageType"` MessageType MessageKind `json:"messageType" yaml:"messageType"`
} }
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
@ -97,7 +103,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship1", "command", "shellcommand", "shell") sub := newSubject("ship1", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh) proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
@ -105,7 +111,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship2", "command", "shellcommand", "shell") sub := newSubject("ship2", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh) proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
@ -159,14 +165,9 @@ type Subject struct {
// node, the name of the node // node, the name of the node
Node string `json:"node" yaml:"node"` Node string `json:"node" yaml:"node"`
// messageType, command/event // messageType, command/event
MessageType MessageType `json:"messageType" yaml:"messageType"` MessageKind MessageKind `json:"messageKind" yaml:"messageKind"`
// method, what is this message doing, etc. shellcommand, syslog, etc. // method, what is this message doing, etc. shellcommand, syslog, etc.
Method string `json:"method" yaml:"method"` Method string `json:"method" yaml:"method"`
// domain is used to differentiate services. Like there can be more
// logging services, but rarely more logging services for the same
// thing. Domain is here used to differentiate the the services and
// tell with one word what it is for.
Domain string `json:"domain" yaml:"domain"`
// messageCh is the channel for receiving new content to be sent // messageCh is the channel for receiving new content to be sent
messageCh chan Message messageCh chan Message
} }
@ -174,22 +175,29 @@ type Subject struct {
// newSubject will return a new variable of the type subject, and insert // newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel // all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject. // to receive new messages on the specific subject.
func newSubject(node string, messageType MessageType, method string, domain string) Subject { func newSubject(node string, messageKind MessageKind, method string) Subject {
return Subject{ return Subject{
Node: node, Node: node,
MessageType: messageType, MessageKind: messageKind,
Method: method, Method: method,
Domain: domain,
messageCh: make(chan Message), messageCh: make(chan Message),
} }
} }
// subjectName is the complete representation of a subject
type subjectName string type subjectName string
func (s Subject) name() subjectName { func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.Node, s.MessageType, s.Method, s.Domain)) return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method))
} }
type processKind string
const (
kindSubscriber processKind = "subscriber"
kindPublisher processKind = "publisher"
)
// process are represent the communication to one individual host // process are represent the communication to one individual host
type process struct { type process struct {
messageID int messageID int

View file

@ -5,7 +5,6 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
"os"
"os/exec" "os/exec"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -15,25 +14,32 @@ import (
// TODO: Right now the only thing a subscriber can do is ro receive commands, // TODO: Right now the only thing a subscriber can do is ro receive commands,
// check if there are more things a subscriber should be able to do. // check if there are more things a subscriber should be able to do.
func (s *server) RunSubscriber() { func (s *server) RunSubscriber() {
subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
// Subscribe will start up a Go routine under the hood calling the // Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received. // callback function specified when a new message is received.
_, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent
// the message.
go handler(s.natsConn, s.nodeName, msg) go handler(s.natsConn, s.nodeName, msg)
}) })
if err != nil { if err != nil {
log.Printf("error: Subscribe failed: %v\n", err) log.Printf("error: Subscribe failed: %v\n", err)
} }
// Do some further processing of the actual data we received in the
// subscriber callback function.
select {} select {}
} }
// Listen for message will send an ACK message back to the sender, // handler will deserialize the message when a new message is received,
// and put the received incomming message on the reqMsg channel // check the MessageType field in the message to decide what kind of
// for further processing. // message it and then it will check how to handle that message type,
// and handle it.
// This handler function should be started in it's own go routine,so
// one individual handler is started per message received so we can keep
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { func handler(natsConn *nats.Conn, node string, msg *nats.Msg) {
message := Message{} message := Message{}
@ -47,11 +53,13 @@ func handler(natsConn *nats.Conn, node string, msg *nats.Msg) {
log.Printf("error: gob decoding failed: %v\n", err) log.Printf("error: gob decoding failed: %v\n", err)
} }
// ---------
//fmt.Printf("%v\n", msg) //fmt.Printf("%v\n", msg)
switch message.MessageType { // TODO: Maybe the handling of the errors within the subscriber
case "Command": // should also involve the error-kernel to report back centrally
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case message.MessageType == "Command":
// Since the command to execute is at the first position in the // Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the // slice we need to slice it out. The arguments are at the
// remaining positions. // remaining positions.
@ -67,23 +75,12 @@ func handler(natsConn *nats.Conn, node string, msg *nats.Msg) {
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out)))
case "Event": case message.MessageType == "Event":
// Since the command to execute is at the first position in the fmt.Printf("info: the event type is not implemented yet\n")
// slice we need to slice it out. The arguments are at the
// remaining positions.
c := message.Data[0]
a := message.Data[1:]
cmd := exec.Command(c, a...)
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID)))
default: default:
log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType) log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType)
} }
// ---------
} }