1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

feat: add dynamic client support to internal cmd package (#5477)

* feat: add dynamic client support to internal cmd package

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* Update cmd/internal/client.go

Signed-off-by: shuting <shutting06@gmail.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Signed-off-by: shuting <shutting06@gmail.com>
Co-authored-by: shuting <shutting06@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-29 10:16:07 +01:00 committed by GitHub
parent 21da0f335e
commit 8f6c3e648c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 107 additions and 130 deletions

View file

@ -22,19 +22,14 @@ func NewHandlers(client dclient.Interface) CleanupPolicyHandlers {
}
func (h *cleanupPolicyHandlers) Validate(ctx context.Context, logger logr.Logger, request *admissionv1.AdmissionRequest, _ time.Time) *admissionv1.AdmissionResponse {
if request.SubResource != "" {
logger.V(4).Info("skip policy validation on status update")
return admissionutils.ResponseSuccess()
}
policy, _, err := admissionutils.GetCleanupPolicies(request)
if err != nil {
logger.Error(err, "failed to unmarshal policies from admission request")
return admissionutils.Response(err)
}
err = validate.ValidateCleanupPolicy(policy, h.client, false)
err = validate.ValidateCleanupPolicy(logger, policy, h.client, false)
if err != nil {
logger.Error(err, "policy validation errors")
return admissionutils.Response(err)
}
return admissionutils.Response(err)
}

View file

@ -1,7 +0,0 @@
package logger
import (
"github.com/kyverno/kyverno/pkg/logging"
)
var Logger = logging.WithName("cleanuppolicywebhooks")

View file

@ -1,98 +1,33 @@
package main
import (
"context"
"flag"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/cmd/internal"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
kyvernoclient "github.com/kyverno/kyverno/pkg/clients/kyverno"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers/cleanup"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
var (
otel string
otelCollector string
metricsPort string
transportCreds string
disableMetricsExport bool
)
const (
resyncPeriod = 15 * time.Minute
)
func setupMetrics(ctx context.Context, logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
metricsConfiguration := internal.GetMetricsConfiguration(logger, kubeClient)
metricsAddr := ":" + metricsPort
metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics(
ctx,
disableMetricsExport,
otel,
metricsAddr,
otelCollector,
metricsConfiguration,
transportCreds,
kubeClient,
logging.WithName("metrics"),
)
if err != nil {
return nil, nil, err
}
var cancel context.CancelFunc
if otel == "grpc" {
cancel = func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
metrics.ShutDownController(ctx, metricsPusher)
}
}
if otel == "prometheus" {
go func() {
if err := http.ListenAndServe(metricsAddr, metricsServerMux); err != nil {
logger.Error(err, "failed to enable metrics", "address", metricsAddr)
}
}()
}
return metricsConfig, cancel, nil
}
func setupSignals() (context.Context, context.CancelFunc) {
return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
}
func main() {
// application flags
flagset := flag.NewFlagSet("application", flag.ExitOnError)
flagset.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
flagset.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
flagset.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
flagset.StringVar(&metricsPort, "metricsPort", "8000", "Expose prometheus metrics at the given port, default to 8000.")
flagset.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
// config
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
internal.WithMetrics(),
internal.WithTracing(),
internal.WithKubeconfig(),
internal.WithFlagSets(flagset),
)
// parse flags
internal.ParseFlags(appConfig)
@ -101,40 +36,15 @@ func main() {
// start profiling
// setup signals
// setup maxprocs
ctx, logger, sdown := internal.Setup()
defer sdown()
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(ctx, logger, rawClient)
if err != nil {
logger.Error(err, "failed to setup metrics")
os.Exit(1)
}
if metricsShutdown != nil {
defer metricsShutdown()
}
// setup signals
signalCtx, signalCancel := setupSignals()
defer signalCancel()
ctx, logger, metricsConfig, sdown := internal.Setup()
defer sdown()
// create instrumented clients
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
dClient, err := dclient.NewClient(signalCtx, dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
logger.Error(err, "failed to create dynamic client")
os.Exit(1)
}
clientConfig := internal.CreateClientConfig(logger)
kyvernoClient, err := kyvernoclient.NewForConfig(
clientConfig,
kyvernoclient.WithMetrics(metricsConfig, metrics.KubeClient),
kyvernoclient.WithTracing(),
)
if err != nil {
logger.Error(err, "failed to create kyverno client")
os.Exit(1)
}
kyvernoClient := internal.CreateKyvernoClient(logger, kyvernoclient.WithMetrics(metricsConfig, metrics.KubeClient), kyvernoclient.WithTracing())
dClient := internal.CreateDClient(logger, ctx, dynamicClient, kubeClient, 15*time.Minute)
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
@ -144,18 +54,18 @@ func main() {
kyvernoInformer.Kyverno().V1alpha1().CleanupPolicies(),
kubeInformer.Batch().V1().CronJobs(),
)
// controllers
controller := newController(cleanup.ControllerName, *cleanupController, cleanup.Workers)
policyHandlers := NewHandlers(
dClient,
)
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
// start informers and wait for cache sync
// we need to call start again because we potentially registered new informers
if !internal.StartInformersAndWaitForCacheSync(ctx, kubeKyvernoInformer) {
if !internal.StartInformersAndWaitForCacheSync(ctx, kubeKyvernoInformer, kubeInformer, kyvernoInformer) {
os.Exit(1)
}
var wg sync.WaitGroup
controller.run(signalCtx, logger.WithName("cleanup-controller"), &wg)
controller.run(ctx, logger.WithName("cleanup-controller"), &wg)
server := NewServer(
policyHandlers,
func() ([]byte, []byte, error) {
@ -169,6 +79,5 @@ func main() {
// start webhooks server
server.Run(ctx.Done())
// wait for termination signal
<-ctx.Done()
wg.Wait()
}

View file

@ -8,7 +8,6 @@ import (
"github.com/go-logr/logr"
"github.com/julienschmidt/httprouter"
"github.com/kyverno/kyverno/cmd/cleanup-controller/logger"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/webhooks/handlers"
admissionv1 "k8s.io/api/admission/v1"
@ -40,12 +39,14 @@ func NewServer(
policyHandlers CleanupPolicyHandlers,
tlsProvider TlsProvider,
) Server {
policyLogger := logging.WithName("cleanup-policy")
mux := httprouter.New()
mux.HandlerFunc(
"POST",
ValidatingWebhookServicePath,
handlers.FromAdmissionFunc("VALIDATE", policyHandlers.Validate).
WithAdmission(logger.Logger.WithName("validate")).
WithSubResourceFilter().
WithAdmission(policyLogger.WithName("validate")).
ToHandlerFunc(),
)
return &server{
@ -70,7 +71,7 @@ func NewServer(
WriteTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
// ErrorLog: logging.StdLogger(logger.WithName("server"), ""),
ErrorLog: logging.StdLogger(logging.WithName("server"), ""),
},
}
}

View file

@ -5,7 +5,6 @@ import (
"github.com/go-logr/logr"
kyvernov1alpha1 "github.com/kyverno/kyverno/api/kyverno/v1alpha1"
"github.com/kyverno/kyverno/cmd/cleanup-controller/logger"
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/engine/variables"
"github.com/kyverno/kyverno/pkg/policy/generate"
@ -57,7 +56,7 @@ func (c *Cleanup) CanIDelete(kind, namespace string) error {
}
// Validate checks the policy and rules declarations for required configurations
func ValidateCleanupPolicy(cleanuppolicy kyvernov1alpha1.CleanupPolicyInterface, client dclient.Interface, mock bool) error {
func ValidateCleanupPolicy(logger logr.Logger, cleanuppolicy kyvernov1alpha1.CleanupPolicyInterface, client dclient.Interface, mock bool) error {
// namespace := cleanuppolicy.GetNamespace()
var res []*metav1.APIResourceList
clusterResources := sets.NewString()
@ -68,7 +67,7 @@ func ValidateCleanupPolicy(cleanuppolicy kyvernov1alpha1.CleanupPolicyInterface,
if discovery.IsGroupDiscoveryFailedError(err) {
err := err.(*discovery.ErrGroupDiscoveryFailed)
for gv, err := range err.Groups {
logger.Logger.Error(err, "failed to list api resources", "group", gv)
logger.Error(err, "failed to list api resources", "group", gv)
}
} else {
return err

View file

@ -45,22 +45,17 @@ func main() {
// start profiling
// setup signals
// setup maxprocs
ctx, logger, sdown := internal.Setup()
ctx, logger, _, sdown := internal.Setup()
defer sdown()
// create clients
kubeClient := internal.CreateKubernetesClient(logger)
dynamicClient := internal.CreateDynamicClient(logger)
kyvernoClient := internal.CreateKyvernoClient(logger)
client, err := dclient.NewClient(ctx, dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
logger.Error(err, "Failed to create client")
os.Exit(1)
}
client := internal.CreateDClient(logger, ctx, dynamicClient, kubeClient, 15*time.Minute)
// Exit for unsupported version of kubernetes cluster
if !utils.HigherThanKubernetesVersion(kubeClient.Discovery(), logging.GlobalLogger(), 1, 16, 0) {
os.Exit(1)
}
requests := []request{
{policyReportKind},
{clusterPolicyReportKind},
@ -78,7 +73,7 @@ func main() {
run := func(context.Context) {
name := tls.GenerateRootCASecretName()
_, err = kubeClient.CoreV1().Secrets(config.KyvernoNamespace()).Get(context.TODO(), name, metav1.GetOptions{})
_, err := kubeClient.CoreV1().Secrets(config.KyvernoNamespace()).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
logging.V(2).Info("failed to fetch root CA secret", "name", name, "error", err.Error())
if !errors.IsNotFound(err) {

View file

@ -1,8 +1,12 @@
package internal
import (
"context"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dyn "github.com/kyverno/kyverno/pkg/clients/dynamic"
kube "github.com/kyverno/kyverno/pkg/clients/kube"
kyverno "github.com/kyverno/kyverno/pkg/clients/kyverno"
@ -51,3 +55,11 @@ func CreateMetadataClient(logger logr.Logger, opts ...meta.NewOption) metadata.I
checkError(logger, err, "failed to create metadata client")
return client
}
func CreateDClient(logger logr.Logger, ctx context.Context, dyn dynamic.Interface, kube kubernetes.Interface, resync time.Duration) dclient.Interface {
logger = logger.WithName("d-client")
logger.Info("create the kyverno dynamic client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := dclient.NewClient(ctx, dyn, kube, resync)
checkError(logger, err, "failed to create d client")
return client
}

View file

@ -3,6 +3,7 @@ package internal
import "flag"
type Configuration interface {
UsesMetrics() bool
UsesTracing() bool
UsesProfiling() bool
UsesKubeconfig() bool
@ -19,6 +20,12 @@ func NewConfiguration(options ...ConfigurationOption) Configuration {
type ConfigurationOption func(c *configuration)
func WithMetrics() ConfigurationOption {
return func(c *configuration) {
c.usesMetrics = true
}
}
func WithTracing() ConfigurationOption {
return func(c *configuration) {
c.usesTracing = true
@ -44,12 +51,17 @@ func WithFlagSets(flagsets ...*flag.FlagSet) ConfigurationOption {
}
type configuration struct {
usesMetrics bool
usesTracing bool
usesProfiling bool
usesKubeconfig bool
flagSets []*flag.FlagSet
}
func (c *configuration) UsesMetrics() bool {
return c.usesMetrics
}
func (c *configuration) UsesTracing() bool {
return c.usesTracing
}

View file

@ -18,6 +18,12 @@ var (
tracingAddress string
tracingPort string
tracingCreds string
// metrics
otel string
otelCollector string
metricsPort string
transportCreds string
disableMetricsExport bool
// kubeconfig
kubeconfig string
clientRateLimitQPS float64
@ -43,6 +49,14 @@ func initTracingFlags() {
flag.StringVar(&tracingCreds, "tracingCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Tracing Client. If empty string is set, means an insecure connection will be used")
}
func initMetricsFlags() {
flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
flag.StringVar(&metricsPort, "metricsPort", "8000", "Expose prometheus metrics at the given port, default to 8000.")
flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
}
func initKubeconfigFlags() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
@ -60,6 +74,10 @@ func InitFlags(config Configuration) {
if config.UsesTracing() {
initTracingFlags()
}
// metrics
if config.UsesMetrics() {
initMetricsFlags()
}
// kubeconfig
if config.UsesKubeconfig() {
initKubeconfigFlags()

View file

@ -1,15 +1,55 @@
package internal
import (
"context"
"net/http"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
"k8s.io/client-go/kubernetes"
)
func GetMetricsConfiguration(logger logr.Logger, client kubernetes.Interface) config.MetricsConfiguration {
logger = logger.WithName("metrics")
logger.Info("load metrics configuration...")
metricsConfiguration, err := config.NewMetricsConfiguration(client)
checkError(logger, err, "failed to load metrics configuration")
return metricsConfiguration
}
func SetupMetrics(ctx context.Context, logger logr.Logger, kubeClient kubernetes.Interface) (metrics.MetricsConfigManager, context.CancelFunc) {
logger = logger.WithName("metrics")
logger.Info("setup metrics...", "otel", otel, "port", metricsPort, "collector", otelCollector, "creds", transportCreds)
metricsConfiguration := GetMetricsConfiguration(logger, kubeClient)
metricsAddr := ":" + metricsPort
metricsConfig, metricsServerMux, metricsPusher, err := metrics.InitMetrics(
ctx,
disableMetricsExport,
otel,
metricsAddr,
otelCollector,
metricsConfiguration,
transportCreds,
kubeClient,
logging.WithName("metrics"),
)
checkError(logger, err, "failed to init metrics")
var cancel context.CancelFunc
if otel == "grpc" {
cancel = func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
metrics.ShutDownController(ctx, metricsPusher)
}
}
if otel == "prometheus" {
go func() {
if err := http.ListenAndServe(metricsAddr, metricsServerMux); err != nil {
logger.Error(err, "failed to enable metrics", "address", metricsAddr)
}
}()
}
return metricsConfig, cancel
}

View file

@ -4,6 +4,7 @@ import (
"context"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/metrics"
)
func shutdown(logger logr.Logger, sdowns ...context.CancelFunc) context.CancelFunc {
@ -15,11 +16,13 @@ func shutdown(logger logr.Logger, sdowns ...context.CancelFunc) context.CancelFu
}
}
func Setup() (context.Context, logr.Logger, context.CancelFunc) {
func Setup() (context.Context, logr.Logger, metrics.MetricsConfigManager, context.CancelFunc) {
logger := SetupLogger()
ShowVersion(logger)
sdownMaxProcs := SetupMaxProcs(logger)
SetupProfiling(logger)
client := CreateKubernetesClient(logger)
ctx, sdownSignals := SetupSignals(logger)
return ctx, logger, shutdown(logger.WithName("shutdown"), sdownMaxProcs, sdownSignals)
metricsManager, sdownMetrics := SetupMetrics(ctx, logger, client)
return ctx, logger, metricsManager, shutdown(logger.WithName("shutdown"), sdownMaxProcs, sdownMetrics, sdownSignals)
}