From b02a84f6cc6721fefd7aeb1ebc23c1bc6fb6b344 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 16 Dec 2021 11:01:01 +0100 Subject: [PATCH] added config flags for jitter --- configuration_flags.go | 20 ++++++++++++++++++++ server.go | 5 ++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/configuration_flags.go b/configuration_flags.go index b1b0ddc..2444ad9 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -33,6 +33,10 @@ type Configuration struct { BrokerAddress string // nats connect retry NatsConnectRetryInterval int + // NatsReconnectJitter in milliseconds + NatsReconnectJitter int + // NatsReconnectJitterTLS in seconds + NatsReconnectJitterTLS int // The number of the profiling port ProfilingPort string // host and port for prometheus listener, e.g. localhost:2112 @@ -107,6 +111,8 @@ type ConfigurationFromFile struct { NodeName *string BrokerAddress *string NatsConnectRetryInterval *int + NatsReconnectJitter *int + NatsReconnectJitterTLS *int ProfilingPort *string PromHostAndPort *string DefaultMessageTimeout *int @@ -152,6 +158,8 @@ func newConfigurationDefaults() Configuration { DatabaseFolder: "./var/lib", BrokerAddress: "127.0.0.1:4222", NatsConnectRetryInterval: 10, + NatsReconnectJitter: 100, + NatsReconnectJitterTLS: 1, ProfilingPort: "", PromHostAndPort: "", DefaultMessageTimeout: 10, @@ -230,6 +238,16 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.NatsConnectRetryInterval = *cf.NatsConnectRetryInterval } + if cf.NatsReconnectJitter == nil { + conf.NatsReconnectJitter = cd.NatsReconnectJitter + } else { + conf.NatsReconnectJitter = *cf.NatsReconnectJitter + } + if cf.NatsReconnectJitterTLS == nil { + conf.NatsReconnectJitterTLS = cd.NatsReconnectJitterTLS + } else { + conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS + } if cf.ProfilingPort == nil { conf.ProfilingPort = cd.ProfilingPort } else { @@ -405,6 +423,8 @@ func (c *Configuration) CheckFlags() error { flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit") flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker") flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.") + flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.") + flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") diff --git a/server.go b/server.go index 5e91cac..c9f738f 100644 --- a/server.go +++ b/server.go @@ -91,7 +91,7 @@ func NewServer(c *Configuration, version string) (*server, error) { for { var err error // Setting MaxReconnects to -1 which equals unlimited. - conn, err = nats.Connect(c.BrokerAddress, opt, nats.MaxReconnects(-1)) + conn, err = nats.Connect(c.BrokerAddress, opt, nats.MaxReconnects(-1), nats.ReconnectJitter(time.Duration(c.NatsReconnectJitter)*time.Millisecond, time.Duration(c.NatsReconnectJitterTLS)*time.Second)) // If no servers where available, we loop and retry until succesful. if err != nil { log.Printf("error: could not connect, waiting %v seconds, and retrying: %v\n", c.NatsConnectRetryInterval, err) @@ -102,6 +102,9 @@ func NewServer(c *Configuration, version string) (*server, error) { break } + log.Printf(" * conn.Opts.ReconnectJitterTLS: %v\n", conn.Opts.ReconnectJitterTLS) + log.Printf(" * conn.Opts.ReconnectJitter: %v\n", conn.Opts.ReconnectJitter) + // Prepare the connection to the Steward socket file // Check if socket folder exists, if not create it