From f1359e45d93591f4dbc8eb913fc5d84d8c65cb44 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 9 Apr 2021 11:30:40 +0200 Subject: [PATCH] moved startup pub/sub into process structure --- process.go | 4 + server.go | 15 +- startup_processes.go | 317 +++++++++--------- subscriber_method_types.go | 6 + var/errorLog/central/errorCentral.REQErrorLog | 2 - 5 files changed, 184 insertions(+), 160 deletions(-) delete mode 100644 var/errorLog/central/errorCentral.REQErrorLog diff --git a/process.go b/process.go index 1332d3f..01a7c11 100644 --- a/process.go +++ b/process.go @@ -69,6 +69,10 @@ type process struct { ctxCancel context.CancelFunc // Process name processName processName + + // startup holds the startup functions for starting up publisher + // or subscriber processes + startup startup } // prepareNewProcess will set the the provided values and the default diff --git a/server.go b/server.go index 0309309..4be7236 100644 --- a/server.go +++ b/server.go @@ -130,7 +130,9 @@ func (s *server) Start() { // Start the checking the input socket for new messages from operator. go s.readSocket(s.toRingbufferCh) - // Start up the predefined subscribers. + // Start up the predefined subscribers. Since all the logic to handle + // processes are tied to the process struct, we need to create an + // initial process to start the rest. sub := newSubject(REQInitial, s.nodeName) p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []node{}, nil) p.ProcessesStart() @@ -183,11 +185,12 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { sam := subjectAndMessage{ Subject: newSubject(REQErrorLog, "errorCentral"), Message: Message{ - Directory: "errorLog", - ToNode: "errorCentral", - FromNode: FromNode, - Data: []string{er}, - Method: REQErrorLog, + Directory: "errorLog", + ToNode: "errorCentral", + FromNode: FromNode, + FileExtension: ".log", + Data: []string{er}, + Method: REQErrorLog, }, } diff --git a/startup_processes.go b/startup_processes.go index f0b8956..17da955 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -13,207 +13,220 @@ func (p process) ProcessesStart() { // --- Subscriber services that can be started via flags + // Allways start an REQOpCommand subscriber { fmt.Printf("Starting REQOpCommand subscriber: %#v\n", p.node) sub := newSubject(REQOpCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []node{"*"}, nil) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []node{node(p.configuration.CentralNodeName)}, nil) go proc.spawnWorker(p.processes, p.natsConn) } // Start a subscriber for textLogging messages if p.configuration.StartSubREQTextToLogFile.OK { - { - fmt.Printf("Starting text logging subscriber: %#v\n", p.node) - sub := newSubject(REQTextToLogFile, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToLogFile.Values, nil) - // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQTextToLogFile(p) } // Start a subscriber for text to file messages if p.configuration.StartSubREQTextToFile.OK { - { - fmt.Printf("Starting text to file subscriber: %#v\n", p.node) - sub := newSubject(REQTextToFile, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToFile.Values, nil) - // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQTextToFile(p) } // Start a subscriber for Hello messages if p.configuration.StartSubREQHello.OK { - { - fmt.Printf("Starting Hello subscriber: %#v\n", p.node) - sub := newSubject(REQHello, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) - proc.procFuncCh = make(chan Message) - - // The reason for running the say hello subscriber as a procFunc is that - // a handler are not able to hold state, and we need to hold the state - // of the nodes we've received hello's from in the sayHelloNodes map, - // which is the information we pass along to generate metrics. - proc.procFunc = func(ctx context.Context) error { - sayHelloNodes := make(map[node]struct{}) - for { - // Receive a copy of the message sent from the method handler. - var m Message - - select { - case m = <-proc.procFuncCh: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - return nil - } - - fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) - - sayHelloNodes[m.FromNode] = struct{}{} - - // update the prometheus metrics - proc.processes.metricsCh <- metricType{ - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes", - Help: "The current number of total nodes who have said hello", - }), - value: float64(len(sayHelloNodes)), - } - } - } - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQHello(p) } if p.configuration.StartSubREQErrorLog.OK { // Start a subscriber for REQErrorLog messages - { - fmt.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) - sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQErrorLog(p) } // Start a subscriber for Ping Request messages if p.configuration.StartSubREQPing.OK { - { - fmt.Printf("Starting Ping Request subscriber: %#v\n", p.node) - sub := newSubject(REQPing, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQPing(p) } // Start a subscriber for REQPong messages if p.configuration.StartSubREQPong.OK { - { - fmt.Printf("Starting Pong subscriber: %#v\n", p.node) - sub := newSubject(REQPong, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQPong(p) } // Start a subscriber for REQCliCommand messages if p.configuration.StartSubREQCliCommand.OK { - { - fmt.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) - sub := newSubject(REQCliCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQCliCommand(p) } // Start a subscriber for Not In Order Cli Command Request messages if p.configuration.StartSubREQnCliCommand.OK { - { - fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) - sub := newSubject(REQnCliCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQnCliCommand(p) } // Start a subscriber for CLICommandReply messages if p.configuration.StartSubREQTextToConsole.OK { - { - fmt.Printf("Starting Text To Console subscriber: %#v\n", p.node) - sub := newSubject(REQTextToConsole, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToConsole.Values, nil) - go proc.spawnWorker(p.processes, p.natsConn) - } + p.startup.subREQTextToConsole(p) } - // --- Publisher services that can be started via flags - - // --------- Testing with publisher ------------ - // Define a process of kind publisher with subject for SayHello to central, - // and register a procFunc with the process that will handle the actual - // sending of say hello. if p.configuration.StartPubREQHello != 0 { - fmt.Printf("Starting Hello Publisher: %#v\n", p.node) - - sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []node{}, nil) - - // Define the procFunc to be used for the process. - proc.procFunc = procFunc( - func(ctx context.Context) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello)) - for { - fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) - - d := fmt.Sprintf("Hello from %v\n", p.node) - - m := Message{ - ToNode: "central", - FromNode: node(p.node), - Data: []string{d}, - Method: REQHello, - } - - sam, err := newSAM(m) - if err != nil { - // In theory the system should drop the message before it reaches here. - log.Printf("error: ProcessesStart: %v\n", err) - } - proc.toRingbufferCh <- []subjectAndMessage{sam} - - select { - case <-ticker.C: - case <-ctx.Done(): - fmt.Printf(" ** DEBUG: got <- ctx.Done\n") - er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - return nil - } - } - }) - go proc.spawnWorker(p.processes, p.natsConn) + p.startup.pubREQHello(p) } // Start a subscriber for Http Get Requests if p.configuration.StartSubREQHttpGet.OK { - { - fmt.Printf("Starting Http Get subscriber: %#v\n", p.node) - sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) - // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(p.processes, p.natsConn) + p.startup.subREQHttpGet(p) + } +} + +// --------------------------------------------------------------------------------------- + +type startup struct{} + +func (s startup) subREQHttpGet(p process) { + + fmt.Printf("Starting Http Get subscriber: %#v\n", p.node) + sub := newSubject(REQHttpGet, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) + +} + +func (s startup) pubREQHello(p process) { + fmt.Printf("Starting Hello Publisher: %#v\n", p.node) + + sub := newSubject(REQHello, p.configuration.CentralNodeName) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []node{}, nil) + + // Define the procFunc to be used for the process. + proc.procFunc = procFunc( + func(ctx context.Context) error { + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello)) + for { + // fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) + + d := fmt.Sprintf("Hello from %v\n", p.node) + + m := Message{ + ToNode: "central", + FromNode: node(p.node), + Data: []string{d}, + Method: REQHello, + } + + sam, err := newSAM(m) + if err != nil { + // In theory the system should drop the message before it reaches here. + log.Printf("error: ProcessesStart: %v\n", err) + } + proc.toRingbufferCh <- []subjectAndMessage{sam} + + select { + case <-ticker.C: + case <-ctx.Done(): + fmt.Printf(" ** DEBUG: got <- ctx.Done\n") + er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return nil + } + } + }) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQTextToConsole(p process) { + fmt.Printf("Starting Text To Console subscriber: %#v\n", p.node) + sub := newSubject(REQTextToConsole, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToConsole.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQnCliCommand(p process) { + fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) + sub := newSubject(REQnCliCommand, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQCliCommand(p process) { + fmt.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) + sub := newSubject(REQCliCommand, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQPong(p process) { + fmt.Printf("Starting Pong subscriber: %#v\n", p.node) + sub := newSubject(REQPong, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQPing(p process) { + fmt.Printf("Starting Ping Request subscriber: %#v\n", p.node) + sub := newSubject(REQPing, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQErrorLog(p process) { + fmt.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) + sub := newSubject(REQErrorLog, "errorCentral") + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQHello(p process) { + fmt.Printf("Starting Hello subscriber: %#v\n", p.node) + sub := newSubject(REQHello, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) + proc.procFuncCh = make(chan Message) + + // The reason for running the say hello subscriber as a procFunc is that + // a handler are not able to hold state, and we need to hold the state + // of the nodes we've received hello's from in the sayHelloNodes map, + // which is the information we pass along to generate metrics. + proc.procFunc = func(ctx context.Context) error { + sayHelloNodes := make(map[node]struct{}) + for { + // Receive a copy of the message sent from the method handler. + var m Message + + select { + case m = <-proc.procFuncCh: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return nil + } + + // fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) + + sayHelloNodes[m.FromNode] = struct{}{} + + // update the prometheus metrics + proc.processes.metricsCh <- metricType{ + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes", + Help: "The current number of total nodes who have said hello", + }), + value: float64(len(sayHelloNodes)), + } } } + go proc.spawnWorker(p.processes, p.natsConn) } -type startups struct{} - -func (s startups) REQHttpGet(p process) { - { - fmt.Printf("Starting Http Get subscriber: %#v\n", p.node) - sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) - // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(p.processes, p.natsConn) - } +func (s startup) subREQTextToFile(p process) { + fmt.Printf("Starting text to file subscriber: %#v\n", p.node) + sub := newSubject(REQTextToFile, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToFile.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) +} + +func (s startup) subREQTextToLogFile(p process) { + fmt.Printf("Starting text logging subscriber: %#v\n", p.node) + sub := newSubject(REQTextToLogFile, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToLogFile.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 5490db7..e022380 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -275,6 +275,9 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) + er := fmt.Errorf("info: startProc: started %v on %v", sub, message.ToNode) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + case message.Data[0] == "stopProc": fmt.Printf(" ** DEBUG 0: got stopProc\n") // Data layout: OPCommand, Method, publisher/subscriber, receivingNode @@ -334,6 +337,9 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri } proc.processes.mu.Unlock() + er := fmt.Errorf("info: stopProc: stoped %v on %v", sub, message.ToNode) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + default: fmt.Printf("error: no such OpCommand specified: " + message.Data[0]) er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) diff --git a/var/errorLog/central/errorCentral.REQErrorLog b/var/errorLog/central/errorCentral.REQErrorLog deleted file mode 100644 index c8ac95b..0000000 --- a/var/errorLog/central/errorCentral.REQErrorLog +++ /dev/null @@ -1,2 +0,0 @@ -2021-04-08 10:18:45.748974 +0000 UTC, info: canceling publisher: central.REQHello.EventNACK -2021-04-08 10:18:46.252822 +0000 UTC, error: subscriberHandler: failed to execute event: open var/errorLog/central/errorCentral.REQErrorLog: permission denied