mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
Added main ctx, and child ctx derived from main to procs
This commit is contained in:
parent
4929191269
commit
684b3f896f
4 changed files with 45 additions and 28 deletions
|
@ -78,7 +78,7 @@ type process struct {
|
|||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process {
|
||||
func newProcess(ctx context.Context, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
processes.lastProcessID++
|
||||
|
||||
|
@ -88,7 +88,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
|
|||
m[a] = struct{}{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
var method Method
|
||||
|
||||
|
|
25
server.go
25
server.go
|
@ -2,6 +2,7 @@
|
|||
package steward
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
|
@ -62,6 +63,10 @@ func newProcesses(promRegistry *prometheus.Registry) *processes {
|
|||
// server is the structure that will hold the state about spawned
|
||||
// processes on a local instance.
|
||||
type server struct {
|
||||
// The main background context
|
||||
ctx context.Context
|
||||
// The CancelFunc for the main context
|
||||
ctxCancelFunc context.CancelFunc
|
||||
// Configuration options used for running the server
|
||||
configuration *Configuration
|
||||
// The nats connection to the broker
|
||||
|
@ -85,6 +90,8 @@ type server struct {
|
|||
|
||||
// newServer will prepare and return a server type
|
||||
func NewServer(c *Configuration) (*server, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
var opt nats.Option
|
||||
if c.RootCAPath != "" {
|
||||
opt = nats.RootCAs(c.RootCAPath)
|
||||
|
@ -102,6 +109,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
// }
|
||||
opt, err = nats.NkeyOptionFromSeed(c.NkeySeedFile)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to read nkey seed file: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -125,6 +133,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
}
|
||||
|
||||
er := fmt.Errorf("error: nats.Connect failed: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
|
||||
|
@ -136,6 +145,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.SocketFolder, 0700)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
|
||||
}
|
||||
}
|
||||
|
@ -146,6 +156,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
err = os.Remove(socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete sock file: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
@ -153,6 +164,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
nl, err := net.Listen("unix", socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open socket: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
|
||||
|
@ -164,6 +176,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.SocketFolder, 0700)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
|
||||
}
|
||||
}
|
||||
|
@ -174,6 +187,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
err = os.Remove(stewSocketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete stew.sock file: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
@ -181,6 +195,7 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
stewNL, err := net.Listen("unix", stewSocketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open stew socket: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
|
||||
|
@ -189,6 +204,8 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
metrics := newMetrics(c.PromHostAndPort)
|
||||
|
||||
s := &server{
|
||||
ctx: ctx,
|
||||
ctxCancelFunc: cancel,
|
||||
configuration: c,
|
||||
nodeName: c.NodeName,
|
||||
natsConn: conn,
|
||||
|
@ -250,8 +267,8 @@ func (s *server) Start() {
|
|||
// processes are tied to the process struct, we need to create an
|
||||
// initial process to start the rest.
|
||||
sub := newSubject(REQInitial, s.nodeName)
|
||||
p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
|
||||
p.ProcessesStart()
|
||||
p := newProcess(s.ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
|
||||
p.ProcessesStart(s.ctx)
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
s.processes.printProcessesMap()
|
||||
|
@ -268,7 +285,7 @@ func (s *server) Start() {
|
|||
//Block until we receive a signal
|
||||
sig := <-sigCh
|
||||
fmt.Printf("Got exit signal, terminating all processes, %v\n", sig)
|
||||
return
|
||||
s.ctxCancelFunc()
|
||||
|
||||
}
|
||||
|
||||
|
@ -410,7 +427,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
|
|||
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||
|
||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
|
||||
proc := newProcess(s.ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
proc.spawnWorker(s.processes, s.natsConn)
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
func (p process) ProcessesStart() {
|
||||
func (p process) ProcessesStart(ctx context.Context) {
|
||||
|
||||
// --- Subscriber services that can be started via flags
|
||||
|
||||
|
@ -18,7 +18,7 @@ func (p process) ProcessesStart() {
|
|||
{
|
||||
log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQOpCommand, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil)
|
||||
proc := newProcess(ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ func (s startup) subREQHttpGet(p process) {
|
|||
|
||||
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQHttpGet, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHttpGet.Values, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
|
||||
|
@ -101,7 +101,7 @@ func (s startup) pubREQHello(p process) {
|
|||
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
||||
|
||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil)
|
||||
|
||||
// Define the procFunc to be used for the process.
|
||||
proc.procFunc = procFunc(
|
||||
|
@ -141,49 +141,49 @@ func (s startup) pubREQHello(p process) {
|
|||
func (s startup) subREQToConsole(p process) {
|
||||
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToConsole, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToConsole.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQnCliCommand(p process) {
|
||||
log.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQnCliCommand, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQnCliCommand.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQCliCommand(p process) {
|
||||
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCliCommand, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQCliCommand.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQPong(p process) {
|
||||
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPong, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPong.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQPing(p process) {
|
||||
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPing, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQPing.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQErrorLog(p process) {
|
||||
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQErrorLog, "errorCentral")
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQErrorLog.Values, nil)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQHello(p process) {
|
||||
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQHello, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQHello.Values, nil)
|
||||
proc.procFuncCh = make(chan Message)
|
||||
|
||||
// The reason for running the say hello subscriber as a procFunc is that
|
||||
|
@ -231,7 +231,7 @@ func (s startup) subREQHello(p process) {
|
|||
func (s startup) subREQToFile(p process) {
|
||||
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToFile, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFile.Values, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -239,7 +239,7 @@ func (s startup) subREQToFile(p process) {
|
|||
func (s startup) subREQToFileAppend(p process) {
|
||||
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToFileAppend, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQToFileAppend.Values, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ func (s startup) subREQToFileAppend(p process) {
|
|||
func (s startup) subREQTailFile(p process) {
|
||||
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQTailFile, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -255,7 +255,7 @@ func (s startup) subREQTailFile(p process) {
|
|||
func (s startup) subREQToSocket(p process) {
|
||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToSocket, string(p.node))
|
||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil)
|
||||
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{"*"}, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
|
||||
// Create the process and start it.
|
||||
sub := newSubject(arg.Method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil)
|
||||
procNew := newProcess(proc.ctx, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil)
|
||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||
|
||||
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
|
@ -766,7 +766,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
c := message.Data[0]
|
||||
a := message.Data[1:]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
|
||||
outCh := make(chan []byte)
|
||||
|
||||
|
@ -827,7 +827,7 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
|
|||
c := message.Data[0]
|
||||
a := message.Data[1:]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
|
||||
outCh := make(chan []byte)
|
||||
|
||||
|
@ -899,7 +899,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
Timeout: time.Second * 5,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
|
@ -979,9 +979,9 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
var cancel context.CancelFunc
|
||||
|
||||
if message.MethodTimeout != 0 {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
} else {
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
ctx, cancel = context.WithCancel(proc.ctx)
|
||||
}
|
||||
|
||||
outCh := make(chan []byte)
|
||||
|
@ -1050,7 +1050,7 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
|
|||
c := message.Data[0]
|
||||
a := message.Data[1:]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
|
||||
|
||||
outCh := make(chan []byte)
|
||||
|
||||
|
|
Loading…
Reference in a new issue