From c091ebfa38ae70bea06f3b4757baa0048905a69b Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 8 Apr 2021 13:43:47 +0200 Subject: [PATCH] moved all starting of pub/sub procs under process --- server.go | 4 +- startup_processes.go | 136 ++++++++++++++++++++----------------- subscriber_method_types.go | 23 +++++++ 3 files changed, 100 insertions(+), 63 deletions(-) diff --git a/server.go b/server.go index 2cda531..0309309 100644 --- a/server.go +++ b/server.go @@ -131,7 +131,9 @@ func (s *server) Start() { go s.readSocket(s.toRingbufferCh) // Start up the predefined subscribers. - s.ProcessesStart() + sub := newSubject(REQInitial, s.nodeName) + p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []node{}, nil) + p.ProcessesStart() time.Sleep(time.Second * 1) s.processes.printProcessesMap() diff --git a/startup_processes.go b/startup_processes.go index f815831..f0b8956 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -9,45 +9,45 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func (s *server) ProcessesStart() { +func (p process) ProcessesStart() { // --- Subscriber services that can be started via flags { - fmt.Printf("Starting REQOpCommand subscriber: %#v\n", s.nodeName) - sub := newSubject(REQOpCommand, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting REQOpCommand subscriber: %#v\n", p.node) + sub := newSubject(REQOpCommand, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(p.processes, p.natsConn) } // Start a subscriber for textLogging messages - if s.configuration.StartSubREQTextToLogFile.OK { + if p.configuration.StartSubREQTextToLogFile.OK { { - fmt.Printf("Starting text logging subscriber: %#v\n", s.nodeName) - sub := newSubject(REQTextToLogFile, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToLogFile.Values, nil) + fmt.Printf("Starting text logging subscriber: %#v\n", p.node) + sub := newSubject(REQTextToLogFile, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToLogFile.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s.processes, s.natsConn) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for text to file messages - if s.configuration.StartSubREQTextToFile.OK { + if p.configuration.StartSubREQTextToFile.OK { { - fmt.Printf("Starting text to file subscriber: %#v\n", s.nodeName) - sub := newSubject(REQTextToFile, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil) + fmt.Printf("Starting text to file subscriber: %#v\n", p.node) + sub := newSubject(REQTextToFile, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToFile.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s.processes, s.natsConn) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for Hello messages - if s.configuration.StartSubREQHello.OK { + if p.configuration.StartSubREQHello.OK { { - fmt.Printf("Starting Hello subscriber: %#v\n", s.nodeName) - sub := newSubject(REQHello, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHello.Values, nil) + fmt.Printf("Starting Hello subscriber: %#v\n", p.node) + sub := newSubject(REQHello, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) proc.procFuncCh = make(chan Message) // The reason for running the say hello subscriber as a procFunc is that @@ -82,67 +82,67 @@ func (s *server) ProcessesStart() { } } } - go proc.spawnWorker(s.processes, s.natsConn) + go proc.spawnWorker(p.processes, p.natsConn) } } - if s.configuration.StartSubREQErrorLog.OK { + if p.configuration.StartSubREQErrorLog.OK { // Start a subscriber for REQErrorLog messages { - fmt.Printf("Starting REQErrorLog subscriber: %#v\n", s.nodeName) + fmt.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQErrorLog.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for Ping Request messages - if s.configuration.StartSubREQPing.OK { + if p.configuration.StartSubREQPing.OK { { - fmt.Printf("Starting Ping Request subscriber: %#v\n", s.nodeName) - sub := newSubject(REQPing, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPing.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting Ping Request subscriber: %#v\n", p.node) + sub := newSubject(REQPing, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for REQPong messages - if s.configuration.StartSubREQPong.OK { + if p.configuration.StartSubREQPong.OK { { - fmt.Printf("Starting Pong subscriber: %#v\n", s.nodeName) - sub := newSubject(REQPong, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPong.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting Pong subscriber: %#v\n", p.node) + sub := newSubject(REQPong, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for REQCliCommand messages - if s.configuration.StartSubREQCliCommand.OK { + if p.configuration.StartSubREQCliCommand.OK { { - fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName) - sub := newSubject(REQCliCommand, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQCliCommand.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) + sub := newSubject(REQCliCommand, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for Not In Order Cli Command Request messages - if s.configuration.StartSubREQnCliCommand.OK { + if p.configuration.StartSubREQnCliCommand.OK { { - fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", s.nodeName) - sub := newSubject(REQnCliCommand, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQnCliCommand.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) + sub := newSubject(REQnCliCommand, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } // Start a subscriber for CLICommandReply messages - if s.configuration.StartSubREQTextToConsole.OK { + if p.configuration.StartSubREQTextToConsole.OK { { - fmt.Printf("Starting Text To Console subscriber: %#v\n", s.nodeName) - sub := newSubject(REQTextToConsole, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToConsole.Values, nil) - go proc.spawnWorker(s.processes, s.natsConn) + fmt.Printf("Starting Text To Console subscriber: %#v\n", p.node) + sub := newSubject(REQTextToConsole, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTextToConsole.Values, nil) + go proc.spawnWorker(p.processes, p.natsConn) } } @@ -152,24 +152,24 @@ func (s *server) ProcessesStart() { // Define a process of kind publisher with subject for SayHello to central, // and register a procFunc with the process that will handle the actual // sending of say hello. - if s.configuration.StartPubREQHello != 0 { - fmt.Printf("Starting Hello Publisher: %#v\n", s.nodeName) + if p.configuration.StartPubREQHello != 0 { + fmt.Printf("Starting Hello Publisher: %#v\n", p.node) - sub := newSubject(REQHello, s.configuration.CentralNodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) + sub := newSubject(REQHello, p.configuration.CentralNodeName) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []node{}, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( func(ctx context.Context) error { - ticker := time.NewTicker(time.Second * time.Duration(s.configuration.StartPubREQHello)) + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello)) for { fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) - d := fmt.Sprintf("Hello from %v\n", s.nodeName) + d := fmt.Sprintf("Hello from %v\n", p.node) m := Message{ ToNode: "central", - FromNode: node(s.nodeName), + FromNode: node(p.node), Data: []string{d}, Method: REQHello, } @@ -191,17 +191,29 @@ func (s *server) ProcessesStart() { } } }) - go proc.spawnWorker(s.processes, s.natsConn) + go proc.spawnWorker(p.processes, p.natsConn) } // Start a subscriber for Http Get Requests - if s.configuration.StartSubREQHttpGet.OK { + if p.configuration.StartSubREQHttpGet.OK { { - fmt.Printf("Starting Http Get subscriber: %#v\n", s.nodeName) - sub := newSubject(REQHttpGet, s.nodeName) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil) + fmt.Printf("Starting Http Get subscriber: %#v\n", p.node) + sub := newSubject(REQHttpGet, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s.processes, s.natsConn) + go proc.spawnWorker(p.processes, p.natsConn) } } } + +type startups struct{} + +func (s startups) REQHttpGet(p process) { + { + fmt.Printf("Starting Http Get subscriber: %#v\n", p.node) + sub := newSubject(REQHttpGet, string(p.node)) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) + } +} diff --git a/subscriber_method_types.go b/subscriber_method_types.go index b9155be..5490db7 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -52,6 +52,8 @@ type Method string // The constants that will be used throughout the system for // when specifying what kind of Method to send or work with. const ( + // Initial method used to start other processes. + REQInitial Method = "REQInitial" // Command for client operation request of the system. The op // command to execute shall be given in the data field of the // message as string value. For example "ps". @@ -123,6 +125,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { // Event, Used to communicate that an action has been performed. ma := MethodsAvailable{ methodhandlers: map[Method]methodHandler{ + REQInitial: methodREQInitial{ + commandOrEvent: CommandACK, + }, REQOpCommand: methodREQOpCommand{ commandOrEvent: CommandACK, }, @@ -175,6 +180,24 @@ func (m Method) getHandler(method Method) methodHandler { // The structure that works as a reference for all the methods and if // they are of the command or event type, and also if it is a ACK or // NACK message. + +// ---- + +type methodREQInitial struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQInitial) getKind() CommandOrEvent { + return m.commandOrEvent +} + +func (m methodREQInitial) handler(proc process, message Message, node string) ([]byte, error) { + // proc.procFuncCh <- message + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ---- type MethodsAvailable struct { methodhandlers map[Method]methodHandler }