diff --git a/kubeclient/certificates.go b/client/certificates.go similarity index 60% rename from kubeclient/certificates.go rename to client/certificates.go index 9626c1004f..28a1c12111 100644 --- a/kubeclient/certificates.go +++ b/client/certificates.go @@ -1,4 +1,4 @@ -package kubeclient +package client import ( "errors" @@ -7,14 +7,13 @@ import ( tls "github.com/nirmata/kube-policy/pkg/tls" certificates "k8s.io/api/certificates/v1beta1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Issues TLS certificate for webhook server using given PEM private key // Returns signed and approved TLS certificate in PEM format -func (kc *KubeClient) GenerateTlsPemPair(props tls.TlsCertificateProps) (*tls.TlsPemPair, error) { +func (c *Client) GenerateTlsPemPair(props tls.TlsCertificateProps) (*tls.TlsPemPair, error) { privateKey, err := tls.TlsGeneratePrivateKey() if err != nil { return nil, err @@ -25,12 +24,12 @@ func (kc *KubeClient) GenerateTlsPemPair(props tls.TlsCertificateProps) (*tls.Tl return nil, errors.New(fmt.Sprintf("Unable to create certificate request: %v", err)) } - certRequest, err = kc.submitAndApproveCertificateRequest(certRequest) + certRequest, err = c.submitAndApproveCertificateRequest(certRequest) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to submit and approve certificate request: %v", err)) } - tlsCert, err := kc.fetchCertificateFromRequest(certRequest, 10) + tlsCert, err := c.fetchCertificateFromRequest(certRequest, 10) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to fetch certificate from request: %v", err)) } @@ -42,31 +41,44 @@ func (kc *KubeClient) GenerateTlsPemPair(props tls.TlsCertificateProps) (*tls.Tl } // Submits and approves certificate request, returns request which need to be fetched -func (kc *KubeClient) submitAndApproveCertificateRequest(req *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { - certClient := kc.client.CertificatesV1beta1().CertificateSigningRequests() - - csrList, err := certClient.List(metav1.ListOptions{}) +func (c *Client) submitAndApproveCertificateRequest(req *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { + //TODO: using the CSR interface from the kubeclient + certClient, err := c.GetCSRInterface() + if err != nil { + return nil, err + } + // certClient := kc.client.CertificatesV1beta1().CertificateSigningRequests() + csrList, err := c.ListResource("certificatesigningrequest", "") + // csrList, err := certClient.List(metav1.ListOptions{}) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to list existing certificate requests: %v", err)) } for _, csr := range csrList.Items { - if csr.ObjectMeta.Name == req.ObjectMeta.Name { - err := certClient.Delete(csr.ObjectMeta.Name, defaultDeleteOptions()) + csr.GetName() + if csr.GetName() == req.ObjectMeta.Name { + // Delete + err := c.DeleteResouce("certificatesigningrequest", "", csr.GetName()) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to delete existing certificate request: %v", err)) } - kc.logger.Printf("Old certificate request is deleted") + c.logger.Printf("Old certificate request is deleted") break } } - res, err := certClient.Create(req) + // Create + unstrRes, err := c.CreateResource("certificatesigningrequest", "", req) + // res, err := certClient.Create(req) if err != nil { return nil, err } - kc.logger.Printf("Certificate request %s is created", req.ObjectMeta.Name) + c.logger.Printf("Certificate request %s is created", unstrRes.GetName()) + res, err := convertToCSR(unstrRes) + if err != nil { + return nil, err + } res.Status.Conditions = append(res.Status.Conditions, certificates.CertificateSigningRequestCondition{ Type: certificates.CertificateApproved, Reason: "NKP-Approve", @@ -76,7 +88,7 @@ func (kc *KubeClient) submitAndApproveCertificateRequest(req *certificates.Certi if err != nil { return nil, errors.New(fmt.Sprintf("Unable to approve certificate request: %v", err)) } - kc.logger.Printf("Certificate request %s is approved", res.ObjectMeta.Name) + c.logger.Printf("Certificate request %s is approved", res.ObjectMeta.Name) return res, nil } @@ -84,12 +96,16 @@ func (kc *KubeClient) submitAndApproveCertificateRequest(req *certificates.Certi const certificateFetchWaitInterval time.Duration = 200 * time.Millisecond // Fetches certificate from given request. Tries to obtain certificate for maxWaitSeconds -func (kc *KubeClient) fetchCertificateFromRequest(req *certificates.CertificateSigningRequest, maxWaitSeconds uint8) ([]byte, error) { +func (c *Client) fetchCertificateFromRequest(req *certificates.CertificateSigningRequest, maxWaitSeconds uint8) ([]byte, error) { // TODO: react of SIGINT and SIGTERM timeStart := time.Now() - certClient := kc.client.CertificatesV1beta1().CertificateSigningRequests() + c.GetResource("certificatesigningrequest", "", req.ObjectMeta.Name) for time.Now().Sub(timeStart) < time.Duration(maxWaitSeconds)*time.Second { - r, err := certClient.Get(req.ObjectMeta.Name, metav1.GetOptions{}) + unstrR, err := c.GetResource("certificatesigningrequest", "", req.ObjectMeta.Name) + if err != nil { + return nil, err + } + r, err := convertToCSR(unstrR) if err != nil { return nil, err } @@ -111,24 +127,27 @@ const privateKeyField string = "privateKey" const certificateField string = "certificate" // Reads the pair of TLS certificate and key from the specified secret. -func (kc *KubeClient) ReadTlsPair(props tls.TlsCertificateProps) *tls.TlsPemPair { +func (c *Client) ReadTlsPair(props tls.TlsCertificateProps) *tls.TlsPemPair { name := generateSecretName(props) - secret, err := kc.client.CoreV1().Secrets(props.Namespace).Get(name, metav1.GetOptions{}) + unstrSecret, err := c.GetResource("secrets", props.Namespace, name) + if err != nil { + c.logger.Printf("Unable to get secret %s/%s: %s", props.Namespace, name, err) + return nil + } + secret, err := convertToSecret(unstrSecret) if err != nil { - kc.logger.Printf("Unable to get secret %s/%s: %s", props.Namespace, name, err) return nil } - pemPair := tls.TlsPemPair{ Certificate: secret.Data[certificateField], PrivateKey: secret.Data[privateKeyField], } if len(pemPair.Certificate) == 0 { - kc.logger.Printf("TLS Certificate not found in secret %s/%s", props.Namespace, name) + c.logger.Printf("TLS Certificate not found in secret %s/%s", props.Namespace, name) return nil } if len(pemPair.PrivateKey) == 0 { - kc.logger.Printf("TLS PrivateKey not found in secret %s/%s", props.Namespace, name) + c.logger.Printf("TLS PrivateKey not found in secret %s/%s", props.Namespace, name) return nil } return &pemPair @@ -136,24 +155,28 @@ func (kc *KubeClient) ReadTlsPair(props tls.TlsCertificateProps) *tls.TlsPemPair // Writes the pair of TLS certificate and key to the specified secret. // Updates existing secret or creates new one. -func (kc *KubeClient) WriteTlsPair(props tls.TlsCertificateProps, pemPair *tls.TlsPemPair) error { +func (c *Client) WriteTlsPair(props tls.TlsCertificateProps, pemPair *tls.TlsPemPair) error { name := generateSecretName(props) - secret, err := kc.client.CoreV1().Secrets(props.Namespace).Get(name, metav1.GetOptions{}) - + unstrSecret, err := c.GetResource("secrets", props.Namespace, name) if err == nil { // Update existing secret + secret, err := convertToSecret(unstrSecret) + if err != nil { + return nil + } + if secret.Data == nil { secret.Data = make(map[string][]byte) } secret.Data[certificateField] = pemPair.Certificate secret.Data[privateKeyField] = pemPair.PrivateKey - - secret, err = kc.client.CoreV1().Secrets(props.Namespace).Update(secret) + c.UpdateResource("secrets", props.Namespace, secret) if err == nil { - kc.logger.Printf("Secret %s is updated", name) + c.logger.Printf("Secret %s is updated", name) } } else { // Create new secret - secret = &v1.Secret{ + + secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", APIVersion: "v1", @@ -168,9 +191,9 @@ func (kc *KubeClient) WriteTlsPair(props tls.TlsCertificateProps, pemPair *tls.T }, } - secret, err = kc.client.CoreV1().Secrets(props.Namespace).Create(secret) + _, err := c.CreateResource("secrets", props.Namespace, secret) if err == nil { - kc.logger.Printf("Secret %s is created", name) + c.logger.Printf("Secret %s is created", name) } } return err diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000000..10e2e217d5 --- /dev/null +++ b/client/client.go @@ -0,0 +1,304 @@ +package client + +import ( + "fmt" + "log" + "time" + + "github.com/nirmata/kube-policy/config" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + apps "k8s.io/api/apps/v1" + certificates "k8s.io/api/certificates/v1beta1" + v1 "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" + csrtype "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" + event "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" +) + +type Client struct { + logger *log.Logger + client dynamic.Interface + clientConfig *rest.Config +} + +func NewDynamicClient(config *rest.Config, logger *log.Logger) (*Client, error) { + client, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + return &Client{ + logger: logger, + client: client, + clientConfig: config, + }, nil +} + +func (c *Client) GetKubePolicyDeployment() (*apps.Deployment, error) { + kubePolicyDeployment, err := c.GetResource("deployments", config.KubePolicyNamespace, config.KubePolicyDeploymentName) + if err != nil { + return nil, err + } + deploy := apps.Deployment{} + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(kubePolicyDeployment.UnstructuredContent(), &deploy); err != nil { + return nil, err + } + return &deploy, nil +} + +//TODO: can we use dynamic client to fetch the typed interface +// or generate a kube client value to access the interface +//GetEventsInterface provides typed interface for events +func (c *Client) GetEventsInterface() (event.EventInterface, error) { + kubeClient, err := newKubeClient(c.clientConfig) + if err != nil { + return nil, err + } + return kubeClient.CoreV1().Events(""), nil +} + +func (c *Client) GetCSRInterface() (csrtype.CertificateSigningRequestInterface, error) { + kubeClient, err := newKubeClient(c.clientConfig) + if err != nil { + return nil, err + } + + return kubeClient.CertificatesV1beta1().CertificateSigningRequests(), nil +} + +func (c *Client) getInterface(kind string) dynamic.NamespaceableResourceInterface { + return c.client.Resource(c.getGroupVersionMapper(kind)) +} + +func (c *Client) getResourceInterface(kind string, namespace string) dynamic.ResourceInterface { + // Get the resource interface + namespaceableInterface := c.getInterface(kind) + // Get the namespacable interface + var resourceInteface dynamic.ResourceInterface + if namespace != "" { + resourceInteface = namespaceableInterface.Namespace(namespace) + } else { + resourceInteface = namespaceableInterface + } + return resourceInteface +} + +// Keep this a stateful as the resource list will be based on the kubernetes version we connect to +func (c *Client) getGroupVersionMapper(kind string) schema.GroupVersionResource { + //TODO: add checks to see if the kind is supported + //TODO: build the resource list dynamically( by querying the registered resource kinds) + //TODO: the error scenarios + return getGrpVersionMapper(kind, c.clientConfig, false) +} + +// GetResource returns the resource in unstructured/json format +func (c *Client) GetResource(kind string, namespace string, name string) (*unstructured.Unstructured, error) { + return c.getResourceInterface(kind, namespace).Get(name, meta.GetOptions{}) +} + +// ListResource returns the list of resources in unstructured/json format +// Access items using []Items +func (c *Client) ListResource(kind string, namespace string) (*unstructured.UnstructuredList, error) { + return c.getResourceInterface(kind, namespace).List(meta.ListOptions{}) +} + +func (c *Client) DeleteResouce(kind string, namespace string, name string) error { + return c.getResourceInterface(kind, namespace).Delete(name, &meta.DeleteOptions{}) + +} + +// CreateResource creates object for the specified kind/namespace +func (c *Client) CreateResource(kind string, namespace string, obj interface{}) (*unstructured.Unstructured, error) { + // convert typed to unstructured obj + if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { + return c.getResourceInterface(kind, namespace).Create(unstructuredObj, meta.CreateOptions{}) + } + return nil, fmt.Errorf("Unable to create resource ") +} + +// UpdateResource updates object for the specified kind/namespace +func (c *Client) UpdateResource(kind string, namespace string, obj interface{}) (*unstructured.Unstructured, error) { + // convert typed to unstructured obj + if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { + return c.getResourceInterface(kind, namespace).Update(unstructuredObj, meta.UpdateOptions{}) + } + return nil, fmt.Errorf("Unable to update resource ") +} + +func convertToUnstructured(obj interface{}) *unstructured.Unstructured { + unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to convert : %v", err)) + return nil + } + return &unstructured.Unstructured{Object: unstructuredObj} +} + +//ConvertToRuntimeObject converts unstructed to runtime.Object runtime instance +func ConvertToRuntimeObject(obj *unstructured.Unstructured) (*runtime.Object, error) { + scheme := runtime.NewScheme() + gvk := obj.GroupVersionKind() + runtimeObj, err := scheme.New(gvk) + if err != nil { + return nil, err + } + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &runtimeObj); err != nil { + return nil, err + } + return &runtimeObj, nil +} + +//TODO: make this generic for all resource type +//GenerateSecret to generate secrets + +func (c *Client) GenerateSecret(generator types.Generation, namespace string) error { + c.logger.Printf("Preparing to create secret %s/%s", namespace, generator.Name) + secret := &v1.Secret{} + + // if generator.CopyFrom != nil { + c.logger.Printf("Copying data from secret %s/%s", generator.CopyFrom.Namespace, generator.CopyFrom.Name) + // Get configMap resource + unstrSecret, err := c.GetResource("secret", generator.CopyFrom.Namespace, generator.CopyFrom.Name) + if err != nil { + return err + } + // typed object + secret, err = convertToSecret(unstrSecret) + if err != nil { + return err + } + // } + + secret.ObjectMeta = meta.ObjectMeta{ + Name: generator.Name, + Namespace: namespace, + } + + // Copy data from generator to the new secret + if generator.Data != nil { + if secret.Data == nil { + secret.Data = make(map[string][]byte) + } + + for k, v := range generator.Data { + secret.Data[k] = []byte(v) + } + } + + go c.createSecretAfterNamespaceIsCreated(*secret, namespace) + return nil +} + +//TODO: make this generic for all resource type +//GenerateConfigMap to generate configMap +func (c *Client) GenerateConfigMap(generator types.Generation, namespace string) error { + c.logger.Printf("Preparing to create configmap %s/%s", namespace, generator.Name) + configMap := &v1.ConfigMap{} + + // if generator.CopyFrom != nil { + c.logger.Printf("Copying data from configmap %s/%s", generator.CopyFrom.Namespace, generator.CopyFrom.Name) + // Get configMap resource + unstrConfigMap, err := c.GetResource("configmap", generator.CopyFrom.Namespace, generator.CopyFrom.Name) + if err != nil { + return err + } + // typed object + configMap, err = convertToConfigMap(unstrConfigMap) + if err != nil { + return err + } + + // } + configMap.ObjectMeta = meta.ObjectMeta{ + Name: generator.Name, + Namespace: namespace, + } + + // Copy data from generator to the new configmap + if generator.Data != nil { + if configMap.Data == nil { + configMap.Data = make(map[string]string) + } + + for k, v := range generator.Data { + configMap.Data[k] = v + } + } + go c.createConfigMapAfterNamespaceIsCreated(*configMap, namespace) + return nil +} + +func convertToConfigMap(obj *unstructured.Unstructured) (*v1.ConfigMap, error) { + configMap := v1.ConfigMap{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &configMap); err != nil { + return nil, err + } + return &configMap, nil +} + +func convertToSecret(obj *unstructured.Unstructured) (*v1.Secret, error) { + secret := v1.Secret{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &secret); err != nil { + return nil, err + } + return &secret, nil +} + +func convertToCSR(obj *unstructured.Unstructured) (*certificates.CertificateSigningRequest, error) { + csr := certificates.CertificateSigningRequest{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &csr); err != nil { + return nil, err + } + return &csr, nil +} + +func (c *Client) createConfigMapAfterNamespaceIsCreated(configMap v1.ConfigMap, namespace string) { + err := c.waitUntilNamespaceIsCreated(namespace) + if err == nil { + _, err = c.CreateResource("configmap", namespace, configMap) + } + if err != nil { + c.logger.Printf("Can't create a configmap: %s", err) + } +} + +func (c *Client) createSecretAfterNamespaceIsCreated(secret v1.Secret, namespace string) { + err := c.waitUntilNamespaceIsCreated(namespace) + if err == nil { + _, err = c.CreateResource("secrets", namespace, secret) + } + if err != nil { + c.logger.Printf("Can't create a secret: %s", err) + } +} + +// Waits until namespace is created with maximum duration maxWaitTimeForNamespaceCreation +func (c *Client) waitUntilNamespaceIsCreated(name string) error { + timeStart := time.Now() + + var lastError error = nil + for time.Now().Sub(timeStart) < namespaceCreationMaxWaitTime { + _, lastError = c.GetResource("namespace", "", name) + if lastError == nil { + break + } + time.Sleep(namespaceCreationWaitInterval) + } + return lastError +} + +//GetSupportedKinds provides list of supported types +func GetSupportedKinds() []string { + return rTypes +} + +var rTypes = []string{ + "ConfigMap", "Pods", "Deployment", "CronJob", "Endpoints", "HorizontalPodAutoscaler", + "Ingress", "Job", "LimitRange", "Namespace", "NetworkPolicy", "PersistentVolumeClaim", + "PodDisruptionBudget", "PodTemplate", "ResourceQuota", "Secret", "Service", "StatefulSet", +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000000..75f509ff84 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,6 @@ +package client + +// GetResource +// ListResource +// CreateResource +// getGroupVersionMapper (valid and invalid resources) diff --git a/client/utils.go b/client/utils.go new file mode 100644 index 0000000000..443f2aed63 --- /dev/null +++ b/client/utils.go @@ -0,0 +1,79 @@ +package client + +import ( + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const namespaceCreationMaxWaitTime time.Duration = 30 * time.Second +const namespaceCreationWaitInterval time.Duration = 100 * time.Millisecond + +var groupVersionMapper map[string]schema.GroupVersionResource + +func getGrpVersionMapper(kind string, clientConfig *rest.Config, refresh bool) schema.GroupVersionResource { + grpVersionSchema := schema.GroupVersionResource{} + + if groupVersionMapper == nil || refresh { + groupVersionMapper = make(map[string]schema.GroupVersionResource) + // refesh the mapper + if err := refreshRegisteredResources(groupVersionMapper, clientConfig); err != nil { + utilruntime.HandleError(err) + return grpVersionSchema + } + } + // Query mapper + if val, ok := getValue(kind); ok { + return *val + } + utilruntime.HandleError(fmt.Errorf("Resouce '%s' not registered", kind)) + return grpVersionSchema +} + +func getValue(kind string) (*schema.GroupVersionResource, bool) { + if val, ok := groupVersionMapper[kind]; ok { + return &val, true + } + return nil, false +} + +func refreshRegisteredResources(mapper map[string]schema.GroupVersionResource, clientConfig *rest.Config) error { + // build kubernetes client + client, err := newKubeClient(clientConfig) + if err != nil { + return err + } + // get registered server groups and resources + _, resourceList, err := client.Discovery().ServerGroupsAndResources() + if err != nil { + return err + } + + for _, apiResource := range resourceList { + for _, resource := range apiResource.APIResources { + // fmt.Println(resource.Name + " : " + apiResource.APIVersion + " , " + apiResource.GroupVersion + " , " + resource.Name) + grpVersion := strings.Split(apiResource.GroupVersion, "/") + if len(grpVersion) == 2 { + mapper[resource.Name] = schema.GroupVersionResource{ + Group: grpVersion[0], + Version: grpVersion[1], + Resource: resource.Name, + } + } + } + } + return nil +} + +func newKubeClient(clientConfig *rest.Config) (*kubernetes.Clientset, error) { + client, err := kubernetes.NewForConfig(clientConfig) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/init.go b/init.go index 358e7b2d29..9b70184b17 100644 --- a/init.go +++ b/init.go @@ -5,9 +5,9 @@ import ( "log" "net/url" + client "github.com/nirmata/kube-policy/client" "github.com/nirmata/kube-policy/config" - "github.com/nirmata/kube-policy/kubeclient" - "github.com/nirmata/kube-policy/pkg/tls" + tls "github.com/nirmata/kube-policy/pkg/tls" rest "k8s.io/client-go/rest" clientcmd "k8s.io/client-go/tools/clientcmd" @@ -23,7 +23,7 @@ func createClientConfig(kubeconfig string) (*rest.Config, error) { } } -func initTlsPemPair(certFile, keyFile string, clientConfig *rest.Config, kubeclient *kubeclient.KubeClient) (*tls.TlsPemPair, error) { +func initTlsPemPair(certFile, keyFile string, clientConfig *rest.Config, client *client.Client) (*tls.TlsPemPair, error) { var tlsPair *tls.TlsPemPair if certFile != "" || keyFile != "" { tlsPair = tlsPairFromFiles(certFile, keyFile) @@ -33,7 +33,10 @@ func initTlsPemPair(certFile, keyFile string, clientConfig *rest.Config, kubecli if tlsPair != nil { return tlsPair, nil } else { - tlsPair, err = tlsPairFromCluster(clientConfig, kubeclient) + tlsPair, err = tlsPairFromCluster(clientConfig, client) + if err == nil { + log.Printf("Using TLS key/certificate from cluster") + } return tlsPair, err } } @@ -65,7 +68,7 @@ func tlsPairFromFiles(certFile, keyFile string) *tls.TlsPemPair { // Loads or creates PEM private key and TLS certificate for webhook server. // Created pair is stored in cluster's secret. // Returns struct with key/certificate pair. -func tlsPairFromCluster(configuration *rest.Config, client *kubeclient.KubeClient) (*tls.TlsPemPair, error) { +func tlsPairFromCluster(configuration *rest.Config, client *client.Client) (*tls.TlsPemPair, error) { apiServerUrl, err := url.Parse(configuration.Host) if err != nil { return nil, err @@ -75,7 +78,6 @@ func tlsPairFromCluster(configuration *rest.Config, client *kubeclient.KubeClien Namespace: config.KubePolicyNamespace, ApiServerHost: apiServerUrl.Hostname(), } - tlsPair := client.ReadTlsPair(certProps) if tls.IsTlsPairShouldBeUpdated(tlsPair) { log.Printf("Generating new key/certificate pair for TLS") diff --git a/kubeclient/kubeclient.go b/kubeclient/kubeclient.go deleted file mode 100644 index f32d6950b6..0000000000 --- a/kubeclient/kubeclient.go +++ /dev/null @@ -1,599 +0,0 @@ -package kubeclient - -import ( - "fmt" - "log" - "os" - "time" - - "github.com/nirmata/kube-policy/config" - types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/kubernetes" - event "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -// KubeClient is the api-client for core Kubernetes objects -type KubeClient struct { - client *kubernetes.Clientset - logger *log.Logger -} - -// Checks parameters and creates new instance of KubeClient -func NewKubeClient(config *rest.Config, logger *log.Logger) (*KubeClient, error) { - if logger == nil { - logger = log.New(os.Stdout, "Kubernetes client: ", log.LstdFlags) - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - return &KubeClient{ - logger: logger, - client: client, - }, nil -} - -func (kc *KubeClient) GetEvents(namespace string) event.EventInterface { - return kc.client.CoreV1().Events(namespace) -} - -func (kc *KubeClient) GetKubePolicyDeployment() (*apps.Deployment, error) { - kubePolicyDeployment, err := kc.client. - AppsV1(). - Deployments(config.KubePolicyNamespace). - Get(config.KubePolicyDeploymentName, metav1.GetOptions{}) - - if err != nil { - return nil, err - } - - return kubePolicyDeployment, nil -} - -// Generates new ConfigMap in given namespace. If the namespace does not exists yet, -// waits until it is created for maximum namespaceCreationMaxWaitTime (see below) -func (kc *KubeClient) GenerateConfigMap(generator types.Generation, namespace string) error { - kc.logger.Printf("Preparing to create configmap %s/%s", namespace, generator.Name) - configMap := &v1.ConfigMap{} - - var err error - - kc.logger.Printf("Copying data from configmap %s/%s", generator.CopyFrom.Namespace, generator.CopyFrom.Name) - configMap, err = kc.client.CoreV1().ConfigMaps(generator.CopyFrom.Namespace).Get(generator.CopyFrom.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - configMap.ObjectMeta = metav1.ObjectMeta{ - Name: generator.Name, - Namespace: namespace, - } - - // Copy data from generator to the new configmap - if generator.Data != nil { - if configMap.Data == nil { - configMap.Data = make(map[string]string) - } - - for k, v := range generator.Data { - configMap.Data[k] = v - } - } - - go kc.createConfigMapAfterNamespaceIsCreated(*configMap, namespace) - return nil -} - -// Generates new Secret in given namespace. If the namespace does not exists yet, -// waits until it is created for maximum namespaceCreationMaxWaitTime (see below) -func (kc *KubeClient) GenerateSecret(generator types.Generation, namespace string) error { - kc.logger.Printf("Preparing to create secret %s/%s", namespace, generator.Name) - secret := &v1.Secret{} - - var err error - - kc.logger.Printf("Copying data from secret %s/%s", generator.CopyFrom.Namespace, generator.CopyFrom.Name) - secret, err = kc.client.CoreV1().Secrets(generator.CopyFrom.Namespace).Get(generator.CopyFrom.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - secret.ObjectMeta = metav1.ObjectMeta{ - Name: generator.Name, - Namespace: namespace, - } - - // Copy data from generator to the new secret - if generator.Data != nil { - if secret.Data == nil { - secret.Data = make(map[string][]byte) - } - - for k, v := range generator.Data { - secret.Data[k] = []byte(v) - } - } - - go kc.createSecretAfterNamespaceIsCreated(*secret, namespace) - return nil -} - -func defaultDeleteOptions() *metav1.DeleteOptions { - var deletePeriod int64 = 0 - return &metav1.DeleteOptions{ - GracePeriodSeconds: &deletePeriod, - } -} - -const namespaceCreationMaxWaitTime time.Duration = 30 * time.Second -const namespaceCreationWaitInterval time.Duration = 100 * time.Millisecond - -// Waits until namespace is created with maximum duration maxWaitTimeForNamespaceCreation -func (kc *KubeClient) waitUntilNamespaceIsCreated(name string) error { - timeStart := time.Now() - - var lastError error = nil - for time.Now().Sub(timeStart) < namespaceCreationMaxWaitTime { - _, lastError = kc.client.CoreV1().Namespaces().Get(name, metav1.GetOptions{}) - if lastError == nil { - break - } - time.Sleep(namespaceCreationWaitInterval) - } - return lastError -} - -func (kc *KubeClient) createConfigMapAfterNamespaceIsCreated(configMap v1.ConfigMap, namespace string) { - err := kc.waitUntilNamespaceIsCreated(namespace) - if err == nil { - _, err = kc.client.CoreV1().ConfigMaps(namespace).Create(&configMap) - } - if err != nil { - kc.logger.Printf("Can't create a configmap: %s", err) - } -} - -func (kc *KubeClient) createSecretAfterNamespaceIsCreated(secret v1.Secret, namespace string) { - err := kc.waitUntilNamespaceIsCreated(namespace) - if err == nil { - _, err = kc.client.CoreV1().Secrets(namespace).Create(&secret) - } - if err != nil { - kc.logger.Printf("Can't create a secret: %s", err) - } -} - -var rMapper = map[string]getter{ - "ConfigMap": configMapGetter, - "Pods": podsGetter, - "Deployment": deploymentGetter, - "CronJob": cronJobGetter, - "Endpoints": endpointsbGetter, - "HorizontalPodAutoscaler": horizontalPodAutoscalerGetter, - "Ingress": ingressGetter, - "Job": jobGetter, - "LimitRange": limitRangeGetter, - "Namespace": namespaceGetter, - "NetworkPolicy": networkPolicyGetter, - "PersistentVolumeClaim": persistentVolumeClaimGetter, - "PodDisruptionBudget": podDisruptionBudgetGetter, - "PodTemplate": podTemplateGetter, - "ResourceQuota": resourceQuotaGetter, - "Secret": secretGetter, - "Service": serviceGetter, - "StatefulSet": statefulSetGetter, -} - -var lMapper = map[string]lister{ - "ConfigMap": configMapLister, - "Pods": podLister, - "Deployment": deploymentLister, - "CronJob": cronJobLister, - "Endpoints": endpointsLister, - "HorizontalPodAutoscaler": horizontalPodAutoscalerLister, - "Ingress": ingressLister, - "Job": jobLister, - "LimitRange": limitRangeLister, - "Namespace": namespaceLister, - "NetworkPolicy": networkPolicyLister, - "PersistentVolumeClaim": persistentVolumeClaimLister, - "PodDisruptionBudget": podDisruptionBudgetLister, - "PodTemplate": podTemplateLister, - "ResourceQuota": resourceQuotaLister, - "Secret": secretLister, - "Service": serviceLister, - "StatefulSet": statefulSetLister, -} - -type getter func(*kubernetes.Clientset, string, string) (runtime.Object, error) -type lister func(*kubernetes.Clientset, string) ([]runtime.Object, error) - -//ListResource to return resource list -func (kc *KubeClient) ListResource(kind string, namespace string) ([]runtime.Object, error) { - return lMapper[kind](kc.client, namespace) -} - -//GetResource get the resource object -func (kc *KubeClient) GetResource(kind string, resource string) (runtime.Object, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", resource)) - return nil, err - } - return rMapper[kind](kc.client, namespace, name) -} - -//GetSupportedKinds provides list of supported types -func GetSupportedKinds() (rTypes []string) { - for k := range rMapper { - rTypes = append(rTypes, k) - } - return rTypes -} - -func configMapGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func configMapLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func podsGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func podLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().Pods(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func deploymentGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} -func deploymentLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.AppsV1().Deployments(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func cronJobGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.BatchV1beta1().CronJobs(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func cronJobLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.BatchV1beta1().CronJobs(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func endpointsbGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func endpointsLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().Endpoints(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func horizontalPodAutoscalerGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.AutoscalingV1().HorizontalPodAutoscalers(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func horizontalPodAutoscalerLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func ingressGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.ExtensionsV1beta1().Ingresses(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func ingressLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.ExtensionsV1beta1().Ingresses(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func jobGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.BatchV1().Jobs(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func jobLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.BatchV1().Jobs(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func limitRangeGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().LimitRanges(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} -func limitRangeLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().LimitRanges(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func namespaceGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().Namespaces().Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func namespaceLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().Namespaces().List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func networkPolicyGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.NetworkingV1().NetworkPolicies(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func networkPolicyLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.NetworkingV1().NetworkPolicies(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func persistentVolumeClaimGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func persistentVolumeClaimLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func podDisruptionBudgetGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func podDisruptionBudgetLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func podTemplateGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().PodTemplates(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func podTemplateLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().PodTemplates(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func resourceQuotaGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().ResourceQuotas(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func resourceQuotaLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().ResourceQuotas(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func secretGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func secretLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().Secrets(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func serviceGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func serviceLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.CoreV1().Services(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} - -func statefulSetGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { - obj, err := clientSet.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil -} - -func statefulSetLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { - list, err := clientSet.AppsV1().StatefulSets(namespace).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - objList := []runtime.Object{} - for _, obj := range list.Items { - objList = append(objList, &obj) - } - return objList, nil -} diff --git a/main.go b/main.go index 3fd428f433..66e7e1a250 100644 --- a/main.go +++ b/main.go @@ -4,14 +4,15 @@ import ( "flag" "log" - "github.com/nirmata/kube-policy/kubeclient" + "k8s.io/sample-controller/pkg/signals" + + client "github.com/nirmata/kube-policy/client" policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" controller "github.com/nirmata/kube-policy/pkg/controller" event "github.com/nirmata/kube-policy/pkg/event" - violation "github.com/nirmata/kube-policy/pkg/violation" + "github.com/nirmata/kube-policy/pkg/violation" "github.com/nirmata/kube-policy/pkg/webhooks" - "k8s.io/sample-controller/pkg/signals" ) var ( @@ -26,9 +27,9 @@ func main() { log.Fatalf("Error building kubeconfig: %v\n", err) } - kubeclient, err := kubeclient.NewKubeClient(clientConfig, nil) + client, err := client.NewDynamicClient(clientConfig, nil) if err != nil { - log.Fatalf("Error creating kubeclient: %v\n", err) + log.Fatalf("Error creating client: %v\n", err) } policyClientset, err := policyclientset.NewForConfig(clientConfig) @@ -40,21 +41,17 @@ func main() { policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) policyInformer := policyInformerFactory.Kubepolicy().V1alpha1().Policies() - eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil) - violationBuilder := violation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil) + eventController := event.NewEventController(client, policyInformer.Lister(), nil) + violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformer.Lister(), policyClientset, eventController, nil) - policyController := controller.NewPolicyController(policyClientset, + policyController := controller.NewPolicyController( + client, policyInformer, violationBuilder, eventController, - nil, - kubeclient) + nil) - if err != nil { - log.Fatalf("Error creating mutation webhook: %v\n", err) - } - - tlsPair, err := initTlsPemPair(cert, key, clientConfig, kubeclient) + tlsPair, err := initTlsPemPair(cert, key, clientConfig, client) if err != nil { log.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err) } @@ -64,7 +61,7 @@ func main() { log.Fatalf("Unable to create webhook server: %v\n", err) } - webhookRegistrationClient, err := webhooks.NewWebhookRegistrationClient(clientConfig, kubeclient) + webhookRegistrationClient, err := webhooks.NewWebhookRegistrationClient(clientConfig, client) if err != nil { log.Fatalf("Unable to register admission webhooks on cluster: %v\n", err) } @@ -85,6 +82,7 @@ func main() { server.RunAsync() <-stopCh server.Stop() + policyController.Stop() } func init() { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4aede48576..12d2b109a8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,9 +6,8 @@ import ( "os" "time" - kubeClient "github.com/nirmata/kube-policy/kubeclient" + client "github.com/nirmata/kube-policy/client" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" event "github.com/nirmata/kube-policy/pkg/event" @@ -23,9 +22,8 @@ import ( //PolicyController to manage Policy CRD type PolicyController struct { - kubeClient *kubeClient.KubeClient + client *client.Client policyLister lister.PolicyLister - policyInterface policyclientset.Interface policySynced cache.InformerSynced violationBuilder violation.Generator eventBuilder event.Generator @@ -34,21 +32,18 @@ type PolicyController struct { } // NewPolicyController from cmd args -func NewPolicyController(policyInterface policyclientset.Interface, +func NewPolicyController(client *client.Client, policyInformer infomertypes.PolicyInformer, violationBuilder violation.Generator, eventController event.Generator, - logger *log.Logger, - kubeClient *kubeClient.KubeClient) *PolicyController { + logger *log.Logger) *PolicyController { if logger == nil { logger = log.New(os.Stdout, "Policy Controller: ", log.LstdFlags) } - controller := &PolicyController{ - kubeClient: kubeClient, + client: client, policyLister: policyInformer.Lister(), - policyInterface: policyInterface, policySynced: policyInformer.Informer().HasSynced, violationBuilder: violationBuilder, eventBuilder: eventController, @@ -90,6 +85,7 @@ func (pc *PolicyController) deletePolicyHandler(resource interface{}) { func (pc *PolicyController) enqueuePolicy(obj interface{}) { var key string var err error + pc.logger.Println("enque") if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return @@ -109,11 +105,14 @@ func (pc *PolicyController) Run(stopCh <-chan struct{}) error { for i := 0; i < policyControllerWorkerCount; i++ { go wait.Until(pc.runWorker, time.Second, stopCh) } + pc.logger.Println("started policy controller workers") - pc.logger.Println("Started policy controller") return nil } +func (pc *PolicyController) Stop() { + pc.logger.Println("shutting down policy controller workers") +} func (pc *PolicyController) runWorker() { for pc.processNextWorkItem() { } @@ -181,6 +180,7 @@ func (pc *PolicyController) syncHandler(obj interface{}) error { // process policy on existing resource // get the violations and pass to violation Builder // get the events and pass to event Builder + //TODO: processPolicy fmt.Println(policy) return nil } diff --git a/pkg/controller/processPolicy.go b/pkg/controller/processPolicy.go index c9ff4ceb5b..35f4306b56 100644 --- a/pkg/controller/processPolicy.go +++ b/pkg/controller/processPolicy.go @@ -1,15 +1,14 @@ package controller import ( - "encoding/json" "fmt" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" engine "github.com/nirmata/kube-policy/pkg/engine" + "github.com/nirmata/kube-policy/pkg/engine/mutation" event "github.com/nirmata/kube-policy/pkg/event" violation "github.com/nirmata/kube-policy/pkg/violation" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) @@ -22,7 +21,7 @@ func (pc *PolicyController) runForPolicy(key string) { } if policy == nil { - pc.logger.Printf("Counld not find policy by key %s", key) + pc.logger.Printf("Could not find policy by key %s", key) return } @@ -51,13 +50,12 @@ func (pc *PolicyController) processPolicy(policy types.Policy) ( } for _, resource := range resources { - rawResource, err := json.Marshal(resource) if err != nil { pc.logger.Printf("Failed to marshal resources map to rule %s, err: %v\n", rule.Name, err) continue } - violation, eventInfos, err := engine.ProcessExisting(policy, rawResource) + violation, eventInfos, err := engine.ProcessExisting(policy, resource) if err != nil { pc.logger.Printf("Failed to process rule %s, err: %v\n", rule.Name, err) continue @@ -70,8 +68,8 @@ func (pc *PolicyController) processPolicy(policy types.Policy) ( return violations, events, nil } -func (pc *PolicyController) filterResourceByRule(rule types.Rule) ([]runtime.Object, error) { - var targetResources []runtime.Object +func (pc *PolicyController) filterResourceByRule(rule types.Rule) ([][]byte, error) { + var targetResources [][]byte // TODO: make this namespace all var namespace = "default" if err := rule.Validate(); err != nil { @@ -79,27 +77,26 @@ func (pc *PolicyController) filterResourceByRule(rule types.Rule) ([]runtime.Obj } // Get the resource list from kind - resources, err := pc.kubeClient.ListResource(rule.ResourceDescription.Kind, namespace) + resources, err := pc.client.ListResource(rule.ResourceDescription.Kind, namespace) if err != nil { return nil, err } - for _, resource := range resources { + for _, resource := range resources.Items { // TODO: - //rawResource, err := json.Marshal(resource) + rawResource, err := resource.MarshalJSON() // objKind := resource.GetObjectKind() // codecFactory := serializer.NewCodecFactory(runtime.NewScheme()) // codecFactory.EncoderForVersion() - if err != nil { pc.logger.Printf("failed to marshal object %v", resource) continue } // filter the resource by name and label - //if ok, _ := mutation.ResourceMeetsRules(rawResource, rule.ResourceDescription); ok { - // targetResources = append(targetResources, resource) - //} + if ok, _ := mutation.IsRuleApplicableToResource(rawResource, rule.ResourceDescription); ok { + targetResources = append(targetResources, rawResource) + } } return targetResources, nil } @@ -117,6 +114,61 @@ func (pc *PolicyController) getPolicyByKey(key string) (*types.Policy, error) { return elem, nil } } - return nil, nil } + +//TODO wrap the generate, mutation & validation functions for the existing resources +//ProcessExisting processes the policy rule types for the existing resources +func (pc *PolicyController) processExisting(policy types.Policy, rawResource []byte) ([]violation.Info, []event.Info, error) { + // Generate + // generatedDataList := engine.Generate(pc.logger, policy, rawResource) + // // apply the generateData using the kubeClient + // err = pc.applyGenerate(generatedDataList) + // if err != nil { + // return nil, nil, err + // } + // // Mutation + // mutationPatches, err := engine.Mutation(pc.logger, policy, rawResource) + // if err != nil { + // return nil, nil, err + // } + // // Apply mutationPatches on the rawResource + // err = pc.applyPatches(mutationPatches, rawResource) + // if err != nil { + // return nil, nil, err + // } + // //Validation + // validate, _, _ := engine.Validation(policy, rawResource) + // if !validate { + // // validation has errors -> so there will be violations + // // call the violatio builder to apply the violations + // } + // // Generate events + + return nil, nil, nil +} + +//TODO: return events and policy violations +// func (pc *PolicyController) applyGenerate(generatedDataList []engine.GenerationResponse) error { +// // for _, generateData := range generatedDataList { +// // switch generateData.Generator.Kind { +// // case "ConfigMap": +// // err := pc.client.GenerateConfigMap(generateData.Generator, generateData.Namespace) +// // if err != nil { +// // return err +// // } +// // case "Secret": +// // err := pc.client.GenerateSecret(generateData.Generator, generateData.Namespace) +// // if err != nil { +// // return err +// // } +// // default: +// // return errors.New("Unsuported config kind") +// // } +// // } +// return nil +// } + +func (pc *PolicyController) applyPatches([]mutation.PatchBytes, []byte) error { + return nil +} diff --git a/pkg/engine/generate.go b/pkg/engine/generate.go new file mode 100644 index 0000000000..c20670eddc --- /dev/null +++ b/pkg/engine/generate.go @@ -0,0 +1,85 @@ + package engine + +// import ( +// "fmt" +// "log" + +// types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" +// "github.com/nirmata/kube-policy/pkg/engine/mutation" +// ) + +// // Generate should be called to process generate rules on the resource +// func Generate(logger *log.Logger, policy types.Policy, rawResource []byte) ([]GenerateReturnData, error) { +// patchingSets := mutation.GetPolicyPatchingSets(policy) +// generatedList := []GenerateReturnData{} +// for ruleIdx, rule := range policy.Spec.Rules { +// err := rule.Validate() +// if err != nil { +// logger.Printf("Invalid rule detected: #%d in policy %s, err: %v\n", ruleIdx, policy.ObjectMeta.Name, err) +// continue +// } +// if ok, err := mutation.IsRuleApplicableToResource(rawResource, rule.Resource); !ok { +// logger.Printf("Rule %d of policy %s is not applicable to the request", ruleIdx, policy.Name) +// return nil, err +// } +// resourceKind := mutation.ParseKindFromObject(rawResource) + +// // configMapGenerator and secretGenerator can be applied only to namespaces +// if resourceKind == "Namespace" { +// generatedData, err := applyRuleGenerators(rawResource, rule) +// if err != nil && patchingSets == mutation.PatchingSetsStopOnError { +// return nil, fmt.Errorf("Failed to apply generators from rule #%d: %s", ruleIdx, err) +// } +// generatedList = append(generatedList, generatedData...) +// } +// } +// return generatedList, nil +// } + +// // Applies "configMapGenerator" and "secretGenerator" described in PolicyRule +// func applyRuleGenerators(rawResource []byte, rule types.PolicyRule) ([]GenerateReturnData, error) { +// returnData := []GenerateReturnData{} +// namespaceName := mutation.ParseNameFromObject(rawResource) +// var generator *types.PolicyConfigGenerator +// // Apply config map generator rule +// generator, err := applyConfigGenerator(rule.ConfigMapGenerator, namespaceName, "ConfigMap") +// if err != nil { +// return returnData, err +// } +// returnData = append(returnData, GenerateReturnData{namespaceName, "ConfigMap", *generator}) +// // Apply secrets generator rule +// generator, err = applyConfigGenerator(rule.SecretGenerator, namespaceName, "Secret") +// if err != nil { +// return returnData, err +// } +// returnData = append(returnData, GenerateReturnData{namespaceName, "Secret", *generator}) + +// return returnData, nil +// } + +// // Creates resourceKind (ConfigMap or Secret) with parameters specified in generator in cluster specified in request. +// func applyConfigGenerator(generator *types.PolicyConfigGenerator, namespace string, configKind string) (*types.PolicyConfigGenerator, error) { +// if generator == nil { +// return nil, nil +// } +// err := generator.Validate() +// if err != nil { +// return nil, fmt.Errorf("Generator for '%s' is invalid: %s", configKind, err) +// } +// switch configKind { +// case "ConfigMap": +// return generator, nil +// // err = kubeClient.GenerateConfigMap(*generator, namespace) +// case "Secret": +// return generator, nil +// default: +// return nil, fmt.Errorf("Unsupported config Kind '%s'", configKind) +// } +// } + +// //GenerateReturnData holds the generator details +// type GenerateReturnData struct { +// Namespace string +// ConfigKind string +// Generator types.PolicyConfigGenerator +// } diff --git a/pkg/engine/generation.go b/pkg/engine/generation.go index e5cb259942..450d437b52 100644 --- a/pkg/engine/generation.go +++ b/pkg/engine/generation.go @@ -1,31 +1,24 @@ package engine import ( + "errors" "fmt" "log" + client "github.com/nirmata/kube-policy/client" kubepolicy "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" "github.com/nirmata/kube-policy/pkg/engine/mutation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type GenerationResponse struct { - Generator *kubepolicy.Generation - Namespace string -} - // Generate should be called to process generate rules on the resource -// TODO: extend kubeclient(will change to dynamic client) to create resources -func Generate(policy kubepolicy.Policy, rawResource []byte, gvk metav1.GroupVersionKind) []GenerationResponse { +func Generate(client *client.Client, logger *log.Logger, policy kubepolicy.Policy, rawResource []byte, gvk metav1.GroupVersionKind) { // configMapGenerator and secretGenerator can be applied only to namespaces if gvk.Kind != "Namespace" { - return nil + return } - var generateResps []GenerationResponse - for i, rule := range policy.Spec.Rules { - // Checks for preconditions // TODO: Rework PolicyEngine interface that it receives not a policy, but mutation object for // Mutate, validation for Validate and so on. It will allow to bring this checks outside of PolicyEngine @@ -33,46 +26,49 @@ func Generate(policy kubepolicy.Policy, rawResource []byte, gvk metav1.GroupVers err := rule.Validate() if err != nil { - log.Printf("Rule has invalid structure: rule number = %d, rule name = %s in policy %s, err: %v\n", i, rule.Name, policy.ObjectMeta.Name, err) + logger.Printf("Rule has invalid structure: rule number = %d, rule name = %s in policy %s, err: %v\n", i, rule.Name, policy.ObjectMeta.Name, err) continue } ok, err := mutation.ResourceMeetsRules(rawResource, rule.ResourceDescription, gvk) if err != nil { - log.Printf("Rule has invalid data: rule number = %d, rule name = %s in policy %s, err: %v\n", i, rule.Name, policy.ObjectMeta.Name, err) + logger.Printf("Rule has invalid data: rule number = %d, rule name = %s in policy %s, err: %v\n", i, rule.Name, policy.ObjectMeta.Name, err) continue } if !ok { - log.Printf("Rule is not applicable to the request: rule name = %s in policy %s \n", rule.Name, policy.ObjectMeta.Name) + logger.Printf("Rule is not applicable to the request: rule name = %s in policy %s \n", rule.Name, policy.ObjectMeta.Name) continue } - generateResps, err = applyRuleGenerator(rawResource, rule.Generation) + err = applyRuleGenerator(client, rawResource, rule.Generation, gvk) if err != nil { - log.Printf("Failed to apply rule generator: %v", err) - } else { - generateResps = append(generateResps, generateResps...) + logger.Printf("Failed to apply rule generator: %v", err) } } - - return generateResps } // Applies "configMapGenerator" and "secretGenerator" described in PolicyRule // TODO: plan to support all kinds of generator -func applyRuleGenerator(rawResource []byte, generator *kubepolicy.Generation) ([]GenerationResponse, error) { - var generateResps []GenerationResponse +func applyRuleGenerator(client *client.Client, rawResource []byte, generator *kubepolicy.Generation, gvk metav1.GroupVersionKind) error { if generator == nil { - return nil, nil + return nil } err := generator.Validate() if err != nil { - return nil, fmt.Errorf("Generator for '%s' is invalid: %s", generator.Kind, err) + return fmt.Errorf("Generator for '%s' is invalid: %s", generator.Kind, err) } namespaceName := mutation.ParseNameFromObject(rawResource) - generateResps = append(generateResps, GenerationResponse{generator, namespaceName}) - return generateResps, nil + // Generate the resource + switch gvk.Kind { + case "configmap": + err = client.GenerateConfigMap(*generator, namespaceName) + case "secret": + err = client.GenerateSecret(*generator, namespaceName) + case "default": + err = errors.New("resource not supported") + } + return err } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index e0507af49f..ac726fcf1a 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -6,7 +6,7 @@ import ( "os" "time" - kubeClient "github.com/nirmata/kube-policy/kubeclient" + client "github.com/nirmata/kube-policy/client" "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" @@ -21,7 +21,7 @@ import ( ) type controller struct { - kubeClient *kubeClient.KubeClient + client *client.Client policyLister policylister.PolicyLister queue workqueue.RateLimitingInterface recorder record.EventRecorder @@ -37,36 +37,41 @@ type Generator interface { type Controller interface { Generator Run(stopCh <-chan struct{}) + Stop() } //NewEventController to generate a new event controller -func NewEventController(kubeClient *kubeClient.KubeClient, +func NewEventController(client *client.Client, policyLister policylister.PolicyLister, logger *log.Logger) Controller { if logger == nil { - logger = log.New(os.Stdout, "Event Controller: ", log.LstdFlags) + logger = log.New(os.Stdout, "Event Controller: ", log.LstdFlags) } controller := &controller{ - kubeClient: kubeClient, + client: client, policyLister: policyLister, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), - recorder: initRecorder(kubeClient), + recorder: initRecorder(client), logger: logger, } - return controller } -func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder { +func initRecorder(client *client.Client) record.EventRecorder { // Initliaze Event Broadcaster policyscheme.AddToScheme(scheme.Scheme) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(log.Printf) + eventInterface, err := client.GetEventsInterface() + if err != nil { + utilruntime.HandleError(err) // TODO: add more specific error + return nil + } eventBroadcaster.StartRecordingToSink( &typedcorev1.EventSinkImpl{ - Interface: kubeClient.GetEvents("")}) + Interface: eventInterface}) recorder := eventBroadcaster.NewRecorder( scheme.Scheme, v1.EventSource{Component: eventSource}) @@ -84,9 +89,12 @@ func (c *controller) Run(stopCh <-chan struct{}) { for i := 0; i < eventWorkerThreadCount; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } - c.logger.Println("Started eventbuilder controller") + log.Println("Started eventbuilder controller workers") } +func (c *controller) Stop() { + log.Println("Shutting down eventbuilder controller workers") +} func (c *controller) runWorker() { for c.processNextWorkItem() { } @@ -123,20 +131,32 @@ func (c *controller) processNextWorkItem() bool { func (c *controller) SyncHandler(key Info) error { var resource runtime.Object var err error + namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key.Resource)) + return err + } + switch key.Kind { case "Policy": - namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource)) - return err - } + //TODO: policy is clustered resource so wont need namespace resource, err = c.policyLister.Policies(namespace).Get(name) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource)) return err } default: - resource, err = c.kubeClient.GetResource(key.Kind, key.Resource) + resource, err = c.client.GetResource(key.Kind, namespace, name) + if err != nil { + return err + } + //TODO: Test if conversion from unstructured to runtime.Object is implicit or explicit conversion is required + // resourceobj, err := client.ConvertToRuntimeObject(resource) + // if err != nil { + // return err + // } + + // resource, err = c.kubeClient.GetResource(key.Kind, key.Resource) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) return err diff --git a/pkg/violation/builder.go b/pkg/violation/builder.go index ab66e2fc09..8fe8e4364b 100644 --- a/pkg/violation/builder.go +++ b/pkg/violation/builder.go @@ -3,8 +3,9 @@ package violation import ( "fmt" "log" + "os" - kubeClient "github.com/nirmata/kube-policy/kubeclient" + client "github.com/nirmata/kube-policy/client" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" @@ -19,7 +20,7 @@ type Generator interface { } type builder struct { - kubeClient *kubeClient.KubeClient + client *client.Client policyLister policylister.PolicyLister policyInterface policyclientset.Interface eventBuilder event.Generator @@ -34,15 +35,18 @@ type Builder interface { } //NewPolicyViolationBuilder returns new violation builder -func NewPolicyViolationBuilder( - kubeClient *kubeClient.KubeClient, +func NewPolicyViolationBuilder(client *client.Client, policyLister policylister.PolicyLister, policyInterface policyclientset.Interface, eventController event.Generator, logger *log.Logger) Builder { + if logger == nil { + logger = log.New(os.Stdout, "Violation Builder: ", log.LstdFlags) + } + builder := &builder{ - kubeClient: kubeClient, + client: client, policyLister: policyLister, policyInterface: policyInterface, eventBuilder: eventController, @@ -97,8 +101,15 @@ func (b *builder) processViolation(info Info) error { } func (b *builder) isActive(kind string, resource string) (bool, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", resource)) + return false, err + } // Generate Merge Patch - _, err := b.kubeClient.GetResource(kind, resource) + //TODO: test for namspace and clustered object + _, err = b.client.GetResource(kind, namespace, name) + // _, err := b.kubeClient.GetResource(kind, resource) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource)) return false, err diff --git a/pkg/webhooks/registration.go b/pkg/webhooks/registration.go index 6fe8a64f2b..c36c448b88 100644 --- a/pkg/webhooks/registration.go +++ b/pkg/webhooks/registration.go @@ -4,8 +4,8 @@ import ( "errors" "io/ioutil" + "github.com/nirmata/kube-policy/client" "github.com/nirmata/kube-policy/config" - kubeclient "github.com/nirmata/kube-policy/kubeclient" admregapi "k8s.io/api/admissionregistration/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,12 +16,12 @@ import ( // WebhookRegistrationClient is client for registration webhooks on cluster type WebhookRegistrationClient struct { registrationClient *admregclient.AdmissionregistrationV1beta1Client - kubeclient *kubeclient.KubeClient + client *client.Client clientConfig *rest.Config } // NewWebhookRegistrationClient creates new WebhookRegistrationClient instance -func NewWebhookRegistrationClient(clientConfig *rest.Config, kubeclient *kubeclient.KubeClient) (*WebhookRegistrationClient, error) { +func NewWebhookRegistrationClient(clientConfig *rest.Config, client *client.Client) (*WebhookRegistrationClient, error) { registrationClient, err := admregclient.NewForConfig(clientConfig) if err != nil { return nil, err @@ -29,7 +29,7 @@ func NewWebhookRegistrationClient(clientConfig *rest.Config, kubeclient *kubecli return &WebhookRegistrationClient{ registrationClient: registrationClient, - kubeclient: kubeclient, + client: client, clientConfig: clientConfig, }, nil } @@ -149,7 +149,7 @@ func constructWebhook(name, servicePath string, caData []byte) admregapi.Webhook } func (wrc *WebhookRegistrationClient) constructOwner() meta.OwnerReference { - kubePolicyDeployment, err := wrc.kubeclient.GetKubePolicyDeployment() + kubePolicyDeployment, err := wrc.client.GetKubePolicyDeployment() if err != nil { return meta.OwnerReference{} diff --git a/pkg/webhooks/utils.go b/pkg/webhooks/utils.go index da970e58c9..ed4baa9ba3 100644 --- a/pkg/webhooks/utils.go +++ b/pkg/webhooks/utils.go @@ -1,13 +1,11 @@ package webhooks -import ( - kubeclient "github.com/nirmata/kube-policy/kubeclient" -) +import "github.com/nirmata/kube-policy/client" // KindIsSupported checks kind to be prensent in // SupportedKinds defined in config func KindIsSupported(kind string) bool { - for _, k := range kubeclient.GetSupportedKinds() { + for _, k := range client.GetSupportedKinds() { if k == kind { return true }