From 172dfd1b8c35ebf871d2a9330f7408df2b81d90f Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 21 Nov 2024 21:29:06 +0100 Subject: [PATCH] initial test of jetstream consmumer and publisher --- process-jetstream.go | 6 ---- processes.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ request-jetstream.go | 15 +++++++++ requests.go | 6 ++++ 4 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 request-jetstream.go diff --git a/process-jetstream.go b/process-jetstream.go index 05541c1..0ba7228 100644 --- a/process-jetstream.go +++ b/process-jetstream.go @@ -12,12 +12,6 @@ import ( // received, check the MessageType field in the message to decide what // kind of message it is and then it will check how to handle that message type, // and then call the correct method handler for it. -// -// This handler function should be started in it's own go routine,so -// one individual handler is started per message received so we can keep -// the state of the message being processed, and then reply back to the -// correct sending process's reply, meaning so we ACK back to the correct -// publisher. func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstream.Msg, subject string) { // Variable to hold a copy of the message data. diff --git a/processes.go b/processes.go index 700e5d1..c451f68 100644 --- a/processes.go +++ b/processes.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" ) @@ -337,6 +338,78 @@ func (p *processes) Start(proc process) { } proc.startup.subscriber(proc, PublicKey, nil) + + // ProcFunc for Jetstream publishers. + pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error { + js, err := jetstream.New(proc.natsConn) + if err != nil { + log.Fatalf("error: jetstream new failed: %v\n", err) + } + + _, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{ + Name: "orders", + Description: "orders stream", + Subjects: []string{"orders.>"}, + // Discard older messages and keep only the last one. + MaxMsgsPerSubject: 1, + }) + + if err != nil { + log.Fatalf("error: jetstream create or update failed: %v\n", err) + } + + i := 0 + for { + // TODO: + _, err := js.Publish(proc.ctx, "orders.shop1", []byte(fmt.Sprintf("order nr. %v", i))) + if err != nil { + log.Fatalf("error: js failed to publish message: %v\n", err) + } + + log.Printf("published message: %v\n", i) + time.Sleep(time.Second * 1) + i++ + } + } + proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers) + + // Procfunc for Jetstream consumers. + pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error { + js, err := jetstream.New(proc.natsConn) + if err != nil { + log.Fatalf("error: jetstream new failed: %v\n", err) + } + + stream, err := js.Stream(proc.ctx, "orders") + if err != nil { + log.Fatalf("error: js.Stream failed: %v\n", err) + } + + consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ + Name: "order_processor", + Durable: "order_processor", + }) + if err != nil { + log.Fatalf("error: create or update consumer failed: %v\n", err) + } + + cctx, err := consumer.Consume(func(msg jetstream.Msg) { + log.Printf("Received message: %v, with data: %v\n", string(msg.Subject()), string(msg.Data())) + msg.Ack() + }) + if err != nil { + log.Fatalf("error: create or update consumer failed: %v\n", err) + } + + defer cctx.Stop() + + <-proc.ctx.Done() + + return nil + } + + proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers) + } // Stop all subscriber processes. diff --git a/request-jetstream.go b/request-jetstream.go new file mode 100644 index 0000000..1823943 --- /dev/null +++ b/request-jetstream.go @@ -0,0 +1,15 @@ +package ctrl + +// jetstreamsConsumers will start up the netstream consumers. +// The consumer logic are put in the procFunc. +func jetstreamsConsumers(proc process, message Message, node string) ([]byte, error) { + + return []byte{}, nil +} + +// jetstreamPublishers will start up the netstream publishers. +// The publisher logic are put in the procFunc. +func jetstreamPublishers(proc process, message Message, node string) ([]byte, error) { + + return []byte{}, nil +} diff --git a/requests.go b/requests.go index 505236c..7ff69cb 100644 --- a/requests.go +++ b/requests.go @@ -159,6 +159,10 @@ const ( AclExport = "aclExport" // REQAclImport AclImport = "aclImport" + // Jetstreams Consumers + JetstreamConsumers = "jetstreamConsumers" + // JetstreamPublishers + JetStreamPublishers = "jetstreamPublishers" ) type HandlerFunc func(proc process, message Message, node string) ([]byte, error) @@ -212,6 +216,8 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { AclExport: HandlerFunc(methodAclExport), AclImport: HandlerFunc(methodAclImport), Test: HandlerFunc(methodTest), + JetstreamConsumers: HandlerFunc(jetstreamsConsumers), + JetStreamPublishers: HandlerFunc(jetstreamPublishers), }, }