package client import ( "fmt" "strings" "time" "github.com/go-logr/logr" openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2" "github.com/nirmata/kyverno/pkg/config" apps "k8s.io/api/apps/v1" certificates "k8s.io/api/certificates/v1beta1" v1 "k8s.io/api/core/v1" helperv1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" patchTypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" csrtype "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" event "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" ) //Client enables interaction with k8 resource type Client struct { client dynamic.Interface log logr.Logger clientConfig *rest.Config kclient kubernetes.Interface DiscoveryClient IDiscovery } //NewClient creates new instance of client func NewClient(config *rest.Config, resync time.Duration, stopCh <-chan struct{}, log logr.Logger) (*Client, error) { dclient, err := dynamic.NewForConfig(config) if err != nil { return nil, err } kclient, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } client := Client{ client: dclient, clientConfig: config, kclient: kclient, log: log.WithName("dclient"), } // Set discovery client discoveryClient := ServerPreferredResources{cachedClient: memory.NewMemCacheClient(kclient.Discovery()), log: client.log} // client will invalidate registered resources cache every x seconds, // As there is no way to identify if the registered resource is available or not // we will be invalidating the local cache, so the next request get a fresh cache // If a resource is removed then and cache is not invalidate yet, we will not detect the removal // but the re-sync shall re-evaluate go discoveryClient.Poll(resync, stopCh) client.SetDiscovery(discoveryClient) return &client, nil } //NewDynamicSharedInformerFactory returns a new instance of DynamicSharedInformerFactory func (c *Client) NewDynamicSharedInformerFactory(defaultResync time.Duration) dynamicinformer.DynamicSharedInformerFactory { return dynamicinformer.NewDynamicSharedInformerFactory(c.client, defaultResync) } //GetKubePolicyDeployment returns kube policy depoyment value func (c *Client) GetKubePolicyDeployment() (*apps.Deployment, error) { kubePolicyDeployment, err := c.GetResource("", "Deployment", config.KubePolicyNamespace, config.KubePolicyDeploymentName) if err != nil { return nil, err } deploy := apps.Deployment{} if err = runtime.DefaultUnstructuredConverter.FromUnstructured(kubePolicyDeployment.UnstructuredContent(), &deploy); err != nil { return nil, err } return &deploy, nil } //GetEventsInterface provides typed interface for events //TODO: can we use dynamic client to fetch the typed interface // or generate a kube client value to access the interface func (c *Client) GetEventsInterface() (event.EventInterface, error) { return c.kclient.CoreV1().Events(""), nil } //GetCSRInterface provides type interface for CSR func (c *Client) GetCSRInterface() (csrtype.CertificateSigningRequestInterface, error) { return c.kclient.CertificatesV1beta1().CertificateSigningRequests(), nil } func (c *Client) getInterface(apiVersion string, kind string) dynamic.NamespaceableResourceInterface { return c.client.Resource(c.getGroupVersionMapper(apiVersion, kind)) } func (c *Client) getResourceInterface(apiVersion string, kind string, namespace string) dynamic.ResourceInterface { // Get the resource interface from kind namespaceableInterface := c.getInterface(apiVersion, kind) // Get the namespacable interface var resourceInteface dynamic.ResourceInterface if namespace != "" { resourceInteface = namespaceableInterface.Namespace(namespace) } else { resourceInteface = namespaceableInterface } return resourceInteface } // Keep this a stateful as the resource list will be based on the kubernetes version we connect to func (c *Client) getGroupVersionMapper(apiVersion string, kind string) schema.GroupVersionResource { if apiVersion == "" { return c.DiscoveryClient.GetGVRFromKind(kind) } return c.DiscoveryClient.GetGVRFromAPIVersionKind(apiVersion, kind) } // GetResource returns the resource in unstructured/json format func (c *Client) GetResource(apiVersion string, kind string, namespace string, name string, subresources ...string) (*unstructured.Unstructured, error) { return c.getResourceInterface(apiVersion, kind, namespace).Get(name, meta.GetOptions{}, subresources...) } //PatchResource patches the resource func (c *Client) PatchResource(apiVersion string, kind string, namespace string, name string, patch []byte) (*unstructured.Unstructured, error) { return c.getResourceInterface(apiVersion, kind, namespace).Patch(name, patchTypes.JSONPatchType, patch, meta.PatchOptions{}) } // GetDynamicInterface fetches underlying dynamic interface func (c *Client) GetDynamicInterface() dynamic.Interface { return c.client } // ListResource returns the list of resources in unstructured/json format // Access items using []Items func (c *Client) ListResource(apiVersion string, kind string, namespace string, lselector *meta.LabelSelector) (*unstructured.UnstructuredList, error) { options := meta.ListOptions{} if lselector != nil { options = meta.ListOptions{LabelSelector: helperv1.FormatLabelSelector(lselector)} } return c.getResourceInterface(apiVersion, kind, namespace).List(options) } // DeleteResource deletes the specified resource func (c *Client) DeleteResource(apiVersion string, kind string, namespace string, name string, dryRun bool) error { options := meta.DeleteOptions{} if dryRun { options = meta.DeleteOptions{DryRun: []string{meta.DryRunAll}} } return c.getResourceInterface(apiVersion, kind, namespace).Delete(name, &options) } // CreateResource creates object for the specified resource/namespace func (c *Client) CreateResource(apiVersion string, kind string, namespace string, obj interface{}, dryRun bool) (*unstructured.Unstructured, error) { options := meta.CreateOptions{} if dryRun { options = meta.CreateOptions{DryRun: []string{meta.DryRunAll}} } // convert typed to unstructured obj if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { return c.getResourceInterface(apiVersion, kind, namespace).Create(unstructuredObj, options) } return nil, fmt.Errorf("Unable to create resource ") } // UpdateResource updates object for the specified resource/namespace func (c *Client) UpdateResource(apiVersion string, kind string, namespace string, obj interface{}, dryRun bool) (*unstructured.Unstructured, error) { options := meta.UpdateOptions{} if dryRun { options = meta.UpdateOptions{DryRun: []string{meta.DryRunAll}} } // convert typed to unstructured obj if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { return c.getResourceInterface(apiVersion, kind, namespace).Update(unstructuredObj, options) } return nil, fmt.Errorf("Unable to update resource ") } // UpdateStatusResource updates the resource "status" subresource func (c *Client) UpdateStatusResource(apiVersion string, kind string, namespace string, obj interface{}, dryRun bool) (*unstructured.Unstructured, error) { options := meta.UpdateOptions{} if dryRun { options = meta.UpdateOptions{DryRun: []string{meta.DryRunAll}} } // convert typed to unstructured obj if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { return c.getResourceInterface(apiVersion, kind, namespace).UpdateStatus(unstructuredObj, options) } return nil, fmt.Errorf("Unable to update resource ") } func convertToUnstructured(obj interface{}) *unstructured.Unstructured { unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&obj) if err != nil { return nil } return &unstructured.Unstructured{Object: unstructuredObj} } //To-Do remove this to use unstructured type func convertToSecret(obj *unstructured.Unstructured) (v1.Secret, error) { secret := v1.Secret{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &secret); err != nil { return secret, err } return secret, nil } //To-Do remove this to use unstructured type func convertToCSR(obj *unstructured.Unstructured) (*certificates.CertificateSigningRequest, error) { csr := certificates.CertificateSigningRequest{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &csr); err != nil { return nil, err } return &csr, nil } //IDiscovery provides interface to mange Kind and GVR mapping type IDiscovery interface { FindResource(apiVersion string, kind string) (*meta.APIResource, schema.GroupVersionResource, error) GetGVRFromKind(kind string) schema.GroupVersionResource GetGVRFromAPIVersionKind(apiVersion string, kind string) schema.GroupVersionResource GetServerVersion() (*version.Info, error) OpenAPISchema() (*openapi_v2.Document, error) } // SetDiscovery sets the discovery client implementation func (c *Client) SetDiscovery(discoveryClient IDiscovery) { c.DiscoveryClient = discoveryClient } //ServerPreferredResources stores the cachedClient instance for discovery client type ServerPreferredResources struct { cachedClient discovery.CachedDiscoveryInterface log logr.Logger } //Poll will keep invalidate the local cache func (c ServerPreferredResources) Poll(resync time.Duration, stopCh <-chan struct{}) { logger := c.log.WithName("Poll") // start a ticker ticker := time.NewTicker(resync) defer func() { ticker.Stop() }() logger.Info("starting registered resources sync", "period", resync) for { select { case <-stopCh: logger.Info("stopping registered resources sync") return case <-ticker.C: // set cache as stale logger.V(6).Info("invalidating local client cache for registered resources") c.cachedClient.Invalidate() } } } // OpenAPISchema returns the API server OpenAPI schema document func (c ServerPreferredResources) OpenAPISchema() (*openapi_v2.Document, error) { return c.cachedClient.OpenAPISchema() } // GetGVRFromKind get the Group Version Resource from kind func (c ServerPreferredResources) GetGVRFromKind(kind string) schema.GroupVersionResource { _, gvr, err := c.FindResource("", kind) if err != nil { c.log.Info("schema not found", "kind", kind) return schema.GroupVersionResource{} } return gvr } // GetGVRFromAPIVersionKind get the Group Version Resource from APIVersion and kind func (c ServerPreferredResources) GetGVRFromAPIVersionKind(apiVersion string, kind string) schema.GroupVersionResource { _, gvr, err := c.FindResource(apiVersion, kind) if err != nil { c.log.Info("schema not found", "kind", kind, "apiVersion", apiVersion, "Error : ", err) return schema.GroupVersionResource{} } return gvr } // GetServerVersion returns the server version of the cluster func (c ServerPreferredResources) GetServerVersion() (*version.Info, error) { return c.cachedClient.ServerVersion() } // FindResource finds an API resource that matches 'kind'. If the resource is not // found and the Cache is not fresh, the cache is invalidated and a retry is attempted func (c ServerPreferredResources) FindResource(apiVersion string, kind string) (*meta.APIResource, schema.GroupVersionResource, error) { r, gvr, err := c.findResource(apiVersion, kind) if err == nil { return r, gvr, nil } if !c.cachedClient.Fresh() { c.cachedClient.Invalidate() if r, gvr, err = c.findResource(apiVersion, kind); err == nil { return r, gvr, nil } } return nil, schema.GroupVersionResource{}, err } func (c ServerPreferredResources) findResource(apiVersion string, kind string) (*meta.APIResource, schema.GroupVersionResource, error) { var serverresources []*meta.APIResourceList var err error if apiVersion == "" { serverresources, err = c.cachedClient.ServerPreferredResources() } else { serverresources, err = c.cachedClient.ServerResources() } if err != nil { c.log.Error(err, "failed to get registered preferred resources") return nil, schema.GroupVersionResource{}, err } for _, serverresource := range serverresources { if apiVersion != "" && serverresource.GroupVersion != apiVersion { continue } for _, resource := range serverresource.APIResources { // skip the resource names with "/", to avoid comparison with subresources if resource.Kind == kind && !strings.Contains(resource.Name, "/") { gv, err := schema.ParseGroupVersion(serverresource.GroupVersion) if err != nil { c.log.Error(err, "failed to parse groupVersion", "groupVersion", serverresource.GroupVersion) return nil, schema.GroupVersionResource{}, err } return &resource, gv.WithResource(resource.Name), nil } } } return nil, schema.GroupVersionResource{}, fmt.Errorf("kind '%s' not found in apiVersion '%s'", kind, apiVersion) }