diff --git a/message_readers.go b/message_readers.go index f8f6be8..2806c8b 100644 --- a/message_readers.go +++ b/message_readers.go @@ -23,6 +23,9 @@ import ( // for this is that all replies normally pick up the host from the original // first message, but here we inject it on an end node so we need to specify // the fromNode to get the reply back to the node we want. +// +// Messages read from the startup folder will be directly called by the handler +// locally, and the message will not be sent via the nats-server. func (s *server) readStartupFolder() { // Get the names of all the files in the startup folder. @@ -84,7 +87,32 @@ func (s *server) readStartupFolder() { } // Send the SAM struct to be picked up by the ring buffer. - s.ringBufferBulkInCh <- sams + // s.ringBufferBulkInCh <- sams + + // --- + + // Range over all the sams, find the process, check if the method exists, and + // handle the message by starting the correct method handler. + for i := range sams { + processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) + + s.processes.active.mu.Lock() + p := s.processes.active.procNames[processName] + s.processes.active.mu.Unlock() + + mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) + if !ok { + er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) + p.processes.errorKernel.errSend(p, sams[i].Message, er) + } + + _, err = mh.handler(p, sams[i].Message, s.nodeName) + + if err != nil { + er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + p.processes.errorKernel.errSend(p, sams[i].Message, er) + } + } } }