From 215a4c387a750e3de0cad9643f50b865026440e8 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 11 Jan 2023 07:03:01 +0100 Subject: [PATCH] split out start pub/sub process in separate func's --- process.go | 119 ++++++++++++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 56 deletions(-) diff --git a/process.go b/process.go index 62d33f9..67c4365 100644 --- a/process.go +++ b/process.go @@ -187,67 +187,13 @@ func (p process) spawnWorker() { // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindPublisher { - - // If there is a procFunc for the process, start it. - if p.procFunc != nil { - // Initialize the channel for communication between the proc and - // the procFunc. - p.procFuncCh = make(chan Message) - - // Start the procFunc in it's own anonymous func so we are able - // to get the return error. - go func() { - err := p.procFunc(p.ctx, p.procFuncCh) - if err != nil { - er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) - } - }() - } - - go p.publishMessages(p.natsConn) + p.startPublisher() } // Start a subscriber worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindSubscriber { - // If there is a procFunc for the process, start it. - if p.procFunc != nil { - // Initialize the channel for communication between the proc and - // the procFunc. - p.procFuncCh = make(chan Message) - - // Start the procFunc in it's own anonymous func so we are able - // to get the return error. - go func() { - err := p.procFunc(p.ctx, p.procFuncCh) - if err != nil { - er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) - } - }() - } - - p.natsSubscription = p.subscribeMessages() - - // We also need to be able to remove all the information about this process - // when the process context is canceled. - go func() { - <-p.ctx.Done() - err := p.natsSubscription.Unsubscribe() - if err != nil { - er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - } - - p.processes.active.mu.Lock() - delete(p.processes.active.procNames, p.processName) - p.processes.active.mu.Unlock() - - log.Printf("Successfully stopped process: %v\n", p.processName) - - }() + p.startSubscriber() } // Add information about the new process to the started processes map. @@ -259,6 +205,67 @@ func (p process) spawnWorker() { p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } +func (p process) startPublisher() { + // If there is a procFunc for the process, start it. + if p.procFunc != nil { + // Initialize the channel for communication between the proc and + // the procFunc. + p.procFuncCh = make(chan Message) + + // Start the procFunc in it's own anonymous func so we are able + // to get the return error. + go func() { + err := p.procFunc(p.ctx, p.procFuncCh) + if err != nil { + er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) + p.errorKernel.errSend(p, Message{}, er) + } + }() + } + + go p.publishMessages(p.natsConn) +} + +func (p process) startSubscriber() { + // If there is a procFunc for the process, start it. + if p.procFunc != nil { + // Initialize the channel for communication between the proc and + // the procFunc. + p.procFuncCh = make(chan Message) + + // Start the procFunc in it's own anonymous func so we are able + // to get the return error. + go func() { + err := p.procFunc(p.ctx, p.procFuncCh) + if err != nil { + er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) + p.errorKernel.errSend(p, Message{}, er) + } + }() + } + + p.natsSubscription = p.subscribeMessages() + + // We also need to be able to remove all the information about this process + // when the process context is canceled. + go func() { + <-p.ctx.Done() + err := p.natsSubscription.Unsubscribe() + if err != nil { + er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err) + p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } + + p.processes.active.mu.Lock() + delete(p.processes.active.procNames, p.processName) + p.processes.active.mu.Unlock() + + log.Printf("Successfully stopped process: %v\n", p.processName) + + }() +} + var ( ErrACKSubscribeRetry = errors.New("steward: retrying to subscribe for ack message") )