1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

added errorKernel to the processes struct

This commit is contained in:
postmannen 2022-01-19 05:44:20 +01:00
parent 08749f517e
commit db60d69289
3 changed files with 20 additions and 14 deletions

View file

@ -25,18 +25,20 @@ type errorKernel struct {
// errorCh is used to report errors from a process // errorCh is used to report errors from a process
errorCh chan errorEvent errorCh chan errorEvent
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
metrics *metrics
} }
// newErrorKernel will initialize and return a new error kernel // newErrorKernel will initialize and return a new error kernel
func newErrorKernel(ctx context.Context) *errorKernel { func newErrorKernel(ctx context.Context, m *metrics) *errorKernel {
ctxC, cancel := context.WithCancel(ctx) ctxC, cancel := context.WithCancel(ctx)
return &errorKernel{ return &errorKernel{
errorCh: make(chan errorEvent, 2), errorCh: make(chan errorEvent, 2),
ctx: ctxC, ctx: ctxC,
cancel: cancel, cancel: cancel,
metrics: m,
} }
} }
@ -100,7 +102,7 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
newMessagesCh <- []subjectAndMessage{sam} newMessagesCh <- []subjectAndMessage{sam}
//metrics.promErrorMessagesSentTotal.Inc() e.metrics.promErrorMessagesSentTotal.Inc()
}() }()
default: default:

View file

@ -28,14 +28,17 @@ type processes struct {
wg sync.WaitGroup wg sync.WaitGroup
// tui // tui
tui *tui tui *tui
// errorKernel
errorKernel *errorKernel
} }
// newProcesses will prepare and return a *processes which // newProcesses will prepare and return a *processes which
// is map containing all the currently running processes. // is map containing all the currently running processes.
func newProcesses(ctx context.Context, metrics *metrics, tui *tui) *processes { func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel) *processes {
p := processes{ p := processes{
active: *newProcsMap(), active: *newProcsMap(),
tui: tui, tui: tui,
errorKernel: errorKernel,
} }
// Prepare the parent context for the subscribers. // Prepare the parent context for the subscribers.

View file

@ -69,6 +69,12 @@ func NewServer(c *Configuration, version string) (*server, error) {
// Set up the main background context. // Set up the main background context.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
metrics := newMetrics(c.PromHostAndPort)
// Start the error kernel that will do all the error handling
// that is not done within a process.
errorKernel := newErrorKernel(ctx, metrics)
var opt nats.Option var opt nats.Option
if c.RootCAPath != "" { if c.RootCAPath != "" {
@ -122,8 +128,6 @@ func NewServer(c *Configuration, version string) (*server, error) {
} }
} }
metrics := newMetrics(c.PromHostAndPort)
// Create the tui client structure if enabled. // Create the tui client structure if enabled.
var tuiClient *tui var tuiClient *tui
if c.EnableTUI { if c.EnableTUI {
@ -142,11 +146,12 @@ func NewServer(c *Configuration, version string) (*server, error) {
nodeName: c.NodeName, nodeName: c.NodeName,
natsConn: conn, natsConn: conn,
StewardSocket: stewardSocket, StewardSocket: stewardSocket,
processes: newProcesses(ctx, metrics, tuiClient), processes: newProcesses(ctx, metrics, tuiClient, errorKernel),
newMessagesCh: make(chan []subjectAndMessage), newMessagesCh: make(chan []subjectAndMessage),
metrics: metrics, metrics: metrics,
version: version, version: version,
tui: tuiClient, tui: tuiClient,
errorKernel: errorKernel,
} }
// Create the default data folder for where subscribers should // Create the default data folder for where subscribers should
@ -206,10 +211,6 @@ func (s *server) Start() {
log.Printf("Starting steward, version=%+v\n", s.version) log.Printf("Starting steward, version=%+v\n", s.version)
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)}) s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
// Start the error kernel that will do all the error handling
// that is not done within a process.
s.errorKernel = newErrorKernel(s.ctx)
go func() { go func() {
err := s.errorKernel.start(s.newMessagesCh) err := s.errorKernel.start(s.newMessagesCh)
if err != nil { if err != nil {