1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

implemented ch for handling messages locally

This commit is contained in:
postmannen 2022-06-20 14:22:42 +02:00
parent c8e5dd6904
commit 2688f2ba2c
2 changed files with 75 additions and 26 deletions

View file

@ -92,33 +92,35 @@ func (s *server) readStartupFolder() {
// ---
// Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler.
for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
//// Range over all the sams, find the process, check if the method exists, and
//// handle the message by starting the correct method handler.
//for i := range sams {
// processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
//
// s.processes.active.mu.Lock()
// p := s.processes.active.procNames[processName]
// s.processes.active.mu.Unlock()
//
// mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
// if !ok {
// er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
// p.errorKernel.errSend(p, sams[i].Message, er)
// continue
// }
//
// p.handler = mh.handler
//
// //_, err = mh.handler(p, sams[i].Message, s.nodeName)
// //if err != nil {
// // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
// // p.errorKernel.errSend(p, sams[i].Message, er)
// // continue
// //}
//
// executeHandler(p, sams[i].Message, s.nodeName)
//}
s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName]
s.processes.active.mu.Unlock()
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
p.errorKernel.errSend(p, sams[i].Message, er)
continue
}
p.handler = mh.handler
//_, err = mh.handler(p, sams[i].Message, s.nodeName)
//if err != nil {
// er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
// p.errorKernel.errSend(p, sams[i].Message, er)
// continue
//}
executeHandler(p, sams[i].Message, s.nodeName)
}
s.directSAMSCh <- sams
}
}

View file

@ -46,6 +46,8 @@ type server struct {
// In general the ringbuffer will read this
// channel, unfold each slice, and put single messages on the buffer.
toRingBufferCh chan []subjectAndMessage
// directSAMSCh
directSAMSCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
@ -164,6 +166,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
natsConn: conn,
StewardSocket: stewardSocket,
toRingBufferCh: make(chan []subjectAndMessage),
directSAMSCh: make(chan []subjectAndMessage),
metrics: metrics,
version: version,
tui: tuiClient,
@ -309,11 +312,55 @@ func (s *server) Start() {
// so we can cancel this context last, and not use the server.
s.routeMessagesToProcess("./incomingBuffer.db")
// Start reading the channel for injecting direct messages that should
// not be sent via the message broker.
s.directSAMSChRead()
// Check and enable read the messages specified in the startup folder.
s.readStartupFolder()
}
// directSAMSChRead for injecting messages directly in to the local system
// without sending them via the message broker.
func (s *server) directSAMSChRead() {
go func() {
select {
case <-s.ctx.Done():
log.Printf("info: stopped the directSAMSCh reader\n\n")
return
case sams := <-s.directSAMSCh:
// Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler.
for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName]
s.processes.active.mu.Unlock()
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
p.errorKernel.errSend(p, sams[i].Message, er)
continue
}
p.handler = mh.handler
//_, err = mh.handler(p, sams[i].Message, s.nodeName)
//if err != nil {
// er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
// p.errorKernel.errSend(p, sams[i].Message, er)
// continue
//}
executeHandler(p, sams[i].Message, s.nodeName)
}
}
}()
}
// Will stop all processes started during startup.
func (s *server) Stop() {
// Stop the started pub/sub message processes.