diff --git a/process.go b/process.go index 040e2cc..0bddd8a 100644 --- a/process.go +++ b/process.go @@ -78,7 +78,7 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process { +func newProcess(ctx context.Context, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -88,7 +88,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- m[a] = struct{}{} } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) var method Method diff --git a/server.go b/server.go index 24cc122..2eff28c 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package steward import ( + "context" "fmt" "log" "net" @@ -62,6 +63,10 @@ func newProcesses(promRegistry *prometheus.Registry) *processes { // server is the structure that will hold the state about spawned // processes on a local instance. type server struct { + // The main background context + ctx context.Context + // The CancelFunc for the main context + ctxCancelFunc context.CancelFunc // Configuration options used for running the server configuration *Configuration // The nats connection to the broker @@ -85,6 +90,8 @@ type server struct { // newServer will prepare and return a server type func NewServer(c *Configuration) (*server, error) { + ctx, cancel := context.WithCancel(context.Background()) + var opt nats.Option if c.RootCAPath != "" { opt = nats.RootCAs(c.RootCAPath) @@ -102,6 +109,7 @@ func NewServer(c *Configuration) (*server, error) { // } opt, err = nats.NkeyOptionFromSeed(c.NkeySeedFile) if err != nil { + cancel() return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err) } } @@ -125,6 +133,7 @@ func NewServer(c *Configuration) (*server, error) { } er := fmt.Errorf("error: nats.Connect failed: %v", err) + cancel() return nil, er } @@ -136,6 +145,7 @@ func NewServer(c *Configuration) (*server, error) { if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) { err := os.MkdirAll(c.SocketFolder, 0700) if err != nil { + cancel() return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err) } } @@ -146,6 +156,7 @@ func NewServer(c *Configuration) (*server, error) { err = os.Remove(socketFilepath) if err != nil { er := fmt.Errorf("error: could not delete sock file: %v", err) + cancel() return nil, er } } @@ -153,6 +164,7 @@ func NewServer(c *Configuration) (*server, error) { nl, err := net.Listen("unix", socketFilepath) if err != nil { er := fmt.Errorf("error: failed to open socket: %v", err) + cancel() return nil, er } @@ -164,6 +176,7 @@ func NewServer(c *Configuration) (*server, error) { if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) { err := os.MkdirAll(c.SocketFolder, 0700) if err != nil { + cancel() return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err) } } @@ -174,6 +187,7 @@ func NewServer(c *Configuration) (*server, error) { err = os.Remove(stewSocketFilepath) if err != nil { er := fmt.Errorf("error: could not delete stew.sock file: %v", err) + cancel() return nil, er } } @@ -181,6 +195,7 @@ func NewServer(c *Configuration) (*server, error) { stewNL, err := net.Listen("unix", stewSocketFilepath) if err != nil { er := fmt.Errorf("error: failed to open stew socket: %v", err) + cancel() return nil, er } @@ -189,6 +204,8 @@ func NewServer(c *Configuration) (*server, error) { metrics := newMetrics(c.PromHostAndPort) s := &server{ + ctx: ctx, + ctxCancelFunc: cancel, configuration: c, nodeName: c.NodeName, natsConn: conn, @@ -250,8 +267,8 @@ func (s *server) Start() { // processes are tied to the process struct, we need to create an // initial process to start the rest. sub := newSubject(REQInitial, s.nodeName) - p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) - p.ProcessesStart() + p := newProcess(s.ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) + p.ProcessesStart(s.ctx) time.Sleep(time.Second * 1) s.processes.printProcessesMap() @@ -268,7 +285,7 @@ func (s *server) Start() { //Block until we receive a signal sig := <-sigCh fmt.Printf("Got exit signal, terminating all processes, %v\n", sig) - return + s.ctxCancelFunc() } @@ -410,7 +427,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + proc := newProcess(s.ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) // fmt.Printf("*** %#v\n", proc) proc.spawnWorker(s.processes, s.natsConn) diff --git a/startup_processes.go b/startup_processes.go index 5234308..6ff3e8c 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -10,7 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -func (p process) ProcessesStart() { +func (p process) ProcessesStart(ctx context.Context) { // --- Subscriber services that can be started via flags @@ -18,7 +18,7 @@ func (p process) ProcessesStart() { { log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node) sub := newSubject(REQOpCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil) + proc := newProcess(ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil) go proc.spawnWorker(p.processes, p.natsConn) } @@ -91,7 +91,7 @@ func (s startup) subREQHttpGet(p process) { log.Printf("Starting Http Get subscriber: %#v\n", p.node) sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) @@ -101,7 +101,7 @@ func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -141,49 +141,49 @@ func (s startup) pubREQHello(p process) { func (s startup) subREQToConsole(p process) { log.Printf("Starting Text To Console subscriber: %#v\n", p.node) sub := newSubject(REQToConsole, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQnCliCommand(p process) { log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node) sub := newSubject(REQnCliCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQCliCommand(p process) { log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) sub := newSubject(REQCliCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPong(p process) { log.Printf("Starting Pong subscriber: %#v\n", p.node) sub := newSubject(REQPong, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPing(p process) { log.Printf("Starting Ping Request subscriber: %#v\n", p.node) sub := newSubject(REQPing, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQErrorLog(p process) { log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQHello(p process) { log.Printf("Starting Hello subscriber: %#v\n", p.node) sub := newSubject(REQHello, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil) proc.procFuncCh = make(chan Message) // The reason for running the say hello subscriber as a procFunc is that @@ -231,7 +231,7 @@ func (s startup) subREQHello(p process) { func (s startup) subREQToFile(p process) { log.Printf("Starting text to file subscriber: %#v\n", p.node) sub := newSubject(REQToFile, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -239,7 +239,7 @@ func (s startup) subREQToFile(p process) { func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -247,7 +247,7 @@ func (s startup) subREQToFileAppend(p process) { func (s startup) subREQTailFile(p process) { log.Printf("Starting tail log files subscriber: %#v\n", p.node) sub := newSubject(REQTailFile, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } @@ -255,7 +255,7 @@ func (s startup) subREQTailFile(p process) { func (s startup) subREQToSocket(p process) { log.Printf("Starting write to socket subscriber: %#v\n", p.node) sub := newSubject(REQToSocket, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(p.processes, p.natsConn) } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index d756edb..64149e3 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -338,7 +338,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri // Create the process and start it. sub := newSubject(arg.Method, proc.configuration.NodeName) - procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil) + procNew := newProcess(proc.ctx, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) @@ -766,7 +766,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) c := message.Data[0] a := message.Data[1:] - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) outCh := make(chan []byte) @@ -827,7 +827,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string c := message.Data[0] a := message.Data[1:] - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) outCh := make(chan []byte) @@ -899,7 +899,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ Timeout: time.Second * 5, } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -979,9 +979,9 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( var cancel context.CancelFunc if message.MethodTimeout != 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) } else { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(proc.ctx) } outCh := make(chan []byte) @@ -1050,7 +1050,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st c := message.Data[0] a := message.Data[1:] - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) outCh := make(chan []byte)