From 20172a38068fbdcee75466eb2d241e448bd6e887 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 22 Nov 2024 15:44:37 +0100 Subject: [PATCH] added flag/env for streams to consume --- configuration_flags.go | 17 +++++++++-------- processes.go | 36 ++++++++++++------------------------ 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index 91be26c..332d64e 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -95,8 +95,8 @@ type Configuration struct { KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."` StartProcesses StartProcesses - // Comma separated list of additional streams to listen on. - Jetstreams string + // Comma separated list of additional streams to consume from. + JetstreamsConsume string } type StartProcesses struct { @@ -120,8 +120,7 @@ type StartProcesses struct { StartJetstreamConsumer bool `comment:"Start the nats jetstream consumer"` // IsCentralAuth, enable to make this instance take the role as the central auth server - IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"` - Jetstreams string `comment: "Comma separated list of additional streams to listen on"` + IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"` } // NewConfiguration will return a *Configuration. @@ -169,9 +168,8 @@ func NewConfiguration() *Configuration { flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none") flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr") flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish") - flag.BoolVar(&c.StartProcesses.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartProcesses.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher") - flag.BoolVar(&c.StartProcesses.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartProcesses.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer") - flag.StringVar(&c.Jetstreams, "jetstreams", CheckEnv("JETSTREAMS", c.Jetstreams).(string), "Comma separated list of Jetstrams to consume") + + flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from") // Start of Request publishers/subscribers @@ -193,6 +191,9 @@ func NewConfiguration() *Configuration { flag.BoolVar(&c.StartProcesses.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartProcesses.StartSubTailFile).(bool), "true/false") flag.BoolVar(&c.StartProcesses.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartProcesses.StartSubCliCommandCont).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartProcesses.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher") + flag.BoolVar(&c.StartProcesses.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartProcesses.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer") + // Check that mandatory flag values have been set. switch { case c.NodeName == "": @@ -246,7 +247,7 @@ func newConfigurationDefaults() Configuration { LogLevel: "debug", LogConsoleTimestamps: false, KeepPublishersAliveFor: 10, - Jetstreams: "", + JetstreamsConsume: "", StartProcesses: StartProcesses{ StartPubHello: 30, diff --git a/processes.go b/processes.go index fb7babc..d8788b6 100644 --- a/processes.go +++ b/processes.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "sync" "time" @@ -362,29 +363,6 @@ func (p *processes) Start(proc process) { log.Fatalf("error: jetstream create or update failed: %v\n", err) } - // REMOVE: Go routine for injecting messages for testing - // go func() { - // i := 0 - // for { - // m := Message{ - // ToNode: "btdev1", - // FromNode: proc.node, - // JetstreamToNode: "btdev1", - // Method: CliCommand, - // MethodArgs: []string{"/bin/ash", "-c", "tree"}, - // ReplyMethod: Console, - // MethodTimeout: 3, - // //Data: []byte("some text in here............"), - // } - // proc.jetstreamOut <- m - // - // log.Printf("published message: %v\n", i) - // time.Sleep(time.Second * 1) - // i++ - // } - // - // }() - for { // TODO: select { @@ -428,10 +406,20 @@ func (p *processes) Start(proc process) { log.Fatalf("error: js.Stream failed: %v\n", err) } + // Check for more subjects via flags to listen to, and if defined prefix all + // the values with "nodes." + filterSubjectValues := []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"} + if proc.configuration.JetstreamsConsume != "" { + filterSubjectValues = strings.Split(proc.configuration.JetstreamsConsume, ",") + for i, v := range filterSubjectValues { + filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) + } + } + consumer, err := stream.CreateOrUpdateConsumer(proc.ctx, jetstream.ConsumerConfig{ Name: "nodes_processor", Durable: "nodes_processor", - FilterSubjects: []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"}, + FilterSubjects: filterSubjectValues, }) if err != nil { log.Fatalf("error: create or update consumer failed: %v\n", err)