diff --git a/process.go b/process.go index 71a6359..9edece8 100644 --- a/process.go +++ b/process.go @@ -23,8 +23,10 @@ import ( type processKind string const ( - processKindSubscriber processKind = "subscriber" - processKindPublisher processKind = "publisher" + processKindSubscriberNats processKind = "subscriberNats" + processKindPublisherNats processKind = "publisherNats" + processKindConsumerJetstream processKind = "consumerJetstream" + processKindPublisherJetstream processKind = "publisherJetstream" ) // process holds all the logic to handle a message type and it's @@ -165,11 +167,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // process. We can do that since a process can only handle // one message queue. - if proc.processKind == processKindPublisher { - proc.processName = processNameGet(proc.subject.name(), processKindPublisher) + if proc.processKind == processKindPublisherNats { + proc.processName = processNameGet(proc.subject.name(), processKindPublisherNats) } - if proc.processKind == processKindSubscriber { - proc.processName = processNameGet(proc.subject.name(), processKindSubscriber) + if proc.processKind == processKindSubscriberNats { + proc.processName = processNameGet(proc.subject.name(), processKindSubscriberNats) } return proc @@ -191,13 +193,13 @@ func (p process) Start() { // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. - if p.processKind == processKindPublisher { - p.startPublisher() + if p.processKind == processKindPublisherNats { + p.startPublisherNats() } // Start a subscriber worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. - if p.processKind == processKindSubscriber { + if p.processKind == processKindSubscriberNats { p.startSubscriberNats() } @@ -210,7 +212,7 @@ func (p process) Start() { p.errorKernel.logDebug(er) } -func (p process) startPublisher() { +func (p process) startPublisherNats() { // If there is a procFunc for the process, start it. if p.procFunc != nil { // Initialize the channel for communication between the proc and @@ -228,7 +230,7 @@ func (p process) startPublisher() { }() } - go p.publishMessages(p.natsConn) + go p.publishMessagesNats(p.natsConn) } func (p process) startSubscriberNats() { @@ -741,7 +743,7 @@ func (p process) subscribeMessagesNats() *nats.Subscription { // publishMessages will do the publishing of messages for one single // process. The function should be run as a goroutine, and will run // as long as the process it belongs to is running. -func (p process) publishMessages(natsConn *nats.Conn) { +func (p process) publishMessagesNats(natsConn *nats.Conn) { var zEnc *zstd.Encoder // Prepare a zstd encoder so we can reuse the zstd encoder for all messages. @@ -797,7 +799,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { m.ArgSignature = p.addMethodArgSignature(m) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - go p.publishAMessage(m, zEnc, natsConn) + go p.publishAMessageNats(m, zEnc, natsConn) case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) @@ -814,7 +816,7 @@ func (p process) addMethodArgSignature(m Message) []byte { return sign } -func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) { +func (p process) publishAMessageNats(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) { // Create the initial header, and set values below depending on the // various configuration options chosen. natsMsgHeader := make(nats.Header) @@ -835,7 +837,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.C // Get the process name so we can look up the process in the // processes map, and increment the message counter. - pn := processNameGet(p.subject.name(), processKindPublisher) + pn := processNameGet(p.subject.name(), processKindPublisherNats) // Compress the data payload if selected with configuration flag. // The compression chosen is later set in the nats msg header when diff --git a/processes.go b/processes.go index c386b6e..ed3ffa5 100644 --- a/processes.go +++ b/processes.go @@ -382,7 +382,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p } fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) - proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) + proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriberNats) proc.procFunc = pf go proc.Start() @@ -394,7 +394,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr er := fmt.Errorf("starting %v publisher: %#v", m, p.node) p.errorKernel.logDebug(er) sub := newSubject(m, string(p.node)) - proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher) + proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisherNats) proc.procFunc = pf proc.isLongRunningPublisher = true diff --git a/requests_copy.go b/requests_copy.go index c7c93a5..ff71636 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -225,7 +225,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { // Create a new sub process that will do the actual file copying. - copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -333,7 +333,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { // previous message is then fully up and running, so we just discard // that second message in those cases. - pn := processNameGet(sub.name(), processKindSubscriber) + pn := processNameGet(sub.name(), processKindSubscriberNats) // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) proc.processes.active.mu.Lock() @@ -352,7 +352,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { } // Create a new sub process that will do the actual file copying. - copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber) + copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. diff --git a/requests_operator.go b/requests_operator.go index 378d715..70cc190 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -68,7 +68,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) + procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriberNats) go procNew.Start() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) diff --git a/server.go b/server.go index 9e24e5b..70aa666 100644 --- a/server.go +++ b/server.go @@ -415,7 +415,7 @@ func (s *server) directSAMSChRead() { // Range over all the sams, find the process, check if the method exists, and // handle the message by starting the correct method handler. for i := range sams { - processName := processNameGet(sams[i].Subject.name(), processKindSubscriber) + processName := processNameGet(sams[i].Subject.name(), processKindSubscriberNats) s.processes.active.mu.Lock() p := s.processes.active.procNames[processName] @@ -516,7 +516,7 @@ func (s *server) routeMessagesToProcess() { m := sam.Message subjName := sam.Subject.name() - pn := processNameGet(subjName, processKindPublisher) + pn := processNameGet(subjName, processKindPublisherNats) sendOK := func() bool { var ctxCanceled bool @@ -572,9 +572,9 @@ func (s *server) routeMessagesToProcess() { var proc process switch { case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher) + proc = newSubProcess(s.ctx, s, sub, processKindPublisherNats) default: - proc = newProcess(s.ctx, s, sub, processKindPublisher) + proc = newProcess(s.ctx, s, sub, processKindPublisherNats) } proc.Start()