diff --git a/process.go b/process.go index 9a4f395..18e0802 100644 --- a/process.go +++ b/process.go @@ -111,7 +111,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- // work with that come from the outside. An example can be handling // of metrics which the message have no notion of, but a procFunc // can have that wrapped in from when it was constructed. -type procFunc func() error +type procFunc func(ctx context.Context) error // The purpose of this function is to check if we should start a // publisher or subscriber process, where a process is a go routine @@ -147,7 +147,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { - err := p.procFunc() + err := p.procFunc(p.ctx) if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) @@ -167,7 +167,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { - err := p.procFunc() + err := p.procFunc(p.ctx) if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) diff --git a/startup_processes.go b/startup_processes.go index eb2d438..c020068 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -1,6 +1,7 @@ package steward import ( + "context" "fmt" "log" "time" @@ -53,11 +54,20 @@ func (s *server) ProcessesStart() { // a handler are not able to hold state, and we need to hold the state // of the nodes we've received hello's from in the sayHelloNodes map, // which is the information we pass along to generate metrics. - proc.procFunc = func() error { + proc.procFunc = func(ctx context.Context) error { sayHelloNodes := make(map[node]struct{}) for { // Receive a copy of the message sent from the method handler. - m := <-proc.procFuncCh + var m Message + + select { + case m = <-proc.procFuncCh: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return nil + } + fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) sayHelloNodes[m.FromNode] = struct{}{} @@ -150,7 +160,8 @@ func (s *server) ProcessesStart() { // Define the procFunc to be used for the process. proc.procFunc = procFunc( - func() error { + func(ctx context.Context) error { + ticker := time.NewTicker(time.Second * time.Duration(s.configuration.StartPubREQHello)) for { fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode) @@ -169,7 +180,14 @@ func (s *server) ProcessesStart() { log.Printf("error: ProcessesStart: %v\n", err) } proc.toRingbufferCh <- []subjectAndMessage{sam} - time.Sleep(time.Second * time.Duration(s.configuration.StartPubREQHello)) + + select { + case <-ticker.C: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return nil + } } }) go proc.spawnWorker(s.processes, s.natsConn)