mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
flagNodeSlice flags gets updated in config file. Made RST flag value to reset config values
This commit is contained in:
parent
09e7090f83
commit
04f7ca013b
5 changed files with 76 additions and 21 deletions
36
README.md
36
README.md
|
@ -36,12 +36,20 @@ All code in this repository are to be concidered not-production-ready. The code
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
|
### Messages in order
|
||||||
|
|
||||||
- By default the system guarantees that the order of the messages are handled by the subscriber in the order they where sent. So if a network link is down when the message is being sent, it will automatically be rescheduled at the specified interval with the given number of retries.
|
- By default the system guarantees that the order of the messages are handled by the subscriber in the order they where sent. So if a network link is down when the message is being sent, it will automatically be rescheduled at the specified interval with the given number of retries.
|
||||||
|
|
||||||
|
### Messages not in order
|
||||||
|
|
||||||
- There have been implemented a special type `NOSEQ` which will allow messages to be handled within that process in a not sequential manner. This is handy for jobs that will run for a long time, and where other messages are not dependent on it's result.
|
- There have been implemented a special type `NOSEQ` which will allow messages to be handled within that process in a not sequential manner. This is handy for jobs that will run for a long time, and where other messages are not dependent on it's result.
|
||||||
|
|
||||||
|
### Error messages from nodes
|
||||||
|
|
||||||
- Error messages will be sent back to the central error handler upon failure on a node.
|
- Error messages will be sent back to the central error handler upon failure on a node.
|
||||||
|
|
||||||
|
### Message handling and threads
|
||||||
|
|
||||||
- The handling of all messages is done by spawning up a process for the handling the message in it's own thread. This allows us to individually down to the message level keep the state for each message both in regards to ACK's, error handling, send retries, and rerun of a method for a message if the first run was not successful.
|
- The handling of all messages is done by spawning up a process for the handling the message in it's own thread. This allows us to individually down to the message level keep the state for each message both in regards to ACK's, error handling, send retries, and rerun of a method for a message if the first run was not successful.
|
||||||
|
|
||||||
- Processes for handling messages on a host can be restarted upon failure, or asked to just terminate and send a message back to the operator that something have gone seriously wrong. This is right now just partially implemented to test that the concept works.
|
- Processes for handling messages on a host can be restarted upon failure, or asked to just terminate and send a message back to the operator that something have gone seriously wrong. This is right now just partially implemented to test that the concept works.
|
||||||
|
@ -58,13 +66,33 @@ All code in this repository are to be concidered not-production-ready. The code
|
||||||
|
|
||||||
- Default timeouts to wait for ACK messages and max attempts to retry sending a message specified upon startup. This can be overridden on the message level.
|
- Default timeouts to wait for ACK messages and max attempts to retry sending a message specified upon startup. This can be overridden on the message level.
|
||||||
|
|
||||||
- Report errors happening on some node in to central error handler.
|
|
||||||
|
|
||||||
- Message types of both ACK and NACK, so we can decide if we want or don't want an Acknowledge if a message was delivered succesfully.
|
- Message types of both ACK and NACK, so we can decide if we want or don't want an Acknowledge if a message was delivered succesfully.
|
||||||
Example: We probably want an ACK when sending some CLICommand to be executed, but we don't care for an acknowledge (NACK) when we send an "hello I'm here" event.
|
Example: We probably want an ACK when sending some CLICommand to be executed, but we don't care for an acknowledge (NACK) when we send an "hello I'm here" event.
|
||||||
|
|
||||||
|
### Flags and configuration file
|
||||||
|
|
||||||
|
Steward supports both the use of flags/arguments set at startup, and the use of a config file. But how it is used might be a little different than how similar use is normally done.
|
||||||
|
|
||||||
|
A default config file will be created at first startup if one does not exist, with standard defaults values set. Any value also provided via a flag will also be written to the config file. If Steward is restarted the current content of the config file will be used as the new defaults. Said with other words, if you restart Steward without any flags specified the values of the last run will be read from the config file and used.
|
||||||
|
|
||||||
|
If new values are provided via flags they will take precedence over the ones currently in the config file, and they will also replace the current value in the config file, making it the default for the next restart.
|
||||||
|
|
||||||
|
The only exception from the above are the `startSubscriberX` flags which got one extra value that can be used which is the value `RST` for Reset. This will disable the specified subscriber, and also null out the array for which Nodes the subscriber will allow traffic from.
|
||||||
|
|
||||||
|
The config file can also be edited directly, making the use of flags not needed.
|
||||||
|
|
||||||
|
If just getting back to standard default for all config options needed, then delete the current config file, restart Steward, and a new config file with all the options set to it's default values will be created.
|
||||||
|
|
||||||
|
### Errors reporting
|
||||||
|
|
||||||
|
- Report errors happening on some node in to central error handler.
|
||||||
|
|
||||||
|
### Prometheus metrics
|
||||||
|
|
||||||
- Prometheus exporters for Metrics
|
- Prometheus exporters for Metrics
|
||||||
|
|
||||||
|
### Other
|
||||||
|
|
||||||
- More will come. In active development.
|
- More will come. In active development.
|
||||||
|
|
||||||
## Howto
|
## Howto
|
||||||
|
@ -106,6 +134,10 @@ One the nodes out there
|
||||||
|
|
||||||
Use the `--help` flag to get all possibilities.
|
Use the `--help` flag to get all possibilities.
|
||||||
|
|
||||||
|
#### Start subscriber flags
|
||||||
|
|
||||||
|
The start subscribers flags take a string value of which nodes that it will process messages from. Since using a flag to set a value automatically sets that value also in the config file, a value of RST can be given to turn off the subscriber.
|
||||||
|
|
||||||
### Message fields explanation
|
### Message fields explanation
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
|
|
@ -3,6 +3,7 @@ package steward
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -22,8 +23,8 @@ import (
|
||||||
// values.
|
// values.
|
||||||
type flagNodeSlice struct {
|
type flagNodeSlice struct {
|
||||||
value string
|
value string
|
||||||
ok bool
|
OK bool
|
||||||
values []node
|
Values []node
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *flagNodeSlice) String() string {
|
func (f *flagNodeSlice) String() string {
|
||||||
|
@ -40,17 +41,35 @@ func (f *flagNodeSlice) Set(s string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the flag value "RST" is given, set the default values
|
||||||
|
// for each of the flag var's fields, and return back.
|
||||||
|
// Since we reset the actual flag values, it is also these
|
||||||
|
// blank values that will be written to the config file, and
|
||||||
|
// making the change persistent in the system. This will also
|
||||||
|
// be reflected in values stored in the config file, since the
|
||||||
|
// config file is written after the flags have been parsed.
|
||||||
func (f *flagNodeSlice) Parse() error {
|
func (f *flagNodeSlice) Parse() error {
|
||||||
if len(f.value) == 0 {
|
if len(f.value) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
split := strings.Split(f.value, ",")
|
||||||
|
|
||||||
|
// Reset values if RST was the flag value.
|
||||||
|
if split[0] == "RST" {
|
||||||
|
f.OK = false
|
||||||
|
f.value = ""
|
||||||
|
f.Values = []node{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
fv := f.value
|
fv := f.value
|
||||||
sp := strings.Split(fv, ",")
|
sp := strings.Split(fv, ",")
|
||||||
f.ok = true
|
f.OK = true
|
||||||
|
f.Values = []node{}
|
||||||
|
|
||||||
for _, v := range sp {
|
for _, v := range sp {
|
||||||
f.values = append(f.values, node(v))
|
f.Values = append(f.Values, node(v))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -96,7 +115,7 @@ func newConfigurationDefaults() Configuration {
|
||||||
BrokerAddress: "127.0.0.1:4222",
|
BrokerAddress: "127.0.0.1:4222",
|
||||||
ProfilingPort: "",
|
ProfilingPort: "",
|
||||||
PromHostAndPort: "",
|
PromHostAndPort: "",
|
||||||
StartCentralErrorLogger: flagNodeSlice{values: []node{}},
|
StartCentralErrorLogger: flagNodeSlice{Values: []node{}},
|
||||||
DefaultMessageTimeout: 10,
|
DefaultMessageTimeout: 10,
|
||||||
DefaultMessageRetries: 1,
|
DefaultMessageRetries: 1,
|
||||||
PublisherServiceSayhello: 30,
|
PublisherServiceSayhello: 30,
|
||||||
|
@ -113,11 +132,13 @@ func (c *Configuration) CheckFlags() error {
|
||||||
|
|
||||||
// NB: Disabling the config file options for now.
|
// NB: Disabling the config file options for now.
|
||||||
// Read file config. Set system default if it can't find config file.
|
// Read file config. Set system default if it can't find config file.
|
||||||
//fc, err := c.ReadConfigFile()
|
fc, err := c.ReadConfigFile()
|
||||||
//if err != nil {
|
if err != nil {
|
||||||
// log.Printf("%v\n", err)
|
log.Printf("%v\n", err)
|
||||||
fc = newConfigurationDefaults()
|
fc = newConfigurationDefaults()
|
||||||
//}
|
}
|
||||||
|
|
||||||
|
*c = fc
|
||||||
|
|
||||||
flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.")
|
flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.")
|
||||||
flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit")
|
flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit")
|
||||||
|
@ -130,7 +151,7 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed")
|
flag.StringVar(&c.SubscribersDataFolder, "subscribersDataFolder", fc.SubscribersDataFolder, "The data folder where subscribers are allowed to write their data if needed")
|
||||||
flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node")
|
flag.StringVar(&c.CentralNodeName, "centralNodeName", fc.CentralNodeName, "The name of the central node to receive messages published by this node")
|
||||||
|
|
||||||
flag.Var(&c.StartCentralErrorLogger, "startCentralErrorLogger", "When value are given this node will become central error logger. Value can be \"*\" to receive from all hosts, or a comma separated list of hosts to allow processing messages from can be specified, like \"node1,node2\"")
|
flag.Var(&c.StartCentralErrorLogger, "startCentralErrorLogger", "When value are given this node will become central error logger. Value can be \"*\" to receive from all hosts, or a comma separated list of hosts to allow processing messages from can be specified, like \"node1,node2\". Use the value RST to reset the value set in the config file to turn off the process.")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
@ -142,11 +163,11 @@ func (c *Configuration) CheckFlags() error {
|
||||||
return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help")
|
return fmt.Errorf("error: the centralNodeName config option or flag cannot be empty, check -help")
|
||||||
}
|
}
|
||||||
|
|
||||||
// // NB: Disabling the config file options for now.
|
// NB: Disabling the config file options for now.
|
||||||
// if err := c.WriteConfigFile(); err != nil {
|
if err := c.WriteConfigFile(); err != nil {
|
||||||
// log.Printf("error: checkFlags: failed writing config file: %v\n", err)
|
log.Printf("error: checkFlags: failed writing config file: %v\n", err)
|
||||||
// os.Exit(1)
|
os.Exit(1)
|
||||||
// }
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,3 +11,5 @@ StartSayHelloSubscriber = false
|
||||||
SubscribersDataFolder = "./data"
|
SubscribersDataFolder = "./data"
|
||||||
|
|
||||||
[StartCentralErrorLogger]
|
[StartCentralErrorLogger]
|
||||||
|
OK = false
|
||||||
|
Values = []
|
||||||
|
|
|
@ -56,12 +56,12 @@ func (s *server) ProcessesStart() {
|
||||||
go proc.spawnWorker(s)
|
go proc.spawnWorker(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.configuration.StartCentralErrorLogger.ok {
|
if s.configuration.StartCentralErrorLogger.OK {
|
||||||
// Start a subscriber for ErrorLog messages
|
// Start a subscriber for ErrorLog messages
|
||||||
{
|
{
|
||||||
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
|
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
|
||||||
sub := newSubject(ErrorLog, EventNACK, "errorCentral")
|
sub := newSubject(ErrorLog, EventNACK, "errorCentral")
|
||||||
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartCentralErrorLogger.values, nil)
|
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartCentralErrorLogger.Values, nil)
|
||||||
go proc.spawnWorker(s)
|
go proc.spawnWorker(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ func NewServer(c *Configuration) (*server, error) {
|
||||||
metrics: newMetrics(c.PromHostAndPort),
|
metrics: newMetrics(c.PromHostAndPort),
|
||||||
// REMOVED:
|
// REMOVED:
|
||||||
//publisherServices: newPublisherServices(c.PublisherServiceSayhello),
|
//publisherServices: newPublisherServices(c.PublisherServiceSayhello),
|
||||||
centralErrorLogger: c.StartCentralErrorLogger.ok,
|
centralErrorLogger: c.StartCentralErrorLogger.OK,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the default data folder for where subscribers should
|
// Create the default data folder for where subscribers should
|
||||||
|
|
Loading…
Reference in a new issue