1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

Add shutdown methods for exporters and controllers (#4214)

* add shutdown methods for exporters and controllers

Signed-off-by: Tathagata Paul <tathagatapaul7@gmail.com>

* remove shutdown exporter and add timeout in main.go

Signed-off-by: Tathagata Paul <tathagatapaul7@gmail.com>

* move ctx timeout to main

Signed-off-by: Tathagata Paul <tathagatapaul7@gmail.com>

* change variable order

Signed-off-by: Tathagata Paul <tathagatapaul7@gmail.com>
This commit is contained in:
Tathagata Paul 2022-07-25 14:55:26 +05:30 committed by GitHub
parent a190b6ed56
commit 9b41e2e017
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 18 deletions

View file

@ -273,7 +273,7 @@ func main() {
// Metrics Configuration // Metrics Configuration
metricsAddr := ":" + metricsPort metricsAddr := ":" + metricsPort
metricsConfig, metricsServerMux, err := metrics.InitMetrics( metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics(
disableMetricsExport, disableMetricsExport,
otel, otel,
metricsAddr, metricsAddr,
@ -288,6 +288,12 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if otel == "grpc" {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer metrics.ShutDownController(ctx, metricsPusher)
defer cancel()
}
if otel == "prometheus" { if otel == "prometheus" {
go func() { go func() {
setupLog.Info("Enabling Metrics for Kyverno", "address", metricsAddr) setupLog.Info("Enabling Metrics for Kyverno", "address", metricsAddr)
@ -301,11 +307,14 @@ func main() {
// Tracing Configuration // Tracing Configuration
if enableTracing { if enableTracing {
setupLog.Info("Enabling tracing for Kyverno...") setupLog.Info("Enabling tracing for Kyverno...")
err = tracing.NewTraceConfig(otelCollector, transportCreds, kubeClient, log.Log.WithName("Tracing")) tracerProvider, err := tracing.NewTraceConfig(otelCollector, transportCreds, kubeClient, log.Log.WithName("Tracing"))
if err != nil { if err != nil {
setupLog.Error(err, "Failed to enable tracing for Kyverno") setupLog.Error(err, "Failed to enable tracing for Kyverno")
os.Exit(1) os.Exit(1)
} }
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer tracing.ShutDownController(ctx, tracerProvider)
defer cancel()
} }
// POLICY CONTROLLER // POLICY CONTROLLER

View file

@ -5,6 +5,7 @@ import (
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/config"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
@ -16,17 +17,19 @@ func InitMetrics(
metricsConfigData *config.MetricsConfigData, metricsConfigData *config.MetricsConfigData,
transportCreds string, transportCreds string,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
log logr.Logger) (*MetricsConfig, *http.ServeMux, error) { log logr.Logger) (*MetricsConfig, *http.ServeMux, *controller.Controller, error) {
var metricsConfig *MetricsConfig var metricsConfig *MetricsConfig
var err error var err error
var metricsServerMux *http.ServeMux var metricsServerMux *http.ServeMux
var pusher *controller.Controller
if !disableMetricsExport { if !disableMetricsExport {
if otel == "grpc" { if otel == "grpc" {
// Otlpgrpc metrics will be served on port 4317: default port for otlpgrpcmetrics // Otlpgrpc metrics will be served on port 4317: default port for otlpgrpcmetrics
log.Info("Enabling Metrics for Kyverno", "address", metricsAddr) log.Info("Enabling Metrics for Kyverno", "address", metricsAddr)
endpoint := otelCollector + metricsAddr endpoint := otelCollector + metricsAddr
metricsConfig, err = NewOTLPGRPCConfig( metricsConfig, pusher, err = NewOTLPGRPCConfig(
endpoint, endpoint,
metricsConfigData, metricsConfigData,
transportCreds, transportCreds,
@ -34,16 +37,16 @@ func InitMetrics(
log, log,
) )
if err != nil { if err != nil {
return nil, nil, err return nil, nil, pusher, err
} }
} else if otel == "prometheus" { } else if otel == "prometheus" {
// Prometheus Server will serve metrics on metrics-port // Prometheus Server will serve metrics on metrics-port
metricsConfig, metricsServerMux, err = NewPrometheusConfig(metricsConfigData, log) metricsConfig, metricsServerMux, err = NewPrometheusConfig(metricsConfigData, log)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, pusher, err
} }
} }
} }
return metricsConfig, metricsServerMux, nil return metricsConfig, metricsServerMux, pusher, nil
} }

View file

@ -8,6 +8,7 @@ import (
"github.com/go-logr/logr" "github.com/go-logr/logr"
kconfig "github.com/kyverno/kyverno/pkg/config" kconfig "github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/utils/kube" "github.com/kyverno/kyverno/pkg/utils/kube"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
@ -88,12 +89,19 @@ func initializeMetrics(m *MetricsConfig) (*MetricsConfig, error) {
return m, nil return m, nil
} }
func ShutDownController(ctx context.Context, pusher *controller.Controller) {
// pushes any last exports to the receiver
if err := pusher.Stop(ctx); err != nil {
otel.Handle(err)
}
}
func NewOTLPGRPCConfig(endpoint string, func NewOTLPGRPCConfig(endpoint string,
metricsConfigData *kconfig.MetricsConfigData, metricsConfigData *kconfig.MetricsConfigData,
certs string, certs string,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
log logr.Logger, log logr.Logger,
) (*MetricsConfig, error) { ) (*MetricsConfig, *controller.Controller, error) {
ctx := context.Background() ctx := context.Background()
var client otlpmetric.Client var client otlpmetric.Client
@ -102,7 +110,7 @@ func NewOTLPGRPCConfig(endpoint string,
transportCreds, err := kube.FetchCert(ctx, certs, kubeClient) transportCreds, err := kube.FetchCert(ctx, certs, kubeClient)
if err != nil { if err != nil {
log.Error(err, "Error fetching certificate from secret") log.Error(err, "Error fetching certificate from secret")
return nil, err return nil, nil, err
} }
client = otlpmetricgrpc.NewClient( client = otlpmetricgrpc.NewClient(
@ -120,7 +128,7 @@ func NewOTLPGRPCConfig(endpoint string,
metricExp, err := otlpmetric.New(ctx, client) metricExp, err := otlpmetric.New(ctx, client)
if err != nil { if err != nil {
log.Error(err, "Failed to create the collector exporter") log.Error(err, "Failed to create the collector exporter")
return nil, err return nil, nil, err
} }
res, err := resource.New(context.Background(), res, err := resource.New(context.Background(),
@ -129,7 +137,7 @@ func NewOTLPGRPCConfig(endpoint string,
) )
if err != nil { if err != nil {
log.Error(err, "failed creating resource") log.Error(err, "failed creating resource")
return nil, err return nil, nil, err
} }
// create controller and bind the exporter with it // create controller and bind the exporter with it
@ -152,15 +160,15 @@ func NewOTLPGRPCConfig(endpoint string,
m, err = initializeMetrics(m) m, err = initializeMetrics(m)
if err != nil { if err != nil {
log.Error(err, "Failed initializing metrics") log.Error(err, "Failed initializing metrics")
return nil, err return nil, nil, err
} }
if err := pusher.Start(ctx); err != nil { if err := pusher.Start(ctx); err != nil {
log.Error(err, "could not start metric exporter") log.Error(err, "could not start metric exporter")
return nil, err return nil, nil, err
} }
return m, nil return m, pusher, nil
} }
func NewPrometheusConfig(metricsConfigData *kconfig.MetricsConfigData, func NewPrometheusConfig(metricsConfigData *kconfig.MetricsConfigData,

View file

@ -17,8 +17,15 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func ShutDownController(ctx context.Context, tp *sdktrace.TracerProvider) {
// pushes any last exports to the receiver
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}
// NewTraceConfig generates the initial tracing configuration with 'endpoint' as the endpoint to connect to the Opentelemetry Collector // NewTraceConfig generates the initial tracing configuration with 'endpoint' as the endpoint to connect to the Opentelemetry Collector
func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interface, log logr.Logger) error { func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interface, log logr.Logger) (*sdktrace.TracerProvider, error) {
ctx := context.Background() ctx := context.Background()
var client otlptrace.Client var client otlptrace.Client
@ -45,7 +52,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
traceExp, err := otlptrace.New(ctx, client) traceExp, err := otlptrace.New(ctx, client)
if err != nil { if err != nil {
log.Error(err, "Failed to create the collector exporter") log.Error(err, "Failed to create the collector exporter")
return err return nil, err
} }
res, err := resource.New(context.Background(), res, err := resource.New(context.Background(),
@ -54,7 +61,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
) )
if err != nil { if err != nil {
log.Error(err, "failed creating resource") log.Error(err, "failed creating resource")
return err return nil, err
} }
bsp := sdktrace.NewBatchSpanProcessor(traceExp) bsp := sdktrace.NewBatchSpanProcessor(traceExp)
@ -68,7 +75,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
// set global propagator to tracecontext (the default is no-op). // set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{}) otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTracerProvider(tp) otel.SetTracerProvider(tp)
return nil return tp, nil
} }
// DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. // DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.