From 9fae25e19ba8f7c4ecb30d093afabf982e6568d0 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 3 Feb 2021 10:23:50 +0100 Subject: [PATCH] implemented the use of the subject type to name the subject --- publisher.go | 12 +++++++----- subscriber.go | 5 ++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/publisher.go b/publisher.go index 259909c..2514043 100644 --- a/publisher.go +++ b/publisher.go @@ -159,7 +159,7 @@ type process struct { // the subject used for the specific process. One process // can contain only one sender on a message bus, hence // also one subject - subjects subject + subject subject // Put a node here to be able know the node a process is at. // NB: Might not be needed later on. node node @@ -177,6 +177,7 @@ func (s *server) prepareNewProcess(subject subject) process { s.lastProcessID++ proc := process{ messageID: 0, + subject: subject, node: node(subject.node), processID: s.lastProcessID, errorCh: make(chan string), @@ -211,7 +212,7 @@ func (s *server) spawnProcess(proc process) { s.processes[proc.node] = proc time.Sleep(time.Second * 1) - // simulate that we get an error, and that we can send that + // NB: simulate that we get an error, and that we can send that // out of the process and receive it in another thread. s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) @@ -223,7 +224,7 @@ func (s *server) spawnProcess(proc process) { // TODO: read this from local file or rest or....? func getMessageToDeliver() Message { return Message{ - Data: []string{"uname", "-a"}, + Data: []string{"bash", "-c", "uname -a"}, MessageType: eventReturnAck, } } @@ -236,10 +237,11 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) { } msg := &nats.Msg{ - Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"), + Subject: fmt.Sprintf("%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain), + // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"), // Structure of the reply message are: // reply... - Reply: "reply." + string(proc.node) + "command.shellcommand", + Reply: fmt.Sprintf("repply.%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain), Data: dataPayload, } diff --git a/subscriber.go b/subscriber.go index 3054d38..6400084 100644 --- a/subscriber.go +++ b/subscriber.go @@ -21,7 +21,7 @@ func (s *server) RunSubscriber() { // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. - subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") + subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) if err != nil { fmt.Printf("error: Subscribe failed: %v\n", err) @@ -34,6 +34,9 @@ func (s *server) RunSubscriber() { fmt.Printf("%v\n", msg) switch msg.MessageType { case eventReturnAck: + // Since the command to execute is at the first position in the + // slice we need to slice it out. The arguments are at the + // remaining positions. c := msg.Data[0] a := msg.Data[1:] cmd := exec.Command(c, a...)