From db3d9077490b41814ea0b105ca0c9e88dfef257a Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 29 Jan 2021 14:22:36 +0100 Subject: [PATCH] added type naming standard for subject and moved subscriber into the same package --- README.md | 35 ++++++++++++++++++++++ central/main.go | 28 ++++++++++++++---- central/subscriber.go | 69 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 central/subscriber.go diff --git a/README.md b/README.md index 602bc6f..9a1fbc5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,37 @@ # steward + Async management of Edge units. + +## Concepts/Ideas + +### Terminology + +- Node: An installation of an operating system with an ip address +- Process: One message handler running in it's own thread with 1 subject for sending and 1 for reply. +- Message: + - Command: Something to be executed on the message received. An example can be a shell command. + - Event: Something that have happened. An example can be transfer of syslog data from a host. + +### Naming + +#### Subject + +Subject naming are case sensitive, and can not contain the space are the tab character. + +`..` + +Nodename: Are the hostname of the device. This do not have to be resolvable via DNS, it is just a unique name for the host to receive the message. + +Command/Event: Are type of message sent. `command` or `event`. Description of the differences are mentioned earlier. + +Method: Are the functionality the message provide. Example could be `shellcommand` or `syslogforwarding` + +##### Complete subject example + +For syslog of type event to a host named "ship1" + +`ship1.event.syslogforwarding` + +and for a shell command of type command to a host named "ship2" + +`ship2.command.shellcommand` diff --git a/central/main.go b/central/main.go index c4a6219..18cebaa 100644 --- a/central/main.go +++ b/central/main.go @@ -4,6 +4,7 @@ package main import ( "bytes" "encoding/gob" + "flag" "fmt" "log" "os" @@ -57,6 +58,7 @@ type server struct { processes map[node]process // The last processID created lastProcessID int + thisNodeName string } // newServer will prepare and return a server type @@ -66,7 +68,7 @@ func newServer(brokerAddress string) (*server, error) { }, nil } -func (s *server) Run() { +func (s *server) RunPublisher() { proc := s.prepareNewProcess("btship1") // fmt.Printf("*** %#v\n", proc) go s.spawnProcess(proc) @@ -167,10 +169,10 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) { } msg := &nats.Msg{ - Subject: string(proc.node), + Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"), // Structure of the reply message are: - // reply--pid - Reply: "reply-" + string(proc.node) + "-pid" + fmt.Sprint(proc.processID), + // reply... + Reply: "reply." + string(proc.node) + "command.shellcommand", Data: dataPayload, } @@ -220,11 +222,19 @@ func gobEncodePayload(m Message) ([]byte, error) { } func main() { + node := flag.String("node", "0", "some unique string to identify this Edge unit") + modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish") + modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe") + flag.Parse() + s, err := newServer("localhost") if err != nil { log.Printf("error: failed to connect to broker: %v\n", err) os.Exit(1) } + + s.thisNodeName = *node + // Create a connection to nats server s.natsConn, err = nats.Connect("localhost", nil) if err != nil { @@ -232,5 +242,13 @@ func main() { } defer s.natsConn.Close() - s.Run() + if *modePublisher { + go s.RunPublisher() + } + + if *modeSubscriber { + go s.RunSubscriber() + } + + select {} } diff --git a/central/subscriber.go b/central/subscriber.go new file mode 100644 index 0000000..fac915d --- /dev/null +++ b/central/subscriber.go @@ -0,0 +1,69 @@ +package main + +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + "os/exec" + + "github.com/nats-io/nats.go" +) + +func (s *server) RunSubscriber() { + + // Create a channel to put the data received in the subscriber callback + // function + reqMsgCh := make(chan Message) + + // Subscribe will start up a Go routine under the hood calling the + // callback function specified when a new message is received. + subject := fmt.Sprintf("%s.%s.%s", s.thisNodeName, "command", "shellcommand") + _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.thisNodeName)) + if err != nil { + fmt.Printf("error: Subscribe failed: %v\n", err) + } + + // Do some further processing of the actual data we received in the + // subscriber callback function. + for { + msg := <-reqMsgCh + fmt.Printf("%v\n", msg) + switch msg.MessageType { + case eventReturnAck: + c := msg.Data[0] + a := msg.Data[1:] + cmd := exec.Command(c, a...) + cmd.Stdout = os.Stdout + err := cmd.Start() + if err != nil { + fmt.Printf("error: execution of command failed: %v\n", err) + } + } + + } +} + +// Listen for message will send an ACK message back to the sender, +// and put the received incomming message on the reqMsg channel +// for further processing. +func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) func(req *nats.Msg) { + return func(req *nats.Msg) { + message := Message{} + + // Create a buffer to decode the gob encoded binary data back + // to it's original structure. + buf := bytes.NewBuffer(req.Data) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + fmt.Printf("error: gob decoding failed: %v\n", err) + } + + // Put the data recived on the channel for further processing + reqMsgCh <- message + + // Send a confirmation message back to the publisher + natsConn.Publish(req.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) + } +}