diff --git a/publisher.go b/publisher.go index db4a088..7ee602d 100644 --- a/publisher.go +++ b/publisher.go @@ -104,7 +104,7 @@ func (s *server) PublisherStart() { // Prepare and start a single process { sub := newSubject("ship1", "command", "shellcommand") - proc := s.processPrepareNew(sub, s.errorCh) + proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) // fmt.Printf("*** %#v\n", proc) go s.processSpawnWorker(proc) } @@ -112,7 +112,7 @@ func (s *server) PublisherStart() { // Prepare and start a single process { sub := newSubject("ship2", "command", "shellcommand") - proc := s.processPrepareNew(sub, s.errorCh) + proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) // fmt.Printf("*** %#v\n", proc) go s.processSpawnWorker(proc) } @@ -191,11 +191,14 @@ func (s Subject) name() subjectName { return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method)) } +// processKind are either kindSubscriber or kindPublisher, and are +// used to distinguish the kind of process to spawn and to know +// the process kind put in the process map. type processKind string const ( - kindSubscriber processKind = "subscriber" - kindPublisher processKind = "publisher" + processKindSubscriber processKind = "subscriber" + processKindPublisher processKind = "publisher" ) // process are represent the communication to one individual host @@ -212,20 +215,22 @@ type process struct { processID int // errorCh is used to report errors from a process // NB: Implementing this as an int to report for testing - errorCh chan errProcess + errorCh chan errProcess + processKind processKind } // prepareNewProcess will set the the provided values and the default // values for a process. -func (s *server) processPrepareNew(subject Subject, errCh chan errProcess) process { +func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind) process { // create the initial configuration for a sessions communicating with 1 host process. s.lastProcessID++ proc := process{ - messageID: 0, - subject: subject, - node: node(subject.Node), - processID: s.lastProcessID, - errorCh: errCh, + messageID: 0, + subject: subject, + node: node(subject.Node), + processID: s.lastProcessID, + errorCh: errCh, + processKind: processKind, //messageCh: make(chan Message), } @@ -248,32 +253,34 @@ func (s *server) processSpawnWorker(proc process) { // give the message to the correct publisher process. A channel that // is listened on in the for loop below could be used to receive the // messages from the message-pickup-process. - for { - // Wait and read the next message on the message channel - m := <-proc.subject.messageCh - m.ID = s.processes[proc.subject.name()].messageID - messageDeliver(proc, m, s.natsConn) + if proc.processKind == processKindPublisher { + for { + // Wait and read the next message on the message channel + m := <-proc.subject.messageCh + m.ID = s.processes[proc.subject.name()].messageID + messageDeliver(proc, m, s.natsConn) - // Increment the counter for the next message to be sent. - proc.messageID++ - s.processes[proc.subject.name()] = proc - time.Sleep(time.Second * 1) + // Increment the counter for the next message to be sent. + proc.messageID++ + s.processes[proc.subject.name()] = proc + time.Sleep(time.Second * 1) - // NB: simulate that we get an error, and that we can send that - // out of the process and receive it in another thread. - ep := errProcess{ - infoText: "process failed", - process: proc, - message: m, - errorActionCh: make(chan errorAction), - } - s.errorCh <- ep + // NB: simulate that we get an error, and that we can send that + // out of the process and receive it in another thread. + ep := errProcess{ + infoText: "process failed", + process: proc, + message: m, + errorActionCh: make(chan errorAction), + } + s.errorCh <- ep - // Wait for the response action back from the error kernel, and - // decide what to do. Should we continue, quit, or .... ? - switch <-ep.errorActionCh { - case errActionContinue: - log.Printf("The errAction was continue...so we're continuing\n") + // Wait for the response action back from the error kernel, and + // decide what to do. Should we continue, quit, or .... ? + switch <-ep.errorActionCh { + case errActionContinue: + log.Printf("The errAction was continue...so we're continuing\n") + } } } }