diff --git a/cmd/cleanup-controller/main.go b/cmd/cleanup-controller/main.go
index e6121d0f6a..374c8d02c8 100644
--- a/cmd/cleanup-controller/main.go
+++ b/cmd/cleanup-controller/main.go
@@ -18,13 +18,9 @@ import (
 	corev1 "k8s.io/api/core/v1"
 	kubeinformers "k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/rest"
 )
 
 var (
-	kubeconfig           string
-	clientRateLimitQPS   float64
-	clientRateLimitBurst int
 	otel                 string
 	otelCollector        string
 	metricsPort          string
@@ -38,9 +34,6 @@ const (
 
 func parseFlags(config internal.Configuration) {
 	internal.InitFlags(config)
-	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
-	flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
-	flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
 	flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
 	flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
 	flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
@@ -49,46 +42,6 @@ func parseFlags(config internal.Configuration) {
 	flag.Parse()
 }
 
-func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, error) {
-	logger = logger.WithName("kube-clients")
-	logger.Info("create kube clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
-	clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
-	if err != nil {
-		return nil, nil, err
-	}
-	kubeClient, err := kubernetes.NewForConfig(clientConfig)
-	if err != nil {
-		return nil, nil, err
-	}
-	return clientConfig, kubeClient, nil
-}
-
-func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientConfig *rest.Config, metricsConfig *metrics.MetricsConfig) (kubernetes.Interface, dclient.Interface, error) {
-	logger = logger.WithName("instrumented-clients")
-	logger.Info("create instrumented clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
-	kubeClient, err := kubeclient.NewForConfig(
-		clientConfig,
-		kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		kubeclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, err
-	}
-	dynamicClient, err := dynamicclient.NewForConfig(
-		clientConfig,
-		dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		dynamicclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, err
-	}
-	dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, resyncPeriod)
-	if err != nil {
-		return nil, nil, err
-	}
-	return kubeClient, dClient, nil
-}
-
 func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
 	logger = logger.WithName("metrics")
 	logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
@@ -130,7 +83,11 @@ func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics
 
 func main() {
 	// config
-	appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
+	appConfig := internal.NewConfiguration(
+		internal.WithProfiling(),
+		internal.WithTracing(),
+		internal.WithKubeconfig(),
+	)
 	// parse flags
 	parseFlags(appConfig)
 	// setup logger
@@ -142,11 +99,8 @@ func main() {
 	internal.ShowVersion(logger)
 	// start profiling
 	internal.SetupProfiling(logger)
-	// create client config and kube clients
-	clientConfig, rawClient, err := createKubeClients(logger)
-	if err != nil {
-		os.Exit(1)
-	}
+	// create raw client
+	rawClient := internal.CreateKubernetesClient(logger)
 	// setup signals
 	signalCtx, signalCancel := internal.SetupSignals(logger)
 	defer signalCancel()
@@ -160,14 +114,16 @@ func main() {
 		defer metricsShutdown()
 	}
 	// create instrumented clients
-	kubeClient, dynamicClient, err := createInstrumentedClients(signalCtx, logger, clientConfig, metricsConfig)
+	kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
+	dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
+	dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
 	if err != nil {
-		logger.Error(err, "failed to create instrument clients")
+		logger.Error(err, "failed to create dynamic client")
 		os.Exit(1)
 	}
 	kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
 	policyHandlers := NewHandlers(
-		dynamicClient,
+		dClient,
 	)
 	secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
 	// start informers and wait for cache sync
diff --git a/cmd/initContainer/main.go b/cmd/initContainer/main.go
index aea14dabd4..32b5eb2c92 100644
--- a/cmd/initContainer/main.go
+++ b/cmd/initContainer/main.go
@@ -25,16 +25,9 @@ import (
 	coordinationv1 "k8s.io/api/coordination/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/dynamic"
 	"k8s.io/client-go/kubernetes"
 )
 
-var (
-	kubeconfig           string
-	clientRateLimitQPS   float64
-	clientRateLimitBurst int
-)
-
 const (
 	policyReportKind        string = "PolicyReport"
 	clusterPolicyReportKind string = "ClusterPolicyReport"
@@ -43,15 +36,14 @@ const (
 
 func parseFlags(config internal.Configuration) {
 	internal.InitFlags(config)
-	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
-	flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 0, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
-	flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
 	flag.Parse()
 }
 
 func main() {
 	// config
-	appConfig := internal.NewConfiguration()
+	appConfig := internal.NewConfiguration(
+		internal.WithKubeconfig(),
+	)
 	// parse flags
 	parseFlags(appConfig)
 	// setup logger
@@ -64,24 +56,10 @@ func main() {
 	// os signal handler
 	signalCtx, signalCancel := internal.SetupSignals(logger)
 	defer signalCancel()
-	// create client config
-	clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
-	if err != nil {
-		logger.Error(err, "Failed to build kubeconfig")
-		os.Exit(1)
-	}
 
-	kubeClient, err := kubernetes.NewForConfig(clientConfig)
-	if err != nil {
-		logger.Error(err, "Failed to create kubernetes client")
-		os.Exit(1)
-	}
-
-	dynamicClient, err := dynamic.NewForConfig(clientConfig)
-	if err != nil {
-		logger.Error(err, "Failed to create dynamic client")
-		os.Exit(1)
-	}
+	kubeClient := internal.CreateKubernetesClient(logger)
+	dynamicClient := internal.CreateDynamicClient(logger)
+	kyvernoClient := internal.CreateKyvernoClient(logger)
 
 	// DYNAMIC CLIENT
 	// - client for all registered resources
@@ -96,12 +74,6 @@ func main() {
 		os.Exit(1)
 	}
 
-	pclient, err := kyvernoclient.NewForConfig(clientConfig)
-	if err != nil {
-		logger.Error(err, "Failed to create client")
-		os.Exit(1)
-	}
-
 	// Exit for unsupported version of kubernetes cluster
 	if !utils.HigherThanKubernetesVersion(kubeClient.Discovery(), logging.GlobalLogger(), 1, 16, 0) {
 		os.Exit(1)
@@ -150,8 +122,8 @@ func main() {
 		in := gen(done, signalCtx.Done(), requests...)
 		// process requests
 		// processing routine count : 2
-		p1 := process(client, pclient, done, signalCtx.Done(), in)
-		p2 := process(client, pclient, done, signalCtx.Done(), in)
+		p1 := process(client, kyvernoClient, done, signalCtx.Done(), in)
+		p2 := process(client, kyvernoClient, done, signalCtx.Done(), in)
 		// merge results from processing routines
 		for err := range merge(done, signalCtx.Done(), p1, p2) {
 			if err != nil {
diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go
index 4ced3a5a11..89bdbc534a 100644
--- a/cmd/kyverno/main.go
+++ b/cmd/kyverno/main.go
@@ -21,6 +21,7 @@ import (
 	dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
 	kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
 	kyvernoclient "github.com/kyverno/kyverno/pkg/clients/kyverno"
+	metadataclient "github.com/kyverno/kyverno/pkg/clients/metadata"
 	"github.com/kyverno/kyverno/pkg/config"
 	"github.com/kyverno/kyverno/pkg/controllers/certmanager"
 	configcontroller "github.com/kyverno/kyverno/pkg/controllers/config"
@@ -52,20 +53,16 @@ import (
 	corev1 "k8s.io/api/core/v1"
 	kubeinformers "k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
-	metadataclient "k8s.io/client-go/metadata"
 	metadatainformers "k8s.io/client-go/metadata/metadatainformer"
-	"k8s.io/client-go/rest"
 )
 
 const (
-	resyncPeriod         = 15 * time.Minute
-	metadataResyncPeriod = 15 * time.Minute
+	resyncPeriod = 15 * time.Minute
 )
 
 var (
 	// TODO: this has been added to backward support command line arguments
 	// will be removed in future and the configuration will be set only via configmaps
-	kubeconfig                 string
 	serverIP                   string
 	metricsPort                string
 	webhookTimeout             int
@@ -79,8 +76,6 @@ var (
 	imagePullSecrets           string
 	imageSignatureRepository   string
 	allowInsecureRegistry      bool
-	clientRateLimitQPS         float64
-	clientRateLimitBurst       int
 	webhookRegistrationTimeout time.Duration
 	backgroundScan             bool
 	admissionReports           bool
@@ -98,7 +93,6 @@ func parseFlags(config internal.Configuration) {
 	flag.IntVar(&webhookTimeout, "webhookTimeout", webhookcontroller.DefaultWebhookTimeout, "Timeout for webhook configurations.")
 	flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.")
 	flag.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
-	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
 	flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
 	flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
 	flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
@@ -109,8 +103,6 @@ func parseFlags(config internal.Configuration) {
 	flag.StringVar(&imageSignatureRepository, "imageSignatureRepository", "", "Alternate repository for image signatures. Can be overridden per rule via `verifyImages.Repository`.")
 	flag.BoolVar(&allowInsecureRegistry, "allowInsecureRegistry", false, "Whether to allow insecure connections to registries. Don't use this for anything but testing.")
 	flag.BoolVar(&autoUpdateWebhooks, "autoUpdateWebhooks", true, "Set this flag to 'false' to disable auto-configuration of the webhook.")
-	flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
-	flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
 	flag.DurationVar(&webhookRegistrationTimeout, "webhookRegistrationTimeout", 120*time.Second, "Timeout for webhook registration, e.g., 30s, 1m, 5m.")
 	flag.Func(toggle.ProtectManagedResourcesFlagName, toggle.ProtectManagedResourcesDescription, toggle.ProtectManagedResources.Parse)
 	flag.BoolVar(&backgroundScan, "backgroundScan", true, "Enable or disable backgound scan.")
@@ -124,67 +116,6 @@ func parseFlags(config internal.Configuration) {
 	flag.Parse()
 }
 
-func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, metadataclient.Interface, error) {
-	logger = logger.WithName("kube-clients")
-	logger.Info("create kube clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
-	clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
-	if err != nil {
-		return nil, nil, nil, err
-	}
-	kubeClient, err := kubernetes.NewForConfig(clientConfig)
-	if err != nil {
-		return nil, nil, nil, err
-	}
-	metadataClient, err := metadataclient.NewForConfig(clientConfig)
-	if err != nil {
-		return nil, nil, nil, err
-	}
-	return clientConfig, kubeClient, metadataClient, nil
-}
-
-func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientConfig *rest.Config, metricsConfig *metrics.MetricsConfig) (kubernetes.Interface, kubernetes.Interface, versioned.Interface, dclient.Interface, error) {
-	logger = logger.WithName("instrumented-clients")
-	logger.Info("create instrumented clients...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
-	kubeClient, err := kubeclient.NewForConfig(
-		clientConfig,
-		kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		kubeclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, nil, nil, err
-	}
-	// The leader queries/updates the lease object quite frequently. So we use a separate kube-client to eliminate the throttle issue
-	kubeClientLeaderElection, err := kubeclient.NewForConfig(
-		clientConfig,
-		kubeclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		kubeclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, nil, nil, err
-	}
-	kyvernoClient, err := kyvernoclient.NewForConfig(
-		clientConfig,
-		kyvernoclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		kyvernoclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, nil, nil, err
-	}
-	dynamicClient, err := dynamicclient.NewForConfig(
-		clientConfig,
-		dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
-		dynamicclient.WithTracing(),
-	)
-	if err != nil {
-		return nil, nil, nil, nil, err
-	}
-	dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, metadataResyncPeriod)
-	if err != nil {
-		return nil, nil, nil, nil, err
-	}
-	return kubeClient, kubeClientLeaderElection, kyvernoClient, dClient, nil
-}
-
 func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
 	logger = logger.WithName("metrics")
 	logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
@@ -463,7 +394,11 @@ func createrLeaderControllers(
 
 func main() {
 	// config
-	appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
+	appConfig := internal.NewConfiguration(
+		internal.WithProfiling(),
+		internal.WithTracing(),
+		internal.WithKubeconfig(),
+	)
 	// parse flags
 	parseFlags(appConfig)
 	// setup logger
@@ -477,12 +412,8 @@ func main() {
 	internal.ShowVersion(logger)
 	// start profiling
 	internal.SetupProfiling(logger)
-	// create client config and kube clients
-	clientConfig, rawClient, metadataClient, err := createKubeClients(logger)
-	if err != nil {
-		logger.Error(err, "failed to create kubernetes clients")
-		os.Exit(1)
-	}
+	// create raw client
+	rawClient := internal.CreateKubernetesClient(logger)
 	// setup metrics
 	metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
 	if err != nil {
@@ -496,9 +427,14 @@ func main() {
 	signalCtx, signalCancel := internal.SetupSignals(logger)
 	defer signalCancel()
 	// create instrumented clients
-	kubeClient, kubeClientLeaderElection, kyvernoClient, dynamicClient, err := createInstrumentedClients(signalCtx, logger, clientConfig, metricsConfig)
+	kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
+	leaderElectionClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
+	kyvernoClient := internal.CreateKyvernoClient(logger, kyvernoclient.WithMetrics(metricsConfig, metrics.KyvernoClient), kyvernoclient.WithTracing())
+	metadataClient := internal.CreateMetadataClient(logger, metadataclient.WithMetrics(metricsConfig, metrics.KyvernoClient), metadataclient.WithTracing())
+	dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
+	dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
 	if err != nil {
-		logger.Error(err, "failed to create instrument clients")
+		logger.Error(err, "failed to create dynamic client")
 		os.Exit(1)
 	}
 	// setup tracing
@@ -512,7 +448,7 @@ func main() {
 	// setup cosign
 	setupCosign(logger)
 	// check we can run
-	if err := sanityChecks(dynamicClient); err != nil {
+	if err := sanityChecks(dClient); err != nil {
 		logger.Error(err, "sanity checks failed")
 		os.Exit(1)
 	}
@@ -520,9 +456,7 @@ func main() {
 	kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
 	kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
 	kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
-	configuration, err := config.NewConfiguration(
-		kubeClient,
-	)
+	configuration, err := config.NewConfiguration(kubeClient)
 	if err != nil {
 		logger.Error(err, "failed to initialize configuration")
 		os.Exit(1)
@@ -541,7 +475,7 @@ func main() {
 	)
 	policyCache := policycache.NewCache()
 	eventGenerator := event.NewEventGenerator(
-		dynamicClient,
+		dClient,
 		kyvernoInformer.Kyverno().V1().ClusterPolicies(),
 		kyvernoInformer.Kyverno().V1().Policies(),
 		maxQueuedEvents,
@@ -566,7 +500,7 @@ func main() {
 		kyvernoInformer,
 		kubeClient,
 		kyvernoClient,
-		dynamicClient,
+		dClient,
 		configuration,
 		policyCache,
 		eventGenerator,
@@ -591,7 +525,7 @@ func main() {
 		logger.WithName("leader-election"),
 		"kyverno",
 		config.KyvernoNamespace(),
-		kubeClientLeaderElection,
+		leaderElectionClient,
 		config.KyvernoPodName(),
 		leaderElectionRetryPeriod,
 		func(ctx context.Context) {
@@ -614,7 +548,7 @@ func main() {
 				metadataInformer,
 				kubeClient,
 				kyvernoClient,
-				dynamicClient,
+				dClient,
 				configuration,
 				metricsConfig,
 				eventGenerator,
@@ -669,11 +603,11 @@ func main() {
 		kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
 	)
 	policyHandlers := webhookspolicy.NewHandlers(
-		dynamicClient,
+		dClient,
 		openApiManager,
 	)
 	resourceHandlers := webhooksresource.NewHandlers(
-		dynamicClient,
+		dClient,
 		kyvernoClient,
 		configuration,
 		metricsConfig,