1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed publisher channel on subject, and messages to publish are now directly published from the newMessagesCh

This commit is contained in:
postmannen 2024-12-01 02:15:53 +01:00
parent 5776baad37
commit 9fca6d0b7f
4 changed files with 29 additions and 190 deletions

View file

@ -101,11 +101,6 @@ type Subject struct {
ToNode string `json:"node" yaml:"toNode"`
// method, what is this message doing, etc. CLICommand, Syslog, etc.
Method Method `json:"method" yaml:"method"`
// messageCh is used by publisher kind processes to read new messages
// to be published. The content on this channel have been routed here
// from routeMessagesToPublish in *server.
// This channel is only used for publishing processes.
publishMessageCh chan Message
}
// newSubject will return a new variable of the type subject, and insert
@ -124,9 +119,8 @@ func newSubject(method Method, node string) Subject {
}
return Subject{
ToNode: node,
Method: method,
publishMessageCh: make(chan Message),
ToNode: node,
Method: method,
}
}
@ -139,9 +133,8 @@ func newSubjectNoVerifyHandler(method Method, node string) Subject {
// Get the Event type for the Method.
return Subject{
ToNode: node,
Method: method,
publishMessageCh: make(chan Message),
ToNode: node,
Method: method,
}
}

View file

@ -688,9 +688,8 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
}
sub := Subject{
ToNode: string(m.ToNode),
Method: m.Method,
publishMessageCh: make(chan Message),
ToNode: string(m.ToNode),
Method: m.Method,
}
sam := subjectAndMessage{

View file

@ -176,12 +176,6 @@ func (p process) start() {
p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)})
}
// Start a publisher worker, which will start a go routine (process)
// to handle publishing of the messages for the subject it owns.
if p.processKind == processKindPublisher {
p.startPublisher()
}
// Start a subscriber worker, which will start a go routine (process)
// to handle executing the request method defined in the message.
if p.processKind == processKindSubscriber {
@ -197,28 +191,6 @@ func (p process) start() {
p.errorKernel.logDebug(er)
}
// startPublisher.
func (p process) startPublisher() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logError)
}
}()
}
go p.publishMessages(p.natsConn)
}
func (p process) startSubscriber() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
@ -276,15 +248,17 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
message.RetryWait = 0
}
subject := newSubject(message.Method, string(message.ToNode))
// The for loop will run until the message is delivered successfully,
// or that retries are reached.
for {
msg := &nats.Msg{
Subject: string(p.subject.name()),
Subject: string(subject.name()),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"),
// Structure of the reply message are:
// <nodename>.<message type>.<method>.reply
Reply: fmt.Sprintf("%s.reply", p.subject.name()),
Reply: fmt.Sprintf("%s.reply", subject.name()),
Data: natsMsgPayload,
Header: natsMsgHeader,
}
@ -384,7 +358,7 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
switch {
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, subject.name(), err)
p.errorKernel.logDebug(er)
time.Sleep(time.Second * time.Duration(message.RetryWait))
@ -393,13 +367,13 @@ func (p process) publishNats(natsMsgPayload []byte, natsMsgHeader nats.Header, n
return ErrACKSubscribeRetry
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", subject.name(), err)
p.errorKernel.logDebug(er)
return er
default:
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", p.subject.name(), err)
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update ctrl to handle the new error type: subject=%v: %v", subject.name(), err)
p.errorKernel.logDebug(er)
return er
@ -711,60 +685,6 @@ func (p process) startNatsSubscriber() *nats.Subscription {
return natsSubscription
}
// publishMessages will do the publishing of messages for one single
// process. The function should be run as a goroutine, and will run
// as long as the process it belongs to is running.
func (p process) publishMessages(natsConn *nats.Conn) {
// Adding a timer that will be used for when to remove the sub process
// publisher. The timer is reset each time a message is published with
// the process, so the sub process publisher will not be removed until
// it have not received any messages for the given amount of time.
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
defer ticker.Stop()
for {
// Wait and read the next message on the message channel, or
// exit this function if Cancel are received via ctx.
select {
case <-ticker.C:
// If it is a long running publisher we don't want to cancel it.
if p.isLongRunningPublisher {
continue
}
// We only want to remove subprocesses
// REMOVED 120123: Removed if so all publishers should be canceled if inactive.
//if p.isSubProcess {
p.processes.active.mu.Lock()
p.ctxCancel()
delete(p.processes.active.procNames, p.processName)
p.processes.active.mu.Unlock()
er := fmt.Errorf("info: canceled publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
p.errorKernel.logDebug(er)
return
//}
case m := <-p.subject.publishMessageCh:
ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
// Sign the methodArgs, and add the signature to the message.
m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
go p.publishAMessage(m, natsConn)
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
p.errorKernel.logDebug(er)
return
}
}
}
func (p process) addMethodArgSignature(m Message) []byte {
argsString := argsToString(m.MethodArgs)
sign := ed25519.Sign(p.nodeAuth.SignPrivateKey, []byte(argsString))

105
server.go
View file

@ -508,114 +508,41 @@ func (s *server) routeMessagesToPublisherProcess() {
methodsAvailable := method.GetMethodsAvailable()
go func() {
for sam := range s.newMessagesCh {
for sam1 := range s.newMessagesCh {
message := sam1.Message
go func(sam subjectAndMessage) {
go func(message Message) {
s.messageID.mu.Lock()
s.messageID.id++
sam.Message.ID = s.messageID.id
message.ID = s.messageID.id
s.messageID.mu.Unlock()
s.metrics.promMessagesProcessedIDLast.Set(float64(sam.Message.ID))
s.metrics.promMessagesProcessedIDLast.Set(float64(message.ID))
// Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er, logError)
if _, ok := methodsAvailable.CheckIfExists(message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", message.Method)
s.errorKernel.errSend(s.processInitial, message, er, logError)
return
}
switch {
case sam.Message.Retries < 0:
sam.Message.Retries = s.configuration.DefaultMessageRetries
case message.Retries < 0:
message.Retries = s.configuration.DefaultMessageRetries
}
if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 {
sam.Message.MethodTimeout = s.configuration.DefaultMethodTimeout
if message.MethodTimeout < 1 && message.MethodTimeout != -1 {
message.MethodTimeout = s.configuration.DefaultMethodTimeout
}
// ---
m := sam.Message
message.ArgSignature = s.processInitial.addMethodArgSignature(message)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
go s.processInitial.publishAMessage(message, s.natsConn)
sendOK := func() bool {
var ctxCanceled bool
s.processes.active.mu.Lock()
defer s.processes.active.mu.Unlock()
// Check if the process exist, if it do not exist return false so a
// new publisher process will be created.
proc, ok := s.processes.active.procNames[pn]
if !ok {
return false
}
if proc.ctx.Err() != nil {
ctxCanceled = true
}
if ok && ctxCanceled {
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
s.errorKernel.logDebug(er)
delete(proc.processes.active.procNames, proc.processName)
return false
}
// If found in map above, and go routine for publishing is running,
// put the message on that processes incomming message channel.
if ok && !ctxCanceled {
select {
case proc.subject.publishMessageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logDebug(er)
}
return true
}
// The process was not found, so we return false here so a new publisher
// process will be created later.
return false
}()
if sendOK {
return
}
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
s.errorKernel.logDebug(er)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher)
}
proc.start()
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logDebug(er)
// Now when the process is spawned we continue,
// and send the message to that new process.
select {
case proc.subject.publishMessageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logDebug(er)
}
}(sam)
}(message)
}
}()