From 88798c3e39db2189392c82347a1aeffe6f1b9f04 Mon Sep 17 00:00:00 2001 From: Khaled Emara Date: Wed, 3 Jan 2024 03:12:05 +0200 Subject: [PATCH] feat: add new client for events (#9323) Signed-off-by: Khaled Emara --- cmd/background-controller/main.go | 3 ++- cmd/cleanup-controller/main.go | 3 ++- cmd/internal/client.go | 30 ++++++++++++++++++++---------- cmd/internal/config.go | 12 ++++++++++++ cmd/internal/flag.go | 12 ++++++++++-- cmd/internal/setup.go | 11 +++++++++-- cmd/kyverno/main.go | 3 ++- cmd/reports-controller/main.go | 3 ++- pkg/event/controller.go | 14 +++++++++----- 9 files changed, 68 insertions(+), 23 deletions(-) diff --git a/cmd/background-controller/main.go b/cmd/background-controller/main.go index c973299b6b..c57796200e 100644 --- a/cmd/background-controller/main.go +++ b/cmd/background-controller/main.go @@ -108,6 +108,7 @@ func main() { internal.WithKyvernoClient(), internal.WithDynamicClient(), internal.WithKyvernoDynamicClient(), + internal.WithEventsClient(), internal.WithFlagSets(flagset), ) // parse flags @@ -137,7 +138,7 @@ func main() { emitEventsValues = []string{} } eventGenerator := event.NewEventGenerator( - setup.KyvernoDynamicClient, + setup.EventsClient, logging.WithName("EventGenerator"), emitEventsValues..., ) diff --git a/cmd/cleanup-controller/main.go b/cmd/cleanup-controller/main.go index 20668c1601..ac96679a92 100644 --- a/cmd/cleanup-controller/main.go +++ b/cmd/cleanup-controller/main.go @@ -88,6 +88,7 @@ func main() { internal.WithLeaderElection(), internal.WithKyvernoClient(), internal.WithKyvernoDynamicClient(), + internal.WithEventsClient(), internal.WithConfigMapCaching(), internal.WithDeferredLoading(), internal.WithMetadataClient(), @@ -133,7 +134,7 @@ func main() { genericloggingcontroller.CheckGeneration, ) eventGenerator := event.NewEventGenerator( - setup.KyvernoDynamicClient, + setup.EventsClient, logging.WithName("EventGenerator"), ) // start informers and wait for cache sync diff --git a/cmd/internal/client.go b/cmd/internal/client.go index b35cf47887..ec5616e1e6 100644 --- a/cmd/internal/client.go +++ b/cmd/internal/client.go @@ -11,22 +11,24 @@ import ( apisrv "github.com/kyverno/kyverno/pkg/clients/apiserver" "github.com/kyverno/kyverno/pkg/clients/dclient" dyn "github.com/kyverno/kyverno/pkg/clients/dynamic" - kube "github.com/kyverno/kyverno/pkg/clients/kube" + kubeclient "github.com/kyverno/kyverno/pkg/clients/kube" kyverno "github.com/kyverno/kyverno/pkg/clients/kyverno" meta "github.com/kyverno/kyverno/pkg/clients/metadata" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/metrics" "github.com/kyverno/kyverno/pkg/tracing" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" apiserver "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" ) -func createClientConfig(logger logr.Logger) *rest.Config { - clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst) +func createClientConfig(logger logr.Logger, rateLimitQPS float64, rateLimitBurst int) *rest.Config { + clientConfig, err := config.CreateClientConfig(kubeconfig, rateLimitQPS, rateLimitBurst) checkError(logger, err, "failed to create rest client configuration") clientConfig.Wrap( func(base http.RoundTripper) http.RoundTripper { @@ -36,10 +38,10 @@ func createClientConfig(logger logr.Logger) *rest.Config { return clientConfig } -func createKubernetesClient(logger logr.Logger, opts ...kube.NewOption) kubernetes.Interface { +func createKubernetesClient(logger logr.Logger, rateLimitQPS float64, rateLimitBurst int, opts ...kubeclient.NewOption) kubernetes.Interface { logger = logger.WithName("kube-client") logger.Info("create kube client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := kube.NewForConfig(createClientConfig(logger), opts...) + client, err := kubeclient.NewForConfig(createClientConfig(logger, rateLimitQPS, rateLimitBurst), opts...) checkError(logger, err, "failed to create kubernetes client") return client } @@ -47,7 +49,7 @@ func createKubernetesClient(logger logr.Logger, opts ...kube.NewOption) kubernet func createKyvernoClient(logger logr.Logger, opts ...kyverno.NewOption) versioned.Interface { logger = logger.WithName("kyverno-client") logger.Info("create kyverno client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := kyverno.NewForConfig(createClientConfig(logger), opts...) + client, err := kyverno.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...) checkError(logger, err, "failed to create kyverno client") return client } @@ -55,7 +57,7 @@ func createKyvernoClient(logger logr.Logger, opts ...kyverno.NewOption) versione func createDynamicClient(logger logr.Logger, opts ...dyn.NewOption) dynamic.Interface { logger = logger.WithName("dynamic-client") logger.Info("create dynamic client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := dyn.NewForConfig(createClientConfig(logger), opts...) + client, err := dyn.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...) checkError(logger, err, "failed to create dynamic client") return client } @@ -63,7 +65,7 @@ func createDynamicClient(logger logr.Logger, opts ...dyn.NewOption) dynamic.Inte func createMetadataClient(logger logr.Logger, opts ...meta.NewOption) metadata.Interface { logger = logger.WithName("metadata-client") logger.Info("create metadata client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := meta.NewForConfig(createClientConfig(logger), opts...) + client, err := meta.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...) checkError(logger, err, "failed to create metadata client") return client } @@ -71,7 +73,7 @@ func createMetadataClient(logger logr.Logger, opts ...meta.NewOption) metadata.I func createApiServerClient(logger logr.Logger, opts ...apisrv.NewOption) apiserver.Interface { logger = logger.WithName("apiserver-client") logger.Info("create apiserver client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := apisrv.NewForConfig(createClientConfig(logger), opts...) + client, err := apisrv.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...) checkError(logger, err, "failed to create apiserver client") return client } @@ -84,10 +86,18 @@ func createKyvernoDynamicClient(logger logr.Logger, ctx context.Context, dyn dyn return client } +func createEventsClient(logger logr.Logger, kube kubernetes.Interface, metricsManager metrics.MetricsConfigManager) eventsv1.EventsV1Interface { + logger = logger.WithName("events-client") + logger.Info("create the events client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) + client := kubeclient.From(createKubernetesClient(logger, eventsRateLimitQPS, eventsRateLimitBurst), kubeclient.WithTracing()) + client = client.WithMetrics(metricsManager, metrics.KubeClient) + return client.EventsV1() +} + func CreateAggregatorClient(logger logr.Logger, opts ...agg.NewOption) aggregator.Interface { logger = logger.WithName("aggregator-client") logger.Info("create aggregator client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst) - client, err := agg.NewForConfig(createClientConfig(logger), opts...) + client, err := agg.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...) checkError(logger, err, "failed to create aggregator client") return client } diff --git a/cmd/internal/config.go b/cmd/internal/config.go index f32a165a3b..be92d66aee 100644 --- a/cmd/internal/config.go +++ b/cmd/internal/config.go @@ -21,6 +21,7 @@ type Configuration interface { UsesApiServerClient() bool UsesMetadataClient() bool UsesKyvernoDynamicClient() bool + UsesEventsClient() bool FlagSets() []*flag.FlagSet } @@ -132,6 +133,12 @@ func WithKyvernoDynamicClient() ConfigurationOption { } } +func WithEventsClient() ConfigurationOption { + return func(c *configuration) { + c.usesEventsClient = true + } +} + func WithFlagSets(flagsets ...*flag.FlagSet) ConfigurationOption { return func(c *configuration) { c.flagSets = append(c.flagSets, flagsets...) @@ -155,6 +162,7 @@ type configuration struct { usesApiServerClient bool usesMetadataClient bool usesKyvernoDynamicClient bool + usesEventsClient bool flagSets []*flag.FlagSet } @@ -222,6 +230,10 @@ func (c *configuration) UsesKyvernoDynamicClient() bool { return c.usesKyvernoDynamicClient } +func (c *configuration) UsesEventsClient() bool { + return c.usesEventsClient +} + func (c *configuration) FlagSets() []*flag.FlagSet { return c.flagSets } diff --git a/cmd/internal/flag.go b/cmd/internal/flag.go index 7f69dba7b0..f00fac4865 100644 --- a/cmd/internal/flag.go +++ b/cmd/internal/flag.go @@ -33,6 +33,8 @@ var ( kubeconfig string clientRateLimitQPS float64 clientRateLimitBurst int + eventsRateLimitQPS float64 + eventsRateLimitBurst int // engine enablePolicyException bool exceptionNamespace string @@ -83,10 +85,12 @@ func initMetricsFlags() { flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.") } -func initKubeconfigFlags(qps float64, burst int) { +func initKubeconfigFlags(qps float64, burst int, eventsQPS float64, eventsBurst int) { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", qps, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.") flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", burst, "Configure the maximum burst for throttle. Uses the client default if zero.") + flag.Float64Var(&eventsRateLimitQPS, "eventsRateLimitQPS", eventsQPS, "Configure the maximum QPS to the Kubernetes API server from Kyverno for events. Uses the client default if zero.") + flag.IntVar(&eventsRateLimitBurst, "eventsRateLimitBurst", eventsBurst, "Configure the maximum burst for throttle for events. Uses the client default if zero.") } func initPolicyExceptionsFlags() { @@ -132,12 +136,16 @@ func initCleanupFlags() { type options struct { clientRateLimitQPS float64 clientRateLimitBurst int + eventsRateLimitQPS float64 + eventsRateLimitBurst int } func newOptions() options { return options{ clientRateLimitQPS: 20, clientRateLimitBurst: 50, + eventsRateLimitQPS: 1000, + eventsRateLimitBurst: 2000, } } @@ -178,7 +186,7 @@ func initFlags(config Configuration, opts ...Option) { } // kubeconfig if config.UsesKubeconfig() { - initKubeconfigFlags(options.clientRateLimitQPS, options.clientRateLimitBurst) + initKubeconfigFlags(options.clientRateLimitQPS, options.clientRateLimitBurst, options.eventsRateLimitQPS, options.eventsRateLimitBurst) } // policy exceptions if config.UsesPolicyExceptions() { diff --git a/cmd/internal/setup.go b/cmd/internal/setup.go index fa67d64bed..b30a443ffd 100644 --- a/cmd/internal/setup.go +++ b/cmd/internal/setup.go @@ -16,6 +16,7 @@ import ( "github.com/kyverno/kyverno/pkg/imageverifycache" "github.com/kyverno/kyverno/pkg/metrics" "github.com/kyverno/kyverno/pkg/registryclient" + eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" corev1listers "k8s.io/client-go/listers/core/v1" ) @@ -46,6 +47,7 @@ type SetupResult struct { ApiServerClient apiserverclient.UpstreamInterface MetadataClient metadataclient.UpstreamInterface KyvernoDynamicClient dclient.Interface + EventsClient eventsv1.EventsV1Interface } func Setup(config Configuration, name string, skipResourceFilters bool) (context.Context, SetupResult, context.CancelFunc) { @@ -57,7 +59,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context sdownMaxProcs := setupMaxProcs(logger) setupProfiling(logger) ctx, sdownSignals := setupSignals(logger) - client := kubeclient.From(createKubernetesClient(logger), kubeclient.WithTracing()) + client := kubeclient.From(createKubernetesClient(logger, clientRateLimitQPS, clientRateLimitBurst), kubeclient.WithTracing()) metricsConfiguration := startMetricsConfigController(ctx, logger, client) metricsManager, sdownMetrics := SetupMetrics(ctx, logger, metricsConfiguration, client) client = client.WithMetrics(metricsManager, metrics.KubeClient) @@ -77,7 +79,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context } var leaderElectionClient kubeclient.UpstreamInterface if config.UsesLeaderElection() { - leaderElectionClient = createKubernetesClient(logger, kubeclient.WithMetrics(metricsManager, metrics.KubeClient), kubeclient.WithTracing()) + leaderElectionClient = createKubernetesClient(logger, clientRateLimitQPS, clientRateLimitBurst, kubeclient.WithMetrics(metricsManager, metrics.KubeClient), kubeclient.WithTracing()) } var kyvernoClient kyvernoclient.UpstreamInterface if config.UsesKyvernoClient() { @@ -95,6 +97,10 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context if config.UsesKyvernoDynamicClient() { dClient = createKyvernoDynamicClient(logger, ctx, dynamicClient, client, 15*time.Minute) } + var eventsClient eventsv1.EventsV1Interface + if config.UsesEventsClient() { + eventsClient = createEventsClient(logger, client, metricsManager) + } var metadataClient metadataclient.UpstreamInterface if config.UsesMetadataClient() { metadataClient = createMetadataClient(logger, metadataclient.WithMetrics(metricsManager, metrics.MetadataClient), metadataclient.WithTracing()) @@ -116,6 +122,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context ApiServerClient: apiServerClient, MetadataClient: metadataClient, KyvernoDynamicClient: dClient, + EventsClient: eventsClient, }, shutdown(logger.WithName("shutdown"), sdownMaxProcs, sdownMetrics, sdownTracing, sdownSignals) } diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 6ada959df6..f607bd54b6 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -255,6 +255,7 @@ func main() { internal.WithKyvernoClient(), internal.WithDynamicClient(), internal.WithKyvernoDynamicClient(), + internal.WithEventsClient(), internal.WithApiServerClient(), internal.WithFlagSets(flagset), ) @@ -320,7 +321,7 @@ func main() { omitEventsValues = []string{} } eventGenerator := event.NewEventGenerator( - setup.KyvernoDynamicClient, + setup.EventsClient, logging.WithName("EventGenerator"), omitEventsValues..., ) diff --git a/cmd/reports-controller/main.go b/cmd/reports-controller/main.go index 5f75a1c9b8..2310c1cf66 100644 --- a/cmd/reports-controller/main.go +++ b/cmd/reports-controller/main.go @@ -224,6 +224,7 @@ func main() { internal.WithDynamicClient(), internal.WithMetadataClient(), internal.WithKyvernoDynamicClient(), + internal.WithEventsClient(), internal.WithFlagSets(flagset), ) // parse flags @@ -254,7 +255,7 @@ func main() { omitEventsValues = []string{} } eventGenerator := event.NewEventGenerator( - setup.KyvernoDynamicClient, + setup.EventsClient, logging.WithName("EventGenerator"), omitEventsValues..., ) diff --git a/pkg/event/controller.go b/pkg/event/controller.go index de7ac0e156..8606ca6b70 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -6,10 +6,10 @@ import ( "github.com/go-logr/logr" "github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme" - "github.com/kyverno/kyverno/pkg/clients/dclient" corev1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/tools/events" "k8s.io/klog/v2" ) @@ -29,6 +29,9 @@ type generator struct { // recorders recorders map[Source]events.EventRecorder + // client + eventsClient eventsv1.EventsV1Interface + // config omitEvents sets.Set[string] logger logr.Logger @@ -46,13 +49,14 @@ type Controller interface { } // NewEventGenerator to generate a new event controller -func NewEventGenerator(client dclient.Interface, logger logr.Logger, omitEvents ...string) Controller { +func NewEventGenerator(eventsClient eventsv1.EventsV1Interface, logger logr.Logger, omitEvents ...string) Controller { return &generator{ broadcaster: events.NewBroadcaster(&events.EventSinkImpl{ - Interface: client.GetEventsInterface(), + Interface: eventsClient, }), - omitEvents: sets.New(omitEvents...), - logger: logger, + eventsClient: eventsClient, + omitEvents: sets.New(omitEvents...), + logger: logger, } }