1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 14:56:49 +00:00

added separate timer and starting for acl updates

This commit is contained in:
postmannen 2022-05-26 08:07:23 +02:00
parent 9d71f11145
commit 587e23c91d
2 changed files with 120 additions and 79 deletions

View file

@ -41,8 +41,10 @@ type Configuration struct {
NatsReconnectJitter int NatsReconnectJitter int
// NatsReconnectJitterTLS in seconds // NatsReconnectJitterTLS in seconds
NatsReconnectJitterTLS int NatsReconnectJitterTLS int
// PublicKeysGetInterval in seconds // REQKeysRequestUpdateInterval in seconds
PublicKeysGetInterval int REQKeysRequestUpdateInterval int
// REQAclRequestUpdateInterval in seconds
REQAclRequestUpdateInterval int
// The number of the profiling port // The number of the profiling port
ProfilingPort string ProfilingPort string
// host and port for prometheus listener, e.g. localhost:2112 // host and port for prometheus listener, e.g. localhost:2112
@ -93,6 +95,12 @@ type Configuration struct {
StartPubREQKeysRequestUpdate bool StartPubREQKeysRequestUpdate bool
// Subscriber for receiving updates of public keys from central // Subscriber for receiving updates of public keys from central
StartSubREQKeysDeliverUpdate bool StartSubREQKeysDeliverUpdate bool
// Publisher for asking central for public acl updates
StartPubREQAclRequestUpdate bool
// Subscriber for receiving updates of acl's from central
StartSubREQAclDeliverUpdate bool
// Start the central error logger. // Start the central error logger.
StartSubREQErrorLog bool StartSubREQErrorLog bool
// Subscriber for hello messages // Subscriber for hello messages
@ -132,43 +140,46 @@ type Configuration struct {
// configuration values from file, so we are able to detect // configuration values from file, so we are able to detect
// if a value were given or not when parsing. // if a value were given or not when parsing.
type ConfigurationFromFile struct { type ConfigurationFromFile struct {
ConfigFolder *string ConfigFolder *string
RingBufferSize *int RingBufferSize *int
SocketFolder *string SocketFolder *string
TCPListener *string TCPListener *string
HTTPListener *string HTTPListener *string
DatabaseFolder *string DatabaseFolder *string
NodeName *string NodeName *string
BrokerAddress *string BrokerAddress *string
NatsConnOptTimeout *int NatsConnOptTimeout *int
NatsConnectRetryInterval *int NatsConnectRetryInterval *int
NatsReconnectJitter *int NatsReconnectJitter *int
NatsReconnectJitterTLS *int NatsReconnectJitterTLS *int
PublicKeysGetInterval *int REQKeysRequestUpdateInterval *int
ProfilingPort *string REQAclRequestUpdateInterval *int
PromHostAndPort *string ProfilingPort *string
DefaultMessageTimeout *int PromHostAndPort *string
DefaultMessageRetries *int DefaultMessageTimeout *int
DefaultMethodTimeout *int DefaultMessageRetries *int
SubscribersDataFolder *string DefaultMethodTimeout *int
CentralNodeName *string SubscribersDataFolder *string
RootCAPath *string CentralNodeName *string
NkeySeedFile *string RootCAPath *string
ExposeDataFolder *string NkeySeedFile *string
ErrorMessageTimeout *int ExposeDataFolder *string
ErrorMessageRetries *int ErrorMessageTimeout *int
Compression *string ErrorMessageRetries *int
Serialization *string Compression *string
SetBlockProfileRate *int Serialization *string
EnableSocket *bool SetBlockProfileRate *int
EnableTUI *bool EnableSocket *bool
EnableSignatureCheck *bool EnableTUI *bool
IsCentralAuth *bool EnableSignatureCheck *bool
EnableDebug *bool IsCentralAuth *bool
EnableDebug *bool
StartPubREQHello *int StartPubREQHello *int
StartPubREQKeysRequestUpdate *bool StartPubREQKeysRequestUpdate *bool
StartSubREQKeysDeliverUpdate *bool StartSubREQKeysDeliverUpdate *bool
StartPubREQAclRequestUpdate *bool
StartSubREQAclDeliverUpdate *bool
StartSubREQErrorLog *bool StartSubREQErrorLog *bool
StartSubREQHello *bool StartSubREQHello *bool
StartSubREQToFileAppend *bool StartSubREQToFileAppend *bool
@ -196,43 +207,46 @@ func NewConfiguration() *Configuration {
// Get a Configuration struct with the default values set. // Get a Configuration struct with the default values set.
func newConfigurationDefaults() Configuration { func newConfigurationDefaults() Configuration {
c := Configuration{ c := Configuration{
ConfigFolder: "./etc/", ConfigFolder: "./etc/",
RingBufferSize: 1000, RingBufferSize: 1000,
SocketFolder: "./tmp", SocketFolder: "./tmp",
TCPListener: "", TCPListener: "",
HTTPListener: "", HTTPListener: "",
DatabaseFolder: "./var/lib", DatabaseFolder: "./var/lib",
NodeName: "", NodeName: "",
BrokerAddress: "127.0.0.1:4222", BrokerAddress: "127.0.0.1:4222",
NatsConnOptTimeout: 20, NatsConnOptTimeout: 20,
NatsConnectRetryInterval: 10, NatsConnectRetryInterval: 10,
NatsReconnectJitter: 100, NatsReconnectJitter: 100,
NatsReconnectJitterTLS: 1, NatsReconnectJitterTLS: 1,
PublicKeysGetInterval: 60, REQKeysRequestUpdateInterval: 60,
ProfilingPort: "", REQAclRequestUpdateInterval: 60,
PromHostAndPort: "", ProfilingPort: "",
DefaultMessageTimeout: 10, PromHostAndPort: "",
DefaultMessageRetries: 1, DefaultMessageTimeout: 10,
DefaultMethodTimeout: 10, DefaultMessageRetries: 1,
SubscribersDataFolder: "./data", DefaultMethodTimeout: 10,
CentralNodeName: "", SubscribersDataFolder: "./data",
RootCAPath: "", CentralNodeName: "",
NkeySeedFile: "", RootCAPath: "",
ExposeDataFolder: "", NkeySeedFile: "",
ErrorMessageTimeout: 60, ExposeDataFolder: "",
ErrorMessageRetries: 10, ErrorMessageTimeout: 60,
Compression: "", ErrorMessageRetries: 10,
Serialization: "", Compression: "",
SetBlockProfileRate: 0, Serialization: "",
EnableSocket: true, SetBlockProfileRate: 0,
EnableTUI: false, EnableSocket: true,
EnableSignatureCheck: false, EnableTUI: false,
IsCentralAuth: false, EnableSignatureCheck: false,
EnableDebug: false, IsCentralAuth: false,
EnableDebug: false,
StartPubREQHello: 30, StartPubREQHello: 30,
StartPubREQKeysRequestUpdate: true, StartPubREQKeysRequestUpdate: true,
StartSubREQKeysDeliverUpdate: true, StartSubREQKeysDeliverUpdate: true,
StartPubREQAclRequestUpdate: true,
StartSubREQAclDeliverUpdate: true,
StartSubREQErrorLog: false, StartSubREQErrorLog: false,
StartSubREQHello: true, StartSubREQHello: true,
StartSubREQToFileAppend: true, StartSubREQToFileAppend: true,
@ -319,10 +333,15 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else { } else {
conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS
} }
if cf.PublicKeysGetInterval == nil { if cf.REQKeysRequestUpdateInterval == nil {
conf.PublicKeysGetInterval = cd.PublicKeysGetInterval conf.REQKeysRequestUpdateInterval = cd.REQKeysRequestUpdateInterval
} else { } else {
conf.PublicKeysGetInterval = *cf.PublicKeysGetInterval conf.REQKeysRequestUpdateInterval = *cf.REQKeysRequestUpdateInterval
}
if cf.REQAclRequestUpdateInterval == nil {
conf.REQAclRequestUpdateInterval = cd.REQAclRequestUpdateInterval
} else {
conf.REQAclRequestUpdateInterval = *cf.REQAclRequestUpdateInterval
} }
if cf.ProfilingPort == nil { if cf.ProfilingPort == nil {
conf.ProfilingPort = cd.ProfilingPort conf.ProfilingPort = cd.ProfilingPort
@ -442,6 +461,18 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else { } else {
conf.StartSubREQKeysDeliverUpdate = *cf.StartSubREQKeysDeliverUpdate conf.StartSubREQKeysDeliverUpdate = *cf.StartSubREQKeysDeliverUpdate
} }
if cf.StartPubREQAclRequestUpdate == nil {
conf.StartPubREQAclRequestUpdate = cd.StartPubREQAclRequestUpdate
} else {
conf.StartPubREQAclRequestUpdate = *cf.StartPubREQAclRequestUpdate
}
if cf.StartSubREQAclDeliverUpdate == nil {
conf.StartSubREQAclDeliverUpdate = cd.StartSubREQAclDeliverUpdate
} else {
conf.StartSubREQAclDeliverUpdate = *cf.StartSubREQAclDeliverUpdate
}
if cf.StartSubREQErrorLog == nil { if cf.StartSubREQErrorLog == nil {
conf.StartSubREQErrorLog = cd.StartSubREQErrorLog conf.StartSubREQErrorLog = cd.StartSubREQErrorLog
} else { } else {
@ -566,7 +597,8 @@ func (c *Configuration) CheckFlags() error {
flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.") flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.")
flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.") flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.")
flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.") flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.")
flag.IntVar(&c.PublicKeysGetInterval, "publicKeysGetInterval", fc.PublicKeysGetInterval, "default interval in seconds for asking the central for public keys") flag.IntVar(&c.REQKeysRequestUpdateInterval, "REQKeysRequestUpdateInterval", fc.REQKeysRequestUpdateInterval, "default interval in seconds for asking the central for public keys")
flag.IntVar(&c.REQAclRequestUpdateInterval, "REQAclRequestUpdateInterval", fc.REQAclRequestUpdateInterval, "default interval in seconds for asking the central for acl updates")
flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port")
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112")
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level")
@ -588,10 +620,16 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server")
flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR")
// Start of Request publishers/subscribers
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds") flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
flag.BoolVar(&c.StartPubREQKeysRequestUpdate, "startPubREQKeysRequestUpdate", fc.StartPubREQKeysRequestUpdate, "true/false") flag.BoolVar(&c.StartPubREQKeysRequestUpdate, "startPubREQKeysRequestUpdate", fc.StartPubREQKeysRequestUpdate, "true/false")
flag.BoolVar(&c.StartSubREQKeysDeliverUpdate, "startSubREQKeysDeliverUpdate", fc.StartSubREQKeysDeliverUpdate, "true/false") flag.BoolVar(&c.StartSubREQKeysDeliverUpdate, "startSubREQKeysDeliverUpdate", fc.StartSubREQKeysDeliverUpdate, "true/false")
flag.BoolVar(&c.StartPubREQAclRequestUpdate, "startPubREQAclRequestUpdate", fc.StartPubREQAclRequestUpdate, "true/false")
flag.BoolVar(&c.StartSubREQAclDeliverUpdate, "startSubREQAclDeliverUpdate", fc.StartSubREQAclDeliverUpdate, "true/false")
flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false") flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false")
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")

View file

@ -175,8 +175,10 @@ func (p *processes) Start(proc process) {
if proc.configuration.StartPubREQKeysRequestUpdate { if proc.configuration.StartPubREQKeysRequestUpdate {
proc.startup.pubREQKeysRequestUpdate(proc) proc.startup.pubREQKeysRequestUpdate(proc)
// TODO: Putting the acl publisher here. proc.startup.subREQKeysDeliverUpdate(proc)
// Maybe we should also change the name of the configuration flag to something auth related ? }
if proc.configuration.StartPubREQAclRequestUpdate {
proc.startup.pubREQAclRequestUpdate(proc) proc.startup.pubREQAclRequestUpdate(proc)
proc.startup.subREQAclDeliverUpdate(proc) proc.startup.subREQAclDeliverUpdate(proc)
} }
@ -200,9 +202,10 @@ func (p *processes) Start(proc process) {
proc.startup.subREQAclImport(proc) proc.startup.subREQAclImport(proc)
} }
if proc.configuration.StartSubREQKeysDeliverUpdate { // Moved this together with proc.configuration.StartPubREQKeysRequestUpdate since they belong together.
proc.startup.subREQKeysDeliverUpdate(proc) // if proc.configuration.StartSubREQKeysDeliverUpdate {
} // proc.startup.subREQKeysDeliverUpdate(proc)
// }
if proc.configuration.StartSubREQHttpGet { if proc.configuration.StartSubREQHttpGet {
proc.startup.subREQHttpGet(proc) proc.startup.subREQHttpGet(proc)
@ -335,7 +338,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval)) ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQKeysRequestUpdateInterval))
for { for {
// TODO: We could send with the hash of the currently stored keys, // TODO: We could send with the hash of the currently stored keys,
@ -390,7 +393,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval)) ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQAclRequestUpdateInterval))
for { for {
// TODO: We could send with the hash of the currently stored hash, // TODO: We could send with the hash of the currently stored hash,