From ab0b0fbe245451d94ebf4c5c267a6696e4d8be9b Mon Sep 17 00:00:00 2001
From: postmannen <postmannen@gmail.com>
Date: Fri, 22 Nov 2024 13:42:42 +0100
Subject: [PATCH] added flags/env for starting up jetstream processes

---
 configuration_flags.go |  81 ++++++++---------
 processes.go           | 194 +++++++++++++++++++++--------------------
 2 files changed, 137 insertions(+), 138 deletions(-)

diff --git a/configuration_flags.go b/configuration_flags.go
index efa589e..91be26c 100644
--- a/configuration_flags.go
+++ b/configuration_flags.go
@@ -95,44 +95,33 @@ type Configuration struct {
 	KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
 
 	StartProcesses StartProcesses
-
-	Jetstreams string `comment:"Comma separated list of streams to consume messages from"`
+	// Comma separated list of additional streams to listen on.
+	Jetstreams string
 }
 
 type StartProcesses struct {
-	// StartPubHello, sets the interval in seconds for how often we send hello messages to central server
-	StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
-	// Enable the updates of public keys
+	StartPubHello    int  `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
 	EnableKeyUpdates bool `comment:"Enable the updates of public keys"`
 
-	// Enable the updates of acl's
 	EnableAclUpdates bool `comment:"Enable the updates of acl's"`
 
-	// Start the central error logger.
-	IsCentralErrorLogger bool `comment:"Start the central error logger."`
-	// Start subscriber for hello messages
-	StartSubHello bool `comment:"Start subscriber for hello messages"`
-	// Start subscriber for text logging
-	StartSubFileAppend bool `comment:"Start subscriber for text logging"`
-	// Start subscriber for writing to file
-	StartSubFile bool `comment:"Start subscriber for writing to file"`
-	// Start subscriber for reading files to copy
-	StartSubCopySrc bool `comment:"Start subscriber for reading files to copy"`
-	// Start subscriber for writing copied files to disk
-	StartSubCopyDst bool `comment:"Start subscriber for writing copied files to disk"`
-	// Start subscriber for Echo Request
-	StartSubCliCommand bool `comment:"Start subscriber for CLICommand"`
-	// Start subscriber for Console
-	StartSubConsole bool `comment:"Start subscriber for Console"`
-	// Start subscriber for HttpGet
-	StartSubHttpGet bool `comment:"Start subscriber for HttpGet"`
-	// Start subscriber for tailing log files
-	StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
-	// Start subscriber for continously delivery of output from cli commands.
-	StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
+	IsCentralErrorLogger    bool `comment:"Start the central error logger."`
+	StartSubHello           bool `comment:"Start subscriber for hello messages"`
+	StartSubFileAppend      bool `comment:"Start subscriber for text logging"`
+	StartSubFile            bool `comment:"Start subscriber for writing to file"`
+	StartSubCopySrc         bool `comment:"Start subscriber for reading files to copy"`
+	StartSubCopyDst         bool `comment:"Start subscriber for writing copied files to disk"`
+	StartSubCliCommand      bool `comment:"Start subscriber for CLICommand"`
+	StartSubConsole         bool `comment:"Start subscriber for Console"`
+	StartSubHttpGet         bool `comment:"Start subscriber for HttpGet"`
+	StartSubTailFile        bool `comment:"Start subscriber for tailing log files"`
+	StartSubCliCommandCont  bool `comment:"Start subscriber for continously delivery of output from cli commands."`
+	StartJetstreamPublisher bool `comment:"Start the nats jetstream publisher"`
+	StartJetstreamConsumer  bool `comment:"Start the nats jetstream consumer"`
 
 	// IsCentralAuth, enable to make this instance take the role as the central auth server
-	IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
+	IsCentralAuth bool   `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
+	Jetstreams    string `comment: "Comma separated list of additional streams to listen on"`
 }
 
 // NewConfiguration will return a *Configuration.
@@ -180,6 +169,8 @@ func NewConfiguration() *Configuration {
 	flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
 	flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
 	flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
+	flag.BoolVar(&c.StartProcesses.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartProcesses.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher")
+	flag.BoolVar(&c.StartProcesses.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartProcesses.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer")
 	flag.StringVar(&c.Jetstreams, "jetstreams", CheckEnv("JETSTREAMS", c.Jetstreams).(string), "Comma separated list of Jetstrams to consume")
 
 	// Start of Request publishers/subscribers
@@ -258,21 +249,23 @@ func newConfigurationDefaults() Configuration {
 		Jetstreams:                "",
 
 		StartProcesses: StartProcesses{
-			StartPubHello:          30,
-			EnableKeyUpdates:       false,
-			EnableAclUpdates:       false,
-			IsCentralErrorLogger:   false,
-			StartSubHello:          true,
-			StartSubFileAppend:     true,
-			StartSubFile:           true,
-			StartSubCopySrc:        true,
-			StartSubCopyDst:        true,
-			StartSubCliCommand:     true,
-			StartSubConsole:        true,
-			StartSubHttpGet:        true,
-			StartSubTailFile:       true,
-			StartSubCliCommandCont: true,
-			IsCentralAuth:          false,
+			StartPubHello:           30,
+			EnableKeyUpdates:        false,
+			EnableAclUpdates:        false,
+			IsCentralErrorLogger:    false,
+			StartSubHello:           true,
+			StartSubFileAppend:      true,
+			StartSubFile:            true,
+			StartSubCopySrc:         true,
+			StartSubCopyDst:         true,
+			StartSubCliCommand:      true,
+			StartSubConsole:         true,
+			StartSubHttpGet:         true,
+			StartSubTailFile:        true,
+			StartSubCliCommandCont:  true,
+			IsCentralAuth:           false,
+			StartJetstreamPublisher: true,
+			StartJetstreamConsumer:  true,
 		},
 	}
 	return c
diff --git a/processes.go b/processes.go
index 8110d31..fb7babc 100644
--- a/processes.go
+++ b/processes.go
@@ -343,65 +343,69 @@ func (p *processes) Start(proc process) {
 	// --------------------------------------------------
 	// ProcFunc for Jetstream publishers.
 	// --------------------------------------------------
-	pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
-		js, err := jetstream.New(proc.natsConn)
-		if err != nil {
-			log.Fatalf("error: jetstream new failed: %v\n", err)
-		}
+	if proc.configuration.StartProcesses.StartJetstreamPublisher {
+		pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
+			js, err := jetstream.New(proc.natsConn)
+			if err != nil {
+				log.Fatalf("error: jetstream new failed: %v\n", err)
+			}
 
-		_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
-			Name:        "nodes",
-			Description: "nodes stream",
-			Subjects:    []string{"nodes.>"},
-			// Discard older messages and keep only the last one.
-			MaxMsgsPerSubject: 1,
-		})
+			_, err = js.CreateOrUpdateStream(proc.ctx, jetstream.StreamConfig{
+				Name:        "nodes",
+				Description: "nodes stream",
+				Subjects:    []string{"nodes.>"},
+				// Discard older messages and keep only the last one.
+				MaxMsgsPerSubject: 1,
+			})
 
-		if err != nil {
-			log.Fatalf("error: jetstream create or update failed: %v\n", err)
-		}
+			if err != nil {
+				log.Fatalf("error: jetstream create or update failed: %v\n", err)
+			}
+
+			// REMOVE: Go routine for injecting messages for testing
+			// go func() {
+			// 	i := 0
+			// 	for {
+			// 		m := Message{
+			// 			ToNode:          "btdev1",
+			// 			FromNode:        proc.node,
+			// 			JetstreamToNode: "btdev1",
+			// 			Method:          CliCommand,
+			// 			MethodArgs:      []string{"/bin/ash", "-c", "tree"},
+			// 			ReplyMethod:     Console,
+			// 			MethodTimeout:   3,
+			// 			//Data:   []byte("some text in here............"),
+			// 		}
+			// 		proc.jetstreamOut <- m
+			//
+			// 		log.Printf("published message: %v\n", i)
+			// 		time.Sleep(time.Second * 1)
+			// 		i++
+			// 	}
+			//
+			// }()
 
-		// REMOVE: Go routine for injecting messages for testing
-		go func() {
-			i := 0
 			for {
-				m := Message{
-					ToNode:        "btdev1",
-					FromNode:      proc.node,
-					Method:        CliCommand,
-					MethodArgs:    []string{"/bin/ash", "-c", "tree"},
-					ReplyMethod:   Console,
-					MethodTimeout: 3,
-					//Data:   []byte("some text in here............"),
+				// TODO:
+				select {
+				case msg := <-proc.jetstreamOut:
+					b, err := json.Marshal(msg)
+					if err != nil {
+						log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
+					}
+
+					subject := fmt.Sprintf("nodes.%v", msg.JetstreamToNode)
+					_, err = js.Publish(proc.ctx, subject, b)
+					if err != nil {
+						log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
+					}
+				case <-ctx.Done():
+					return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
 				}
-				proc.jetstreamOut <- m
-
-				log.Printf("published message: %v\n", i)
-				time.Sleep(time.Second * 1)
-				i++
-			}
-
-		}()
-
-		for {
-			// TODO:
-			select {
-			case msgJS := <-proc.jetstreamOut:
-				b, err := json.Marshal(msgJS)
-				if err != nil {
-					log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
-				}
-
-				_, err = js.Publish(proc.ctx, "nodes.btdev1", b)
-				if err != nil {
-					log.Fatalf("error: pfJetstreamPublishers:js failed to publish message: %v\n", err)
-				}
-			case <-ctx.Done():
-				return fmt.Errorf("%v", "info: pfJetstreamPublishers: got <-ctx.done")
 			}
 		}
+		proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
 	}
-	proc.startup.publisher(proc, JetStreamPublishers, pfJetstreamPublishers)
 
 	// --------------------------------------------------
 	// Procfunc for Jetstream consumers.
@@ -412,58 +416,60 @@ func (p *processes) Start(proc process) {
 	// After a jetstream message is picked up from the stream, the steward message
 	// will be extracted from the data field, and the ctrl message will be put
 	// on the local delivery channel, and handled as a normal ctrl message.
-	pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
-		js, err := jetstream.New(proc.natsConn)
-		if err != nil {
-			log.Fatalf("error: jetstream new failed: %v\n", err)
-		}
-
-		stream, err := js.Stream(proc.ctx, "nodes")
-		if err != nil {
-			log.Fatalf("error: js.Stream failed: %v\n", err)
-		}
-
-		consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
-			Name:           "nodes_processor",
-			Durable:        "nodes_processor",
-			FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"},
-		})
-		if err != nil {
-			log.Fatalf("error: create or update consumer failed: %v\n", err)
-		}
-
-		cctx, err := consumer.Consume(func(msg jetstream.Msg) {
-			stewardMessage := Message{}
-			err := json.Unmarshal(msg.Data(), &stewardMessage)
+	if proc.configuration.StartProcesses.StartJetstreamConsumer {
+		pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
+			js, err := jetstream.New(proc.natsConn)
 			if err != nil {
-				log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
+				log.Fatalf("error: jetstream new failed: %v\n", err)
 			}
 
-			log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
-
-			msg.Ack()
-
-			// Messages received here via jetstream are for this node. Put the message into
-			// a SubjectAndMessage structure, and we use the deliver local from here.
-			sam, err := newSubjectAndMessage(stewardMessage)
+			stream, err := js.Stream(proc.ctx, "nodes")
 			if err != nil {
-				log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
+				log.Fatalf("error: js.Stream failed: %v\n", err)
 			}
-			proc.server.samSendLocalCh <- []subjectAndMessage{sam}
-		})
-		if err != nil {
-			log.Fatalf("error: create or update consumer failed: %v\n", err)
+
+			consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{
+				Name:           "nodes_processor",
+				Durable:        "nodes_processor",
+				FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"},
+			})
+			if err != nil {
+				log.Fatalf("error: create or update consumer failed: %v\n", err)
+			}
+
+			cctx, err := consumer.Consume(func(msg jetstream.Msg) {
+				stewardMessage := Message{}
+				err := json.Unmarshal(msg.Data(), &stewardMessage)
+				if err != nil {
+					log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
+				}
+
+				log.Printf("Received jetstream message to convert and handle as normal nats message: %v, with ctrl method: %v\n", string(msg.Subject()), string(stewardMessage.Method))
+
+				msg.Ack()
+
+				// Messages received here via jetstream are for this node. Put the message into
+				// a SubjectAndMessage structure, and we use the deliver local from here.
+				sam, err := newSubjectAndMessage(stewardMessage)
+				if err != nil {
+					log.Fatalf("error: pfJetstreamConsumers: newSubjectAndMessage failed: %v\n", err)
+				}
+				proc.server.samSendLocalCh <- []subjectAndMessage{sam}
+			})
+			if err != nil {
+				log.Fatalf("error: create or update consumer failed: %v\n", err)
+			}
+
+			defer cctx.Stop()
+
+			<-proc.ctx.Done()
+
+			return nil
 		}
 
-		defer cctx.Stop()
-
-		<-proc.ctx.Done()
-
-		return nil
+		proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
 	}
 
-	proc.startup.subscriber(proc, JetstreamConsumers, pfJetstreamConsumers)
-
 }
 
 // --------------------------------------------------