1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

implemented the use of the subject type to name the subject

This commit is contained in:
postmannen 2021-02-03 10:23:50 +01:00
parent 41ced11b59
commit 9fae25e19b
2 changed files with 11 additions and 6 deletions

View file

@ -159,7 +159,7 @@ type process struct {
// the subject used for the specific process. One process // the subject used for the specific process. One process
// can contain only one sender on a message bus, hence // can contain only one sender on a message bus, hence
// also one subject // also one subject
subjects subject subject subject
// Put a node here to be able know the node a process is at. // Put a node here to be able know the node a process is at.
// NB: Might not be needed later on. // NB: Might not be needed later on.
node node node node
@ -177,6 +177,7 @@ func (s *server) prepareNewProcess(subject subject) process {
s.lastProcessID++ s.lastProcessID++
proc := process{ proc := process{
messageID: 0, messageID: 0,
subject: subject,
node: node(subject.node), node: node(subject.node),
processID: s.lastProcessID, processID: s.lastProcessID,
errorCh: make(chan string), errorCh: make(chan string),
@ -211,7 +212,7 @@ func (s *server) spawnProcess(proc process) {
s.processes[proc.node] = proc s.processes[proc.node] = proc
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// simulate that we get an error, and that we can send that // NB: simulate that we get an error, and that we can send that
// out of the process and receive it in another thread. // out of the process and receive it in another thread.
s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
@ -223,7 +224,7 @@ func (s *server) spawnProcess(proc process) {
// TODO: read this from local file or rest or....? // TODO: read this from local file or rest or....?
func getMessageToDeliver() Message { func getMessageToDeliver() Message {
return Message{ return Message{
Data: []string{"uname", "-a"}, Data: []string{"bash", "-c", "uname -a"},
MessageType: eventReturnAck, MessageType: eventReturnAck,
} }
} }
@ -236,10 +237,11 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
} }
msg := &nats.Msg{ msg := &nats.Msg{
Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"), Subject: fmt.Sprintf("%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"),
// Structure of the reply message are: // Structure of the reply message are:
// reply.<nodename>.<message type>.<method> // reply.<nodename>.<message type>.<method>
Reply: "reply." + string(proc.node) + "command.shellcommand", Reply: fmt.Sprintf("repply.%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain),
Data: dataPayload, Data: dataPayload,
} }

View file

@ -21,7 +21,7 @@ func (s *server) RunSubscriber() {
// Subscribe will start up a Go routine under the hood calling the // Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received. // callback function specified when a new message is received.
subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell")
_, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName))
if err != nil { if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err) fmt.Printf("error: Subscribe failed: %v\n", err)
@ -34,6 +34,9 @@ func (s *server) RunSubscriber() {
fmt.Printf("%v\n", msg) fmt.Printf("%v\n", msg)
switch msg.MessageType { switch msg.MessageType {
case eventReturnAck: case eventReturnAck:
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.
c := msg.Data[0] c := msg.Data[0]
a := msg.Data[1:] a := msg.Data[1:]
cmd := exec.Command(c, a...) cmd := exec.Command(c, a...)