mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
initial ctx for subscribers, and handFuncs implemented
This commit is contained in:
parent
12db6fbd6a
commit
a44e003ff5
2 changed files with 25 additions and 7 deletions
|
@ -111,7 +111,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
|
||||||
// work with that come from the outside. An example can be handling
|
// work with that come from the outside. An example can be handling
|
||||||
// of metrics which the message have no notion of, but a procFunc
|
// of metrics which the message have no notion of, but a procFunc
|
||||||
// can have that wrapped in from when it was constructed.
|
// can have that wrapped in from when it was constructed.
|
||||||
type procFunc func() error
|
type procFunc func(ctx context.Context) error
|
||||||
|
|
||||||
// The purpose of this function is to check if we should start a
|
// The purpose of this function is to check if we should start a
|
||||||
// publisher or subscriber process, where a process is a go routine
|
// publisher or subscriber process, where a process is a go routine
|
||||||
|
@ -147,7 +147,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
// Start the procFunc in it's own anonymous func so we are able
|
// Start the procFunc in it's own anonymous func so we are able
|
||||||
// to get the return error.
|
// to get the return error.
|
||||||
go func() {
|
go func() {
|
||||||
err := p.procFunc()
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
|
@ -167,7 +167,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
// Start the procFunc in it's own anonymous func so we are able
|
// Start the procFunc in it's own anonymous func so we are able
|
||||||
// to get the return error.
|
// to get the return error.
|
||||||
go func() {
|
go func() {
|
||||||
err := p.procFunc()
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
@ -53,11 +54,20 @@ func (s *server) ProcessesStart() {
|
||||||
// a handler are not able to hold state, and we need to hold the state
|
// a handler are not able to hold state, and we need to hold the state
|
||||||
// of the nodes we've received hello's from in the sayHelloNodes map,
|
// of the nodes we've received hello's from in the sayHelloNodes map,
|
||||||
// which is the information we pass along to generate metrics.
|
// which is the information we pass along to generate metrics.
|
||||||
proc.procFunc = func() error {
|
proc.procFunc = func(ctx context.Context) error {
|
||||||
sayHelloNodes := make(map[node]struct{})
|
sayHelloNodes := make(map[node]struct{})
|
||||||
for {
|
for {
|
||||||
// Receive a copy of the message sent from the method handler.
|
// Receive a copy of the message sent from the method handler.
|
||||||
m := <-proc.procFuncCh
|
var m Message
|
||||||
|
|
||||||
|
select {
|
||||||
|
case m = <-proc.procFuncCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name())
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
|
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
|
||||||
|
|
||||||
sayHelloNodes[m.FromNode] = struct{}{}
|
sayHelloNodes[m.FromNode] = struct{}{}
|
||||||
|
@ -150,7 +160,8 @@ func (s *server) ProcessesStart() {
|
||||||
|
|
||||||
// Define the procFunc to be used for the process.
|
// Define the procFunc to be used for the process.
|
||||||
proc.procFunc = procFunc(
|
proc.procFunc = procFunc(
|
||||||
func() error {
|
func(ctx context.Context) error {
|
||||||
|
ticker := time.NewTicker(time.Second * time.Duration(s.configuration.StartPubREQHello))
|
||||||
for {
|
for {
|
||||||
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
|
fmt.Printf("--- DEBUG : procFunc call:kind=%v, Subject=%v, toNode=%v\n", proc.processKind, proc.subject, proc.subject.ToNode)
|
||||||
|
|
||||||
|
@ -169,7 +180,14 @@ func (s *server) ProcessesStart() {
|
||||||
log.Printf("error: ProcessesStart: %v\n", err)
|
log.Printf("error: ProcessesStart: %v\n", err)
|
||||||
}
|
}
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
time.Sleep(time.Second * time.Duration(s.configuration.StartPubREQHello))
|
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name())
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
go proc.spawnWorker(s.processes, s.natsConn)
|
go proc.spawnWorker(s.processes, s.natsConn)
|
||||||
|
|
Loading…
Reference in a new issue