1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 10:28:36 +00:00

feat: add new client for events (#9323)

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>
This commit is contained in:
Khaled Emara 2024-01-03 03:12:05 +02:00 committed by GitHub
parent 7c94783c6a
commit 88798c3e39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 23 deletions

View file

@ -108,6 +108,7 @@ func main() {
internal.WithKyvernoClient(),
internal.WithDynamicClient(),
internal.WithKyvernoDynamicClient(),
internal.WithEventsClient(),
internal.WithFlagSets(flagset),
)
// parse flags
@ -137,7 +138,7 @@ func main() {
emitEventsValues = []string{}
}
eventGenerator := event.NewEventGenerator(
setup.KyvernoDynamicClient,
setup.EventsClient,
logging.WithName("EventGenerator"),
emitEventsValues...,
)

View file

@ -88,6 +88,7 @@ func main() {
internal.WithLeaderElection(),
internal.WithKyvernoClient(),
internal.WithKyvernoDynamicClient(),
internal.WithEventsClient(),
internal.WithConfigMapCaching(),
internal.WithDeferredLoading(),
internal.WithMetadataClient(),
@ -133,7 +134,7 @@ func main() {
genericloggingcontroller.CheckGeneration,
)
eventGenerator := event.NewEventGenerator(
setup.KyvernoDynamicClient,
setup.EventsClient,
logging.WithName("EventGenerator"),
)
// start informers and wait for cache sync

View file

@ -11,22 +11,24 @@ import (
apisrv "github.com/kyverno/kyverno/pkg/clients/apiserver"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dyn "github.com/kyverno/kyverno/pkg/clients/dynamic"
kube "github.com/kyverno/kyverno/pkg/clients/kube"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
kyverno "github.com/kyverno/kyverno/pkg/clients/kyverno"
meta "github.com/kyverno/kyverno/pkg/clients/metadata"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/metrics"
"github.com/kyverno/kyverno/pkg/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
apiserver "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
func createClientConfig(logger logr.Logger) *rest.Config {
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
func createClientConfig(logger logr.Logger, rateLimitQPS float64, rateLimitBurst int) *rest.Config {
clientConfig, err := config.CreateClientConfig(kubeconfig, rateLimitQPS, rateLimitBurst)
checkError(logger, err, "failed to create rest client configuration")
clientConfig.Wrap(
func(base http.RoundTripper) http.RoundTripper {
@ -36,10 +38,10 @@ func createClientConfig(logger logr.Logger) *rest.Config {
return clientConfig
}
func createKubernetesClient(logger logr.Logger, opts ...kube.NewOption) kubernetes.Interface {
func createKubernetesClient(logger logr.Logger, rateLimitQPS float64, rateLimitBurst int, opts ...kubeclient.NewOption) kubernetes.Interface {
logger = logger.WithName("kube-client")
logger.Info("create kube client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := kube.NewForConfig(createClientConfig(logger), opts...)
client, err := kubeclient.NewForConfig(createClientConfig(logger, rateLimitQPS, rateLimitBurst), opts...)
checkError(logger, err, "failed to create kubernetes client")
return client
}
@ -47,7 +49,7 @@ func createKubernetesClient(logger logr.Logger, opts ...kube.NewOption) kubernet
func createKyvernoClient(logger logr.Logger, opts ...kyverno.NewOption) versioned.Interface {
logger = logger.WithName("kyverno-client")
logger.Info("create kyverno client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := kyverno.NewForConfig(createClientConfig(logger), opts...)
client, err := kyverno.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...)
checkError(logger, err, "failed to create kyverno client")
return client
}
@ -55,7 +57,7 @@ func createKyvernoClient(logger logr.Logger, opts ...kyverno.NewOption) versione
func createDynamicClient(logger logr.Logger, opts ...dyn.NewOption) dynamic.Interface {
logger = logger.WithName("dynamic-client")
logger.Info("create dynamic client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := dyn.NewForConfig(createClientConfig(logger), opts...)
client, err := dyn.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...)
checkError(logger, err, "failed to create dynamic client")
return client
}
@ -63,7 +65,7 @@ func createDynamicClient(logger logr.Logger, opts ...dyn.NewOption) dynamic.Inte
func createMetadataClient(logger logr.Logger, opts ...meta.NewOption) metadata.Interface {
logger = logger.WithName("metadata-client")
logger.Info("create metadata client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := meta.NewForConfig(createClientConfig(logger), opts...)
client, err := meta.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...)
checkError(logger, err, "failed to create metadata client")
return client
}
@ -71,7 +73,7 @@ func createMetadataClient(logger logr.Logger, opts ...meta.NewOption) metadata.I
func createApiServerClient(logger logr.Logger, opts ...apisrv.NewOption) apiserver.Interface {
logger = logger.WithName("apiserver-client")
logger.Info("create apiserver client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := apisrv.NewForConfig(createClientConfig(logger), opts...)
client, err := apisrv.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...)
checkError(logger, err, "failed to create apiserver client")
return client
}
@ -84,10 +86,18 @@ func createKyvernoDynamicClient(logger logr.Logger, ctx context.Context, dyn dyn
return client
}
func createEventsClient(logger logr.Logger, kube kubernetes.Interface, metricsManager metrics.MetricsConfigManager) eventsv1.EventsV1Interface {
logger = logger.WithName("events-client")
logger.Info("create the events client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client := kubeclient.From(createKubernetesClient(logger, eventsRateLimitQPS, eventsRateLimitBurst), kubeclient.WithTracing())
client = client.WithMetrics(metricsManager, metrics.KubeClient)
return client.EventsV1()
}
func CreateAggregatorClient(logger logr.Logger, opts ...agg.NewOption) aggregator.Interface {
logger = logger.WithName("aggregator-client")
logger.Info("create aggregator client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := agg.NewForConfig(createClientConfig(logger), opts...)
client, err := agg.NewForConfig(createClientConfig(logger, clientRateLimitQPS, clientRateLimitBurst), opts...)
checkError(logger, err, "failed to create aggregator client")
return client
}

View file

@ -21,6 +21,7 @@ type Configuration interface {
UsesApiServerClient() bool
UsesMetadataClient() bool
UsesKyvernoDynamicClient() bool
UsesEventsClient() bool
FlagSets() []*flag.FlagSet
}
@ -132,6 +133,12 @@ func WithKyvernoDynamicClient() ConfigurationOption {
}
}
func WithEventsClient() ConfigurationOption {
return func(c *configuration) {
c.usesEventsClient = true
}
}
func WithFlagSets(flagsets ...*flag.FlagSet) ConfigurationOption {
return func(c *configuration) {
c.flagSets = append(c.flagSets, flagsets...)
@ -155,6 +162,7 @@ type configuration struct {
usesApiServerClient bool
usesMetadataClient bool
usesKyvernoDynamicClient bool
usesEventsClient bool
flagSets []*flag.FlagSet
}
@ -222,6 +230,10 @@ func (c *configuration) UsesKyvernoDynamicClient() bool {
return c.usesKyvernoDynamicClient
}
func (c *configuration) UsesEventsClient() bool {
return c.usesEventsClient
}
func (c *configuration) FlagSets() []*flag.FlagSet {
return c.flagSets
}

View file

@ -33,6 +33,8 @@ var (
kubeconfig string
clientRateLimitQPS float64
clientRateLimitBurst int
eventsRateLimitQPS float64
eventsRateLimitBurst int
// engine
enablePolicyException bool
exceptionNamespace string
@ -83,10 +85,12 @@ func initMetricsFlags() {
flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
}
func initKubeconfigFlags(qps float64, burst int) {
func initKubeconfigFlags(qps float64, burst int, eventsQPS float64, eventsBurst int) {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", qps, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", burst, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.Float64Var(&eventsRateLimitQPS, "eventsRateLimitQPS", eventsQPS, "Configure the maximum QPS to the Kubernetes API server from Kyverno for events. Uses the client default if zero.")
flag.IntVar(&eventsRateLimitBurst, "eventsRateLimitBurst", eventsBurst, "Configure the maximum burst for throttle for events. Uses the client default if zero.")
}
func initPolicyExceptionsFlags() {
@ -132,12 +136,16 @@ func initCleanupFlags() {
type options struct {
clientRateLimitQPS float64
clientRateLimitBurst int
eventsRateLimitQPS float64
eventsRateLimitBurst int
}
func newOptions() options {
return options{
clientRateLimitQPS: 20,
clientRateLimitBurst: 50,
eventsRateLimitQPS: 1000,
eventsRateLimitBurst: 2000,
}
}
@ -178,7 +186,7 @@ func initFlags(config Configuration, opts ...Option) {
}
// kubeconfig
if config.UsesKubeconfig() {
initKubeconfigFlags(options.clientRateLimitQPS, options.clientRateLimitBurst)
initKubeconfigFlags(options.clientRateLimitQPS, options.clientRateLimitBurst, options.eventsRateLimitQPS, options.eventsRateLimitBurst)
}
// policy exceptions
if config.UsesPolicyExceptions() {

View file

@ -16,6 +16,7 @@ import (
"github.com/kyverno/kyverno/pkg/imageverifycache"
"github.com/kyverno/kyverno/pkg/metrics"
"github.com/kyverno/kyverno/pkg/registryclient"
eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
)
@ -46,6 +47,7 @@ type SetupResult struct {
ApiServerClient apiserverclient.UpstreamInterface
MetadataClient metadataclient.UpstreamInterface
KyvernoDynamicClient dclient.Interface
EventsClient eventsv1.EventsV1Interface
}
func Setup(config Configuration, name string, skipResourceFilters bool) (context.Context, SetupResult, context.CancelFunc) {
@ -57,7 +59,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context
sdownMaxProcs := setupMaxProcs(logger)
setupProfiling(logger)
ctx, sdownSignals := setupSignals(logger)
client := kubeclient.From(createKubernetesClient(logger), kubeclient.WithTracing())
client := kubeclient.From(createKubernetesClient(logger, clientRateLimitQPS, clientRateLimitBurst), kubeclient.WithTracing())
metricsConfiguration := startMetricsConfigController(ctx, logger, client)
metricsManager, sdownMetrics := SetupMetrics(ctx, logger, metricsConfiguration, client)
client = client.WithMetrics(metricsManager, metrics.KubeClient)
@ -77,7 +79,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context
}
var leaderElectionClient kubeclient.UpstreamInterface
if config.UsesLeaderElection() {
leaderElectionClient = createKubernetesClient(logger, kubeclient.WithMetrics(metricsManager, metrics.KubeClient), kubeclient.WithTracing())
leaderElectionClient = createKubernetesClient(logger, clientRateLimitQPS, clientRateLimitBurst, kubeclient.WithMetrics(metricsManager, metrics.KubeClient), kubeclient.WithTracing())
}
var kyvernoClient kyvernoclient.UpstreamInterface
if config.UsesKyvernoClient() {
@ -95,6 +97,10 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context
if config.UsesKyvernoDynamicClient() {
dClient = createKyvernoDynamicClient(logger, ctx, dynamicClient, client, 15*time.Minute)
}
var eventsClient eventsv1.EventsV1Interface
if config.UsesEventsClient() {
eventsClient = createEventsClient(logger, client, metricsManager)
}
var metadataClient metadataclient.UpstreamInterface
if config.UsesMetadataClient() {
metadataClient = createMetadataClient(logger, metadataclient.WithMetrics(metricsManager, metrics.MetadataClient), metadataclient.WithTracing())
@ -116,6 +122,7 @@ func Setup(config Configuration, name string, skipResourceFilters bool) (context
ApiServerClient: apiServerClient,
MetadataClient: metadataClient,
KyvernoDynamicClient: dClient,
EventsClient: eventsClient,
},
shutdown(logger.WithName("shutdown"), sdownMaxProcs, sdownMetrics, sdownTracing, sdownSignals)
}

View file

@ -255,6 +255,7 @@ func main() {
internal.WithKyvernoClient(),
internal.WithDynamicClient(),
internal.WithKyvernoDynamicClient(),
internal.WithEventsClient(),
internal.WithApiServerClient(),
internal.WithFlagSets(flagset),
)
@ -320,7 +321,7 @@ func main() {
omitEventsValues = []string{}
}
eventGenerator := event.NewEventGenerator(
setup.KyvernoDynamicClient,
setup.EventsClient,
logging.WithName("EventGenerator"),
omitEventsValues...,
)

View file

@ -224,6 +224,7 @@ func main() {
internal.WithDynamicClient(),
internal.WithMetadataClient(),
internal.WithKyvernoDynamicClient(),
internal.WithEventsClient(),
internal.WithFlagSets(flagset),
)
// parse flags
@ -254,7 +255,7 @@ func main() {
omitEventsValues = []string{}
}
eventGenerator := event.NewEventGenerator(
setup.KyvernoDynamicClient,
setup.EventsClient,
logging.WithName("EventGenerator"),
omitEventsValues...,
)

View file

@ -6,10 +6,10 @@ import (
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
"github.com/kyverno/kyverno/pkg/clients/dclient"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
)
@ -29,6 +29,9 @@ type generator struct {
// recorders
recorders map[Source]events.EventRecorder
// client
eventsClient eventsv1.EventsV1Interface
// config
omitEvents sets.Set[string]
logger logr.Logger
@ -46,13 +49,14 @@ type Controller interface {
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(client dclient.Interface, logger logr.Logger, omitEvents ...string) Controller {
func NewEventGenerator(eventsClient eventsv1.EventsV1Interface, logger logr.Logger, omitEvents ...string) Controller {
return &generator{
broadcaster: events.NewBroadcaster(&events.EventSinkImpl{
Interface: client.GetEventsInterface(),
Interface: eventsClient,
}),
omitEvents: sets.New(omitEvents...),
logger: logger,
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
logger: logger,
}
}