1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

rewrote so all processes receive the messages via a channel

This commit is contained in:
postmannen 2021-02-03 14:53:25 +01:00
parent 6ec1ce54f4
commit efb781ccf0

View file

@ -107,30 +107,70 @@ func (s *server) PublisherStart() {
}
}()
// Prepare and start a single process
{
sub := subject{
node: "btship1",
messageType: "command",
method: "shellcommand",
domain: "shell",
messageCh: make(chan Message),
}
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
}
// Prepare and start a single process
{
sub := subject{
node: "btship2",
messageType: "command",
method: "shellcommand",
domain: "shell",
messageCh: make(chan Message),
}
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
}
// Simulate generating some commands to be sent as messages to nodes.
go func() {
for {
m := Message{
Data: []string{"bash", "-c", "uname -a"},
MessageType: eventReturnAck,
}
subjName := subjectName("btship1.command.shellcommand.shell")
_, ok := s.processes[subjName]
if ok {
s.processes[subjName].subject.messageCh <- m
} else {
time.Sleep(time.Millisecond * 500)
continue
}
}
}()
// // Simulate generating some commands to be sent as messages to nodes.
// go func() {
// for {
// m := Message{
// Data: []string{"bash", "-c", "uname -a"},
// MessageType: eventReturnAck,
// }
// subjName := subjectName("btship2.command.shellcommand.shell")
// _, ok := s.processes[subjName]
// if ok {
// s.processes[subjName].subject.messageCh <- m
// } else {
// time.Sleep(time.Millisecond * 500)
// continue
// }
// }
// }()
select {}
}
@ -151,6 +191,8 @@ type subject struct {
// thing. Domain is here used to differentiate the the services and
// tell with one word what it is for.
domain string
// messageCh is the channel for receiving new content to be sent
messageCh chan Message
}
type subjectName string
@ -176,7 +218,7 @@ type process struct {
errorCh chan string
// messageCh are the channel where we put the message we want
// a process to send
messageCh chan Message
//messageCh chan Message
}
// prepareNewProcess will set the the provided values and the default
@ -190,7 +232,7 @@ func (s *server) processPrepareNew(subject subject) process {
node: node(subject.node),
processID: s.lastProcessID,
errorCh: make(chan string),
messageCh: make(chan Message),
//messageCh: make(chan Message),
}
return proc
@ -201,13 +243,12 @@ func (s *server) processPrepareNew(subject subject) process {
// map.
func (s *server) processSpawn(proc process) {
mu.Lock()
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
s.processes[proc.subject.name()] = proc
mu.Unlock()
for k, v := range s.processes {
fmt.Printf("DEBUG: k=%v, v=%v \n", k, v)
}
// Loop creating one new message every second to simulate getting new
// messages to deliver.
//
@ -217,7 +258,9 @@ func (s *server) processSpawn(proc process) {
// is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process.
for {
m := getMessageToDeliver()
// m := getMessageToDeliver()
// Wait and read the next message on the message channel
m := <-proc.subject.messageCh
m.ID = s.processes[proc.subject.name()].messageID
messageDeliver(proc, m, s.natsConn)