From 39e29a079eb78e92bb9b9206148e550ebbffb928 Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 9 Feb 2021 11:16:02 +0100 Subject: [PATCH] added messageKind and handling of different types of message --- example-inmessage/orig-ship1.json | 5 ++-- example-inmessage/orig-ship2.json | 5 ++-- getmessagefromfile.go | 2 +- publisher.go | 40 ++++++++++++++++----------- subscriber.go | 45 +++++++++++++++---------------- 5 files changed, 50 insertions(+), 47 deletions(-) diff --git a/example-inmessage/orig-ship1.json b/example-inmessage/orig-ship1.json index 8ba7561..3b42c32 100644 --- a/example-inmessage/orig-ship1.json +++ b/example-inmessage/orig-ship1.json @@ -3,9 +3,8 @@ "subject": { "node":"ship1", - "messageType":"command", - "method":"shellcommand", - "domain":"shell" + "messageKind":"command", + "method":"shellcommand" }, "message": { diff --git a/example-inmessage/orig-ship2.json b/example-inmessage/orig-ship2.json index a576f45..4a0c452 100644 --- a/example-inmessage/orig-ship2.json +++ b/example-inmessage/orig-ship2.json @@ -3,9 +3,8 @@ "subject": { "node":"ship2", - "messageType":"command", - "method":"shellcommand", - "domain":"shell" + "messageKind":"command", + "method":"shellcommand" }, "message": { diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 59abba8..2568b2d 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -39,7 +39,7 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh } for i := range js { - fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) + fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind) } // Send the data back to be consumed diff --git a/publisher.go b/publisher.go index 90f4526..db4a088 100644 --- a/publisher.go +++ b/publisher.go @@ -12,7 +12,13 @@ import ( "github.com/nats-io/nats.go" ) -type MessageType string +// MessageKind describes on the message level if this is +// an event or command kind of message in the Subject name. +// This field is mainly used to be able to spawn up different +// worker processes based on the Subject name so we can have +// one process for handling event kind, and another for +// handling command kind of messages. +type MessageKind string // TODO: Figure it makes sense to have these types at all. // It might make more sense to implement these as two @@ -23,13 +29,13 @@ const ( // delivered back in the reply ack message. // The message should contain the unique ID of the // command. - Command MessageType = "command" + Command MessageKind = "command" // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - Event MessageType = "event" + Event MessageKind = "event" // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. @@ -43,7 +49,7 @@ type Message struct { // interface type here to handle several data types ? Data []string `json:"data" yaml:"data"` // The type of the message being sent - MessageType MessageType `json:"messageType" yaml:"messageType"` + MessageType MessageKind `json:"messageType" yaml:"messageType"` } // server is the structure that will hold the state about spawned @@ -97,7 +103,7 @@ func (s *server) PublisherStart() { // Prepare and start a single process { - sub := newSubject("ship1", "command", "shellcommand", "shell") + sub := newSubject("ship1", "command", "shellcommand") proc := s.processPrepareNew(sub, s.errorCh) // fmt.Printf("*** %#v\n", proc) go s.processSpawnWorker(proc) @@ -105,7 +111,7 @@ func (s *server) PublisherStart() { // Prepare and start a single process { - sub := newSubject("ship2", "command", "shellcommand", "shell") + sub := newSubject("ship2", "command", "shellcommand") proc := s.processPrepareNew(sub, s.errorCh) // fmt.Printf("*** %#v\n", proc) go s.processSpawnWorker(proc) @@ -159,14 +165,9 @@ type Subject struct { // node, the name of the node Node string `json:"node" yaml:"node"` // messageType, command/event - MessageType MessageType `json:"messageType" yaml:"messageType"` + MessageKind MessageKind `json:"messageKind" yaml:"messageKind"` // method, what is this message doing, etc. shellcommand, syslog, etc. Method string `json:"method" yaml:"method"` - // domain is used to differentiate services. Like there can be more - // logging services, but rarely more logging services for the same - // thing. Domain is here used to differentiate the the services and - // tell with one word what it is for. - Domain string `json:"domain" yaml:"domain"` // messageCh is the channel for receiving new content to be sent messageCh chan Message } @@ -174,22 +175,29 @@ type Subject struct { // newSubject will return a new variable of the type subject, and insert // all the values given as arguments. It will also create the channel // to receive new messages on the specific subject. -func newSubject(node string, messageType MessageType, method string, domain string) Subject { +func newSubject(node string, messageKind MessageKind, method string) Subject { return Subject{ Node: node, - MessageType: messageType, + MessageKind: messageKind, Method: method, - Domain: domain, messageCh: make(chan Message), } } +// subjectName is the complete representation of a subject type subjectName string func (s Subject) name() subjectName { - return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.Node, s.MessageType, s.Method, s.Domain)) + return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method)) } +type processKind string + +const ( + kindSubscriber processKind = "subscriber" + kindPublisher processKind = "publisher" +) + // process are represent the communication to one individual host type process struct { messageID int diff --git a/subscriber.go b/subscriber.go index b0da449..dc15b74 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,7 +5,6 @@ import ( "encoding/gob" "fmt" "log" - "os" "os/exec" "github.com/nats-io/nats.go" @@ -15,25 +14,32 @@ import ( // TODO: Right now the only thing a subscriber can do is ro receive commands, // check if there are more things a subscriber should be able to do. func (s *server) RunSubscriber() { - subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") + subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // We start one handler per message received by using go routines here. + // This is for being able to reply back the current publisher who sent + // the message. go handler(s.natsConn, s.nodeName, msg) }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) } - // Do some further processing of the actual data we received in the - // subscriber callback function. select {} } -// Listen for message will send an ACK message back to the sender, -// and put the received incomming message on the reqMsg channel -// for further processing. +// handler will deserialize the message when a new message is received, +// check the MessageType field in the message to decide what kind of +// message it and then it will check how to handle that message type, +// and handle it. +// This handler function should be started in it's own go routine,so +// one individual handler is started per message received so we can keep +// the state of the message being processed, and then reply back to the +// correct sending process's reply, meaning so we ACK back to the correct +// publisher. func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { message := Message{} @@ -47,11 +53,13 @@ func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { log.Printf("error: gob decoding failed: %v\n", err) } - // --------- - //fmt.Printf("%v\n", msg) - switch message.MessageType { - case "Command": + // TODO: Maybe the handling of the errors within the subscriber + // should also involve the error-kernel to report back centrally + // that there was a problem like missing method to handle a specific + // method etc. + switch { + case message.MessageType == "Command": // Since the command to execute is at the first position in the // slice we need to slice it out. The arguments are at the // remaining positions. @@ -67,23 +75,12 @@ func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) - case "Event": - // Since the command to execute is at the first position in the - // slice we need to slice it out. The arguments are at the - // remaining positions. - c := message.Data[0] - a := message.Data[1:] - cmd := exec.Command(c, a...) - cmd.Stdout = os.Stdout - err := cmd.Start() - if err != nil { - log.Printf("error: execution of command failed: %v\n", err) - } + case message.MessageType == "Event": + fmt.Printf("info: the event type is not implemented yet\n") // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) default: log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType) } - // --------- }