From d0f1ed32d199a82be744b0be81e290e0bba1dc8e Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 8 Sep 2021 17:57:21 +0200 Subject: [PATCH] removed the values field from flagslice --- configuration_flags.go | 52 ++++++++++---------------------------- process.go | 9 +------ processes.go | 30 +++++++++++----------- server.go | 4 +-- steward_test.go | 20 +++++++-------- subscriber_method_types.go | 2 +- 6 files changed, 43 insertions(+), 74 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index 7bc8ded..6d3dd70 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -6,7 +6,6 @@ import ( "log" "os" "path/filepath" - "strings" toml "github.com/pelletier/go-toml" ) @@ -22,9 +21,8 @@ import ( // it can be used to check if the flag was used and contained any // values. type flagNodeSlice struct { - value string - OK bool - Values []Node + value string + OK bool } // String method @@ -50,28 +48,6 @@ func (f *flagNodeSlice) Set(s string) error { // be reflected in values stored in the config file, since the // config file is written after the flags have been parsed. func (f *flagNodeSlice) Parse() error { - if len(f.value) == 0 { - return nil - } - - split := strings.Split(f.value, ",") - - // Reset values if RST was the flag value. - if split[0] == "RST" { - f.OK = false - f.value = "" - f.Values = []Node{} - return nil - } - - fv := f.value - sp := strings.Split(fv, ",") - f.OK = true - f.Values = []Node{} - - for _, v := range sp { - f.Values = append(f.Values, Node(v)) - } return nil } @@ -171,18 +147,18 @@ func newConfigurationDefaults() Configuration { ErrorMessageTimeout: 60, ErrorMessageRetries: 10, - StartSubREQErrorLog: flagNodeSlice{Values: []Node{}}, - StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQErrorLog: flagNodeSlice{OK: true}, + StartSubREQHello: flagNodeSlice{OK: true}, + StartSubREQToFileAppend: flagNodeSlice{OK: true}, + StartSubREQToFile: flagNodeSlice{OK: true}, + StartSubREQPing: flagNodeSlice{OK: true}, + StartSubREQPong: flagNodeSlice{OK: true}, + StartSubREQCliCommand: flagNodeSlice{OK: true}, + StartSubREQnCliCommand: flagNodeSlice{OK: true}, + StartSubREQToConsole: flagNodeSlice{OK: true}, + StartSubREQHttpGet: flagNodeSlice{OK: true}, + StartSubREQTailFile: flagNodeSlice{OK: true}, + StartSubREQnCliCommandCont: flagNodeSlice{OK: true}, } return c } diff --git a/process.go b/process.go index 611fb3a..6538f87 100644 --- a/process.go +++ b/process.go @@ -79,16 +79,10 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process { +func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ - // make the slice of allowedReceivers into a map value for easy lookup. - m := make(map[Node]struct{}) - for _, a := range allowedReceivers { - m[a] = struct{}{} - } - ctx, cancel := context.WithCancel(ctx) var method Method @@ -100,7 +94,6 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc processID: processes.lastProcessID, errorCh: errCh, processKind: processKind, - allowedReceivers: m, methodsAvailable: method.GetMethodsAvailable(), toRingbufferCh: toRingbufferCh, configuration: configuration, diff --git a/processes.go b/processes.go index 3f910db..a8f14ba 100644 --- a/processes.go +++ b/processes.go @@ -59,7 +59,7 @@ func (p *processes) Start(proc process) { { log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node) sub := newSubject(REQOpCommand, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) go proc.spawnWorker(proc.processes, proc.natsConn) } @@ -155,7 +155,7 @@ func (s startup) subREQHttpGet(p process) { log.Printf("Starting Http Get subscriber: %#v\n", p.node) sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) @@ -165,7 +165,7 @@ func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -210,49 +210,49 @@ func (s startup) pubREQHello(p process) { func (s startup) subREQToConsole(p process) { log.Printf("Starting Text To Console subscriber: %#v\n", p.node) sub := newSubject(REQToConsole, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQnCliCommand(p process) { log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) sub := newSubject(REQnCliCommand, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQCliCommand(p process) { log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) sub := newSubject(REQCliCommand, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPong(p process) { log.Printf("Starting Pong subscriber: %#v\n", p.node) sub := newSubject(REQPong, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPing(p process) { log.Printf("Starting Ping Request subscriber: %#v\n", p.node) sub := newSubject(REQPing, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQErrorLog(p process) { log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQHello(p process) { log.Printf("Starting Hello subscriber: %#v\n", p.node) sub := newSubject(REQHello, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) proc.procFuncCh = make(chan Message) // The reason for running the say hello subscriber as a procFunc is that @@ -290,7 +290,7 @@ func (s startup) subREQHello(p process) { func (s startup) subREQToFile(p process) { log.Printf("Starting text to file subscriber: %#v\n", p.node) sub := newSubject(REQToFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -298,7 +298,7 @@ func (s startup) subREQToFile(p process) { func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -306,7 +306,7 @@ func (s startup) subREQToFileAppend(p process) { func (s startup) subREQTailFile(p process) { log.Printf("Starting tail log files subscriber: %#v\n", p.node) sub := newSubject(REQTailFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -314,7 +314,7 @@ func (s startup) subREQTailFile(p process) { func (s startup) subREQnCliCommandCont(p process) { log.Printf("Starting cli command with continous delivery: %#v\n", p.node) sub := newSubject(REQnCliCommandCont, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -322,7 +322,7 @@ func (s startup) subREQnCliCommandCont(p process) { func (s startup) subREQToSocket(p process) { log.Printf("Starting write to socket subscriber: %#v\n", p.node) sub := newSubject(REQToSocket, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } diff --git a/server.go b/server.go index 5f20e4f..a90bc05 100644 --- a/server.go +++ b/server.go @@ -226,7 +226,7 @@ func (s *server) Start() { // // NB: The context of the initial process are set in processes.Start. sub := newSubject(REQInitial, s.nodeName) - p := newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) + p := newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, "", nil) // Start all wanted subscriber processes. s.processes.Start(p) @@ -407,7 +407,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil) // fmt.Printf("*** %#v\n", proc) proc.spawnWorker(s.processes, s.natsConn) diff --git a/steward_test.go b/steward_test.go index af8698e..d35a80f 100644 --- a/steward_test.go +++ b/steward_test.go @@ -55,16 +55,16 @@ func TestStewardServer(t *testing.T) { DefaultMessageRetries: 1, DefaultMessageTimeout: 3, - StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQCliCommand: flagNodeSlice{OK: true}, + StartSubREQnCliCommand: flagNodeSlice{OK: true}, + StartSubREQnCliCommandCont: flagNodeSlice{OK: true}, + StartSubREQToConsole: flagNodeSlice{OK: true}, + StartSubREQToFileAppend: flagNodeSlice{OK: true}, + StartSubREQToFile: flagNodeSlice{OK: true}, + StartSubREQHello: flagNodeSlice{OK: true}, + StartSubREQErrorLog: flagNodeSlice{OK: true}, + StartSubREQHttpGet: flagNodeSlice{OK: true}, + StartSubREQTailFile: flagNodeSlice{OK: true}, // StartSubREQToSocket: flagNodeSlice{OK: true, Values: []Node{"*"}}, } stewardServer, err := NewServer(conf) diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 9c5930a..cc347bd 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -412,7 +412,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri // Create the process and start it. sub := newSubject(arg.Method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil) + procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)