From db60d69289b3dc693442117a938ee652c13ec21e Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 19 Jan 2022 05:44:20 +0100 Subject: [PATCH] added errorKernel to the processes struct --- errorkernel.go | 10 ++++++---- processes.go | 9 ++++++--- server.go | 15 ++++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/errorkernel.go b/errorkernel.go index 58cec4d..890eee3 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -25,18 +25,20 @@ type errorKernel struct { // errorCh is used to report errors from a process errorCh chan errorEvent - ctx context.Context - cancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc + metrics *metrics } // newErrorKernel will initialize and return a new error kernel -func newErrorKernel(ctx context.Context) *errorKernel { +func newErrorKernel(ctx context.Context, m *metrics) *errorKernel { ctxC, cancel := context.WithCancel(ctx) return &errorKernel{ errorCh: make(chan errorEvent, 2), ctx: ctxC, cancel: cancel, + metrics: m, } } @@ -100,7 +102,7 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error { newMessagesCh <- []subjectAndMessage{sam} - //metrics.promErrorMessagesSentTotal.Inc() + e.metrics.promErrorMessagesSentTotal.Inc() }() default: diff --git a/processes.go b/processes.go index 93a3bfd..79bed5c 100644 --- a/processes.go +++ b/processes.go @@ -28,14 +28,17 @@ type processes struct { wg sync.WaitGroup // tui tui *tui + // errorKernel + errorKernel *errorKernel } // newProcesses will prepare and return a *processes which // is map containing all the currently running processes. -func newProcesses(ctx context.Context, metrics *metrics, tui *tui) *processes { +func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel) *processes { p := processes{ - active: *newProcsMap(), - tui: tui, + active: *newProcsMap(), + tui: tui, + errorKernel: errorKernel, } // Prepare the parent context for the subscribers. diff --git a/server.go b/server.go index 1fc4c26..45d7a8c 100644 --- a/server.go +++ b/server.go @@ -69,6 +69,12 @@ func NewServer(c *Configuration, version string) (*server, error) { // Set up the main background context. ctx, cancel := context.WithCancel(context.Background()) + metrics := newMetrics(c.PromHostAndPort) + + // Start the error kernel that will do all the error handling + // that is not done within a process. + errorKernel := newErrorKernel(ctx, metrics) + var opt nats.Option if c.RootCAPath != "" { @@ -122,8 +128,6 @@ func NewServer(c *Configuration, version string) (*server, error) { } } - metrics := newMetrics(c.PromHostAndPort) - // Create the tui client structure if enabled. var tuiClient *tui if c.EnableTUI { @@ -142,11 +146,12 @@ func NewServer(c *Configuration, version string) (*server, error) { nodeName: c.NodeName, natsConn: conn, StewardSocket: stewardSocket, - processes: newProcesses(ctx, metrics, tuiClient), + processes: newProcesses(ctx, metrics, tuiClient, errorKernel), newMessagesCh: make(chan []subjectAndMessage), metrics: metrics, version: version, tui: tuiClient, + errorKernel: errorKernel, } // Create the default data folder for where subscribers should @@ -206,10 +211,6 @@ func (s *server) Start() { log.Printf("Starting steward, version=%+v\n", s.version) s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)}) - // Start the error kernel that will do all the error handling - // that is not done within a process. - s.errorKernel = newErrorKernel(s.ctx) - go func() { err := s.errorKernel.start(s.newMessagesCh) if err != nil {