1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed the values field from flagslice

This commit is contained in:
postmannen 2021-09-08 17:57:21 +02:00
parent cd3de53b30
commit d0f1ed32d1
6 changed files with 43 additions and 74 deletions

View file

@ -6,7 +6,6 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strings"
toml "github.com/pelletier/go-toml" toml "github.com/pelletier/go-toml"
) )
@ -22,9 +21,8 @@ import (
// it can be used to check if the flag was used and contained any // it can be used to check if the flag was used and contained any
// values. // values.
type flagNodeSlice struct { type flagNodeSlice struct {
value string value string
OK bool OK bool
Values []Node
} }
// String method // String method
@ -50,28 +48,6 @@ func (f *flagNodeSlice) Set(s string) error {
// be reflected in values stored in the config file, since the // be reflected in values stored in the config file, since the
// config file is written after the flags have been parsed. // config file is written after the flags have been parsed.
func (f *flagNodeSlice) Parse() error { func (f *flagNodeSlice) Parse() error {
if len(f.value) == 0 {
return nil
}
split := strings.Split(f.value, ",")
// Reset values if RST was the flag value.
if split[0] == "RST" {
f.OK = false
f.value = ""
f.Values = []Node{}
return nil
}
fv := f.value
sp := strings.Split(fv, ",")
f.OK = true
f.Values = []Node{}
for _, v := range sp {
f.Values = append(f.Values, Node(v))
}
return nil return nil
} }
@ -171,18 +147,18 @@ func newConfigurationDefaults() Configuration {
ErrorMessageTimeout: 60, ErrorMessageTimeout: 60,
ErrorMessageRetries: 10, ErrorMessageRetries: 10,
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}}, StartSubREQErrorLog: flagNodeSlice{OK: true},
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQHello: flagNodeSlice{OK: true},
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToFileAppend: flagNodeSlice{OK: true},
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToFile: flagNodeSlice{OK: true},
StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQPing: flagNodeSlice{OK: true},
StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQPong: flagNodeSlice{OK: true},
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQCliCommand: flagNodeSlice{OK: true},
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQnCliCommand: flagNodeSlice{OK: true},
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToConsole: flagNodeSlice{OK: true},
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQHttpGet: flagNodeSlice{OK: true},
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQTailFile: flagNodeSlice{OK: true},
StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQnCliCommandCont: flagNodeSlice{OK: true},
} }
return c return c
} }

View file

@ -79,16 +79,10 @@ type process struct {
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process { func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, procFunc func() error) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
processes.lastProcessID++ processes.lastProcessID++
// make the slice of allowedReceivers into a map value for easy lookup.
m := make(map[Node]struct{})
for _, a := range allowedReceivers {
m[a] = struct{}{}
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
var method Method var method Method
@ -100,7 +94,6 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc
processID: processes.lastProcessID, processID: processes.lastProcessID,
errorCh: errCh, errorCh: errCh,
processKind: processKind, processKind: processKind,
allowedReceivers: m,
methodsAvailable: method.GetMethodsAvailable(), methodsAvailable: method.GetMethodsAvailable(),
toRingbufferCh: toRingbufferCh, toRingbufferCh: toRingbufferCh,
configuration: configuration, configuration: configuration,

View file

@ -59,7 +59,7 @@ func (p *processes) Start(proc process) {
{ {
log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node) log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node)
sub := newSubject(REQOpCommand, string(proc.node)) sub := newSubject(REQOpCommand, string(proc.node))
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil) proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(proc.processes, proc.natsConn) go proc.spawnWorker(proc.processes, proc.natsConn)
} }
@ -155,7 +155,7 @@ func (s startup) subREQHttpGet(p process) {
log.Printf("Starting Http Get subscriber: %#v\n", p.node) log.Printf("Starting Http Get subscriber: %#v\n", p.node)
sub := newSubject(REQHttpGet, string(p.node)) sub := newSubject(REQHttpGet, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
@ -165,7 +165,7 @@ func (s startup) pubREQHello(p process) {
log.Printf("Starting Hello Publisher: %#v\n", p.node) log.Printf("Starting Hello Publisher: %#v\n", p.node)
sub := newSubject(REQHello, p.configuration.CentralNodeName) sub := newSubject(REQHello, p.configuration.CentralNodeName)
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil)
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = procFunc( proc.procFunc = procFunc(
@ -210,49 +210,49 @@ func (s startup) pubREQHello(p process) {
func (s startup) subREQToConsole(p process) { func (s startup) subREQToConsole(p process) {
log.Printf("Starting Text To Console subscriber: %#v\n", p.node) log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
sub := newSubject(REQToConsole, string(p.node)) sub := newSubject(REQToConsole, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQnCliCommand(p process) { func (s startup) subREQnCliCommand(p process) {
log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node)
sub := newSubject(REQnCliCommand, string(p.node)) sub := newSubject(REQnCliCommand, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQCliCommand(p process) { func (s startup) subREQCliCommand(p process) {
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
sub := newSubject(REQCliCommand, string(p.node)) sub := newSubject(REQCliCommand, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQPong(p process) { func (s startup) subREQPong(p process) {
log.Printf("Starting Pong subscriber: %#v\n", p.node) log.Printf("Starting Pong subscriber: %#v\n", p.node)
sub := newSubject(REQPong, string(p.node)) sub := newSubject(REQPong, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQPing(p process) { func (s startup) subREQPing(p process) {
log.Printf("Starting Ping Request subscriber: %#v\n", p.node) log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
sub := newSubject(REQPing, string(p.node)) sub := newSubject(REQPing, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQErrorLog(p process) { func (s startup) subREQErrorLog(p process) {
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
sub := newSubject(REQErrorLog, "errorCentral") sub := newSubject(REQErrorLog, "errorCentral")
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
func (s startup) subREQHello(p process) { func (s startup) subREQHello(p process) {
log.Printf("Starting Hello subscriber: %#v\n", p.node) log.Printf("Starting Hello subscriber: %#v\n", p.node)
sub := newSubject(REQHello, string(p.node)) sub := newSubject(REQHello, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
proc.procFuncCh = make(chan Message) proc.procFuncCh = make(chan Message)
// The reason for running the say hello subscriber as a procFunc is that // The reason for running the say hello subscriber as a procFunc is that
@ -290,7 +290,7 @@ func (s startup) subREQHello(p process) {
func (s startup) subREQToFile(p process) { func (s startup) subREQToFile(p process) {
log.Printf("Starting text to file subscriber: %#v\n", p.node) log.Printf("Starting text to file subscriber: %#v\n", p.node)
sub := newSubject(REQToFile, string(p.node)) sub := newSubject(REQToFile, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
@ -298,7 +298,7 @@ func (s startup) subREQToFile(p process) {
func (s startup) subREQToFileAppend(p process) { func (s startup) subREQToFileAppend(p process) {
log.Printf("Starting text logging subscriber: %#v\n", p.node) log.Printf("Starting text logging subscriber: %#v\n", p.node)
sub := newSubject(REQToFileAppend, string(p.node)) sub := newSubject(REQToFileAppend, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
@ -306,7 +306,7 @@ func (s startup) subREQToFileAppend(p process) {
func (s startup) subREQTailFile(p process) { func (s startup) subREQTailFile(p process) {
log.Printf("Starting tail log files subscriber: %#v\n", p.node) log.Printf("Starting tail log files subscriber: %#v\n", p.node)
sub := newSubject(REQTailFile, string(p.node)) sub := newSubject(REQTailFile, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
@ -314,7 +314,7 @@ func (s startup) subREQTailFile(p process) {
func (s startup) subREQnCliCommandCont(p process) { func (s startup) subREQnCliCommandCont(p process) {
log.Printf("Starting cli command with continous delivery: %#v\n", p.node) log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
sub := newSubject(REQnCliCommandCont, string(p.node)) sub := newSubject(REQnCliCommandCont, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }
@ -322,7 +322,7 @@ func (s startup) subREQnCliCommandCont(p process) {
func (s startup) subREQToSocket(p process) { func (s startup) subREQToSocket(p process) {
log.Printf("Starting write to socket subscriber: %#v\n", p.node) log.Printf("Starting write to socket subscriber: %#v\n", p.node)
sub := newSubject(REQToSocket, string(p.node)) sub := newSubject(REQToSocket, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil) proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(p.processes, p.natsConn) go proc.spawnWorker(p.processes, p.natsConn)
} }

View file

@ -226,7 +226,7 @@ func (s *server) Start() {
// //
// NB: The context of the initial process are set in processes.Start. // NB: The context of the initial process are set in processes.Start.
sub := newSubject(REQInitial, s.nodeName) sub := newSubject(REQInitial, s.nodeName)
p := newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) p := newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, "", nil)
// Start all wanted subscriber processes. // Start all wanted subscriber processes.
s.processes.Start(p) s.processes.Start(p)
@ -407,7 +407,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s.processes, s.natsConn) proc.spawnWorker(s.processes, s.natsConn)

View file

@ -55,16 +55,16 @@ func TestStewardServer(t *testing.T) {
DefaultMessageRetries: 1, DefaultMessageRetries: 1,
DefaultMessageTimeout: 3, DefaultMessageTimeout: 3,
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQCliCommand: flagNodeSlice{OK: true},
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQnCliCommand: flagNodeSlice{OK: true},
StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQnCliCommandCont: flagNodeSlice{OK: true},
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToConsole: flagNodeSlice{OK: true},
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToFileAppend: flagNodeSlice{OK: true},
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToFile: flagNodeSlice{OK: true},
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQHello: flagNodeSlice{OK: true},
StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQErrorLog: flagNodeSlice{OK: true},
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQHttpGet: flagNodeSlice{OK: true},
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQTailFile: flagNodeSlice{OK: true},
// StartSubREQToSocket: flagNodeSlice{OK: true, Values: []Node{"*"}}, // StartSubREQToSocket: flagNodeSlice{OK: true, Values: []Node{"*"}},
} }
stewardServer, err := NewServer(conf) stewardServer, err := NewServer(conf)

View file

@ -412,7 +412,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
// Create the process and start it. // Create the process and start it.
sub := newSubject(arg.Method, proc.configuration.NodeName) sub := newSubject(arg.Method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil) procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn) go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)