1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

fixed block when reading startup folder

This commit is contained in:
postmannen 2022-09-08 07:37:44 +02:00
parent 3cf8bd2ce8
commit 0ad2763ad1
2 changed files with 37 additions and 68 deletions

View file

@ -36,8 +36,12 @@ func (s *server) readStartupFolder() {
return return
} }
for _, fp := range filePaths {
fmt.Printf("info: ranging filepaths, current filePath contains: %v\n", fp)
}
for _, filePath := range filePaths { for _, filePath := range filePaths {
fmt.Printf("DEBUGDEBUGDEBUGDEBUGDEBUGDEBUGDEBUG: %v\n", filePath) fmt.Printf("info: reading and working on file from startup folder %v\n", filePath)
// Read the content of each file. // Read the content of each file.
readBytes, err := func(filePath string) ([]byte, error) { readBytes, err := func(filePath string) ([]byte, error) {
@ -76,52 +80,22 @@ func (s *server) readStartupFolder() {
for i := range sams { for i := range sams {
if sams[i].Message.FromNode == "" { if sams[i].Message.FromNode == "" {
sams = append(sams[:i], sams[i+1:]...) sams = append(sams[:i], sams[i+1:]...)
er := fmt.Errorf(" error: missing from field in startup message") er := fmt.Errorf(" error: missing fromNode field in startup message, discaring message")
s.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
} }
// Bounds check. // NB: REMOVED CODE!
if i == len(sams)-1 { // // Bounds check.
break // if i == len(sams)-1 {
} // fmt.Printf(" *** DEBUG: HIT BOUNDS CHECK, breaking out\n")
// break
// }
} }
// Send the SAM struct to be picked up by the ring buffer.
// s.ringBufferBulkInCh <- sams
// ---
//// Range over all the sams, find the process, check if the method exists, and
//// handle the message by starting the correct method handler.
//for i := range sams {
// processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
//
// s.processes.active.mu.Lock()
// p := s.processes.active.procNames[processName]
// s.processes.active.mu.Unlock()
//
// mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
// if !ok {
// er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
// p.errorKernel.errSend(p, sams[i].Message, er)
// continue
// }
//
// p.handler = mh.handler
//
// //_, err = mh.handler(p, sams[i].Message, s.nodeName)
// //if err != nil {
// // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
// // p.errorKernel.errSend(p, sams[i].Message, er)
// // continue
// //}
//
// executeHandler(p, sams[i].Message, s.nodeName)
//}
s.directSAMSCh <- sams s.directSAMSCh <- sams
} }
} }
// getFilePaths will get the names of all the messages in // getFilePaths will get the names of all the messages in
@ -129,7 +103,6 @@ func (s *server) readStartupFolder() {
func (s *server) getFilePaths(dirName string) ([]string, error) { func (s *server) getFilePaths(dirName string) ([]string, error) {
dirPath, err := os.Executable() dirPath, err := os.Executable()
dirPath = filepath.Dir(dirPath) dirPath = filepath.Dir(dirPath)
fmt.Printf(" * DEBUG: dirPath=%v\n", dirPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err) return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err)
} }

View file

@ -326,37 +326,33 @@ func (s *server) Start() {
// without sending them via the message broker. // without sending them via the message broker.
func (s *server) directSAMSChRead() { func (s *server) directSAMSChRead() {
go func() { go func() {
select { for {
case <-s.ctx.Done(): select {
log.Printf("info: stopped the directSAMSCh reader\n\n") case <-s.ctx.Done():
return log.Printf("info: stopped the directSAMSCh reader\n\n")
case sams := <-s.directSAMSCh: return
// Range over all the sams, find the process, check if the method exists, and case sams := <-s.directSAMSCh:
// handle the message by starting the correct method handler. fmt.Printf(" * DEBUG: directSAMSChRead: <- sams = %v\n", sams)
for i := range sams { // Range over all the sams, find the process, check if the method exists, and
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) // handle the message by starting the correct method handler.
for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
s.processes.active.mu.Lock() s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName] p := s.processes.active.procNames[processName]
s.processes.active.mu.Unlock() s.processes.active.mu.Unlock()
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
p.errorKernel.errSend(p, sams[i].Message, er) p.errorKernel.errSend(p, sams[i].Message, er)
continue continue
}
p.handler = mh.handler
go executeHandler(p, sams[i].Message, s.nodeName)
} }
p.handler = mh.handler
//_, err = mh.handler(p, sams[i].Message, s.nodeName)
//if err != nil {
// er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
// p.errorKernel.errSend(p, sams[i].Message, er)
// continue
//}
executeHandler(p, sams[i].Message, s.nodeName)
} }
} }
}() }()