diff --git a/startup_processes.go b/processes.go similarity index 76% rename from startup_processes.go rename to processes.go index d5d6a00..77b9f8a 100644 --- a/startup_processes.go +++ b/processes.go @@ -4,91 +4,131 @@ import ( "context" "fmt" "log" + "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) -func (p process) ProcessesStart(ctx context.Context) { +// processes holds all the information about running processes +type processes struct { + // The active spawned processes + active map[processName]map[int]process + // mutex to lock the map + mu sync.RWMutex + // The last processID created + lastProcessID int + // + promTotalProcesses prometheus.Gauge + // + promProcessesVec *prometheus.GaugeVec +} + +// newProcesses will prepare and return a *processes which +// is map containing all the currently running processes. +func newProcesses(promRegistry *prometheus.Registry) *processes { + p := processes{ + active: make(map[processName]map[int]process), + } + + p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "total_running_processes", + Help: "The current number of total running processes", + }) + + p.promProcessesVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "running_process", + Help: "Name of the running process", + }, []string{"processName"}, + ) + + return &p +} + +// Start all the subscriber processes. +// Takes an initial process as it's input. All processes +// will be tied to this single process's context. +func (p *processes) Start(proc process) { // --- Subscriber services that can be started via flags // Allways start an REQOpCommand subscriber { - log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node) - sub := newSubject(REQOpCommand, string(p.node)) - proc := newProcess(ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil) - go proc.spawnWorker(p.processes, p.natsConn) + log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node) + sub := newSubject(REQOpCommand, string(proc.node)) + proc := newProcess(proc.ctx, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, []Node{Node(proc.configuration.CentralNodeName)}, nil) + go proc.spawnWorker(proc.processes, proc.natsConn) } // Start a subscriber for textLogging messages - if p.configuration.StartSubREQToFileAppend.OK { - p.startup.subREQToFileAppend(p) + if proc.configuration.StartSubREQToFileAppend.OK { + proc.startup.subREQToFileAppend(proc) } // Start a subscriber for text to file messages - if p.configuration.StartSubREQToFile.OK { - p.startup.subREQToFile(p) + if proc.configuration.StartSubREQToFile.OK { + proc.startup.subREQToFile(proc) } // Start a subscriber for Hello messages - if p.configuration.StartSubREQHello.OK { - p.startup.subREQHello(p) + if proc.configuration.StartSubREQHello.OK { + proc.startup.subREQHello(proc) } - if p.configuration.StartSubREQErrorLog.OK { + if proc.configuration.StartSubREQErrorLog.OK { // Start a subscriber for REQErrorLog messages - p.startup.subREQErrorLog(p) + proc.startup.subREQErrorLog(proc) } // Start a subscriber for Ping Request messages - if p.configuration.StartSubREQPing.OK { - p.startup.subREQPing(p) + if proc.configuration.StartSubREQPing.OK { + proc.startup.subREQPing(proc) } // Start a subscriber for REQPong messages - if p.configuration.StartSubREQPong.OK { - p.startup.subREQPong(p) + if proc.configuration.StartSubREQPong.OK { + proc.startup.subREQPong(proc) } // Start a subscriber for REQCliCommand messages - if p.configuration.StartSubREQCliCommand.OK { - p.startup.subREQCliCommand(p) + if proc.configuration.StartSubREQCliCommand.OK { + proc.startup.subREQCliCommand(proc) } // Start a subscriber for Not In Order Cli Command Request messages - if p.configuration.StartSubREQnCliCommand.OK { - p.startup.subREQnCliCommand(p) + if proc.configuration.StartSubREQnCliCommand.OK { + proc.startup.subREQnCliCommand(proc) } // Start a subscriber for CLICommandReply messages - if p.configuration.StartSubREQToConsole.OK { - p.startup.subREQToConsole(p) + if proc.configuration.StartSubREQToConsole.OK { + proc.startup.subREQToConsole(proc) } - if p.configuration.StartPubREQHello != 0 { - p.startup.pubREQHello(p) + if proc.configuration.StartPubREQHello != 0 { + proc.startup.pubREQHello(proc) } // Start a subscriber for Http Get Requests - if p.configuration.StartSubREQHttpGet.OK { - p.startup.subREQHttpGet(p) + if proc.configuration.StartSubREQHttpGet.OK { + proc.startup.subREQHttpGet(proc) } - if p.configuration.StartSubREQTailFile.OK { - p.startup.subREQTailFile(p) + if proc.configuration.StartSubREQTailFile.OK { + proc.startup.subREQTailFile(proc) } - if p.configuration.StartSubREQnCliCommandCont.OK { - p.startup.subREQnCliCommandCont(p) + if proc.configuration.StartSubREQnCliCommandCont.OK { + proc.startup.subREQnCliCommandCont(proc) } - p.startup.subREQToSocket(p) + proc.startup.subREQToSocket(proc) } // --------------------------------------------------------------------------------------- +// Startup holds all the startup methods for subscribers. type startup struct{} func (s startup) subREQHttpGet(p process) { diff --git a/server.go b/server.go index ca9cc70..51cc762 100644 --- a/server.go +++ b/server.go @@ -9,12 +9,9 @@ import ( "os" "path/filepath" "strings" - "sync" "time" "github.com/nats-io/nats.go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) type processName string @@ -25,41 +22,6 @@ func processNameGet(sn subjectName, pk processKind) processName { return processName(pn) } -// processes holds all the information about running processes -type processes struct { - // The active spawned processes - active map[processName]map[int]process - // mutex to lock the map - mu sync.RWMutex - // The last processID created - lastProcessID int - // - promTotalProcesses prometheus.Gauge - // - promProcessesVec *prometheus.GaugeVec -} - -// newProcesses will prepare and return a *processes which -// is map containing all the currently running processes. -func newProcesses(promRegistry *prometheus.Registry) *processes { - p := processes{ - active: make(map[processName]map[int]process), - } - - p.promTotalProcesses = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "total_running_processes", - Help: "The current number of total running processes", - }) - - p.promProcessesVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "running_process", - Help: "Name of the running process", - }, []string{"processName"}, - ) - - return &p -} - // server is the structure that will hold the state about spawned // processes on a local instance. type server struct { @@ -266,13 +228,15 @@ func (s *server) Start() { // Start the checking the input socket for new messages from operator. go s.readSocket(s.toRingbufferCh) - // Start up the predefined subscribers. Since all the logic to handle - // processes are tied to the process struct, we need to create an - // initial process to start the rest. + // Start up the predefined subscribers. + // + // Since all the logic to handle processes are tied to the process + // struct, we need to create an initial process to start the rest. s.ctxSubscribers, s.ctxSubscribersCancelFunc = context.WithCancel(s.ctx) sub := newSubject(REQInitial, s.nodeName) p := newProcess(s.ctxSubscribers, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) - p.ProcessesStart(s.ctxSubscribers) + // Start all wanted subscriber processes. + s.processes.Start(p) time.Sleep(time.Second * 1) s.processes.printProcessesMap()