1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

auto spawning of new publisher proc for new subjects.

This commit is contained in:
postmannen 2021-02-09 14:29:19 +01:00
parent 6d955b5fe0
commit 2594495668

View file

@ -102,20 +102,20 @@ func (s *server) PublisherStart() {
go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
// Prepare and start a single process // Prepare and start a single process
{ //{
sub := newSubject("ship1", "command", "shellcommand") // sub := newSubject("ship1", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) // proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc) // // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) // go s.processSpawnWorker(proc)
} //}
// Prepare and start a single process // Prepare and start a single process
{ // {
sub := newSubject("ship2", "command", "shellcommand") // sub := newSubject("ship2", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) // proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc) // // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) // go s.processSpawnWorker(proc)
} // }
s.handleNewOperatorMessages() s.handleNewOperatorMessages()
@ -137,8 +137,8 @@ func (s *server) handleNewOperatorMessages() {
// an error should be generated and processed by the error-kernel. // an error should be generated and processed by the error-kernel.
go func() { go func() {
for v := range s.newMessagesCh { for v := range s.newMessagesCh {
for _, vv := range v { for i, vv := range v {
redo:
m := vv.Message m := vv.Message
subjName := vv.Subject.name() subjName := vv.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, vv.Subject) fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, vv.Subject)
@ -148,9 +148,16 @@ func (s *server) handleNewOperatorMessages() {
// Put the message on the correct process's messageCh // Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m s.processes[subjName].subject.messageCh <- m
} else { } else {
log.Printf("info: did not find that specific subject: %v\n", subjName) // If a publisher do not exist for the given subject, create it.
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(v[i].Subject.Node, v[i].Subject.MessageKind, v[i].Subject.Method)
proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
continue goto redo
} }
} }
} }