From 0ad2763ad1f7b0da3e96368570ff71cac6c5ba4b Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 8 Sep 2022 07:37:44 +0200 Subject: [PATCH] fixed block when reading startup folder --- message_readers.go | 53 ++++++++++++---------------------------------- server.go | 52 +++++++++++++++++++++------------------------ 2 files changed, 37 insertions(+), 68 deletions(-) diff --git a/message_readers.go b/message_readers.go index af1ffec..f012082 100644 --- a/message_readers.go +++ b/message_readers.go @@ -36,8 +36,12 @@ func (s *server) readStartupFolder() { return } + for _, fp := range filePaths { + fmt.Printf("info: ranging filepaths, current filePath contains: %v\n", fp) + } + for _, filePath := range filePaths { - fmt.Printf("DEBUGDEBUGDEBUGDEBUGDEBUGDEBUGDEBUG: %v\n", filePath) + fmt.Printf("info: reading and working on file from startup folder %v\n", filePath) // Read the content of each file. readBytes, err := func(filePath string) ([]byte, error) { @@ -76,52 +80,22 @@ func (s *server) readStartupFolder() { for i := range sams { if sams[i].Message.FromNode == "" { sams = append(sams[:i], sams[i+1:]...) - er := fmt.Errorf(" error: missing from field in startup message") + er := fmt.Errorf(" error: missing fromNode field in startup message, discaring message") s.errorKernel.errSend(s.processInitial, Message{}, er) } - // Bounds check. - if i == len(sams)-1 { - break - } + // NB: REMOVED CODE! + // // Bounds check. + // if i == len(sams)-1 { + // fmt.Printf(" *** DEBUG: HIT BOUNDS CHECK, breaking out\n") + // break + // } } - // Send the SAM struct to be picked up by the ring buffer. - // s.ringBufferBulkInCh <- sams - - // --- - - //// Range over all the sams, find the process, check if the method exists, and - //// handle the message by starting the correct method handler. - //for i := range sams { - // processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) - // - // s.processes.active.mu.Lock() - // p := s.processes.active.procNames[processName] - // s.processes.active.mu.Unlock() - // - // mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) - // if !ok { - // er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - // p.errorKernel.errSend(p, sams[i].Message, er) - // continue - // } - // - // p.handler = mh.handler - // - // //_, err = mh.handler(p, sams[i].Message, s.nodeName) - // //if err != nil { - // // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - // // p.errorKernel.errSend(p, sams[i].Message, er) - // // continue - // //} - // - // executeHandler(p, sams[i].Message, s.nodeName) - //} - s.directSAMSCh <- sams } + } // getFilePaths will get the names of all the messages in @@ -129,7 +103,6 @@ func (s *server) readStartupFolder() { func (s *server) getFilePaths(dirName string) ([]string, error) { dirPath, err := os.Executable() dirPath = filepath.Dir(dirPath) - fmt.Printf(" * DEBUG: dirPath=%v\n", dirPath) if err != nil { return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err) } diff --git a/server.go b/server.go index 3a7dbbf..47aefdf 100644 --- a/server.go +++ b/server.go @@ -326,37 +326,33 @@ func (s *server) Start() { // without sending them via the message broker. func (s *server) directSAMSChRead() { go func() { - select { - case <-s.ctx.Done(): - log.Printf("info: stopped the directSAMSCh reader\n\n") - return - case sams := <-s.directSAMSCh: - // Range over all the sams, find the process, check if the method exists, and - // handle the message by starting the correct method handler. - for i := range sams { - processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) + for { + select { + case <-s.ctx.Done(): + log.Printf("info: stopped the directSAMSCh reader\n\n") + return + case sams := <-s.directSAMSCh: + fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams) + // Range over all the sams, find the process, check if the method exists, and + // handle the message by starting the correct method handler. + for i := range sams { + processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) - s.processes.active.mu.Lock() - p := s.processes.active.procNames[processName] - s.processes.active.mu.Unlock() + s.processes.active.mu.Lock() + p := s.processes.active.procNames[processName] + s.processes.active.mu.Unlock() - mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) - if !ok { - er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - p.errorKernel.errSend(p, sams[i].Message, er) - continue + mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) + if !ok { + er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) + p.errorKernel.errSend(p, sams[i].Message, er) + continue + } + + p.handler = mh.handler + + go executeHandler(p, sams[i].Message, s.nodeName) } - - p.handler = mh.handler - - //_, err = mh.handler(p, sams[i].Message, s.nodeName) - //if err != nil { - // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - // p.errorKernel.errSend(p, sams[i].Message, er) - // continue - //} - - executeHandler(p, sams[i].Message, s.nodeName) } } }()