diff --git a/process.go b/process.go index f53f63b..980e72c 100644 --- a/process.go +++ b/process.go @@ -52,11 +52,13 @@ type process struct { procFuncCh chan Message // copy of the configuration from server configuration *Configuration + // The new messages channel copied from *Server + newMessagesCh chan<- []subjectAndMessage } // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(processes *processes, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { +func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -77,6 +79,8 @@ func newProcess(processes *processes, subject Subject, errCh chan errProcess, pr processKind: processKind, allowedReceivers: m, methodsAvailable: method.GetMethodsAvailable(), + newMessagesCh: newMessagesCh, + configuration: configuration, } return proc @@ -90,8 +94,6 @@ func newProcess(processes *processes, subject Subject, errCh chan errProcess, pr // It will give the process the next available ID, and also add the // process to the processes map in the server structure. func (p process) spawnWorker(s *server) { - // Copy the configuration from Server - p.configuration = s.configuration // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one message queue. diff --git a/publisher.go b/publisher.go index 2e88a87..9474579 100644 --- a/publisher.go +++ b/publisher.go @@ -86,7 +86,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) // fmt.Printf("*** %#v\n", proc) proc.spawnWorker(s) diff --git a/server.go b/server.go index 8c54a1a..9a13f70 100644 --- a/server.go +++ b/server.go @@ -14,6 +14,7 @@ import ( type processName string +// Will return a process name made up of subjectName+processKind func processNameGet(sn subjectName, pk processKind) processName { pn := fmt.Sprintf("%s_%s", sn, pk) return processName(pn) diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index 442ea8e..67f23f5 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -201,12 +201,8 @@ func (m methodSubscriberSayHello) getKind() CommandOrEvent { } func (m methodSubscriberSayHello) handler(proc process, message Message, node string) ([]byte, error) { - //fmt.Printf("-- DEBUG 3.1: %#v, %#v, %#v\n\n", proc.subject.name(), proc.procFunc, proc.procFuncCh) - //pn := processNameGet(proc.subject.name(), processKindSubscriber) - //fmt.Printf("-- DEBUG 3.2: pn = %#v\n\n", pn) + log.Printf("<--- Received hello from %#v\n", message.FromNode) - // Since the handler is only called to handle a specific type of message we need - // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes // send the message to the procFuncCh which is running alongside the process // and can hold registries and handle special things for an individual process. diff --git a/subscribers.go b/subscribers.go index 52df85e..d9fdd83 100644 --- a/subscribers.go +++ b/subscribers.go @@ -11,7 +11,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommand, CommandACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -20,7 +20,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName) sub := newSubject(TextLogging, EventACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -29,7 +29,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) proc.procFuncCh = make(chan Message) proc.procFunc = func() error { sayHelloNodes := make(map[node]struct{}) @@ -57,7 +57,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) go proc.spawnWorker(s) } } diff --git a/textlogging.log b/textlogging.log deleted file mode 100644 index 3cf44bc..0000000 --- a/textlogging.log +++ /dev/null @@ -1,4 +0,0 @@ - -some message sent from a ship for testing -some message sent from a ship for testing -some message sent from a ship for testing