From 09e7090f83d3a595954b6a3e84ac1d48e84bf81b Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 24 Mar 2021 10:14:17 +0100 Subject: [PATCH] specify allowed nodes via flag for errorLogger --- cmd/main.go | 6 +++- configurationAndFlags.go | 59 +++++++++++++++++++++++++--------------- etc/config.toml | 11 ++++---- runProcessesAtStartup.go | 4 +-- server.go | 7 +---- 5 files changed, 51 insertions(+), 36 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ef2247d..b5d4e4c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,7 +12,11 @@ import ( func main() { c := steward.NewConfiguration() - c.CheckFlags() + err := c.CheckFlags() + if err != nil { + log.Printf("%v\n", err) + return + } // Start profiling if profiling port is specified if c.ProfilingPort != "" { diff --git a/configurationAndFlags.go b/configurationAndFlags.go index 0021d06..9ee3fc0 100644 --- a/configurationAndFlags.go +++ b/configurationAndFlags.go @@ -3,7 +3,6 @@ package steward import ( "flag" "fmt" - "log" "os" "path/filepath" "strings" @@ -21,13 +20,13 @@ import ( // the []values field, and the `ok` field will be set to true, so // it can be used to check if the flag was used and contained any // values. -type flagStringSlice struct { +type flagNodeSlice struct { value string ok bool - values []string + values []node } -func (f *flagStringSlice) String() string { +func (f *flagNodeSlice) String() string { return "" } @@ -35,13 +34,13 @@ func (f *flagStringSlice) String() string { // It will put the comma separated string value given as input into // the `value`field, then call the Parse function to split those // comma separated values into []values. -func (f *flagStringSlice) Set(s string) error { +func (f *flagNodeSlice) Set(s string) error { f.value = s f.Parse() return nil } -func (f *flagStringSlice) Parse() error { +func (f *flagNodeSlice) Parse() error { if len(f.value) == 0 { return nil } @@ -49,7 +48,10 @@ func (f *flagStringSlice) Parse() error { fv := f.value sp := strings.Split(fv, ",") f.ok = true - f.values = sp + + for _, v := range sp { + f.values = append(f.values, node(v)) + } return nil } @@ -75,9 +77,9 @@ type Configuration struct { SubscribersDataFolder string // central node to receive messages published from nodes CentralNodeName string - // HERE: SHOULD TAKE STRING - // ALSO RENAME ALL STARTING FLAGS TO BEGIN WITH START - StartCentralErrorLogger bool + // Start the central error logger. + // Takes a comma separated string of nodes to receive from or "*" for all nodes. + StartCentralErrorLogger flagNodeSlice // default message timeout in seconds. This can be overridden on the message level") StartSayHelloSubscriber bool } @@ -94,7 +96,7 @@ func newConfigurationDefaults() Configuration { BrokerAddress: "127.0.0.1:4222", ProfilingPort: "", PromHostAndPort: "", - StartCentralErrorLogger: false, + StartCentralErrorLogger: flagNodeSlice{values: []node{}}, DefaultMessageTimeout: 10, DefaultMessageRetries: 1, PublisherServiceSayhello: 30, @@ -104,36 +106,49 @@ func newConfigurationDefaults() Configuration { return c } -func (c *Configuration) CheckFlags() { +func (c *Configuration) CheckFlags() error { // Create an empty default config var fc Configuration + // NB: Disabling the config file options for now. // Read file config. Set system default if it can't find config file. - fc, err := c.ReadConfigFile() - if err != nil { - log.Printf("%v\n", err) - fc = newConfigurationDefaults() - } + //fc, err := c.ReadConfigFile() + //if err != nil { + // log.Printf("%v\n", err) + fc = newConfigurationDefaults() + //} flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.") - flag.StringVar(&c.NodeName, "node", fc.NodeName, "some unique string to identify this Edge unit") + flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit") flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") - flag.BoolVar(&c.StartCentralErrorLogger, "startCentralErrorLogger", fc.StartCentralErrorLogger, "set to true if this is the node that should receive the error log's from other nodes") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", fc.DefaultMessageRetries, "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.PublisherServiceSayhello, "publisherServiceSayhello", fc.PublisherServiceSayhello, "Make the current node send hello messages to central at given interval in seconds") flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed") flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node") + flag.Var(&c.StartCentralErrorLogger, "startCentralErrorLogger", "When value are given this node will become central error logger. Value can be \"*\" to receive from all hosts, or a comma separated list of hosts to allow processing messages from can be specified, like \"node1,node2\"") + flag.Parse() - if err := c.WriteConfigFile(); err != nil { - log.Printf("error: checkFlags: failed writing config file: %v\n", err) - os.Exit(1) + // Check that mandatory flag values have been set. + switch { + case c.NodeName == "": + return fmt.Errorf("error: the nodeName config option or flag cannot be empty, check -help") + case c.CentralNodeName == "": + return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help") } + + // // NB: Disabling the config file options for now. + // if err := c.WriteConfigFile(); err != nil { + // log.Printf("error: checkFlags: failed writing config file: %v\n", err) + // os.Exit(1) + // } + + return nil } // Reads the current config file from disk. diff --git a/etc/config.toml b/etc/config.toml index 623412c..f6f43c8 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -1,12 +1,13 @@ -BrokerAddress = "0" +BrokerAddress = "127.0.0.1:4222" CentralNodeName = "central" ConfigFolder = "./etc" DefaultMessageRetries = 3 DefaultMessageTimeout = 5 NodeName = "central" -ProfilingPort = "6060" -PromHostAndPort = ":2112" -PublisherServiceSayhello = 0 -StartCentralErrorLogger = false +ProfilingPort = "" +PromHostAndPort = "" +PublisherServiceSayhello = 30 StartSayHelloSubscriber = false SubscribersDataFolder = "./data" + +[StartCentralErrorLogger] diff --git a/runProcessesAtStartup.go b/runProcessesAtStartup.go index dd0ae36..69ea36e 100644 --- a/runProcessesAtStartup.go +++ b/runProcessesAtStartup.go @@ -56,12 +56,12 @@ func (s *server) ProcessesStart() { go proc.spawnWorker(s) } - if s.configuration.StartCentralErrorLogger { + if s.configuration.StartCentralErrorLogger.ok { // Start a subscriber for ErrorLog messages { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartCentralErrorLogger.values, nil) go proc.spawnWorker(s) } } diff --git a/server.go b/server.go index e84c951..e9c9177 100644 --- a/server.go +++ b/server.go @@ -80,7 +80,7 @@ func NewServer(c *Configuration) (*server, error) { metrics: newMetrics(c.PromHostAndPort), // REMOVED: //publisherServices: newPublisherServices(c.PublisherServiceSayhello), - centralErrorLogger: c.StartCentralErrorLogger, + centralErrorLogger: c.StartCentralErrorLogger.ok, } // Create the default data folder for where subscribers should @@ -99,11 +99,6 @@ func NewServer(c *Configuration) (*server, error) { log.Printf("info: Creating subscribers data folder at %v\n", c.SubscribersDataFolder) } - // The node name of the central server have to be set. - if s.configuration.CentralNodeName == "" { - return nil, fmt.Errorf("error: the centralNodeName config option or flag cannot be empty") - } - return s, nil }