mirror of
https://github.com/postmannen/ctrl.git
synced 2025-04-08 09:54:33 +00:00
Added child contexts for errorkernel and startup subscribers
This commit is contained in:
parent
661a3e6fd9
commit
d2846007bd
2 changed files with 79 additions and 53 deletions
|
@ -8,6 +8,7 @@
|
|||
package steward
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
@ -41,51 +42,59 @@ func newErrorKernel() *errorKernel {
|
|||
// process if it should continue or not based not based on how severe
|
||||
// the error where. This should be right after sending the error
|
||||
// sending in the process.
|
||||
func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage) {
|
||||
func (e *errorKernel) start(ctx context.Context, newMessagesCh chan<- []subjectAndMessage) error {
|
||||
// NOTE: For now it will just print the error messages to the
|
||||
// console.
|
||||
go func() {
|
||||
|
||||
for {
|
||||
er := <-e.errorCh
|
||||
|
||||
// We should be able to handle each error individually and
|
||||
// also concurrently, so the handler is started in it's
|
||||
// own go routine
|
||||
go func() {
|
||||
// NOTE: Here we should check the severity of the error,
|
||||
// and also possibly the the error-state of the process
|
||||
// that fails, so we can decide if we should stop and
|
||||
// start a new process to replace to old one, or if we
|
||||
// should just kill the process and send message back to
|
||||
// the operator....or other ?
|
||||
//
|
||||
// Just print the error, and tell the process to continue
|
||||
|
||||
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
|
||||
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
|
||||
|
||||
// // TESTING: Creating an error message to send to errorCentral
|
||||
// fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
|
||||
// sam := subjectAndMessage{
|
||||
// Subject: Subject{
|
||||
// ToNode: "errorCentral",
|
||||
// CommandOrEvent: EventNACK,
|
||||
// Method: ErrorLog,
|
||||
// },
|
||||
// Message: Message{
|
||||
// ToNode: "errorCentral",
|
||||
// Data: []string{"some tull here .............."},
|
||||
// CommandOrEvent: EventNACK,
|
||||
// Method: ErrorLog,
|
||||
// },
|
||||
// }
|
||||
// newMessagesCh <- []subjectAndMessage{sam}
|
||||
|
||||
er.errorActionCh <- errActionContinue
|
||||
}()
|
||||
for {
|
||||
var er errProcess
|
||||
select {
|
||||
case er = <-e.errorCh:
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("info: stopping errorKernel")
|
||||
}
|
||||
}()
|
||||
|
||||
// We should be able to handle each error individually and
|
||||
// also concurrently, so the handler is started in it's
|
||||
// own go routine
|
||||
go func() {
|
||||
// NOTE: Here we should check the severity of the error,
|
||||
// and also possibly the the error-state of the process
|
||||
// that fails, so we can decide if we should stop and
|
||||
// start a new process to replace to old one, or if we
|
||||
// should just kill the process and send message back to
|
||||
// the operator....or other ?
|
||||
//
|
||||
// Just print the error, and tell the process to continue
|
||||
|
||||
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
|
||||
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
|
||||
|
||||
// // TESTING: Creating an error message to send to errorCentral
|
||||
// fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
|
||||
// sam := subjectAndMessage{
|
||||
// Subject: Subject{
|
||||
// ToNode: "errorCentral",
|
||||
// CommandOrEvent: EventNACK,
|
||||
// Method: ErrorLog,
|
||||
// },
|
||||
// Message: Message{
|
||||
// ToNode: "errorCentral",
|
||||
// Data: []string{"some tull here .............."},
|
||||
// CommandOrEvent: EventNACK,
|
||||
// Method: ErrorLog,
|
||||
// },
|
||||
// }
|
||||
// newMessagesCh <- []subjectAndMessage{sam}
|
||||
|
||||
select {
|
||||
case er.errorActionCh <- errActionContinue:
|
||||
case <-ctx.Done():
|
||||
log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n")
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
type errorAction int
|
||||
|
|
41
server.go
41
server.go
|
@ -239,10 +239,25 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
// if there is publisher process for a given message subject, and
|
||||
// not exist it will spawn one.
|
||||
func (s *server) Start() {
|
||||
// Stop main context last when exits.
|
||||
defer func() {
|
||||
s.ctxCancelFunc()
|
||||
log.Printf("info: stopping the main server context with ctxCancelFunc()\n")
|
||||
}()
|
||||
// Start the error kernel that will do all the error handling
|
||||
// not done within a process.
|
||||
s.errorKernel = newErrorKernel()
|
||||
s.errorKernel.startErrorKernel(s.toRingbufferCh)
|
||||
{
|
||||
s.errorKernel = newErrorKernel()
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
|
||||
go func() {
|
||||
err := s.errorKernel.start(ctx, s.toRingbufferCh)
|
||||
if err != nil {
|
||||
log.Printf("%v\n", err)
|
||||
}
|
||||
}()
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
// Start collecting the metrics
|
||||
go s.startMetrics()
|
||||
|
@ -266,15 +281,20 @@ func (s *server) Start() {
|
|||
// Start up the predefined subscribers. Since all the logic to handle
|
||||
// processes are tied to the process struct, we need to create an
|
||||
// initial process to start the rest.
|
||||
sub := newSubject(REQInitial, s.nodeName)
|
||||
p := newProcess(s.ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
|
||||
p.ProcessesStart(s.ctx)
|
||||
{
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
sub := newSubject(REQInitial, s.nodeName)
|
||||
p := newProcess(ctx, s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
|
||||
p.ProcessesStart(ctx)
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
s.processes.printProcessesMap()
|
||||
time.Sleep(time.Second * 1)
|
||||
s.processes.printProcessesMap()
|
||||
|
||||
// Start the processing of new messages from an input channel.
|
||||
s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh)
|
||||
// Start the processing of new messages from an input channel.
|
||||
s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh)
|
||||
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
// Set up channel on which to send signal notifications.
|
||||
// We must use a buffered channel or risk missing the signal
|
||||
|
@ -303,9 +323,6 @@ func (s *server) Start() {
|
|||
// deeply needed to implement this.
|
||||
// But still.. Need to look into this.
|
||||
//time.Sleep(time.Second * 10)
|
||||
s.ctxCancelFunc()
|
||||
fmt.Printf(" *** Done: ctxCancelFunc()\n")
|
||||
|
||||
}
|
||||
|
||||
func (p *processes) printProcessesMap() {
|
||||
|
|
Loading…
Add table
Reference in a new issue