1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

added flag for max jetstream messages to keep on broker per subject

This commit is contained in:
postmannen 2024-11-27 08:54:17 +01:00
parent f80a57e1c2
commit 5fee84c18a
2 changed files with 48 additions and 46 deletions

View file

@ -51,7 +51,9 @@ type Configuration struct {
// Host and port for prometheus listener, e.g. localhost:2112 // Host and port for prometheus listener, e.g. localhost:2112
PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"` PromHostAndPort string `comment:"Host and port for prometheus listener, e.g. localhost:2112"`
// Comma separated list of additional streams to consume from. // Comma separated list of additional streams to consume from.
JetstreamsConsume string JetstreamsConsume string `comment:"a comma separated list of other jetstream subjects to consume"`
// Jetstream MaxMsgsPerSubject
JetStreamMaxMsgsPerSubject int `comment:"max messages to keep on the broker for a jetstream subject"`
// Set to true if this is the node that should receive the error log's from other nodes // Set to true if this is the node that should receive the error log's from other nodes
DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"` DefaultMessageTimeout int `comment:"Set to true if this is the node that should receive the error log's from other nodes"`
// Default value for how long can a request method max be allowed to run in seconds // Default value for how long can a request method max be allowed to run in seconds
@ -168,6 +170,7 @@ func NewConfiguration() *Configuration {
flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port") flag.StringVar(&c.ProfilingPort, "profilingPort", CheckEnv("PROFILING_PORT", c.ProfilingPort).(string), "The number of the profiling port")
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", CheckEnv("PROM_HOST_AND_PORT", c.PromHostAndPort).(string), "host and port for prometheus listener, e.g. localhost:2112")
flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from") flag.StringVar(&c.JetstreamsConsume, "jetstreamsConsume", CheckEnv("JETSTREAMS_CONSUME", c.JetstreamsConsume).(string), "Comma separated list of Jetstrams to consume from")
flag.IntVar(&c.JetStreamMaxMsgsPerSubject, "jetstreamMaxMsgsPerSubject", CheckEnv("JETSTREAM_MAX_MSGS_PER_SUBJECT", c.JetStreamMaxMsgsPerSubject).(int), "max messages to keep on the broker per jetstream subject")
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", CheckEnv("DEFAULT_MESSAGE_TIMEOUT", c.DefaultMessageTimeout).(int), "default message timeout in seconds. This can be overridden on the message level")
flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", CheckEnv("DEFAULT_MESSAGE_RETRIES", c.DefaultMessageRetries).(int), "default amount of retries that will be done before a message is thrown away, and out of the system")
flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run") flag.IntVar(&c.DefaultMethodTimeout, "defaultMethodTimeout", CheckEnv("DEFAULT_METHOD_TIMEOUT", c.DefaultMethodTimeout).(int), "default amount of seconds a request method max will be allowed to run")
@ -228,46 +231,47 @@ func NewConfiguration() *Configuration {
// Get a Configuration struct with the default values set. // Get a Configuration struct with the default values set.
func newConfigurationDefaults() Configuration { func newConfigurationDefaults() Configuration {
c := Configuration{ c := Configuration{
ConfigFolder: "./etc/", ConfigFolder: "./etc/",
SocketFolder: "./tmp", SocketFolder: "./tmp",
ReadFolder: "./readfolder", ReadFolder: "./readfolder",
EnableReadFolder: true, EnableReadFolder: true,
TCPListener: "", TCPListener: "",
HTTPListener: "", HTTPListener: "",
DatabaseFolder: "./var/lib", DatabaseFolder: "./var/lib",
NodeName: "", NodeName: "",
BrokerAddress: "127.0.0.1:4222", BrokerAddress: "127.0.0.1:4222",
NatsConnOptTimeout: 20, NatsConnOptTimeout: 20,
NatsConnectRetryInterval: 10, NatsConnectRetryInterval: 10,
NatsReconnectJitter: 100, NatsReconnectJitter: 100,
NatsReconnectJitterTLS: 1, NatsReconnectJitterTLS: 1,
KeysUpdateInterval: 60, KeysUpdateInterval: 60,
AclUpdateInterval: 60, AclUpdateInterval: 60,
ProfilingPort: "", ProfilingPort: "",
PromHostAndPort: "", PromHostAndPort: "",
JetstreamsConsume: "", JetstreamsConsume: "",
DefaultMessageTimeout: 10, JetStreamMaxMsgsPerSubject: 100,
DefaultMessageRetries: 1, DefaultMessageTimeout: 10,
DefaultMethodTimeout: 10, DefaultMessageRetries: 1,
SubscribersDataFolder: "./data", DefaultMethodTimeout: 10,
CentralNodeName: "central", SubscribersDataFolder: "./data",
RootCAPath: "", CentralNodeName: "central",
NkeySeedFile: "", RootCAPath: "",
NkeyFromED25519SSHKeyFile: "", NkeySeedFile: "",
NkeySeed: "", NkeyFromED25519SSHKeyFile: "",
ExposeDataFolder: "", NkeySeed: "",
ErrorMessageTimeout: 60, ExposeDataFolder: "",
ErrorMessageRetries: 10, ErrorMessageTimeout: 60,
Compression: "z", ErrorMessageRetries: 10,
Serialization: "cbor", Compression: "z",
SetBlockProfileRate: 0, Serialization: "cbor",
EnableSocket: true, SetBlockProfileRate: 0,
EnableSignatureCheck: false, EnableSocket: true,
EnableAclCheck: false, EnableSignatureCheck: false,
EnableDebug: false, EnableAclCheck: false,
LogLevel: "debug", EnableDebug: false,
LogConsoleTimestamps: false, LogLevel: "debug",
KeepPublishersAliveFor: 10, LogConsoleTimestamps: false,
KeepPublishersAliveFor: 10,
StartProcesses: StartProcesses{ StartProcesses: StartProcesses{
StartPubHello: 30, StartPubHello: 30,

View file

@ -128,11 +128,9 @@ func (s *server) jetstreamPublish() {
// Create a stream // Create a stream
_, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{ _, _ = js.CreateStream(s.ctx, jetstream.StreamConfig{
Name: "NODES", Name: "NODES",
Subjects: []string{"NODES.>"}, Subjects: []string{"NODES.>"},
// TODO: Create Flag ? MaxMsgsPerSubject: int64(s.configuration.JetStreamMaxMsgsPerSubject),
MaxMsgsPerSubject: 100,
// MaxMsgsPerSubject: 1,
}) })
// Publish messages. // Publish messages.