diff --git a/publisher.go b/publisher.go index 67091d5..3db5aa1 100644 --- a/publisher.go +++ b/publisher.go @@ -107,30 +107,70 @@ func (s *server) PublisherStart() { } }() + // Prepare and start a single process { sub := subject{ node: "btship1", messageType: "command", method: "shellcommand", domain: "shell", + messageCh: make(chan Message), } proc := s.processPrepareNew(sub) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) } + // Prepare and start a single process { sub := subject{ node: "btship2", messageType: "command", method: "shellcommand", domain: "shell", + messageCh: make(chan Message), } proc := s.processPrepareNew(sub) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) } + // Simulate generating some commands to be sent as messages to nodes. + go func() { + for { + m := Message{ + Data: []string{"bash", "-c", "uname -a"}, + MessageType: eventReturnAck, + } + subjName := subjectName("btship1.command.shellcommand.shell") + _, ok := s.processes[subjName] + if ok { + s.processes[subjName].subject.messageCh <- m + } else { + time.Sleep(time.Millisecond * 500) + continue + } + } + }() + + // // Simulate generating some commands to be sent as messages to nodes. + // go func() { + // for { + // m := Message{ + // Data: []string{"bash", "-c", "uname -a"}, + // MessageType: eventReturnAck, + // } + // subjName := subjectName("btship2.command.shellcommand.shell") + // _, ok := s.processes[subjName] + // if ok { + // s.processes[subjName].subject.messageCh <- m + // } else { + // time.Sleep(time.Millisecond * 500) + // continue + // } + // } + // }() + select {} } @@ -151,6 +191,8 @@ type subject struct { // thing. Domain is here used to differentiate the the services and // tell with one word what it is for. domain string + // messageCh is the channel for receiving new content to be sent + messageCh chan Message } type subjectName string @@ -176,7 +218,7 @@ type process struct { errorCh chan string // messageCh are the channel where we put the message we want // a process to send - messageCh chan Message + //messageCh chan Message } // prepareNewProcess will set the the provided values and the default @@ -190,7 +232,7 @@ func (s *server) processPrepareNew(subject subject) process { node: node(subject.node), processID: s.lastProcessID, errorCh: make(chan string), - messageCh: make(chan Message), + //messageCh: make(chan Message), } return proc @@ -201,13 +243,12 @@ func (s *server) processPrepareNew(subject subject) process { // map. func (s *server) processSpawn(proc process) { mu.Lock() + // We use the full name of the subject to identify a unique + // process. We can do that since a process can only handle + // one message queue. s.processes[proc.subject.name()] = proc mu.Unlock() - for k, v := range s.processes { - fmt.Printf("DEBUG: k=%v, v=%v \n", k, v) - } - // Loop creating one new message every second to simulate getting new // messages to deliver. // @@ -217,7 +258,9 @@ func (s *server) processSpawn(proc process) { // is listened on in the for loop below could be used to receive the // messages from the message-pickup-process. for { - m := getMessageToDeliver() + // m := getMessageToDeliver() + // Wait and read the next message on the message channel + m := <-proc.subject.messageCh m.ID = s.processes[proc.subject.name()].messageID messageDeliver(proc, m, s.natsConn)