From 9b41e2e0174c10b3548a5364e205911d6a0047c7 Mon Sep 17 00:00:00 2001 From: Tathagata Paul Date: Mon, 25 Jul 2022 14:55:26 +0530 Subject: [PATCH] Add shutdown methods for exporters and controllers (#4214) * add shutdown methods for exporters and controllers Signed-off-by: Tathagata Paul * remove shutdown exporter and add timeout in main.go Signed-off-by: Tathagata Paul * move ctx timeout to main Signed-off-by: Tathagata Paul * change variable order Signed-off-by: Tathagata Paul --- cmd/kyverno/main.go | 13 +++++++++++-- pkg/metrics/init.go | 13 ++++++++----- pkg/metrics/metrics.go | 22 +++++++++++++++------- pkg/tracing/tracing.go | 15 +++++++++++---- 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 422cb714d1..6e705f1e87 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -273,7 +273,7 @@ func main() { // Metrics Configuration metricsAddr := ":" + metricsPort - metricsConfig, metricsServerMux, err := metrics.InitMetrics( + metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics( disableMetricsExport, otel, metricsAddr, @@ -288,6 +288,12 @@ func main() { os.Exit(1) } + if otel == "grpc" { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer metrics.ShutDownController(ctx, metricsPusher) + defer cancel() + } + if otel == "prometheus" { go func() { setupLog.Info("Enabling Metrics for Kyverno", "address", metricsAddr) @@ -301,11 +307,14 @@ func main() { // Tracing Configuration if enableTracing { setupLog.Info("Enabling tracing for Kyverno...") - err = tracing.NewTraceConfig(otelCollector, transportCreds, kubeClient, log.Log.WithName("Tracing")) + tracerProvider, err := tracing.NewTraceConfig(otelCollector, transportCreds, kubeClient, log.Log.WithName("Tracing")) if err != nil { setupLog.Error(err, "Failed to enable tracing for Kyverno") os.Exit(1) } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer tracing.ShutDownController(ctx, tracerProvider) + defer cancel() } // POLICY CONTROLLER diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go index b6c4c75a73..603c366911 100644 --- a/pkg/metrics/init.go +++ b/pkg/metrics/init.go @@ -5,6 +5,7 @@ import ( "github.com/go-logr/logr" "github.com/kyverno/kyverno/pkg/config" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" "k8s.io/client-go/kubernetes" ) @@ -16,17 +17,19 @@ func InitMetrics( metricsConfigData *config.MetricsConfigData, transportCreds string, kubeClient kubernetes.Interface, - log logr.Logger) (*MetricsConfig, *http.ServeMux, error) { + log logr.Logger) (*MetricsConfig, *http.ServeMux, *controller.Controller, error) { + var metricsConfig *MetricsConfig var err error var metricsServerMux *http.ServeMux + var pusher *controller.Controller if !disableMetricsExport { if otel == "grpc" { // Otlpgrpc metrics will be served on port 4317: default port for otlpgrpcmetrics log.Info("Enabling Metrics for Kyverno", "address", metricsAddr) endpoint := otelCollector + metricsAddr - metricsConfig, err = NewOTLPGRPCConfig( + metricsConfig, pusher, err = NewOTLPGRPCConfig( endpoint, metricsConfigData, transportCreds, @@ -34,16 +37,16 @@ func InitMetrics( log, ) if err != nil { - return nil, nil, err + return nil, nil, pusher, err } } else if otel == "prometheus" { // Prometheus Server will serve metrics on metrics-port metricsConfig, metricsServerMux, err = NewPrometheusConfig(metricsConfigData, log) if err != nil { - return nil, nil, err + return nil, nil, pusher, err } } } - return metricsConfig, metricsServerMux, nil + return metricsConfig, metricsServerMux, pusher, nil } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 3dbc80afae..9bb429cb9e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" kconfig "github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/utils/kube" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -88,12 +89,19 @@ func initializeMetrics(m *MetricsConfig) (*MetricsConfig, error) { return m, nil } +func ShutDownController(ctx context.Context, pusher *controller.Controller) { + // pushes any last exports to the receiver + if err := pusher.Stop(ctx); err != nil { + otel.Handle(err) + } +} + func NewOTLPGRPCConfig(endpoint string, metricsConfigData *kconfig.MetricsConfigData, certs string, kubeClient kubernetes.Interface, log logr.Logger, -) (*MetricsConfig, error) { +) (*MetricsConfig, *controller.Controller, error) { ctx := context.Background() var client otlpmetric.Client @@ -102,7 +110,7 @@ func NewOTLPGRPCConfig(endpoint string, transportCreds, err := kube.FetchCert(ctx, certs, kubeClient) if err != nil { log.Error(err, "Error fetching certificate from secret") - return nil, err + return nil, nil, err } client = otlpmetricgrpc.NewClient( @@ -120,7 +128,7 @@ func NewOTLPGRPCConfig(endpoint string, metricExp, err := otlpmetric.New(ctx, client) if err != nil { log.Error(err, "Failed to create the collector exporter") - return nil, err + return nil, nil, err } res, err := resource.New(context.Background(), @@ -129,7 +137,7 @@ func NewOTLPGRPCConfig(endpoint string, ) if err != nil { log.Error(err, "failed creating resource") - return nil, err + return nil, nil, err } // create controller and bind the exporter with it @@ -152,15 +160,15 @@ func NewOTLPGRPCConfig(endpoint string, m, err = initializeMetrics(m) if err != nil { log.Error(err, "Failed initializing metrics") - return nil, err + return nil, nil, err } if err := pusher.Start(ctx); err != nil { log.Error(err, "could not start metric exporter") - return nil, err + return nil, nil, err } - return m, nil + return m, pusher, nil } func NewPrometheusConfig(metricsConfigData *kconfig.MetricsConfigData, diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index d7dfd3d07b..99526f8d7c 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -17,8 +17,15 @@ import ( "k8s.io/client-go/kubernetes" ) +func ShutDownController(ctx context.Context, tp *sdktrace.TracerProvider) { + // pushes any last exports to the receiver + if err := tp.Shutdown(ctx); err != nil { + otel.Handle(err) + } +} + // NewTraceConfig generates the initial tracing configuration with 'endpoint' as the endpoint to connect to the Opentelemetry Collector -func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interface, log logr.Logger) error { +func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interface, log logr.Logger) (*sdktrace.TracerProvider, error) { ctx := context.Background() var client otlptrace.Client @@ -45,7 +52,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa traceExp, err := otlptrace.New(ctx, client) if err != nil { log.Error(err, "Failed to create the collector exporter") - return err + return nil, err } res, err := resource.New(context.Background(), @@ -54,7 +61,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa ) if err != nil { log.Error(err, "failed creating resource") - return err + return nil, err } bsp := sdktrace.NewBatchSpanProcessor(traceExp) @@ -68,7 +75,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa // set global propagator to tracecontext (the default is no-op). otel.SetTextMapPropagator(propagation.TraceContext{}) otel.SetTracerProvider(tp) - return nil + return tp, nil } // DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.