1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 12:59:15 +00:00

naming, and comments

This commit is contained in:
postmannen 2021-02-17 16:58:51 +01:00
parent 71059ba97d
commit 529cc48440
2 changed files with 17 additions and 9 deletions

Binary file not shown.

View file

@ -139,13 +139,15 @@ func (s *server) printProcessesMap() {
// handleNewOperatorMessages will handle all the new operator messages // handleNewOperatorMessages will handle all the new operator messages
// given to the system, and route them to the correct subject queue. // given to the system, and route them to the correct subject queue.
// It will also handle the process of spawning more worker processes
// for publisher subjects if it does not exist.
func (s *server) handleMessagesInRingbuffer() { func (s *server) handleMessagesInRingbuffer() {
// Prepare and start a new ring buffer // Prepare and start a new ring buffer
const bufferSize int = 1000 const bufferSize int = 1000
rb := newringBuffer(bufferSize) rb := newringBuffer(bufferSize)
inCh := make(chan subjectAndMessage) inCh := make(chan subjectAndMessage)
outCh := make(chan samDBValue) ringBufferOutCh := make(chan samDBValue)
rb.start(inCh, outCh) rb.start(inCh, ringBufferOutCh)
// Start reading new messages received on the incomming message // Start reading new messages received on the incomming message
// pipe requested by operator, and fill them into the buffer. // pipe requested by operator, and fill them into the buffer.
@ -162,7 +164,7 @@ func (s *server) handleMessagesInRingbuffer() {
// send if there are a specific subject for it, and no subject // send if there are a specific subject for it, and no subject
// exist throw an error. // exist throw an error.
go func() { go func() {
for samTmp := range outCh { for samTmp := range ringBufferOutCh {
sam := samTmp.Data sam := samTmp.Data
// Check if the format of the message is correct. // Check if the format of the message is correct.
// TODO: Send a message to the error kernel here that // TODO: Send a message to the error kernel here that
@ -175,20 +177,25 @@ func (s *server) handleMessagesInRingbuffer() {
continue continue
} }
redo:
// Adding a label here so we are able to redo the sending // Adding a label here so we are able to redo the sending
// of the last message if a process with specified subject // of the last message if a process with specified subject
// is not present. The process will then be created, and // is not present. The process will then be created, and
// the code will loop back to the redo: label. // the code will loop back to the redo: label.
redo:
m := sam.Message m := sam.Message
subjName := sam.Subject.name() subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName] _, ok := s.processes[subjName]
// Are there already a process for that subject, put the
// message on that processes incomming message channel.
if ok { if ok {
log.Printf("info: found the specific subject: %v\n", subjName) log.Printf("info: found the specific subject: %v\n", subjName)
// Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m s.processes[subjName].subject.messageCh <- m
// If no process to handle the specific subject exist,
// the we create and spawn one.
} else { } else {
// If a publisher do not exist for the given subject, create it, and // If a publisher do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message. // by using the goto at the end redo the process for this specific message.
@ -201,7 +208,8 @@ func (s *server) handleMessagesInRingbuffer() {
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
s.printProcessesMap() s.printProcessesMap()
// Now when the process is spawned we jump back to the redo: label. // Now when the process is spawned we jump back to the redo: label,
// and send the message to that new process.
goto redo goto redo
} }
} }
@ -288,9 +296,9 @@ func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, proce
return proc return proc
} }
// spawnProcess will spawn a new process. It will give the process // processSpawnWorker will spawn a new process. It will give the
// the next available ID, and also add the process to the processes // process the next available ID, and also add the process to the
// map. // processes map.
func (s *server) processSpawnWorker(proc process) { func (s *server) processSpawnWorker(proc process) {
s.mu.Lock() s.mu.Lock()
// We use the full name of the subject to identify a unique // We use the full name of the subject to identify a unique