1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

add procFuncCh creation part of start the procFunc

This commit is contained in:
postmannen 2022-01-02 11:57:25 +01:00
parent dea0c094fd
commit e961111d96
2 changed files with 15 additions and 8 deletions

View file

@ -56,7 +56,8 @@ type process struct {
// a whole for sharing a map from the *server level.
procFunc procFunc
// The channel to send a messages to the procFunc go routine.
// This is typically used within the methodHandler.
// This is typically used within the methodHandler for so we
// can pass messages between the procFunc and the handler.
procFuncCh chan Message
// copy of the configuration from server
configuration *Configuration
@ -136,7 +137,7 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc
// procFunc's can also be used to wrap in other types which we want to
// work with. An example can be handling of metrics which the message
// have no notion of, but a procFunc can have that wrapped in from when it was constructed.
type procFunc func(ctx context.Context) error
type procFunc func(ctx context.Context, procFuncCh chan Message) error
// The purpose of this function is to check if we should start a
// publisher or subscriber process, where a process is a go routine
@ -168,10 +169,14 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx)
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)
@ -187,11 +192,14 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
if p.processKind == processKindSubscriber {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx)
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
sendErrorLogMessage(p.configuration, procs.metrics, p.toRingbufferCh, Node(p.node), er)

View file

@ -213,7 +213,7 @@ func (s startup) pubREQHello(p process) {
// Define the procFunc to be used for the process.
proc.procFunc = procFunc(
func(ctx context.Context) error {
func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello))
for {
@ -297,13 +297,12 @@ func (s startup) subREQHello(p process) {
log.Printf("Starting Hello subscriber: %#v\n", p.node)
sub := newSubject(REQHello, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
proc.procFuncCh = make(chan Message)
// The reason for running the say hello subscriber as a procFunc is that
// a handler are not able to hold state, and we need to hold the state
// of the nodes we've received hello's from in the sayHelloNodes map,
// which is the information we pass along to generate metrics.
proc.procFunc = func(ctx context.Context) error {
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
sayHelloNodes := make(map[Node]struct{})
for {
@ -311,7 +310,7 @@ func (s startup) subREQHello(p process) {
var m Message
select {
case m = <-proc.procFuncCh:
case m = <-procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)