1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

split out start pub/sub process in separate func's

This commit is contained in:
postmannen 2023-01-11 07:03:01 +01:00
parent d0d3ef8b39
commit 215a4c387a

View file

@ -187,67 +187,13 @@ func (p process) spawnWorker() {
// Start a publisher worker, which will start a go routine (process) // Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
if p.processKind == processKindPublisher { if p.processKind == processKindPublisher {
p.startPublisher()
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
}
}()
}
go p.publishMessages(p.natsConn)
} }
// Start a subscriber worker, which will start a go routine (process) // Start a subscriber worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
if p.processKind == processKindSubscriber { if p.processKind == processKindSubscriber {
// If there is a procFunc for the process, start it. p.startSubscriber()
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
}
}()
}
p.natsSubscription = p.subscribeMessages()
// We also need to be able to remove all the information about this process
// when the process context is canceled.
go func() {
<-p.ctx.Done()
err := p.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
p.processes.active.mu.Lock()
delete(p.processes.active.procNames, p.processName)
p.processes.active.mu.Unlock()
log.Printf("Successfully stopped process: %v\n", p.processName)
}()
} }
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
@ -259,6 +205,67 @@ func (p process) spawnWorker() {
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
} }
func (p process) startPublisher() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
}
}()
}
go p.publishMessages(p.natsConn)
}
func (p process) startSubscriber() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
// Initialize the channel for communication between the proc and
// the procFunc.
p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able
// to get the return error.
go func() {
err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
}
}()
}
p.natsSubscription = p.subscribeMessages()
// We also need to be able to remove all the information about this process
// when the process context is canceled.
go func() {
<-p.ctx.Done()
err := p.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err)
p.errorKernel.errSend(p, Message{}, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
p.processes.active.mu.Lock()
delete(p.processes.active.procNames, p.processName)
p.processes.active.mu.Unlock()
log.Printf("Successfully stopped process: %v\n", p.processName)
}()
}
var ( var (
ErrACKSubscribeRetry = errors.New("steward: retrying to subscribe for ack message") ErrACKSubscribeRetry = errors.New("steward: retrying to subscribe for ack message")
) )