1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

added Nats to the naming of the processKind constants, and added added 2 new for Jetstream

This commit is contained in:
postmannen 2024-11-21 05:28:56 +01:00
parent 7b2d5fc96f
commit 6b1c259373
5 changed files with 27 additions and 25 deletions

View file

@ -23,8 +23,10 @@ import (
type processKind string
const (
processKindSubscriber processKind = "subscriber"
processKindPublisher processKind = "publisher"
processKindSubscriberNats processKind = "subscriberNats"
processKindPublisherNats processKind = "publisherNats"
processKindConsumerJetstream processKind = "consumerJetstream"
processKindPublisherJetstream processKind = "publisherJetstream"
)
// process holds all the logic to handle a message type and it's
@ -165,11 +167,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
// process. We can do that since a process can only handle
// one message queue.
if proc.processKind == processKindPublisher {
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
if proc.processKind == processKindPublisherNats {
proc.processName = processNameGet(proc.subject.name(), processKindPublisherNats)
}
if proc.processKind == processKindSubscriber {
proc.processName = processNameGet(proc.subject.name(), processKindSubscriber)
if proc.processKind == processKindSubscriberNats {
proc.processName = processNameGet(proc.subject.name(), processKindSubscriberNats)
}
return proc
@ -191,13 +193,13 @@ func (p process) Start() {
// Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
if p.processKind == processKindPublisher {
p.startPublisher()
if p.processKind == processKindPublisherNats {
p.startPublisherNats()
}
// Start a subscriber worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
if p.processKind == processKindSubscriber {
if p.processKind == processKindSubscriberNats {
p.startSubscriberNats()
}
@ -210,7 +212,7 @@ func (p process) Start() {
p.errorKernel.logDebug(er)
}
func (p process) startPublisher() {
func (p process) startPublisherNats() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
@ -228,7 +230,7 @@ func (p process) startPublisher() {
}()
}
go p.publishMessages(p.natsConn)
go p.publishMessagesNats(p.natsConn)
}
func (p process) startSubscriberNats() {
@ -741,7 +743,7 @@ func (p process) subscribeMessagesNats() *nats.Subscription {
// publishMessages will do the publishing of messages for one single
// process. The function should be run as a goroutine, and will run
// as long as the process it belongs to is running.
func (p process) publishMessages(natsConn *nats.Conn) {
func (p process) publishMessagesNats(natsConn *nats.Conn) {
var zEnc *zstd.Encoder
// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
@ -797,7 +799,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
go p.publishAMessage(m, zEnc, natsConn)
go p.publishAMessageNats(m, zEnc, natsConn)
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
@ -814,7 +816,7 @@ func (p process) addMethodArgSignature(m Message) []byte {
return sign
}
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) {
func (p process) publishAMessageNats(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := make(nats.Header)
@ -835,7 +837,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.C
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
pn := processNameGet(p.subject.name(), processKindPublisherNats)
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when

View file

@ -382,7 +382,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
}
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriberNats)
proc.procFunc = pf
go proc.Start()
@ -394,7 +394,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
p.errorKernel.logDebug(er)
sub := newSubject(m, string(p.node))
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisher)
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisherNats)
proc.procFunc = pf
proc.isLongRunningPublisher = true

View file

@ -225,7 +225,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// Create a new sub process that will do the actual file copying.
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats)
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
@ -333,7 +333,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
// previous message is then fully up and running, so we just discard
// that second message in those cases.
pn := processNameGet(sub.name(), processKindSubscriber)
pn := processNameGet(sub.name(), processKindSubscriberNats)
// fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn)
proc.processes.active.mu.Lock()
@ -352,7 +352,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
}
// Create a new sub process that will do the actual file copying.
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber)
copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriberNats)
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.

View file

@ -68,7 +68,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriberNats)
go procNew.Start()
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)

View file

@ -415,7 +415,7 @@ func (s *server) directSAMSChRead() {
// Range over all the sams, find the process, check if the method exists, and
// handle the message by starting the correct method handler.
for i := range sams {
processName := processNameGet(sams[i].Subject.name(), processKindSubscriber)
processName := processNameGet(sams[i].Subject.name(), processKindSubscriberNats)
s.processes.active.mu.Lock()
p := s.processes.active.procNames[processName]
@ -516,7 +516,7 @@ func (s *server) routeMessagesToProcess() {
m := sam.Message
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
pn := processNameGet(subjName, processKindPublisherNats)
sendOK := func() bool {
var ctxCanceled bool
@ -572,9 +572,9 @@ func (s *server) routeMessagesToProcess() {
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher)
proc = newSubProcess(s.ctx, s, sub, processKindPublisherNats)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher)
proc = newProcess(s.ctx, s, sub, processKindPublisherNats)
}
proc.Start()