From 5fee84c18af740a072320e8ad04fe133a625e970 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 27 Nov 2024 08:54:17 +0100 Subject: [PATCH] added flag for max jetstream messages to keep on broker per subject --- configuration_flags.go | 86 ++++++++++++++++++++++-------------------- message_readers.go | 8 ++-- 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index 92e03d5..8444128 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -51,7 +51,9 @@ type Configuration struct { // Host and port for prometheus listener, e.g. localhost:2112 PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"` // Comma separated list of additional streams to consume from. - JetstreamsConsume string + JetstreamsConsume string `comment:"a comma separated list of other jetstream subjects to consume"` + // Jetstream MaxMsgsPerSubject + JetStreamMaxMsgsPerSubject int `comment:"max messages to keep on the broker for a jetstream subject"` // Set to true if this is the node that should receive the error log's from other nodes DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"` // Default value for how long can a request method max be allowed to run in seconds @@ -168,6 +170,7 @@ func NewConfiguration() *Configuration { flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112") flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from") + flag.IntVar(&c.JetStreamMaxMsgsPerSubject, "jetstreamMaxMsgsPerSubject", CheckEnv("JETSTREAM_MAX_MSGS_PER_SUBJECT", c.JetStreamMaxMsgsPerSubject).(int), "max messages to keep on the broker per jetstream subject") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run") @@ -228,46 +231,47 @@ func NewConfiguration() *Configuration { // Get a Configuration struct with the default values set. func newConfigurationDefaults() Configuration { c := Configuration{ - ConfigFolder: "./etc/", - SocketFolder: "./tmp", - ReadFolder: "./readfolder", - EnableReadFolder: true, - TCPListener: "", - HTTPListener: "", - DatabaseFolder: "./var/lib", - NodeName: "", - BrokerAddress: "127.0.0.1:4222", - NatsConnOptTimeout: 20, - NatsConnectRetryInterval: 10, - NatsReconnectJitter: 100, - NatsReconnectJitterTLS: 1, - KeysUpdateInterval: 60, - AclUpdateInterval: 60, - ProfilingPort: "", - PromHostAndPort: "", - JetstreamsConsume: "", - DefaultMessageTimeout: 10, - DefaultMessageRetries: 1, - DefaultMethodTimeout: 10, - SubscribersDataFolder: "./data", - CentralNodeName: "central", - RootCAPath: "", - NkeySeedFile: "", - NkeyFromED25519SSHKeyFile: "", - NkeySeed: "", - ExposeDataFolder: "", - ErrorMessageTimeout: 60, - ErrorMessageRetries: 10, - Compression: "z", - Serialization: "cbor", - SetBlockProfileRate: 0, - EnableSocket: true, - EnableSignatureCheck: false, - EnableAclCheck: false, - EnableDebug: false, - LogLevel: "debug", - LogConsoleTimestamps: false, - KeepPublishersAliveFor: 10, + ConfigFolder: "./etc/", + SocketFolder: "./tmp", + ReadFolder: "./readfolder", + EnableReadFolder: true, + TCPListener: "", + HTTPListener: "", + DatabaseFolder: "./var/lib", + NodeName: "", + BrokerAddress: "127.0.0.1:4222", + NatsConnOptTimeout: 20, + NatsConnectRetryInterval: 10, + NatsReconnectJitter: 100, + NatsReconnectJitterTLS: 1, + KeysUpdateInterval: 60, + AclUpdateInterval: 60, + ProfilingPort: "", + PromHostAndPort: "", + JetstreamsConsume: "", + JetStreamMaxMsgsPerSubject: 100, + DefaultMessageTimeout: 10, + DefaultMessageRetries: 1, + DefaultMethodTimeout: 10, + SubscribersDataFolder: "./data", + CentralNodeName: "central", + RootCAPath: "", + NkeySeedFile: "", + NkeyFromED25519SSHKeyFile: "", + NkeySeed: "", + ExposeDataFolder: "", + ErrorMessageTimeout: 60, + ErrorMessageRetries: 10, + Compression: "z", + Serialization: "cbor", + SetBlockProfileRate: 0, + EnableSocket: true, + EnableSignatureCheck: false, + EnableAclCheck: false, + EnableDebug: false, + LogLevel: "debug", + LogConsoleTimestamps: false, + KeepPublishersAliveFor: 10, StartProcesses: StartProcesses{ StartPubHello: 30, diff --git a/message_readers.go b/message_readers.go index 65c257f..9f9df95 100644 --- a/message_readers.go +++ b/message_readers.go @@ -128,11 +128,9 @@ func (s *server) jetstreamPublish() { // Create a stream _, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{ - Name: "NODES", - Subjects: []string{"NODES.>"}, - // TODO: Create Flag ? - MaxMsgsPerSubject: 100, - // MaxMsgsPerSubject: 1, + Name: "NODES", + Subjects: []string{"NODES.>"}, + MaxMsgsPerSubject: int64(s.configuration.JetStreamMaxMsgsPerSubject), }) // Publish messages.