From 809ea472a42a6db6802720a2392d019ba4147516 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 8 Feb 2021 21:58:31 +0100 Subject: [PATCH] Rewrote publisher to handle incomming messages concurrently --- subscriber.go | 103 +++++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/subscriber.go b/subscriber.go index 650a348..b0da449 100644 --- a/subscriber.go +++ b/subscriber.go @@ -15,76 +15,75 @@ import ( // TODO: Right now the only thing a subscriber can do is ro receive commands, // check if there are more things a subscriber should be able to do. func (s *server) RunSubscriber() { - - // Create a channel to put the data received in the subscriber callback - // function - reqMsgCh := make(chan Message) + subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. - subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") - _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) + _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + go handler(s.natsConn, s.nodeName, msg) + }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) } // Do some further processing of the actual data we received in the // subscriber callback function. - for { - msg := <-reqMsgCh - //fmt.Printf("%v\n", msg) - switch msg.MessageType { - case "Command": - // Since the command to execute is at the first position in the - // slice we need to slice it out. The arguments are at the - // remaining positions. - c := msg.Data[0] - a := msg.Data[1:] - cmd := exec.Command(c, a...) - cmd.Stdout = os.Stdout - err := cmd.Start() - if err != nil { - log.Printf("error: execution of command failed: %v\n", err) - } - case "Event": - // Since the command to execute is at the first position in the - // slice we need to slice it out. The arguments are at the - // remaining positions. - c := msg.Data[0] - a := msg.Data[1:] - cmd := exec.Command(c, a...) - cmd.Stdout = os.Stdout - err := cmd.Start() - if err != nil { - log.Printf("error: execution of command failed: %v\n", err) - } - default: - log.Printf("info: did not find that specific type of command: %#v\n", msg.MessageType) - } - - } + select {} } // Listen for message will send an ACK message back to the sender, // and put the received incomming message on the reqMsg channel // for further processing. -func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) func(req *nats.Msg) { - return func(req *nats.Msg) { - message := Message{} +func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { - // Create a buffer to decode the gob encoded binary data back - // to it's original structure. - buf := bytes.NewBuffer(req.Data) - gobDec := gob.NewDecoder(buf) - err := gobDec.Decode(&message) + message := Message{} + + // Create a buffer to decode the gob encoded binary data back + // to it's original structure. + buf := bytes.NewBuffer(msg.Data) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + log.Printf("error: gob decoding failed: %v\n", err) + } + + // --------- + + //fmt.Printf("%v\n", msg) + switch message.MessageType { + case "Command": + // Since the command to execute is at the first position in the + // slice we need to slice it out. The arguments are at the + // remaining positions. + c := message.Data[0] + a := message.Data[1:] + cmd := exec.Command(c, a...) + //cmd.Stdout = os.Stdout + out, err := cmd.CombinedOutput() if err != nil { - log.Printf("error: gob decoding failed: %v\n", err) + log.Printf("error: execution of command failed: %v\n", err) } - - // Put the data recived on the channel for further processing - reqMsgCh <- message + fmt.Printf("%s", out) // Send a confirmation message back to the publisher - natsConn.Publish(req.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) + natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) + case "Event": + // Since the command to execute is at the first position in the + // slice we need to slice it out. The arguments are at the + // remaining positions. + c := message.Data[0] + a := message.Data[1:] + cmd := exec.Command(c, a...) + cmd.Stdout = os.Stdout + err := cmd.Start() + if err != nil { + log.Printf("error: execution of command failed: %v\n", err) + } + + // Send a confirmation message back to the publisher + natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) + default: + log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType) } + // --------- }