1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-04-15 00:36:28 +00:00

feat: use client funcs from internal cmd package (#5443)

* refactor: improve instrumented clients creation

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* refactor: instrumented clients code part 3

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* metadata

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* metadata

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* feat: use client funcs from internal cmd package

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fixes

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-23 11:36:15 +01:00 committed by GitHub
parent 83a68c6707
commit 25a10e7cd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 182 deletions

View file

@ -18,13 +18,9 @@ import (
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var (
kubeconfig string
clientRateLimitQPS float64
clientRateLimitBurst int
otel string
otelCollector string
metricsPort string
@ -38,9 +34,6 @@ const (
func parseFlags(config internal.Configuration) {
internal.InitFlags(config)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
@ -49,46 +42,6 @@ func parseFlags(config internal.Configuration) {
flag.Parse()
}
func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, error) {
logger = logger.WithName("kube-clients")
logger.Info("create kube clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil {
return nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, nil, err
}
return clientConfig, kubeClient, nil
}
func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientConfig *rest.Config, metricsConfig *metrics.MetricsConfig) (kubernetes.Interface, dclient.Interface, error) {
logger = logger.WithName("instrumented-clients")
logger.Info("create instrumented clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
kubeClient, err := kubeclient.NewForConfig(
clientConfig,
kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
kubeclient.WithTracing(),
)
if err != nil {
return nil, nil, err
}
dynamicClient, err := dynamicclient.NewForConfig(
clientConfig,
dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
dynamicclient.WithTracing(),
)
if err != nil {
return nil, nil, err
}
dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, resyncPeriod)
if err != nil {
return nil, nil, err
}
return kubeClient, dClient, nil
}
func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
@ -130,7 +83,11 @@ func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics
func main() {
// config
appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
internal.WithTracing(),
internal.WithKubeconfig(),
)
// parse flags
parseFlags(appConfig)
// setup logger
@ -142,11 +99,8 @@ func main() {
internal.ShowVersion(logger)
// start profiling
internal.SetupProfiling(logger)
// create client config and kube clients
clientConfig, rawClient, err := createKubeClients(logger)
if err != nil {
os.Exit(1)
}
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup signals
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
@ -160,14 +114,16 @@ func main() {
defer metricsShutdown()
}
// create instrumented clients
kubeClient, dynamicClient, err := createInstrumentedClients(signalCtx, logger, clientConfig, metricsConfig)
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
logger.Error(err, "failed to create instrument clients")
logger.Error(err, "failed to create dynamic client")
os.Exit(1)
}
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
policyHandlers := NewHandlers(
dynamicClient,
dClient,
)
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
// start informers and wait for cache sync

View file

@ -25,16 +25,9 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)
var (
kubeconfig string
clientRateLimitQPS float64
clientRateLimitBurst int
)
const (
policyReportKind string = "PolicyReport"
clusterPolicyReportKind string = "ClusterPolicyReport"
@ -43,15 +36,14 @@ const (
func parseFlags(config internal.Configuration) {
internal.InitFlags(config)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 0, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.Parse()
}
func main() {
// config
appConfig := internal.NewConfiguration()
appConfig := internal.NewConfiguration(
internal.WithKubeconfig(),
)
// parse flags
parseFlags(appConfig)
// setup logger
@ -64,24 +56,10 @@ func main() {
// os signal handler
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// create client config
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil {
logger.Error(err, "Failed to build kubeconfig")
os.Exit(1)
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create kubernetes client")
os.Exit(1)
}
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create dynamic client")
os.Exit(1)
}
kubeClient := internal.CreateKubernetesClient(logger)
dynamicClient := internal.CreateDynamicClient(logger)
kyvernoClient := internal.CreateKyvernoClient(logger)
// DYNAMIC CLIENT
// - client for all registered resources
@ -96,12 +74,6 @@ func main() {
os.Exit(1)
}
pclient, err := kyvernoclient.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create client")
os.Exit(1)
}
// Exit for unsupported version of kubernetes cluster
if !utils.HigherThanKubernetesVersion(kubeClient.Discovery(), logging.GlobalLogger(), 1, 16, 0) {
os.Exit(1)
@ -150,8 +122,8 @@ func main() {
in := gen(done, signalCtx.Done(), requests...)
// process requests
// processing routine count : 2
p1 := process(client, pclient, done, signalCtx.Done(), in)
p2 := process(client, pclient, done, signalCtx.Done(), in)
p1 := process(client, kyvernoClient, done, signalCtx.Done(), in)
p2 := process(client, kyvernoClient, done, signalCtx.Done(), in)
// merge results from processing routines
for err := range merge(done, signalCtx.Done(), p1, p2) {
if err != nil {

View file

@ -21,6 +21,7 @@ import (
dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
kyvernoclient "github.com/kyverno/kyverno/pkg/clients/kyverno"
metadataclient "github.com/kyverno/kyverno/pkg/clients/metadata"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers/certmanager"
configcontroller "github.com/kyverno/kyverno/pkg/controllers/config"
@ -52,20 +53,16 @@ import (
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
metadataclient "k8s.io/client-go/metadata"
metadatainformers "k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
)
const (
resyncPeriod = 15 * time.Minute
metadataResyncPeriod = 15 * time.Minute
resyncPeriod = 15 * time.Minute
)
var (
// TODO: this has been added to backward support command line arguments
// will be removed in future and the configuration will be set only via configmaps
kubeconfig string
serverIP string
metricsPort string
webhookTimeout int
@ -79,8 +76,6 @@ var (
imagePullSecrets string
imageSignatureRepository string
allowInsecureRegistry bool
clientRateLimitQPS float64
clientRateLimitBurst int
webhookRegistrationTimeout time.Duration
backgroundScan bool
admissionReports bool
@ -98,7 +93,6 @@ func parseFlags(config internal.Configuration) {
flag.IntVar(&webhookTimeout, "webhookTimeout", webhookcontroller.DefaultWebhookTimeout, "Timeout for webhook configurations.")
flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.")
flag.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
@ -109,8 +103,6 @@ func parseFlags(config internal.Configuration) {
flag.StringVar(&imageSignatureRepository, "imageSignatureRepository", "", "Alternate repository for image signatures. Can be overridden per rule via `verifyImages.Repository`.")
flag.BoolVar(&allowInsecureRegistry, "allowInsecureRegistry", false, "Whether to allow insecure connections to registries. Don't use this for anything but testing.")
flag.BoolVar(&autoUpdateWebhooks, "autoUpdateWebhooks", true, "Set this flag to 'false' to disable auto-configuration of the webhook.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.DurationVar(&webhookRegistrationTimeout, "webhookRegistrationTimeout", 120*time.Second, "Timeout for webhook registration, e.g., 30s, 1m, 5m.")
flag.Func(toggle.ProtectManagedResourcesFlagName, toggle.ProtectManagedResourcesDescription, toggle.ProtectManagedResources.Parse)
flag.BoolVar(&backgroundScan, "backgroundScan", true, "Enable or disable backgound scan.")
@ -124,67 +116,6 @@ func parseFlags(config internal.Configuration) {
flag.Parse()
}
func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, metadataclient.Interface, error) {
logger = logger.WithName("kube-clients")
logger.Info("create kube clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil {
return nil, nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, nil, nil, err
}
metadataClient, err := metadataclient.NewForConfig(clientConfig)
if err != nil {
return nil, nil, nil, err
}
return clientConfig, kubeClient, metadataClient, nil
}
func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientConfig *rest.Config, metricsConfig *metrics.MetricsConfig) (kubernetes.Interface, kubernetes.Interface, versioned.Interface, dclient.Interface, error) {
logger = logger.WithName("instrumented-clients")
logger.Info("create instrumented clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
kubeClient, err := kubeclient.NewForConfig(
clientConfig,
kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
kubeclient.WithTracing(),
)
if err != nil {
return nil, nil, nil, nil, err
}
// The leader queries/updates the lease object quite frequently. So we use a separate kube-client to eliminate the throttle issue
kubeClientLeaderElection, err := kubeclient.NewForConfig(
clientConfig,
kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
kubeclient.WithTracing(),
)
if err != nil {
return nil, nil, nil, nil, err
}
kyvernoClient, err := kyvernoclient.NewForConfig(
clientConfig,
kyvernoclient.WithMetrics(metricsConfig, metrics.KubeClient),
kyvernoclient.WithTracing(),
)
if err != nil {
return nil, nil, nil, nil, err
}
dynamicClient, err := dynamicclient.NewForConfig(
clientConfig,
dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
dynamicclient.WithTracing(),
)
if err != nil {
return nil, nil, nil, nil, err
}
dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, metadataResyncPeriod)
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClient, kubeClientLeaderElection, kyvernoClient, dClient, nil
}
func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
@ -463,7 +394,11 @@ func createrLeaderControllers(
func main() {
// config
appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
internal.WithTracing(),
internal.WithKubeconfig(),
)
// parse flags
parseFlags(appConfig)
// setup logger
@ -477,12 +412,8 @@ func main() {
internal.ShowVersion(logger)
// start profiling
internal.SetupProfiling(logger)
// create client config and kube clients
clientConfig, rawClient, metadataClient, err := createKubeClients(logger)
if err != nil {
logger.Error(err, "failed to create kubernetes clients")
os.Exit(1)
}
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
if err != nil {
@ -496,9 +427,14 @@ func main() {
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// create instrumented clients
kubeClient, kubeClientLeaderElection, kyvernoClient, dynamicClient, err := createInstrumentedClients(signalCtx, logger, clientConfig, metricsConfig)
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
leaderElectionClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
kyvernoClient := internal.CreateKyvernoClient(logger, kyvernoclient.WithMetrics(metricsConfig, metrics.KyvernoClient), kyvernoclient.WithTracing())
metadataClient := internal.CreateMetadataClient(logger, metadataclient.WithMetrics(metricsConfig, metrics.KyvernoClient), metadataclient.WithTracing())
dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
logger.Error(err, "failed to create instrument clients")
logger.Error(err, "failed to create dynamic client")
os.Exit(1)
}
// setup tracing
@ -512,7 +448,7 @@ func main() {
// setup cosign
setupCosign(logger)
// check we can run
if err := sanityChecks(dynamicClient); err != nil {
if err := sanityChecks(dClient); err != nil {
logger.Error(err, "sanity checks failed")
os.Exit(1)
}
@ -520,9 +456,7 @@ func main() {
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
configuration, err := config.NewConfiguration(
kubeClient,
)
configuration, err := config.NewConfiguration(kubeClient)
if err != nil {
logger.Error(err, "failed to initialize configuration")
os.Exit(1)
@ -541,7 +475,7 @@ func main() {
)
policyCache := policycache.NewCache()
eventGenerator := event.NewEventGenerator(
dynamicClient,
dClient,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
maxQueuedEvents,
@ -566,7 +500,7 @@ func main() {
kyvernoInformer,
kubeClient,
kyvernoClient,
dynamicClient,
dClient,
configuration,
policyCache,
eventGenerator,
@ -591,7 +525,7 @@ func main() {
logger.WithName("leader-election"),
"kyverno",
config.KyvernoNamespace(),
kubeClientLeaderElection,
leaderElectionClient,
config.KyvernoPodName(),
leaderElectionRetryPeriod,
func(ctx context.Context) {
@ -614,7 +548,7 @@ func main() {
metadataInformer,
kubeClient,
kyvernoClient,
dynamicClient,
dClient,
configuration,
metricsConfig,
eventGenerator,
@ -669,11 +603,11 @@ func main() {
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
)
policyHandlers := webhookspolicy.NewHandlers(
dynamicClient,
dClient,
openApiManager,
)
resourceHandlers := webhooksresource.NewHandlers(
dynamicClient,
dClient,
kyvernoClient,
configuration,
metricsConfig,