diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 0c1eac2..1eba085 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/publisher.go b/publisher.go index 7c451a1..0cf6433 100644 --- a/publisher.go +++ b/publisher.go @@ -139,13 +139,15 @@ func (s *server) printProcessesMap() { // handleNewOperatorMessages will handle all the new operator messages // given to the system, and route them to the correct subject queue. +// It will also handle the process of spawning more worker processes +// for publisher subjects if it does not exist. func (s *server) handleMessagesInRingbuffer() { // Prepare and start a new ring buffer const bufferSize int = 1000 rb := newringBuffer(bufferSize) inCh := make(chan subjectAndMessage) - outCh := make(chan samDBValue) - rb.start(inCh, outCh) + ringBufferOutCh := make(chan samDBValue) + rb.start(inCh, ringBufferOutCh) // Start reading new messages received on the incomming message // pipe requested by operator, and fill them into the buffer. @@ -162,7 +164,7 @@ func (s *server) handleMessagesInRingbuffer() { // send if there are a specific subject for it, and no subject // exist throw an error. go func() { - for samTmp := range outCh { + for samTmp := range ringBufferOutCh { sam := samTmp.Data // Check if the format of the message is correct. // TODO: Send a message to the error kernel here that @@ -175,20 +177,25 @@ func (s *server) handleMessagesInRingbuffer() { continue } + redo: // Adding a label here so we are able to redo the sending // of the last message if a process with specified subject // is not present. The process will then be created, and // the code will loop back to the redo: label. - redo: m := sam.Message subjName := sam.Subject.name() // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) _, ok := s.processes[subjName] + + // Are there already a process for that subject, put the + // message on that processes incomming message channel. if ok { log.Printf("info: found the specific subject: %v\n", subjName) - // Put the message on the correct process's messageCh s.processes[subjName].subject.messageCh <- m + + // If no process to handle the specific subject exist, + // the we create and spawn one. } else { // If a publisher do not exist for the given subject, create it, and // by using the goto at the end redo the process for this specific message. @@ -201,7 +208,8 @@ func (s *server) handleMessagesInRingbuffer() { time.Sleep(time.Millisecond * 500) s.printProcessesMap() - // Now when the process is spawned we jump back to the redo: label. + // Now when the process is spawned we jump back to the redo: label, + // and send the message to that new process. goto redo } } @@ -288,9 +296,9 @@ func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, proce return proc } -// spawnProcess will spawn a new process. It will give the process -// the next available ID, and also add the process to the processes -// map. +// processSpawnWorker will spawn a new process. It will give the +// process the next available ID, and also add the process to the +// processes map. func (s *server) processSpawnWorker(proc process) { s.mu.Lock() // We use the full name of the subject to identify a unique