From 7b2d5fc96ff9fd8e287c77bea03781ac46552f83 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 21 Nov 2024 04:54:55 +0100 Subject: [PATCH] added initial jetstream field to process struct, and renamed process.spawnWorker to process.start. --- process.go | 28 +++++++++++++++++++--------- processes.go | 4 ++-- requests_copy.go | 4 ++-- requests_operator.go | 2 +- server.go | 2 +- 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/process.go b/process.go index 50c93dc..71a6359 100644 --- a/process.go +++ b/process.go @@ -5,12 +5,14 @@ import ( "crypto/ed25519" "errors" "fmt" + "log" "os" "time" "github.com/fxamacker/cbor/v2" "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" // "google.golang.org/protobuf/internal/errors" ) @@ -115,6 +117,8 @@ type process struct { errorKernel *errorKernel // metrics metrics *metrics + // jetstream + js jetstream.JetStream } // prepareNewProcess will set the the provided values and the default @@ -130,6 +134,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin var method Method + js, err := jetstream.New(server.natsConn) + if err != nil { + log.Fatalf("error: failed to create jetstream.New: %v\n", err) + } + proc := process{ server: server, messageID: 0, @@ -149,6 +158,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin centralAuth: server.centralAuth, errorKernel: server.errorKernel, metrics: server.metrics, + js: js, } // We use the full name of the subject to identify a unique @@ -172,7 +182,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // // It will give the process the next available ID, and also add the // process to the processes map in the server structure. -func (p process) spawnWorker() { +func (p process) Start() { // Add prometheus metrics for the process. if !p.isSubProcess { @@ -188,7 +198,7 @@ func (p process) spawnWorker() { // Start a subscriber worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindSubscriber { - p.startSubscriber() + p.startSubscriberNats() } // Add information about the new process to the started processes map. @@ -221,7 +231,7 @@ func (p process) startPublisher() { go p.publishMessages(p.natsConn) } -func (p process) startSubscriber() { +func (p process) startSubscriberNats() { // If there is a procFunc for the process, start it. if p.procFunc != nil { // Initialize the channel for communication between the proc and @@ -239,7 +249,7 @@ func (p process) startSubscriber() { }() } - p.natsSubscription = p.subscribeMessages() + p.natsSubscription = p.subscribeMessagesNats() // We also need to be able to remove all the information about this process // when the process context is canceled. @@ -710,17 +720,17 @@ func (p process) verifySigOrAclFlag(message Message) bool { // SubscribeMessage will register the Nats callback function for the specified // nats subject. This allows us to receive Nats messages for a given subject // on a node. -func (p process) subscribeMessages() *nats.Subscription { +func (p process) subscribeMessagesNats() *nats.Subscription { subject := string(p.subject.name()) - // natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { - natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) { - //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // Register the callback function that NATS will use when new messages arrive. + natsSubscription, err := p.natsConn.QueueSubscribe(subject, subject, func(msg *nats.Msg) { // Start up the subscriber handler. go p.messageSubscriberHandlerNats(p.natsConn, p.configuration.NodeName, msg, subject) }) + if err != nil { - er := fmt.Errorf("error: Subscribe failed: %v", err) + er := fmt.Errorf("error: nats queue subscribe failed: %v", err) p.errorKernel.logDebug(er) return nil } diff --git a/processes.go b/processes.go index 49745b7..c386b6e 100644 --- a/processes.go +++ b/processes.go @@ -385,7 +385,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber) proc.procFunc = pf - go proc.spawnWorker() + go proc.Start() } // publisher will start a publisher process. It takes the initial process, request method, @@ -398,7 +398,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr proc.procFunc = pf proc.isLongRunningPublisher = true - go proc.spawnWorker() + go proc.Start() } // --------------------------------------------------------------- diff --git a/requests_copy.go b/requests_copy.go index 8733b80..c7c93a5 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -235,7 +235,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { copySrcSubProc.handler = copySrcSubHandler() // The process will be killed when the context expires. - go copySrcSubProc.spawnWorker() + go copySrcSubProc.Start() // Send a message over the the node where the destination file will be written, // to also start up a sub process on the destination node. @@ -362,7 +362,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) { copyDstSubProc.handler = copyDstSubHandler() // The process will be killed when the context expires. - go copyDstSubProc.spawnWorker() + go copyDstSubProc.Start() fp := filepath.Join(cia.DstDir, cia.DstFile) replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName) diff --git a/requests_operator.go b/requests_operator.go index 743a032..378d715 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -69,7 +69,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber) - go procNew.spawnWorker() + go procNew.Start() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf("%v", txt) diff --git a/server.go b/server.go index fe5d294..9e24e5b 100644 --- a/server.go +++ b/server.go @@ -577,7 +577,7 @@ func (s *server) routeMessagesToProcess() { proc = newProcess(s.ctx, s, sub, processKindPublisher) } - proc.spawnWorker() + proc.Start() er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) s.errorKernel.logDebug(er)