From 960a2c3588395d4f740a5fb83b05bb05cee69741 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 22 Nov 2024 11:31:07 +0100 Subject: [PATCH] initial poc of jetstream seems to be working. Rerouting messages with the jetstream field specified to the jetstream channel --- message_and_subject.go | 3 ++ process.go | 3 ++ processes.go | 81 +++++++++++++++++++++++++++++++++++------- server.go | 11 ++++++ 4 files changed, 86 insertions(+), 12 deletions(-) diff --git a/message_and_subject.go b/message_and_subject.go index 0ee0604..07458c1 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -84,6 +84,9 @@ type Message struct { PreviousMessage *Message // Schedule Schedule []int `json:"schedule" yaml:"schedule"` + // Is to be used with the stream subject to tell what nodes + // the the message is for. + JetstreamToNode string } // --- Subject diff --git a/process.go b/process.go index 59ec560..ff8c1e5 100644 --- a/process.go +++ b/process.go @@ -90,6 +90,8 @@ type process struct { configuration *Configuration // The new messages channel copied from *Server newMessagesCh chan<- []subjectAndMessage + // JetstreamOut channel + jetstreamOut chan Message // The structure who holds all processes information processes *processes // nats connection @@ -152,6 +154,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, stream str processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), newMessagesCh: server.newMessagesCh, + jetstreamOut: server.jetstreamOutCh, configuration: server.configuration, processes: server.processes, natsConn: server.natsConn, diff --git a/processes.go b/processes.go index 5744c6d..8110d31 100644 --- a/processes.go +++ b/processes.go @@ -2,6 +2,7 @@ package ctrl import ( "context" + "encoding/json" "fmt" "log" "sync" @@ -339,7 +340,9 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, PublicKey, nil) + // -------------------------------------------------- // ProcFunc for Jetstream publishers. + // -------------------------------------------------- pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error { js, err := jetstream.New(proc.natsConn) if err != nil { @@ -358,22 +361,57 @@ func (p *processes) Start(proc process) { log.Fatalf("error: jetstream create or update failed: %v\n", err) } - i := 0 - for { - // TODO: - _, err := js.Publish(proc.ctx, "nodes.btdev1", []byte(fmt.Sprintf("message nr. %v", i))) - if err != nil { - log.Fatalf("error: js failed to publish message: %v\n", err) + // REMOVE: Go routine for injecting messages for testing + go func() { + i := 0 + for { + m := Message{ + ToNode: "btdev1", + FromNode: proc.node, + Method: CliCommand, + MethodArgs: []string{"/bin/ash", "-c", "tree"}, + ReplyMethod: Console, + MethodTimeout: 3, + //Data: []byte("some text in here............"), + } + proc.jetstreamOut <- m + + log.Printf("published message: %v\n", i) + time.Sleep(time.Second * 1) + i++ } - log.Printf("published message: %v\n", i) - time.Sleep(time.Second * 1) - i++ + }() + + for { + // TODO: + select { + case msgJS := <-proc.jetstreamOut: + b, err := json.Marshal(msgJS) + if err != nil { + log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err) + } + + _, err = js.Publish(proc.ctx, "nodes.btdev1", b) + if err != nil { + log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err) + } + case <-ctx.Done(): + return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done") + } } } proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers) + // -------------------------------------------------- // Procfunc for Jetstream consumers. + // -------------------------------------------------- + + // pfJetstreamConsumers connect to a given nats jetstream, and consume messages + // for the node on specified subjects within that stream. + // After a jetstream message is picked up from the stream, the steward message + // will be extracted from the data field, and the ctrl message will be put + // on the local delivery channel, and handled as a normal ctrl message. pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error { js, err := jetstream.New(proc.natsConn) if err != nil { @@ -386,8 +424,8 @@ func (p *processes) Start(proc process) { } consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ - Name: "order_processor", - Durable: "order_processor", + Name: "nodes_processor", + Durable: "nodes_processor", FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"}, }) if err != nil { @@ -395,8 +433,23 @@ func (p *processes) Start(proc process) { } cctx, err := consumer.Consume(func(msg jetstream.Msg) { - log.Printf("Received message: %v, with data: %v\n", string(msg.Subject()), string(msg.Data())) + stewardMessage := Message{} + err := json.Unmarshal(msg.Data(), &stewardMessage) + if err != nil { + log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err) + } + + log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method)) + msg.Ack() + + // Messages received here via jetstream are for this node. Put the message into + // a SubjectAndMessage structure, and we use the deliver local from here. + sam, err := newSubjectAndMessage(stewardMessage) + if err != nil { + log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err) + } + proc.server.samSendLocalCh <- []subjectAndMessage{sam} }) if err != nil { log.Fatalf("error: create or update consumer failed: %v\n", err) @@ -413,6 +466,8 @@ func (p *processes) Start(proc process) { } +// -------------------------------------------------- + // Stop all subscriber processes. func (p *processes) Stop() { log.Printf("info: canceling all subscriber processes...\n") @@ -422,6 +477,8 @@ func (p *processes) Stop() { } +// --------------------------------------------------------------------------------------- +// Helper functions, and other // --------------------------------------------------------------------------------------- // Startup holds all the startup methods for subscribers. diff --git a/server.go b/server.go index fe9dca5..a293cfa 100644 --- a/server.go +++ b/server.go @@ -49,6 +49,8 @@ type server struct { // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. newMessagesCh chan []subjectAndMessage + // jetstreamOutCh + jetstreamOutCh chan Message // directSAMSCh samSendLocalCh chan []subjectAndMessage // errorKernel is doing all the error handling like what to do if @@ -218,6 +220,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { natsConn: conn, ctrlSocket: ctrlSocket, newMessagesCh: make(chan []subjectAndMessage), + jetstreamOutCh: make(chan Message), samSendLocalCh: make(chan []subjectAndMessage), metrics: metrics, version: version, @@ -488,6 +491,14 @@ func (s *server) routeMessagesToProcess() { for samSlice := range s.newMessagesCh { for _, sam := range samSlice { + // If the message have the JetstreamToNode field specified + // deliver it via the jet stream processes, and abort trying + // to send it via the normal nats publisher. + if sam.Message.JetstreamToNode != "" { + s.jetstreamOutCh <- sam.Message + continue + } + go func(sam subjectAndMessage) { s.messageID.mu.Lock() s.messageID.id++