From 587e23c91d6dcf346c36cae9c21e0d57ee383e8a Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 26 May 2022 08:07:23 +0200 Subject: [PATCH] added separate timer and starting for acl updates --- configuration_flags.go | 182 +++++++++++++++++++++++++---------------- processes.go | 17 ++-- 2 files changed, 120 insertions(+), 79 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index d759268..c1a3200 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -41,8 +41,10 @@ type Configuration struct { NatsReconnectJitter int // NatsReconnectJitterTLS in seconds NatsReconnectJitterTLS int - // PublicKeysGetInterval in seconds - PublicKeysGetInterval int + // REQKeysRequestUpdateInterval in seconds + REQKeysRequestUpdateInterval int + // REQAclRequestUpdateInterval in seconds + REQAclRequestUpdateInterval int // The number of the profiling port ProfilingPort string // host and port for prometheus listener, e.g. localhost:2112 @@ -93,6 +95,12 @@ type Configuration struct { StartPubREQKeysRequestUpdate bool // Subscriber for receiving updates of public keys from central StartSubREQKeysDeliverUpdate bool + + // Publisher for asking central for public acl updates + StartPubREQAclRequestUpdate bool + // Subscriber for receiving updates of acl's from central + StartSubREQAclDeliverUpdate bool + // Start the central error logger. StartSubREQErrorLog bool // Subscriber for hello messages @@ -132,43 +140,46 @@ type Configuration struct { // configuration values from file, so we are able to detect // if a value were given or not when parsing. type ConfigurationFromFile struct { - ConfigFolder *string - RingBufferSize *int - SocketFolder *string - TCPListener *string - HTTPListener *string - DatabaseFolder *string - NodeName *string - BrokerAddress *string - NatsConnOptTimeout *int - NatsConnectRetryInterval *int - NatsReconnectJitter *int - NatsReconnectJitterTLS *int - PublicKeysGetInterval *int - ProfilingPort *string - PromHostAndPort *string - DefaultMessageTimeout *int - DefaultMessageRetries *int - DefaultMethodTimeout *int - SubscribersDataFolder *string - CentralNodeName *string - RootCAPath *string - NkeySeedFile *string - ExposeDataFolder *string - ErrorMessageTimeout *int - ErrorMessageRetries *int - Compression *string - Serialization *string - SetBlockProfileRate *int - EnableSocket *bool - EnableTUI *bool - EnableSignatureCheck *bool - IsCentralAuth *bool - EnableDebug *bool + ConfigFolder *string + RingBufferSize *int + SocketFolder *string + TCPListener *string + HTTPListener *string + DatabaseFolder *string + NodeName *string + BrokerAddress *string + NatsConnOptTimeout *int + NatsConnectRetryInterval *int + NatsReconnectJitter *int + NatsReconnectJitterTLS *int + REQKeysRequestUpdateInterval *int + REQAclRequestUpdateInterval *int + ProfilingPort *string + PromHostAndPort *string + DefaultMessageTimeout *int + DefaultMessageRetries *int + DefaultMethodTimeout *int + SubscribersDataFolder *string + CentralNodeName *string + RootCAPath *string + NkeySeedFile *string + ExposeDataFolder *string + ErrorMessageTimeout *int + ErrorMessageRetries *int + Compression *string + Serialization *string + SetBlockProfileRate *int + EnableSocket *bool + EnableTUI *bool + EnableSignatureCheck *bool + IsCentralAuth *bool + EnableDebug *bool StartPubREQHello *int StartPubREQKeysRequestUpdate *bool StartSubREQKeysDeliverUpdate *bool + StartPubREQAclRequestUpdate *bool + StartSubREQAclDeliverUpdate *bool StartSubREQErrorLog *bool StartSubREQHello *bool StartSubREQToFileAppend *bool @@ -196,43 +207,46 @@ func NewConfiguration() *Configuration { // Get a Configuration struct with the default values set. func newConfigurationDefaults() Configuration { c := Configuration{ - ConfigFolder: "./etc/", - RingBufferSize: 1000, - SocketFolder: "./tmp", - TCPListener: "", - HTTPListener: "", - DatabaseFolder: "./var/lib", - NodeName: "", - BrokerAddress: "127.0.0.1:4222", - NatsConnOptTimeout: 20, - NatsConnectRetryInterval: 10, - NatsReconnectJitter: 100, - NatsReconnectJitterTLS: 1, - PublicKeysGetInterval: 60, - ProfilingPort: "", - PromHostAndPort: "", - DefaultMessageTimeout: 10, - DefaultMessageRetries: 1, - DefaultMethodTimeout: 10, - SubscribersDataFolder: "./data", - CentralNodeName: "", - RootCAPath: "", - NkeySeedFile: "", - ExposeDataFolder: "", - ErrorMessageTimeout: 60, - ErrorMessageRetries: 10, - Compression: "", - Serialization: "", - SetBlockProfileRate: 0, - EnableSocket: true, - EnableTUI: false, - EnableSignatureCheck: false, - IsCentralAuth: false, - EnableDebug: false, + ConfigFolder: "./etc/", + RingBufferSize: 1000, + SocketFolder: "./tmp", + TCPListener: "", + HTTPListener: "", + DatabaseFolder: "./var/lib", + NodeName: "", + BrokerAddress: "127.0.0.1:4222", + NatsConnOptTimeout: 20, + NatsConnectRetryInterval: 10, + NatsReconnectJitter: 100, + NatsReconnectJitterTLS: 1, + REQKeysRequestUpdateInterval: 60, + REQAclRequestUpdateInterval: 60, + ProfilingPort: "", + PromHostAndPort: "", + DefaultMessageTimeout: 10, + DefaultMessageRetries: 1, + DefaultMethodTimeout: 10, + SubscribersDataFolder: "./data", + CentralNodeName: "", + RootCAPath: "", + NkeySeedFile: "", + ExposeDataFolder: "", + ErrorMessageTimeout: 60, + ErrorMessageRetries: 10, + Compression: "", + Serialization: "", + SetBlockProfileRate: 0, + EnableSocket: true, + EnableTUI: false, + EnableSignatureCheck: false, + IsCentralAuth: false, + EnableDebug: false, StartPubREQHello: 30, StartPubREQKeysRequestUpdate: true, StartSubREQKeysDeliverUpdate: true, + StartPubREQAclRequestUpdate: true, + StartSubREQAclDeliverUpdate: true, StartSubREQErrorLog: false, StartSubREQHello: true, StartSubREQToFileAppend: true, @@ -319,10 +333,15 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS } - if cf.PublicKeysGetInterval == nil { - conf.PublicKeysGetInterval = cd.PublicKeysGetInterval + if cf.REQKeysRequestUpdateInterval == nil { + conf.REQKeysRequestUpdateInterval = cd.REQKeysRequestUpdateInterval } else { - conf.PublicKeysGetInterval = *cf.PublicKeysGetInterval + conf.REQKeysRequestUpdateInterval = *cf.REQKeysRequestUpdateInterval + } + if cf.REQAclRequestUpdateInterval == nil { + conf.REQAclRequestUpdateInterval = cd.REQAclRequestUpdateInterval + } else { + conf.REQAclRequestUpdateInterval = *cf.REQAclRequestUpdateInterval } if cf.ProfilingPort == nil { conf.ProfilingPort = cd.ProfilingPort @@ -442,6 +461,18 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQKeysDeliverUpdate = *cf.StartSubREQKeysDeliverUpdate } + + if cf.StartPubREQAclRequestUpdate == nil { + conf.StartPubREQAclRequestUpdate = cd.StartPubREQAclRequestUpdate + } else { + conf.StartPubREQAclRequestUpdate = *cf.StartPubREQAclRequestUpdate + } + if cf.StartSubREQAclDeliverUpdate == nil { + conf.StartSubREQAclDeliverUpdate = cd.StartSubREQAclDeliverUpdate + } else { + conf.StartSubREQAclDeliverUpdate = *cf.StartSubREQAclDeliverUpdate + } + if cf.StartSubREQErrorLog == nil { conf.StartSubREQErrorLog = cd.StartSubREQErrorLog } else { @@ -566,7 +597,8 @@ func (c *Configuration) CheckFlags() error { flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.") flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.") flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.") - flag.IntVar(&c.PublicKeysGetInterval, "publicKeysGetInterval", fc.PublicKeysGetInterval, "default interval in seconds for asking the central for public keys") + flag.IntVar(&c.REQKeysRequestUpdateInterval, "REQKeysRequestUpdateInterval", fc.REQKeysRequestUpdateInterval, "default interval in seconds for asking the central for public keys") + flag.IntVar(&c.REQAclRequestUpdateInterval, "REQAclRequestUpdateInterval", fc.REQAclRequestUpdateInterval, "default interval in seconds for asking the central for acl updates") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") @@ -588,10 +620,16 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") + // Start of Request publishers/subscribers + flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds") flag.BoolVar(&c.StartPubREQKeysRequestUpdate, "startPubREQKeysRequestUpdate", fc.StartPubREQKeysRequestUpdate, "true/false") flag.BoolVar(&c.StartSubREQKeysDeliverUpdate, "startSubREQKeysDeliverUpdate", fc.StartSubREQKeysDeliverUpdate, "true/false") + + flag.BoolVar(&c.StartPubREQAclRequestUpdate, "startPubREQAclRequestUpdate", fc.StartPubREQAclRequestUpdate, "true/false") + flag.BoolVar(&c.StartSubREQAclDeliverUpdate, "startSubREQAclDeliverUpdate", fc.StartSubREQAclDeliverUpdate, "true/false") + flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false") flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") diff --git a/processes.go b/processes.go index 698db7b..4961e1e 100644 --- a/processes.go +++ b/processes.go @@ -175,8 +175,10 @@ func (p *processes) Start(proc process) { if proc.configuration.StartPubREQKeysRequestUpdate { proc.startup.pubREQKeysRequestUpdate(proc) - // TODO: Putting the acl publisher here. - // Maybe we should also change the name of the configuration flag to something auth related ? + proc.startup.subREQKeysDeliverUpdate(proc) + } + + if proc.configuration.StartPubREQAclRequestUpdate { proc.startup.pubREQAclRequestUpdate(proc) proc.startup.subREQAclDeliverUpdate(proc) } @@ -200,9 +202,10 @@ func (p *processes) Start(proc process) { proc.startup.subREQAclImport(proc) } - if proc.configuration.StartSubREQKeysDeliverUpdate { - proc.startup.subREQKeysDeliverUpdate(proc) - } + // Moved this together with proc.configuration.StartPubREQKeysRequestUpdate since they belong together. + // if proc.configuration.StartSubREQKeysDeliverUpdate { + // proc.startup.subREQKeysDeliverUpdate(proc) + // } if proc.configuration.StartSubREQHttpGet { proc.startup.subREQHttpGet(proc) @@ -335,7 +338,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) { // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval)) + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQKeysRequestUpdateInterval)) for { // TODO: We could send with the hash of the currently stored keys, @@ -390,7 +393,7 @@ func (s startup) pubREQAclRequestUpdate(p process) { // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval)) + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQAclRequestUpdateInterval)) for { // TODO: We could send with the hash of the currently stored hash,