From 9fca6d0b7f1e4b24e0230a701b5f65f68b22aba5 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sun, 1 Dec 2024 02:15:53 +0100 Subject: [PATCH] removed publisher channel on subject, and messages to publish are now directly published from the newMessagesCh --- message_and_subject.go | 15 ++---- message_readers.go | 5 +- process.go | 94 +++--------------------------------- server.go | 105 +++++++---------------------------------- 4 files changed, 29 insertions(+), 190 deletions(-) diff --git a/message_and_subject.go b/message_and_subject.go index ecc827b..9949a2b 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -101,11 +101,6 @@ type Subject struct { ToNode string `json:"node" yaml:"toNode"` // method, what is this message doing, etc. CLICommand, Syslog, etc. Method Method `json:"method" yaml:"method"` - // messageCh is used by publisher kind processes to read new messages - // to be published. The content on this channel have been routed here - // from routeMessagesToPublish in *server. - // This channel is only used for publishing processes. - publishMessageCh chan Message } // newSubject will return a new variable of the type subject, and insert @@ -124,9 +119,8 @@ func newSubject(method Method, node string) Subject { } return Subject{ - ToNode: node, - Method: method, - publishMessageCh: make(chan Message), + ToNode: node, + Method: method, } } @@ -139,9 +133,8 @@ func newSubjectNoVerifyHandler(method Method, node string) Subject { // Get the Event type for the Method. return Subject{ - ToNode: node, - Method: method, - publishMessageCh: make(chan Message), + ToNode: node, + Method: method, } } diff --git a/message_readers.go b/message_readers.go index 50b3948..223d6a7 100644 --- a/message_readers.go +++ b/message_readers.go @@ -688,9 +688,8 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) { } sub := Subject{ - ToNode: string(m.ToNode), - Method: m.Method, - publishMessageCh: make(chan Message), + ToNode: string(m.ToNode), + Method: m.Method, } sam := subjectAndMessage{ diff --git a/process.go b/process.go index 42e9d49..7b7095c 100644 --- a/process.go +++ b/process.go @@ -176,12 +176,6 @@ func (p process) start() { p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)}) } - // Start a publisher worker, which will start a go routine (process) - // to handle publishing of the messages for the subject it owns. - if p.processKind == processKindPublisher { - p.startPublisher() - } - // Start a subscriber worker, which will start a go routine (process) // to handle executing the request method defined in the message. if p.processKind == processKindSubscriber { @@ -197,28 +191,6 @@ func (p process) start() { p.errorKernel.logDebug(er) } -// startPublisher. -func (p process) startPublisher() { - // If there is a procFunc for the process, start it. - if p.procFunc != nil { - // Initialize the channel for communication between the proc and - // the procFunc. - p.procFuncCh = make(chan Message) - - // Start the procFunc in it's own anonymous func so we are able - // to get the return error. - go func() { - err := p.procFunc(p.ctx, p.procFuncCh) - if err != nil { - er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logError) - } - }() - } - - go p.publishMessages(p.natsConn) -} - func (p process) startSubscriber() { // If there is a procFunc for the process, start it. if p.procFunc != nil { @@ -276,15 +248,17 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n message.RetryWait = 0 } + subject := newSubject(message.Method, string(message.ToNode)) + // The for loop will run until the message is delivered successfully, // or that retries are reached. for { msg := &nats.Msg{ - Subject: string(p.subject.name()), + Subject: string(subject.name()), // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"), // Structure of the reply message are: // ...reply - Reply: fmt.Sprintf("%s.reply", p.subject.name()), + Reply: fmt.Sprintf("%s.reply", subject.name()), Data: natsMsgPayload, Header: natsMsgHeader, } @@ -384,7 +358,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n switch { case err == nats.ErrNoResponders || err == nats.ErrTimeout: - er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, subject.name(), err) p.errorKernel.logDebug(er) time.Sleep(time.Second * time.Duration(message.RetryWait)) @@ -393,13 +367,13 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n return ErrACKSubscribeRetry case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: - er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", subject.name(), err) p.errorKernel.logDebug(er) return er default: - er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", p.subject.name(), err) + er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", subject.name(), err) p.errorKernel.logDebug(er) return er @@ -711,60 +685,6 @@ func (p process) startNatsSubscriber() *nats.Subscription { return natsSubscription } -// publishMessages will do the publishing of messages for one single -// process. The function should be run as a goroutine, and will run -// as long as the process it belongs to is running. -func (p process) publishMessages(natsConn *nats.Conn) { - - // Adding a timer that will be used for when to remove the sub process - // publisher. The timer is reset each time a message is published with - // the process, so the sub process publisher will not be removed until - // it have not received any messages for the given amount of time. - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) - defer ticker.Stop() - - for { - - // Wait and read the next message on the message channel, or - // exit this function if Cancel are received via ctx. - select { - case <-ticker.C: - // If it is a long running publisher we don't want to cancel it. - if p.isLongRunningPublisher { - continue - } - - // We only want to remove subprocesses - // REMOVED 120123: Removed if so all publishers should be canceled if inactive. - //if p.isSubProcess { - p.processes.active.mu.Lock() - p.ctxCancel() - delete(p.processes.active.procNames, p.processName) - p.processes.active.mu.Unlock() - - er := fmt.Errorf("info: canceled publisher: %v", p.processName) - //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) - - return - //} - - case m := <-p.subject.publishMessageCh: - ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) - // Sign the methodArgs, and add the signature to the message. - m.ArgSignature = p.addMethodArgSignature(m) - // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - - go p.publishAMessage(m, natsConn) - case <-p.ctx.Done(): - er := fmt.Errorf("info: canceling publisher: %v", p.processName) - //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) - p.errorKernel.logDebug(er) - return - } - } -} - func (p process) addMethodArgSignature(m Message) []byte { argsString := argsToString(m.MethodArgs) sign := ed25519.Sign(p.nodeAuth.SignPrivateKey, []byte(argsString)) diff --git a/server.go b/server.go index d942706..f1c4a6d 100644 --- a/server.go +++ b/server.go @@ -508,114 +508,41 @@ func (s *server) routeMessagesToPublisherProcess() { methodsAvailable := method.GetMethodsAvailable() go func() { - for sam := range s.newMessagesCh { + for sam1 := range s.newMessagesCh { + message := sam1.Message - go func(sam subjectAndMessage) { + go func(message Message) { s.messageID.mu.Lock() s.messageID.id++ - sam.Message.ID = s.messageID.id + message.ID = s.messageID.id s.messageID.mu.Unlock() - s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID)) + s.metrics.promMessagesProcessedIDLast.Set(float64(message.ID)) // Check if the format of the message is correct. - if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er, logError) + if _, ok := methodsAvailable.CheckIfExists(message.Method); !ok { + er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", message.Method) + s.errorKernel.errSend(s.processInitial, message, er, logError) return } switch { - case sam.Message.Retries < 0: - sam.Message.Retries = s.configuration.DefaultMessageRetries + case message.Retries < 0: + message.Retries = s.configuration.DefaultMessageRetries } - if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 { - sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout + if message.MethodTimeout < 1 && message.MethodTimeout != -1 { + message.MethodTimeout = s.configuration.DefaultMethodTimeout } // --- - m := sam.Message + message.ArgSignature = s.processInitial.addMethodArgSignature(message) + // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - subjName := sam.Subject.name() - pn := processNameGet(subjName, processKindPublisher) + go s.processInitial.publishAMessage(message, s.natsConn) - sendOK := func() bool { - var ctxCanceled bool - - s.processes.active.mu.Lock() - defer s.processes.active.mu.Unlock() - - // Check if the process exist, if it do not exist return false so a - // new publisher process will be created. - proc, ok := s.processes.active.procNames[pn] - if !ok { - return false - } - - if proc.ctx.Err() != nil { - ctxCanceled = true - } - if ok && ctxCanceled { - er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName) - s.errorKernel.logDebug(er) - delete(proc.processes.active.procNames, proc.processName) - return false - } - - // If found in map above, and go routine for publishing is running, - // put the message on that processes incomming message channel. - if ok && !ctxCanceled { - select { - case proc.subject.publishMessageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er) - } - - return true - } - - // The process was not found, so we return false here so a new publisher - // process will be created later. - return false - }() - - if sendOK { - return - } - - er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName) - s.errorKernel.logDebug(er) - - sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - var proc process - switch { - case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher) - default: - proc = newProcess(s.ctx, s, sub, processKindPublisher) - } - - proc.start() - er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) - s.errorKernel.logDebug(er) - - // Now when the process is spawned we continue, - // and send the message to that new process. - select { - case proc.subject.publishMessageCh <- m: - er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) - s.errorKernel.logDebug(er) - case <-proc.ctx.Done(): - er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName) - s.errorKernel.logDebug(er) - } - - }(sam) + }(message) } }()