1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 23:06:47 +00:00

renamed subject.messageCh to subject.publishMessageCh

This commit is contained in:
postmannen 2024-11-29 13:32:55 +01:00
parent c2031a66e7
commit 3ff67a5f1f
4 changed files with 13 additions and 13 deletions

View file

@ -105,7 +105,7 @@ type Subject struct {
// to be published. The content on this channel have been routed here // to be published. The content on this channel have been routed here
// from routeMessagesToPublish in *server. // from routeMessagesToPublish in *server.
// This channel is only used for publishing processes. // This channel is only used for publishing processes.
messageCh chan Message publishMessageCh chan Message
} }
// newSubject will return a new variable of the type subject, and insert // newSubject will return a new variable of the type subject, and insert
@ -126,7 +126,7 @@ func newSubject(method Method, node string) Subject {
return Subject{ return Subject{
ToNode: node, ToNode: node,
Method: method, Method: method,
messageCh: make(chan Message), publishMessageCh: make(chan Message),
} }
} }
@ -141,7 +141,7 @@ func newSubjectNoVerifyHandler(method Method, node string) Subject {
return Subject{ return Subject{
ToNode: node, ToNode: node,
Method: method, Method: method,
messageCh: make(chan Message), publishMessageCh: make(chan Message),
} }
} }

View file

@ -690,7 +690,7 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
sub := Subject{ sub := Subject{
ToNode: string(m.ToNode), ToNode: string(m.ToNode),
Method: m.Method, Method: m.Method,
messageCh: make(chan Message), publishMessageCh: make(chan Message),
} }
sam := subjectAndMessage{ sam := subjectAndMessage{

View file

@ -749,7 +749,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
return return
//} //}
case m := <-p.subject.messageCh: case m := <-p.subject.publishMessageCh:
ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
// Sign the methodArgs, and add the signature to the message. // Sign the methodArgs, and add the signature to the message.
m.ArgSignature = p.addMethodArgSignature(m) m.ArgSignature = p.addMethodArgSignature(m)

View file

@ -568,7 +568,7 @@ func (s *server) routeMessagesToPublisherProcess() {
// put the message on that processes incomming message channel. // put the message on that processes incomming message channel.
if ok && !ctxCanceled { if ok && !ctxCanceled {
select { select {
case proc.subject.messageCh <- m: case proc.subject.publishMessageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName) er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er) s.errorKernel.logDebug(er)
case <-proc.ctx.Done(): case <-proc.ctx.Done():
@ -607,7 +607,7 @@ func (s *server) routeMessagesToPublisherProcess() {
// Now when the process is spawned we continue, // Now when the process is spawned we continue,
// and send the message to that new process. // and send the message to that new process.
select { select {
case proc.subject.messageCh <- m: case proc.subject.publishMessageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName) er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
s.errorKernel.logDebug(er) s.errorKernel.logDebug(er)
case <-proc.ctx.Done(): case <-proc.ctx.Done():