1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 02:18:15 +00:00

refactor: dynamic client use instrumented clients (#5436)

* refactor: improve instrumented clients creation

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* refactor: instrumented clients code part 3

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* refactor: dynamic client

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-22 14:37:27 +01:00 committed by GitHub
parent 8a09d76198
commit 2178b9fe77
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 81 additions and 85 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/cmd/internal"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/logging"
@ -75,11 +76,19 @@ func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientCo
if err != nil {
return nil, nil, err
}
dynamicClient, err := dclient.NewClient(ctx, clientConfig, kubeClient, metricsConfig, resyncPeriod)
dynamicClient, err := dynamicclient.NewForConfig(
clientConfig,
dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
dynamicclient.WithTracing(),
)
if err != nil {
return nil, nil, err
}
return kubeClient, dynamicClient, nil
dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, resyncPeriod)
if err != nil {
return nil, nil, err
}
return kubeClient, dClient, nil
}
func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {

View file

@ -19,6 +19,7 @@ import (
policy2 "github.com/kyverno/kyverno/pkg/policy"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
log "sigs.k8s.io/controller-runtime/pkg/log"
yaml1 "sigs.k8s.io/yaml"
@ -203,7 +204,11 @@ func (c *ApplyCommandConfig) applyCommandHelper() (rc *common.ResultCounts, reso
if err != nil {
return rc, resources, skipInvalidPolicies, pvInfos, err
}
dClient, err = dclient.NewClient(context.Background(), restConfig, kubeClient, nil, 15*time.Minute)
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return rc, resources, skipInvalidPolicies, pvInfos, err
}
dClient, err = dclient.NewClient(context.Background(), dynamicClient, kubeClient, 15*time.Minute)
if err != nil {
return rc, resources, skipInvalidPolicies, pvInfos, err
}

View file

@ -27,6 +27,7 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)
@ -81,9 +82,20 @@ func main() {
os.Exit(1)
}
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
logger.Error(err, "Failed to create dynamic client")
os.Exit(1)
}
// DYNAMIC CLIENT
// - client for all registered resources
client, err := dclient.NewClient(signalCtx, clientConfig, kubeClient, nil, 15*time.Minute)
client, err := dclient.NewClient(
signalCtx,
dynamicClient,
kubeClient,
15*time.Minute,
)
if err != nil {
logger.Error(err, "Failed to create client")
os.Exit(1)

View file

@ -32,7 +32,7 @@ func CreateKyvernoClient(logger logr.Logger, opts ...kyverno.NewOption) versione
logger = logger.WithName("kyverno-client")
logger.Info("create kyverno client...", "kubeconfig", kubeconfig, "qps", clientRateLimitQPS, "burst", clientRateLimitBurst)
client, err := kyverno.NewForConfig(CreateClientConfig(logger), opts...)
checkError(logger, err, "failed to create kubernetes client")
checkError(logger, err, "failed to create kyverno client")
return client
}

View file

@ -20,6 +20,7 @@ import (
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
kyvernoclient "github.com/kyverno/kyverno/pkg/clients/kyverno"
"github.com/kyverno/kyverno/pkg/config"
@ -171,11 +172,19 @@ func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientCo
if err != nil {
return nil, nil, nil, nil, err
}
dynamicClient, err := dclient.NewClient(ctx, clientConfig, kubeClient, metricsConfig, metadataResyncPeriod)
dynamicClient, err := dynamicclient.NewForConfig(
clientConfig,
dynamicclient.WithMetrics(metricsConfig, metrics.KubeClient),
dynamicclient.WithTracing(),
)
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClient, kubeClientLeaderElection, kyvernoClient, dynamicClient, nil
dClient, err := dclient.NewClient(ctx, dynamicClient, kubeClient, metadataResyncPeriod)
if err != nil {
return nil, nil, nil, nil, err
}
return kubeClient, kubeClientLeaderElection, kyvernoClient, dClient, nil
}
func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics.MetricsConfig, context.CancelFunc, error) {

View file

@ -6,7 +6,6 @@ import (
"fmt"
"time"
"github.com/kyverno/kyverno/pkg/metrics"
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -16,22 +15,20 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
certsv1beta1 "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
)
type Interface interface {
// NewDynamicSharedInformerFactory returns a new instance of DynamicSharedInformerFactory
NewDynamicSharedInformerFactory(time.Duration) dynamicinformer.DynamicSharedInformerFactory
// GetEventsInterface provides typed interface for events
GetEventsInterface() (corev1.EventInterface, error)
GetEventsInterface() corev1.EventInterface
// GetDynamicInterface fetches underlying dynamic interface
GetDynamicInterface() dynamic.Interface
// Discovery return the discovery client implementation
Discovery() IDiscovery
// SetDiscovery sets the discovery client implementation
SetDiscovery(discoveryClient IDiscovery)
// RawAbsPath performs a raw call to the kubernetes API
RawAbsPath(path string) ([]byte, error)
// GetResource returns the resource in unstructured/json format
GetResource(apiVersion string, kind string, namespace string, name string, subresources ...string) (*unstructured.Unstructured, error)
@ -48,36 +45,32 @@ type Interface interface {
UpdateResource(apiVersion string, kind string, namespace string, obj interface{}, dryRun bool) (*unstructured.Unstructured, error)
// UpdateStatusResource updates the resource "status" subresource
UpdateStatusResource(apiVersion string, kind string, namespace string, obj interface{}, dryRun bool) (*unstructured.Unstructured, error)
// RecordClientQuery publish the client query to the metric
RecordClientQuery(clientQueryOperation metrics.ClientQueryOperation, clientType metrics.ClientType, resourceKind string, resourceNamespace string)
}
// Client enables interaction with k8 resource
type client struct {
client dynamic.Interface
discoveryClient IDiscovery
kclient kubernetes.Interface
metricsConfig metrics.MetricsConfigManager
restClient rest.Interface
dyn dynamic.Interface
disco IDiscovery
rest rest.Interface
kube kubernetes.Interface
}
// NewClient creates new instance of client
func NewClient(ctx context.Context, config *rest.Config, kclient kubernetes.Interface, metricsConfig metrics.MetricsConfigManager, resync time.Duration) (Interface, error) {
dclient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
func NewClient(
ctx context.Context,
dyn dynamic.Interface,
kube kubernetes.Interface,
resync time.Duration,
) (Interface, error) {
disco := kube.Discovery()
client := client{
client: dclient,
kclient: kclient,
restClient: kclient.Discovery().RESTClient(),
}
if metricsConfig != nil {
client.metricsConfig = metricsConfig
dyn: dyn,
kube: kube,
rest: disco.RESTClient(),
}
// Set discovery client
discoveryClient := &serverPreferredResources{
cachedClient: memory.NewMemCacheClient(kclient.Discovery()),
cachedClient: memory.NewMemCacheClient(disco),
}
// client will invalidate registered resources cache every x seconds,
// As there is no way to identify if the registered resource is available or not
@ -91,21 +84,16 @@ func NewClient(ctx context.Context, config *rest.Config, kclient kubernetes.Inte
// NewDynamicSharedInformerFactory returns a new instance of DynamicSharedInformerFactory
func (c *client) NewDynamicSharedInformerFactory(defaultResync time.Duration) dynamicinformer.DynamicSharedInformerFactory {
return dynamicinformer.NewDynamicSharedInformerFactory(c.client, defaultResync)
return dynamicinformer.NewDynamicSharedInformerFactory(c.dyn, defaultResync)
}
// GetEventsInterface provides typed interface for events
func (c *client) GetEventsInterface() (corev1.EventInterface, error) {
return c.kclient.CoreV1().Events(""), nil
}
// GetCSRInterface provides type interface for CSR
func (c *client) GetCSRInterface() (certsv1beta1.CertificateSigningRequestInterface, error) {
return c.kclient.CertificatesV1beta1().CertificateSigningRequests(), nil
func (c *client) GetEventsInterface() corev1.EventInterface {
return c.kube.CoreV1().Events(metav1.NamespaceAll)
}
func (c *client) getInterface(apiVersion string, kind string) dynamic.NamespaceableResourceInterface {
return c.client.Resource(c.getGroupVersionMapper(apiVersion, kind))
return c.dyn.Resource(c.getGroupVersionMapper(apiVersion, kind))
}
func (c *client) getResourceInterface(apiVersion string, kind string, namespace string) dynamic.ResourceInterface {
@ -124,35 +112,34 @@ func (c *client) getResourceInterface(apiVersion string, kind string, namespace
// Keep this a stateful as the resource list will be based on the kubernetes version we connect to
func (c *client) getGroupVersionMapper(apiVersion string, kind string) schema.GroupVersionResource {
if apiVersion == "" {
gvr, _ := c.discoveryClient.GetGVRFromKind(kind)
gvr, _ := c.disco.GetGVRFromKind(kind)
return gvr
}
return c.discoveryClient.GetGVRFromAPIVersionKind(apiVersion, kind)
return c.disco.GetGVRFromAPIVersionKind(apiVersion, kind)
}
// GetResource returns the resource in unstructured/json format
func (c *client) GetResource(apiVersion string, kind string, namespace string, name string, subresources ...string) (*unstructured.Unstructured, error) {
c.RecordClientQuery(metrics.ClientGet, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).Get(context.TODO(), name, metav1.GetOptions{}, subresources...)
}
// RawAbsPath performs a raw call to the kubernetes API
func (c *client) RawAbsPath(path string) ([]byte, error) {
if c.restClient == nil {
if c.rest == nil {
return nil, errors.New("rest client not supported")
}
return c.restClient.Get().RequestURI(path).DoRaw(context.TODO())
return c.rest.Get().RequestURI(path).DoRaw(context.TODO())
}
// PatchResource patches the resource
func (c *client) PatchResource(apiVersion string, kind string, namespace string, name string, patch []byte) (*unstructured.Unstructured, error) {
c.RecordClientQuery(metrics.ClientUpdate, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).Patch(context.TODO(), name, types.JSONPatchType, patch, metav1.PatchOptions{})
}
// GetDynamicInterface fetches underlying dynamic interface
func (c *client) GetDynamicInterface() dynamic.Interface {
return c.client
return c.dyn
}
// ListResource returns the list of resources in unstructured/json format
@ -162,8 +149,6 @@ func (c *client) ListResource(apiVersion string, kind string, namespace string,
if lselector != nil {
options = metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(lselector)}
}
c.RecordClientQuery(metrics.ClientList, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).List(context.TODO(), options)
}
@ -173,7 +158,6 @@ func (c *client) DeleteResource(apiVersion string, kind string, namespace string
if dryRun {
options = metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}}
}
c.RecordClientQuery(metrics.ClientDelete, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).Delete(context.TODO(), name, options)
}
@ -185,7 +169,6 @@ func (c *client) CreateResource(apiVersion string, kind string, namespace string
}
// convert typed to unstructured obj
if unstructuredObj, err := kubeutils.ConvertToUnstructured(obj); err == nil && unstructuredObj != nil {
c.RecordClientQuery(metrics.ClientCreate, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).Create(context.TODO(), unstructuredObj, options)
}
return nil, fmt.Errorf("unable to create resource ")
@ -199,7 +182,6 @@ func (c *client) UpdateResource(apiVersion string, kind string, namespace string
}
// convert typed to unstructured obj
if unstructuredObj, err := kubeutils.ConvertToUnstructured(obj); err == nil && unstructuredObj != nil {
c.RecordClientQuery(metrics.ClientUpdate, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).Update(context.TODO(), unstructuredObj, options)
}
return nil, fmt.Errorf("unable to update resource ")
@ -213,7 +195,6 @@ func (c *client) UpdateStatusResource(apiVersion string, kind string, namespace
}
// convert typed to unstructured obj
if unstructuredObj, err := kubeutils.ConvertToUnstructured(obj); err == nil && unstructuredObj != nil {
c.RecordClientQuery(metrics.ClientUpdateStatus, metrics.KubeDynamicClient, kind, namespace)
return c.getResourceInterface(apiVersion, kind, namespace).UpdateStatus(context.TODO(), unstructuredObj, options)
}
return nil, fmt.Errorf("unable to update resource ")
@ -221,17 +202,10 @@ func (c *client) UpdateStatusResource(apiVersion string, kind string, namespace
// Discovery return the discovery client implementation
func (c *client) Discovery() IDiscovery {
return c.discoveryClient
return c.disco
}
// SetDiscovery sets the discovery client implementation
func (c *client) SetDiscovery(discoveryClient IDiscovery) {
c.discoveryClient = discoveryClient
}
func (c *client) RecordClientQuery(clientQueryOperation metrics.ClientQueryOperation, clientType metrics.ClientType, resourceKind string, resourceNamespace string) {
if c.metricsConfig == nil {
return
}
c.metricsConfig.RecordClientQueries(clientQueryOperation, clientType, resourceKind, resourceNamespace)
c.disco = discoveryClient
}

View file

@ -107,11 +107,8 @@ func TestCRUDResource(t *testing.T) {
func TestEventInterface(t *testing.T) {
f := newFixture(t)
iEvent, err := f.client.GetEventsInterface()
if err != nil {
t.Errorf("GetEventsInterface not working: %s", err)
}
_, err = iEvent.List(context.TODO(), metav1.ListOptions{})
iEvent := f.client.GetEventsInterface()
_, err := iEvent.List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("Testing Event interface not working: %s", err)
}

View file

@ -20,8 +20,8 @@ func NewFakeClient(scheme *runtime.Scheme, gvrToListKind map[schema.GroupVersion
// the typed and dynamic client are initialized with similar resources
kclient := kubefake.NewSimpleClientset(objects...)
return &client{
client: c,
kclient: kclient,
dyn: c,
kube: kclient,
}, nil
}
@ -29,11 +29,11 @@ func NewEmptyFakeClient() Interface {
gvrToListKind := map[schema.GroupVersionResource]string{}
objects := []runtime.Object{}
scheme := runtime.NewScheme()
kclient := kubefake.NewSimpleClientset(objects...)
return &client{
client: fake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objects...),
kclient: kubefake.NewSimpleClientset(objects...),
discoveryClient: NewFakeDiscoveryClient(nil),
dyn: fake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objects...),
disco: NewFakeDiscoveryClient(nil),
kube: kclient,
}
}

View file

@ -9,7 +9,6 @@ import (
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/controllers"
"github.com/kyverno/kyverno/pkg/metrics"
util "github.com/kyverno/kyverno/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtimeSchema "k8s.io/apimachinery/pkg/runtime/schema"
@ -80,8 +79,6 @@ func (c *controller) sync() {
Version: "v1",
Resource: "customresourcedefinitions",
}).List(context.TODO(), metav1.ListOptions{})
c.client.RecordClientQuery(metrics.ClientList, metrics.KubeDynamicClient, "CustomResourceDefinition", "")
if err != nil {
logger.Error(err, "could not fetch crd's from server")
return

View file

@ -76,11 +76,7 @@ func initRecorder(client dclient.Interface, eventSource Source, log logr.Logger)
return nil
}
eventBroadcaster := record.NewBroadcaster()
eventInterface, err := client.GetEventsInterface()
if err != nil {
log.Error(err, "failed to get event interface for logging")
return nil
}
eventInterface := client.GetEventsInterface()
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: eventInterface,

View file

@ -106,10 +106,7 @@ func NewPolicyController(
// Event broad caster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.V(5).Info)
eventInterface, err := client.GetEventsInterface()
if err != nil {
return nil, err
}
eventInterface := client.GetEventsInterface()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: eventInterface})
pc := PolicyController{