1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

implemented the concept of processKind

This commit is contained in:
postmannen 2021-02-09 11:52:08 +01:00
parent 39e29a079e
commit 29254c04d0

View file

@ -104,7 +104,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship1", "command", "shellcommand") sub := newSubject("ship1", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh) proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
} }
@ -112,7 +112,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship2", "command", "shellcommand") sub := newSubject("ship2", "command", "shellcommand")
proc := s.processPrepareNew(sub, s.errorCh) proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
} }
@ -191,11 +191,14 @@ func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method)) return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method))
} }
// processKind are either kindSubscriber or kindPublisher, and are
// used to distinguish the kind of process to spawn and to know
// the process kind put in the process map.
type processKind string type processKind string
const ( const (
kindSubscriber processKind = "subscriber" processKindSubscriber processKind = "subscriber"
kindPublisher processKind = "publisher" processKindPublisher processKind = "publisher"
) )
// process are represent the communication to one individual host // process are represent the communication to one individual host
@ -212,20 +215,22 @@ type process struct {
processID int processID int
// errorCh is used to report errors from a process // errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing // NB: Implementing this as an int to report for testing
errorCh chan errProcess errorCh chan errProcess
processKind processKind
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func (s *server) processPrepareNew(subject Subject, errCh chan errProcess) process { func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++ s.lastProcessID++
proc := process{ proc := process{
messageID: 0, messageID: 0,
subject: subject, subject: subject,
node: node(subject.Node), node: node(subject.Node),
processID: s.lastProcessID, processID: s.lastProcessID,
errorCh: errCh, errorCh: errCh,
processKind: processKind,
//messageCh: make(chan Message), //messageCh: make(chan Message),
} }
@ -248,32 +253,34 @@ func (s *server) processSpawnWorker(proc process) {
// give the message to the correct publisher process. A channel that // give the message to the correct publisher process. A channel that
// is listened on in the for loop below could be used to receive the // is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process. // messages from the message-pickup-process.
for { if proc.processKind == processKindPublisher {
// Wait and read the next message on the message channel for {
m := <-proc.subject.messageCh // Wait and read the next message on the message channel
m.ID = s.processes[proc.subject.name()].messageID m := <-proc.subject.messageCh
messageDeliver(proc, m, s.natsConn) m.ID = s.processes[proc.subject.name()].messageID
messageDeliver(proc, m, s.natsConn)
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
proc.messageID++ proc.messageID++
s.processes[proc.subject.name()] = proc s.processes[proc.subject.name()] = proc
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// NB: simulate that we get an error, and that we can send that // NB: simulate that we get an error, and that we can send that
// out of the process and receive it in another thread. // out of the process and receive it in another thread.
ep := errProcess{ ep := errProcess{
infoText: "process failed", infoText: "process failed",
process: proc, process: proc,
message: m, message: m,
errorActionCh: make(chan errorAction), errorActionCh: make(chan errorAction),
} }
s.errorCh <- ep s.errorCh <- ep
// Wait for the response action back from the error kernel, and // Wait for the response action back from the error kernel, and
// decide what to do. Should we continue, quit, or .... ? // decide what to do. Should we continue, quit, or .... ?
switch <-ep.errorActionCh { switch <-ep.errorActionCh {
case errActionContinue: case errActionContinue:
log.Printf("The errAction was continue...so we're continuing\n") log.Printf("The errAction was continue...so we're continuing\n")
}
} }
} }
} }