1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-05 20:09:16 +00:00

specify allowed nodes via flag for errorLogger

This commit is contained in:
postmannen 2021-03-24 10:14:17 +01:00
parent 593842e070
commit 09e7090f83
5 changed files with 51 additions and 36 deletions

View file

@ -12,7 +12,11 @@ import (
func main() { func main() {
c := steward.NewConfiguration() c := steward.NewConfiguration()
c.CheckFlags() err := c.CheckFlags()
if err != nil {
log.Printf("%v\n", err)
return
}
// Start profiling if profiling port is specified // Start profiling if profiling port is specified
if c.ProfilingPort != "" { if c.ProfilingPort != "" {

View file

@ -3,7 +3,6 @@ package steward
import ( import (
"flag" "flag"
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -21,13 +20,13 @@ import (
// the []values field, and the `ok` field will be set to true, so // the []values field, and the `ok` field will be set to true, so
// it can be used to check if the flag was used and contained any // it can be used to check if the flag was used and contained any
// values. // values.
type flagStringSlice struct { type flagNodeSlice struct {
value string value string
ok bool ok bool
values []string values []node
} }
func (f *flagStringSlice) String() string { func (f *flagNodeSlice) String() string {
return "" return ""
} }
@ -35,13 +34,13 @@ func (f *flagStringSlice) String() string {
// It will put the comma separated string value given as input into // It will put the comma separated string value given as input into
// the `value`field, then call the Parse function to split those // the `value`field, then call the Parse function to split those
// comma separated values into []values. // comma separated values into []values.
func (f *flagStringSlice) Set(s string) error { func (f *flagNodeSlice) Set(s string) error {
f.value = s f.value = s
f.Parse() f.Parse()
return nil return nil
} }
func (f *flagStringSlice) Parse() error { func (f *flagNodeSlice) Parse() error {
if len(f.value) == 0 { if len(f.value) == 0 {
return nil return nil
} }
@ -49,7 +48,10 @@ func (f *flagStringSlice) Parse() error {
fv := f.value fv := f.value
sp := strings.Split(fv, ",") sp := strings.Split(fv, ",")
f.ok = true f.ok = true
f.values = sp
for _, v := range sp {
f.values = append(f.values, node(v))
}
return nil return nil
} }
@ -75,9 +77,9 @@ type Configuration struct {
SubscribersDataFolder string SubscribersDataFolder string
// central node to receive messages published from nodes // central node to receive messages published from nodes
CentralNodeName string CentralNodeName string
// HERE: SHOULD TAKE STRING // Start the central error logger.
// ALSO RENAME ALL STARTING FLAGS TO BEGIN WITH START // Takes a comma separated string of nodes to receive from or "*" for all nodes.
StartCentralErrorLogger bool StartCentralErrorLogger flagNodeSlice
// default message timeout in seconds. This can be overridden on the message level") // default message timeout in seconds. This can be overridden on the message level")
StartSayHelloSubscriber bool StartSayHelloSubscriber bool
} }
@ -94,7 +96,7 @@ func newConfigurationDefaults() Configuration {
BrokerAddress: "127.0.0.1:4222", BrokerAddress: "127.0.0.1:4222",
ProfilingPort: "", ProfilingPort: "",
PromHostAndPort: "", PromHostAndPort: "",
StartCentralErrorLogger: false, StartCentralErrorLogger: flagNodeSlice{values: []node{}},
DefaultMessageTimeout: 10, DefaultMessageTimeout: 10,
DefaultMessageRetries: 1, DefaultMessageRetries: 1,
PublisherServiceSayhello: 30, PublisherServiceSayhello: 30,
@ -104,36 +106,49 @@ func newConfigurationDefaults() Configuration {
return c return c
} }
func (c *Configuration) CheckFlags() { func (c *Configuration) CheckFlags() error {
// Create an empty default config // Create an empty default config
var fc Configuration var fc Configuration
// NB: Disabling the config file options for now.
// Read file config. Set system default if it can't find config file. // Read file config. Set system default if it can't find config file.
fc, err := c.ReadConfigFile() //fc, err := c.ReadConfigFile()
if err != nil { //if err != nil {
log.Printf("%v\n", err) // log.Printf("%v\n", err)
fc = newConfigurationDefaults() fc = newConfigurationDefaults()
} //}
flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.") flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.")
flag.StringVar(&c.NodeName, "node", fc.NodeName, "some unique string to identify this Edge unit") flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit")
flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker") flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker")
flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port")
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112")
flag.BoolVar(&c.StartCentralErrorLogger, "startCentralErrorLogger", fc.StartCentralErrorLogger, "set to true if this is the node that should receive the error log's from other nodes")
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level")
flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", fc.DefaultMessageRetries, "default amount of retries that will be done before a message is thrown away, and out of the system") flag.IntVar(&c.DefaultMessageRetries, "defaultMessageRetries", fc.DefaultMessageRetries, "default amount of retries that will be done before a message is thrown away, and out of the system")
flag.IntVar(&c.PublisherServiceSayhello, "publisherServiceSayhello", fc.PublisherServiceSayhello, "Make the current node send hello messages to central at given interval in seconds") flag.IntVar(&c.PublisherServiceSayhello, "publisherServiceSayhello", fc.PublisherServiceSayhello, "Make the current node send hello messages to central at given interval in seconds")
flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed") flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed")
flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node") flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node")
flag.Var(&c.StartCentralErrorLogger, "startCentralErrorLogger", "When value are given this node will become central error logger. Value can be \"*\" to receive from all hosts, or a comma separated list of hosts to allow processing messages from can be specified, like \"node1,node2\"")
flag.Parse() flag.Parse()
if err := c.WriteConfigFile(); err != nil { // Check that mandatory flag values have been set.
log.Printf("error: checkFlags: failed writing config file: %v\n", err) switch {
os.Exit(1) case c.NodeName == "":
return fmt.Errorf("error: the nodeName config option or flag cannot be empty, check -help")
case c.CentralNodeName == "":
return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help")
} }
// // NB: Disabling the config file options for now.
// if err := c.WriteConfigFile(); err != nil {
// log.Printf("error: checkFlags: failed writing config file: %v\n", err)
// os.Exit(1)
// }
return nil
} }
// Reads the current config file from disk. // Reads the current config file from disk.

View file

@ -1,12 +1,13 @@
BrokerAddress = "0" BrokerAddress = "127.0.0.1:4222"
CentralNodeName = "central" CentralNodeName = "central"
ConfigFolder = "./etc" ConfigFolder = "./etc"
DefaultMessageRetries = 3 DefaultMessageRetries = 3
DefaultMessageTimeout = 5 DefaultMessageTimeout = 5
NodeName = "central" NodeName = "central"
ProfilingPort = "6060" ProfilingPort = ""
PromHostAndPort = ":2112" PromHostAndPort = ""
PublisherServiceSayhello = 0 PublisherServiceSayhello = 30
StartCentralErrorLogger = false
StartSayHelloSubscriber = false StartSayHelloSubscriber = false
SubscribersDataFolder = "./data" SubscribersDataFolder = "./data"
[StartCentralErrorLogger]

View file

@ -56,12 +56,12 @@ func (s *server) ProcessesStart() {
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
if s.configuration.StartCentralErrorLogger { if s.configuration.StartCentralErrorLogger.ok {
// Start a subscriber for ErrorLog messages // Start a subscriber for ErrorLog messages
{ {
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(ErrorLog, EventNACK, "errorCentral") sub := newSubject(ErrorLog, EventNACK, "errorCentral")
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartCentralErrorLogger.values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }

View file

@ -80,7 +80,7 @@ func NewServer(c *Configuration) (*server, error) {
metrics: newMetrics(c.PromHostAndPort), metrics: newMetrics(c.PromHostAndPort),
// REMOVED: // REMOVED:
//publisherServices: newPublisherServices(c.PublisherServiceSayhello), //publisherServices: newPublisherServices(c.PublisherServiceSayhello),
centralErrorLogger: c.StartCentralErrorLogger, centralErrorLogger: c.StartCentralErrorLogger.ok,
} }
// Create the default data folder for where subscribers should // Create the default data folder for where subscribers should
@ -99,11 +99,6 @@ func NewServer(c *Configuration) (*server, error) {
log.Printf("info: Creating subscribers data folder at %v\n", c.SubscribersDataFolder) log.Printf("info: Creating subscribers data folder at %v\n", c.SubscribersDataFolder)
} }
// The node name of the central server have to be set.
if s.configuration.CentralNodeName == "" {
return nil, fmt.Errorf("error: the centralNodeName config option or flag cannot be empty")
}
return s, nil return s, nil
} }