diff --git a/controller/controller.go b/controller/controller.go deleted file mode 100644 index c2b687a67c..0000000000 --- a/controller/controller.go +++ /dev/null @@ -1,175 +0,0 @@ -package controller - -import ( - "errors" - "log" - "os" - "sort" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - - types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - clientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" - policies "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/typed/policy/v1alpha1" - informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" - lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" - violation "github.com/nirmata/kube-policy/pkg/violation" -) - -// PolicyController for CRD -type PolicyController struct { - policyInformerFactory informers.SharedInformerFactory - policyLister lister.PolicyLister - policiesInterface policies.PolicyInterface - logger *log.Logger - violationBuilder *violation.Builder -} - -// NewPolicyController from cmd args -func NewPolicyController(config *rest.Config, logger *log.Logger) (*PolicyController, error) { - if logger == nil { - logger = log.New(os.Stdout, "Policy Controller: ", log.LstdFlags|log.Lshortfile) - } - - if config == nil { - return nil, errors.New("Client Config should be set for controller") - } - - policyClientset, err := clientset.NewForConfig(config) - if err != nil { - return nil, err - } - // Initialize Kube Client - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, time.Second*30) - policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() - - // generate Violation builder - builder, err := violation.NewViolationHelper(kubeClient, policyClientset, logger, policyInformer) - if err != nil { - return nil, err - } - controller := &PolicyController{ - policyInformerFactory: policyInformerFactory, - policyLister: policyInformer.Lister(), - policiesInterface: policyClientset.NirmataV1alpha1().Policies("default"), - logger: logger, - violationBuilder: builder, - } - - policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.createPolicyHandler, - UpdateFunc: controller.updatePolicyHandler, - DeleteFunc: controller.deletePolicyHandler, - }) - - return controller, nil -} - -// Run is main controller thread -func (c *PolicyController) Run(stopCh <-chan struct{}) { - c.policyInformerFactory.Start(stopCh) - // Un-comment to run the violation Builder - c.violationBuilder.Run(1, stopCh) -} - -// GetPolicies retrieves all policy resources -// from cache. Cache is refreshed by informer -func (c *PolicyController) GetPolicies() []types.Policy { - // Create nil Selector to grab all the policies - selector := labels.NewSelector() - cachedPolicies, err := c.policyLister.List(selector) - - if err != nil { - c.logger.Printf("Error: %v", err) - return nil - } - - var policies []types.Policy - for _, elem := range cachedPolicies { - policies = append(policies, *elem.DeepCopy()) - } - - sort.Slice(policies, func(i, j int) bool { - return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time) - }) - - return policies -} - -// Writes error message to the policy logs in status section -func (c *PolicyController) LogPolicyError(name, text string) { - c.addPolicyLog(name, "[ERROR] "+text) -} - -// Writes info message to the policy logs in status section -func (c *PolicyController) LogPolicyInfo(name, text string) { - c.addPolicyLog(name, "[ INFO] "+text) -} - -// This is the maximum number of records that can be written to the log object of the policy. -// If this number is exceeded, the older entries will be deleted. -const policyLogMaxRecords int = 50 - -// Appends given log text to the status/logs array. -func (c *PolicyController) addPolicyLog(name, text string) { - getOptions := metav1.GetOptions{ - ResourceVersion: "1", - IncludeUninitialized: true, - } - policy, err := c.policiesInterface.Get(name, getOptions) - if err != nil { - c.logger.Printf("Unable to get policy %s: %s", name, err) - return - } - - // Add new log record - text = time.Now().Format("2006 Jan 02 15:04:05.999 ") + text - //policy.Status.Logs = append(policy.Status.Logs, text) - // Pop front extra log records - // logsCount := len(policy.Status.Logs) - // if logsCount > policyLogMaxRecords { - // policy.Status.Logs = policy.Status.Logs[logsCount-policyLogMaxRecords:] - // } - // Save logs to policy object - _, err = c.policiesInterface.UpdateStatus(policy) - if err != nil { - c.logger.Printf("Unable to update logs for policy %s: %s", name, err) - } -} - -func (c *PolicyController) createPolicyHandler(resource interface{}) { - key := c.getResourceKey(resource) - c.logger.Printf("Policy created: %s", key) -} - -func (c *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) { - oldKey := c.getResourceKey(oldResource) - newKey := c.getResourceKey(newResource) - - c.logger.Printf("Policy %s updated to %s", oldKey, newKey) -} - -func (c *PolicyController) deletePolicyHandler(resource interface{}) { - key := c.getResourceKey(resource) - c.logger.Printf("Policy deleted: %s", key) -} - -func (c *PolicyController) getResourceKey(resource interface{}) string { - if key, err := cache.MetaNamespaceKeyFunc(resource); err != nil { - c.logger.Fatalf("Error retrieving policy key: %v", err) - } else { - return key - } - - return "" -} diff --git a/kubeclient/kubeclient.go b/kubeclient/kubeclient.go index 44eb018658..2edc317900 100644 --- a/kubeclient/kubeclient.go +++ b/kubeclient/kubeclient.go @@ -1,6 +1,7 @@ package kubeclient import ( + "fmt" "log" "os" "time" @@ -11,8 +12,12 @@ import ( v1 "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + event "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // KubeClient is the api-client for core Kubernetes objects @@ -38,6 +43,10 @@ func NewKubeClient(config *rest.Config, logger *log.Logger) (*KubeClient, error) }, nil } +func (kc *KubeClient) GetEventsInterface(namespace string) event.EventInterface { + return kc.client.CoreV1().Events(namespace) +} + func (kc *KubeClient) GetKubePolicyDeployment() (*apps.Deployment, error) { kubePolicyDeployment, err := kc.client. Apps(). @@ -175,3 +184,429 @@ func (kc *KubeClient) createSecretAfterNamespaceIsCreated(secret v1.Secret, name kc.logger.Printf("Can't create a secret: %s", err) } } + +var rMapper = map[string]getter{ + "ConfigMap": configMapGetter, + "Pods": podsGetter, + "Deployment": deploymentGetter, + "CronJob": cronJobGetter, + "Endpoints": endpointsbGetter, + "HorizontalPodAutoscaler": horizontalPodAutoscalerGetter, + "Ingress": ingressGetter, + "Job": jobGetter, + "LimitRange": limitRangeGetter, + "Namespace": namespaceGetter, + "NetworkPolicy": networkPolicyGetter, + "PersistentVolumeClaim": persistentVolumeClaimGetter, + "PodDisruptionBudget": podDisruptionBudgetGetter, + "PodTemplate": podTemplateGetter, + "ResourceQuota": resourceQuotaGetter, + "Secret": secretGetter, + "Service": serviceGetter, + "StatefulSet": statefulSetGetter, +} + +var lMapper = map[string]lister{ + "ConfigMap": configMapLister, + "Pods": podLister, + "Deployment": deploymentLister, + "CronJob": cronJobLister, + "Endpoints": endpointsLister, + "HorizontalPodAutoscaler": horizontalPodAutoscalerLister, + "Ingress": ingressLister, + "Job": jobLister, + "LimitRange": limitRangeLister, + "Namespace": namespaceLister, + "NetworkPolicy": networkPolicyLister, + "PersistentVolumeClaim": persistentVolumeClaimLister, + "PodDisruptionBudget": podDisruptionBudgetLister, + "PodTemplate": podTemplateLister, + "ResourceQuota": resourceQuotaLister, + "Secret": secretLister, + "Service": serviceLister, + "StatefulSet": statefulSetLister, +} + +type getter func(*kubernetes.Clientset, string, string) (runtime.Object, error) +type lister func(*kubernetes.Clientset, string) ([]runtime.Object, error) + +//ListResource to return resource list +func (kc *KubeClient) ListResource(kind string, namespace string) ([]runtime.Object, error) { + return lMapper[kind](kc.client, namespace) +} + +//GetResource get the resource object +func (kc *KubeClient) GetResource(kind string, resource string) (runtime.Object, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", resource)) + return nil, err + } + return rMapper[kind](kc.client, namespace, name) +} + +//GetSupportedKinds provides list of supported types +func GetSupportedKinds() (rTypes []string) { + for k := range rMapper { + rTypes = append(rTypes, k) + } + return rTypes +} + +func configMapGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func configMapLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func podsGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func podLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func deploymentGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} +func deploymentLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.AppsV1().Deployments(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func cronJobGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.BatchV1beta1().CronJobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func cronJobLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.BatchV1beta1().CronJobs(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func endpointsbGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func endpointsLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().Endpoints(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func horizontalPodAutoscalerGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.AutoscalingV1().HorizontalPodAutoscalers(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func horizontalPodAutoscalerLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func ingressGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.ExtensionsV1beta1().Ingresses(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func ingressLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.ExtensionsV1beta1().Ingresses(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func jobGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.BatchV1().Jobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func jobLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.BatchV1().Jobs(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func limitRangeGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().LimitRanges(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} +func limitRangeLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().LimitRanges(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func namespaceGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().Namespaces().Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func namespaceLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().Namespaces().List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func networkPolicyGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.NetworkingV1().NetworkPolicies(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func networkPolicyLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.NetworkingV1().NetworkPolicies(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func persistentVolumeClaimGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func persistentVolumeClaimLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func podDisruptionBudgetGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func podDisruptionBudgetLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func podTemplateGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().PodTemplates(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func podTemplateLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().PodTemplates(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func resourceQuotaGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().ResourceQuotas(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func resourceQuotaLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().ResourceQuotas(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func secretGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func secretLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().Secrets(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func serviceGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func serviceLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.CoreV1().Services(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} + +func statefulSetGetter(clientSet *kubernetes.Clientset, namespace string, name string) (runtime.Object, error) { + obj, err := clientSet.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func statefulSetLister(clientSet *kubernetes.Clientset, namespace string) ([]runtime.Object, error) { + list, err := clientSet.AppsV1().StatefulSets(namespace).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + objList := []runtime.Object{} + for _, obj := range list.Items { + objList = append(objList, &obj) + } + return objList, nil +} diff --git a/main.go b/main.go index e0ba632a68..4e04177455 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,18 @@ import ( "flag" "log" - "github.com/nirmata/kube-policy/controller" "github.com/nirmata/kube-policy/kubeclient" + "github.com/nirmata/kube-policy/policycontroller" "github.com/nirmata/kube-policy/server" "github.com/nirmata/kube-policy/webhooks" - signals "k8s.io/sample-controller/pkg/signals" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" + policyengine "github.com/nirmata/kube-policy/pkg/policyengine" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" + + event "github.com/nirmata/kube-policy/pkg/event" + "k8s.io/sample-controller/pkg/signals" ) var ( @@ -24,17 +30,38 @@ func main() { log.Fatalf("Error building kubeconfig: %v\n", err) } - controller, err := controller.NewPolicyController(clientConfig, nil) - if err != nil { - log.Fatalf("Error creating PolicyController: %s\n", err) - } - kubeclient, err := kubeclient.NewKubeClient(clientConfig, nil) if err != nil { log.Fatalf("Error creating kubeclient: %v\n", err) } - mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig, kubeclient, controller, nil) + policyClientset, err := policyclientset.NewForConfig(clientConfig) + if err != nil { + log.Fatalf("Error creating policyClient: %v\n", err) + } + + //TODO wrap the policyInformer inside a factory + policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) + policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() + + eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil) + violationBuilder := policyviolation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil) + policyEngine := policyengine.NewPolicyEngine(kubeclient, nil) + + policyController := policycontroller.NewPolicyController(policyClientset, + policyInformer, + policyEngine, + violationBuilder, + eventController, + nil, + kubeclient) + + mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig, + kubeclient, + policyInformer.Lister(), + violationBuilder, + eventController, + nil) if err != nil { log.Fatalf("Error creating mutation webhook: %v\n", err) } @@ -51,17 +78,17 @@ func main() { server.RunAsync() stopCh := signals.SetupSignalHandler() - controller.Run(stopCh) - - if err != nil { - log.Fatalf("Error running PolicyController: %s\n", err) + policyInformerFactory.Start(stopCh) + if err = eventController.Run(stopCh); err != nil { + log.Fatalf("Error running EventController: %v\n", err) + } + + if err = policyController.Run(stopCh); err != nil { + log.Fatalf("Error running PolicyController: %v\n", err) } - log.Println("Policy Controller has started") <-stopCh - server.Stop() - log.Println("Policy Controller has stopped") } func init() { diff --git a/pkg/apis/policy/v1alpha1/types.go b/pkg/apis/policy/v1alpha1/types.go index b503a1fa86..7fcb3fc380 100644 --- a/pkg/apis/policy/v1alpha1/types.go +++ b/pkg/apis/policy/v1alpha1/types.go @@ -12,20 +12,8 @@ import ( type Policy struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec PolicySpec `json:"spec"` - // Status PolicyStatus `json:"status"` - Status PolicyViolations `json:"status,omitempty"` -} - -type PolicyViolations struct { - Violations []Violation `json:"violations,omitempty"` -} -type Violation struct { - Kind string `json:"kind,omitempty"` - Resource string `json:"resource,omitempty"` - Source string `json:"source,omitempty"` - Rule string `json:"rule,omitempty"` - Reason string `json:"reason,omitempty"` + Spec PolicySpec `json:"spec"` + Status PolicyStatus `json:"status"` } // Specification of the Policy. @@ -86,7 +74,8 @@ type PolicyCopyFrom struct { // Contains logs about policy application type PolicyStatus struct { - Logs []string `json:"log"` + Logs []string `json:"log"` + Violations []Violation `json:"violations,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -97,3 +86,10 @@ type PolicyList struct { metav1.ListMeta `json:"metadata"` Items []Policy `json:"items"` } + +// Violation for the policy +type Violation struct { + Kind string `json:"kind,omitempty"` + Resource string `json:"resource,omitempty"` + Rule string `json:"rule,omitempty"` +} diff --git a/pkg/event/eventcontroller.go b/pkg/event/eventcontroller.go new file mode 100644 index 0000000000..af517ccad6 --- /dev/null +++ b/pkg/event/eventcontroller.go @@ -0,0 +1,160 @@ +package event + +import ( + "fmt" + "log" + "time" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" + policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +type controller struct { + kubeClient *kubeClient.KubeClient + policyLister policylister.PolicyLister + queue workqueue.RateLimitingInterface + recorder record.EventRecorder + logger *log.Logger +} + +//Generator to generate event +type Generator interface { + Add(info Info) +} + +//Controller api +type Controller interface { + Generator + Run(stopCh <-chan struct{}) error +} + +//NewEventController to generate a new event controller +func NewEventController(kubeClient *kubeClient.KubeClient, + policyLister policylister.PolicyLister, + logger *log.Logger) Controller { + controller := &controller{ + kubeClient: kubeClient, + policyLister: policyLister, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), + recorder: initRecorder(kubeClient), + logger: logger, + } + return controller +} + +func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder { + // Initliaze Event Broadcaster + policyscheme.AddToScheme(scheme.Scheme) + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.Printf) + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{ + Interface: kubeClient.GetEventsInterface("")}) + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, + v1.EventSource{Component: eventSource}) + return recorder +} + +func (c *controller) Add(info Info) { + c.queue.Add(info) +} + +func (c *controller) Run(stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + log.Println("starting eventbuilder controller") + + log.Println("Starting eventbuilder controller workers") + for i := 0; i < eventWorkerThreadCount; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + log.Println("Started eventbuilder controller workers") + <-stopCh + log.Println("Shutting down eventbuilder controller workers") + return nil +} + +func (c *controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *controller) processNextWorkItem() bool { + obj, shutdown := c.queue.Get() + if shutdown { + return false + } + err := func(obj interface{}) error { + defer c.queue.Done(obj) + var key Info + var ok bool + if key, ok = obj.(Info); !ok { + c.queue.Forget(obj) + log.Printf("Expecting type info by got %v", obj) + return nil + } + // Run the syncHandler, passing the resource and the policy + if err := c.SyncHandler(key); err != nil { + c.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error()) + } + return nil + }(obj) + + if err != nil { + log.Println((err)) + } + return true +} + +func (c *controller) SyncHandler(key Info) error { + var resource runtime.Object + var err error + switch key.Kind { + case "Policy": + namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource)) + return err + } + resource, err = c.policyLister.Policies(namespace).Get(name) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource)) + return err + } + default: + resource, err = c.kubeClient.GetResource(key.Kind, key.Resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) + return err + } + } + c.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message) + return nil +} + +//NewEvent returns a new event +func NewEvent(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) Info { + msgText, err := getEventMsg(message, args) + if err != nil { + utilruntime.HandleError(err) + } + return Info{ + Kind: kind, + Resource: resource, + Reason: reason.String(), + Message: msgText, + } +} diff --git a/pkg/event/eventmsgbuilder.go b/pkg/event/eventmsgbuilder.go new file mode 100644 index 0000000000..1e06c3c5a4 --- /dev/null +++ b/pkg/event/eventmsgbuilder.go @@ -0,0 +1,32 @@ +package event + +import ( + "fmt" + "regexp" +) + +func (k MsgKey) String() string { + return [...]string{ + "Failed to satisfy policy on resource %s.The following rules %s failed to apply. Created Policy Violation", + "Failed to process rule %s of policy %s. Created Policy Violation %s", + "Policy applied successfully on the resource %s", + "Rule %s of Policy %s applied successfull", + "Failed to apply policy, blocked creation of resource %s. The following rules %s failed to apply", + "Failed to apply rule %s of policy %s Blocked update of the resource", + "Failed to apply policy on resource %s.Blocked update of the resource. The following rules %s failed to apply", + }[k] +} + +const argRegex = "%[s,d,v]" + +//GetEventMsg return the application message based on the message id and the arguments, +// if the number of arguments passed to the message are incorrect generate an error +func getEventMsg(key MsgKey, args ...interface{}) (string, error) { + // Verify the number of arguments + re := regexp.MustCompile(argRegex) + argsCount := len(re.FindAllString(key.String(), -1)) + if argsCount != len(args) { + return "", fmt.Errorf("message expects %d arguments, but %d arguments passed", argsCount, len(args)) + } + return fmt.Sprintf(key.String(), args...), nil +} diff --git a/pkg/event/eventmsgbuilder_test.go b/pkg/event/eventmsgbuilder_test.go new file mode 100644 index 0000000000..dcedd1e377 --- /dev/null +++ b/pkg/event/eventmsgbuilder_test.go @@ -0,0 +1,23 @@ +package event + +import ( + "fmt" + "testing" + + "gotest.tools/assert" +) + +func TestPositive(t *testing.T) { + resourceName := "test_resource" + expectedMsg := fmt.Sprintf("Policy applied successfully on the resource %s", resourceName) + msg, err := getEventMsg(SPolicyApply, resourceName) + assert.NilError(t, err) + assert.Equal(t, expectedMsg, msg) +} + +// passing incorrect args +func TestIncorrectArgs(t *testing.T) { + resourceName := "test_resource" + _, err := getEventMsg(SPolicyApply, resourceName, "extra_args") + assert.Error(t, err, "message expects 1 arguments, but 2 arguments passed") +} diff --git a/pkg/event/reason.go b/pkg/event/reason.go new file mode 100644 index 0000000000..ceac4cb0d9 --- /dev/null +++ b/pkg/event/reason.go @@ -0,0 +1,21 @@ +package event + +//Reason types of Event Reasons +type Reason int + +const ( + //PolicyViolation there is a violation of policy + PolicyViolation Reason = iota + //PolicyApplied policy applied + PolicyApplied + //RequestBlocked the request to create/update the resource was blocked( generated from admission-controller) + RequestBlocked +) + +func (r Reason) String() string { + return [...]string{ + "PolicyViolation", + "PolicyApplied", + "RequestBlocked", + }[r] +} diff --git a/pkg/event/util.go b/pkg/event/util.go new file mode 100644 index 0000000000..20727b8a26 --- /dev/null +++ b/pkg/event/util.go @@ -0,0 +1,28 @@ +package event + +const eventSource = "policy-controller" + +const eventWorkQueueName = "policy-controller-events" + +const eventWorkerThreadCount = 1 + +//Info defines the event details +type Info struct { + Kind string + Resource string + Reason string + Message string +} + +//MsgKey is an identified to determine the preset message formats +type MsgKey int + +const ( + FResourcePolcy MsgKey = iota + FProcessRule + SPolicyApply + SRuleApply + FPolicyApplyBlockCreate + FPolicyApplyBlockUpdate + FPolicyApplyBlockUpdateRule +) diff --git a/pkg/policyengine/mutation.go b/pkg/policyengine/mutation.go new file mode 100644 index 0000000000..85483bf53a --- /dev/null +++ b/pkg/policyengine/mutation.go @@ -0,0 +1,96 @@ +package policyengine + +import ( + "errors" + "fmt" + + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + "github.com/nirmata/kube-policy/pkg/policyengine/mutation" +) + +func (p *policyEngine) ProcessMutation(policy types.Policy, rawResource []byte) ([]mutation.PatchBytes, error) { + patchingSets := mutation.GetPolicyPatchingSets(policy) + var policyPatches []mutation.PatchBytes + + for ruleIdx, rule := range policy.Spec.Rules { + err := rule.Validate() + if err != nil { + p.logger.Printf("Invalid rule detected: #%s in policy %s, err: %v\n", rule.Name, policy.ObjectMeta.Name, err) + continue + } + + if ok, err := mutation.IsRuleApplicableToResource(rawResource, rule.Resource); !ok { + p.logger.Printf("Rule %d of policy %s is not applicable to the request", ruleIdx, policy.Name) + return nil, err + } + + err = p.applyRuleGenerators(rawResource, rule) + if err != nil && patchingSets == mutation.PatchingSetsStopOnError { + return nil, fmt.Errorf("Failed to apply generators from rule #%s: %v", rule.Name, err) + } + + rulePatchesProcessed, err := mutation.ProcessPatches(rule.Patches, rawResource, patchingSets) + if err != nil { + return nil, fmt.Errorf("Failed to process patches from rule #%s: %v", rule.Name, err) + } + + if rulePatchesProcessed != nil { + policyPatches = append(policyPatches, rulePatchesProcessed...) + p.logger.Printf("Rule %d: prepared %d patches", ruleIdx, len(rulePatchesProcessed)) + // TODO: add PolicyApplied events per rule for policy and resource + } else { + p.logger.Printf("Rule %d: no patches prepared", ruleIdx) + } + } + + // empty patch, return error to deny resource creation + if policyPatches == nil { + return nil, fmt.Errorf("no patches prepared") + } + + return policyPatches, nil +} + +// Applies "configMapGenerator" and "secretGenerator" described in PolicyRule +func (p *policyEngine) applyRuleGenerators(rawResource []byte, rule types.PolicyRule) error { + kind := mutation.ParseKindFromObject(rawResource) + + // configMapGenerator and secretGenerator can be applied only to namespaces + if kind == "Namespace" { + namespaceName := mutation.ParseNameFromObject(rawResource) + + err := p.applyConfigGenerator(rule.ConfigMapGenerator, namespaceName, "ConfigMap") + if err == nil { + err = p.applyConfigGenerator(rule.SecretGenerator, namespaceName, "Secret") + } + return err + } + return nil +} + +// Creates resourceKind (ConfigMap or Secret) with parameters specified in generator in cluster specified in request. +func (p *policyEngine) applyConfigGenerator(generator *types.PolicyConfigGenerator, namespace string, configKind string) error { + if generator == nil { + return nil + } + + err := generator.Validate() + if err != nil { + return errors.New(fmt.Sprintf("Generator for '%s' is invalid: %s", configKind, err)) + } + + switch configKind { + case "ConfigMap": + err = p.kubeClient.GenerateConfigMap(*generator, namespace) + case "Secret": + err = p.kubeClient.GenerateSecret(*generator, namespace) + default: + err = errors.New(fmt.Sprintf("Unsupported config Kind '%s'", configKind)) + } + + if err != nil { + return errors.New(fmt.Sprintf("Unable to apply generator for %s '%s/%s' : %s", configKind, namespace, generator.Name, err)) + } + + return nil +} diff --git a/pkg/policyengine/mutation/checkRules.go b/pkg/policyengine/mutation/checkRules.go new file mode 100644 index 0000000000..ecb7f7ff35 --- /dev/null +++ b/pkg/policyengine/mutation/checkRules.go @@ -0,0 +1,44 @@ +package mutation + +import ( + "github.com/minio/minio/pkg/wildcard" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// kind is the type of object being manipulated +// Checks requests kind, name and labels to fit the policy +func IsRuleApplicableToResource(resourceRaw []byte, policyResource types.PolicyResource) (bool, error) { + // kind := ParseKindFromObject(resourceRaw) + // if policyResource.Kind != kind { + // return false, nil + // } + + if resourceRaw != nil { + meta := ParseMetadataFromObject(resourceRaw) + name := ParseNameFromObject(resourceRaw) + + if policyResource.Name != nil { + + if !wildcard.Match(*policyResource.Name, name) { + return false, nil + } + } + + if policyResource.Selector != nil { + selector, err := metav1.LabelSelectorAsSelector(policyResource.Selector) + + if err != nil { + return false, err + } + + labelMap := ParseLabelsFromMetadata(meta) + + if !selector.Matches(labelMap) { + return false, nil + } + + } + } + return true, nil +} diff --git a/webhooks/patches.go b/pkg/policyengine/mutation/patches.go similarity index 85% rename from webhooks/patches.go rename to pkg/policyengine/mutation/patches.go index 8e7e09cb27..83f66863dc 100644 --- a/webhooks/patches.go +++ b/pkg/policyengine/mutation/patches.go @@ -1,4 +1,4 @@ -package webhooks +package mutation import ( "encoding/json" @@ -20,6 +20,15 @@ const ( type PatchBytes []byte +func GetPolicyPatchingSets(policy types.Policy) PatchingSets { + // failurePolicy property is the only available way for now to define behavior on patching error. + // TODO: define new failurePolicy values specific for patching and other policy features. + if policy.Spec.FailurePolicy != nil && *policy.Spec.FailurePolicy == "continueOnError" { + return PatchingSetsContinueAlways + } + return PatchingSetsDefault +} + // Test patches on given document according to given sets. // Returns array from separate patches that can be applied to the document // Returns error ONLY in case when creation of resource should be denied. @@ -27,7 +36,6 @@ func ProcessPatches(patches []types.PolicyPatch, originalDocument []byte, sets P if len(originalDocument) == 0 { return nil, errors.New("Source document for patching is empty") } - var appliedPatches []PatchBytes patchedDocument := originalDocument for _, patch := range patches { diff --git a/webhooks/patches_test.go b/pkg/policyengine/mutation/patches_test.go similarity index 75% rename from webhooks/patches_test.go rename to pkg/policyengine/mutation/patches_test.go index 0c1c29d6be..986b15594f 100644 --- a/webhooks/patches_test.go +++ b/pkg/policyengine/mutation/patches_test.go @@ -1,10 +1,9 @@ -package webhooks_test +package mutation import ( - "gotest.tools/assert" "testing" - "github.com/nirmata/kube-policy/webhooks" + "gotest.tools/assert" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" ) @@ -36,7 +35,7 @@ const endpointsDocument string = `{ func TestProcessPatches_EmptyPatches(t *testing.T) { var empty []types.PolicyPatch - patches, err := webhooks.ProcessPatches(empty, []byte(endpointsDocument), webhooks.PatchingSetsDefault) + patches, err := ProcessPatches(empty, []byte(endpointsDocument), PatchingSetsDefault) assert.NilError(t, err) assert.Assert(t, len(patches) == 0) } @@ -52,13 +51,13 @@ func makeAddIsMutatedLabelPatch() types.PolicyPatch { func TestProcessPatches_EmptyDocument(t *testing.T) { var patches []types.PolicyPatch patches = append(patches, makeAddIsMutatedLabelPatch()) - patchesBytes, err := webhooks.ProcessPatches(patches, nil, webhooks.PatchingSetsDefault) + patchesBytes, err := ProcessPatches(patches, nil, PatchingSetsDefault) assert.Assert(t, err != nil) assert.Assert(t, len(patchesBytes) == 0) } func TestProcessPatches_AllEmpty(t *testing.T) { - patchesBytes, err := webhooks.ProcessPatches(nil, nil, webhooks.PatchingSetsDefault) + patchesBytes, err := ProcessPatches(nil, nil, PatchingSetsDefault) assert.Assert(t, err != nil) assert.Assert(t, len(patchesBytes) == 0) } @@ -67,7 +66,7 @@ func TestProcessPatches_AddPathDoesntExist_StopOnError(t *testing.T) { patch := makeAddIsMutatedLabelPatch() patch.Path = "/metadata/additional/is-mutated" patches := []types.PolicyPatch{patch} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsStopOnError) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsStopOnError) assert.Assert(t, err != nil) assert.Assert(t, len(patchesBytes) == 0) } @@ -76,7 +75,7 @@ func TestProcessPatches_AddPathDoesntExist_ContinueOnError(t *testing.T) { patch := makeAddIsMutatedLabelPatch() patch.Path = "/metadata/additional/is-mutated" patches := []types.PolicyPatch{patch} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsContinueAlways) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsContinueAlways) assert.NilError(t, err) assert.Assert(t, len(patchesBytes) == 0) } @@ -84,7 +83,7 @@ func TestProcessPatches_AddPathDoesntExist_ContinueOnError(t *testing.T) { func TestProcessPatches_RemovePathDoesntExist_StopOnError(t *testing.T) { patch := types.PolicyPatch{Path: "/metadata/labels/is-mutated", Operation: "remove"} patches := []types.PolicyPatch{patch} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsStopOnError) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsStopOnError) assert.Assert(t, err != nil) assert.Assert(t, len(patchesBytes) == 0) } @@ -93,7 +92,7 @@ func TestProcessPatches_AddAndRemovePathsDontExist_ContinueOnError_EmptyResult(t patch1 := types.PolicyPatch{Path: "/metadata/labels/is-mutated", Operation: "remove"} patch2 := types.PolicyPatch{Path: "/spec/labels/label3", Operation: "add", Value: "label3Value"} patches := []types.PolicyPatch{patch1, patch2} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsContinueAlways) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsContinueAlways) assert.NilError(t, err) assert.Assert(t, len(patchesBytes) == 0) } @@ -103,7 +102,7 @@ func TestProcessPatches_AddAndRemovePathsDontExist_ContinueOnError_NotEmptyResul patch2 := types.PolicyPatch{Path: "/spec/labels/label2", Operation: "remove", Value: "label2Value"} patch3 := types.PolicyPatch{Path: "/metadata/labels/label3", Operation: "add", Value: "label3Value"} patches := []types.PolicyPatch{patch1, patch2, patch3} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsContinueAlways) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsContinueAlways) assert.NilError(t, err) assert.Assert(t, len(patchesBytes) == 1) assertEqStringAndData(t, `{"path":"/metadata/labels/label3","op":"add","value":"label3Value"}`, patchesBytes[0]) @@ -112,7 +111,7 @@ func TestProcessPatches_AddAndRemovePathsDontExist_ContinueOnError_NotEmptyResul func TestProcessPatches_RemovePathDoesntExist_IgnoreRemoveFailures_EmptyResult(t *testing.T) { patch := types.PolicyPatch{Path: "/metadata/labels/is-mutated", Operation: "remove"} patches := []types.PolicyPatch{patch} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsContinueOnRemoveFailure) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsContinueOnRemoveFailure) assert.NilError(t, err) assert.Assert(t, len(patchesBytes) == 0) } @@ -121,7 +120,7 @@ func TestProcessPatches_RemovePathDoesntExist_IgnoreRemoveFailures_NotEmptyResul patch1 := types.PolicyPatch{Path: "/metadata/labels/is-mutated", Operation: "remove"} patch2 := types.PolicyPatch{Path: "/metadata/labels/label2", Operation: "add", Value: "label2Value"} patches := []types.PolicyPatch{patch1, patch2} - patchesBytes, err := webhooks.ProcessPatches(patches, []byte(endpointsDocument), webhooks.PatchingSetsContinueOnRemoveFailure) + patchesBytes, err := ProcessPatches(patches, []byte(endpointsDocument), PatchingSetsContinueOnRemoveFailure) assert.NilError(t, err) assert.Assert(t, len(patchesBytes) == 1) assertEqStringAndData(t, `{"path":"/metadata/labels/label2","op":"add","value":"label2Value"}`, patchesBytes[0]) diff --git a/webhooks/utils.go b/pkg/policyengine/mutation/utils.go similarity index 77% rename from webhooks/utils.go rename to pkg/policyengine/mutation/utils.go index 0f163514ba..5abc2ee03c 100644 --- a/webhooks/utils.go +++ b/pkg/policyengine/mutation/utils.go @@ -1,4 +1,4 @@ -package webhooks +package mutation import ( "encoding/json" @@ -7,21 +7,21 @@ import ( "k8s.io/apimachinery/pkg/labels" ) -func parseMetadataFromObject(bytes []byte) map[string]interface{} { +func ParseMetadataFromObject(bytes []byte) map[string]interface{} { var objectJSON map[string]interface{} json.Unmarshal(bytes, &objectJSON) return objectJSON["metadata"].(map[string]interface{}) } -func parseKindFromObject(bytes []byte) string { +func ParseKindFromObject(bytes []byte) string { var objectJSON map[string]interface{} json.Unmarshal(bytes, &objectJSON) return objectJSON["kind"].(string) } -func parseLabelsFromMetadata(meta map[string]interface{}) labels.Set { +func ParseLabelsFromMetadata(meta map[string]interface{}) labels.Set { if interfaceMap, ok := meta["labels"].(map[string]interface{}); ok { labelMap := make(labels.Set, len(interfaceMap)) @@ -33,7 +33,7 @@ func parseLabelsFromMetadata(meta map[string]interface{}) labels.Set { return nil } -func parseNameFromObject(bytes []byte) string { +func ParseNameFromObject(bytes []byte) string { var objectJSON map[string]interface{} json.Unmarshal(bytes, &objectJSON) @@ -45,7 +45,7 @@ func parseNameFromObject(bytes []byte) string { return "" } -func parseNamespaceFromObject(bytes []byte) string { +func ParseNamespaceFromObject(bytes []byte) string { var objectJSON map[string]interface{} json.Unmarshal(bytes, &objectJSON) @@ -58,7 +58,7 @@ func parseNamespaceFromObject(bytes []byte) string { } // returns true if policyResourceName is a regexp -func parseRegexPolicyResourceName(policyResourceName string) (string, bool) { +func ParseRegexPolicyResourceName(policyResourceName string) (string, bool) { regex := strings.Split(policyResourceName, "regex:") if len(regex) == 1 { return regex[0], false diff --git a/webhooks/utils_test.go b/pkg/policyengine/mutation/utils_test.go similarity index 96% rename from webhooks/utils_test.go rename to pkg/policyengine/mutation/utils_test.go index ef000ecb08..f8473ae287 100644 --- a/webhooks/utils_test.go +++ b/pkg/policyengine/mutation/utils_test.go @@ -1,4 +1,4 @@ -package webhooks_test +package mutation import ( "testing" diff --git a/pkg/policyengine/policyengine.go b/pkg/policyengine/policyengine.go new file mode 100644 index 0000000000..67a9084a51 --- /dev/null +++ b/pkg/policyengine/policyengine.go @@ -0,0 +1,103 @@ +package policyengine + +import ( + "fmt" + "log" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" + "github.com/nirmata/kube-policy/pkg/policyengine/mutation" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" +) + +type PolicyEngine interface { + // ProcessMutation should be called from admission contoller + // when there is an creation / update of the resource + // ProcessMutation(policy types.Policy, rawResource []byte) (patchBytes []byte, events []Events, err error) + ProcessMutation(policy types.Policy, rawResource []byte) ([]mutation.PatchBytes, error) + + // ProcessValidation should be called from admission contoller + // when there is an creation / update of the resource + ProcessValidation(policy types.Policy, rawResource []byte) + + // ProcessExisting should be called from policy controller + // when there is an create / update of the policy + // we should process the policy on matched resources, generate violations accordingly + ProcessExisting(policy types.Policy, rawResource []byte) ([]policyviolation.Info, []event.Info, error) +} + +type policyEngine struct { + kubeClient *kubeClient.KubeClient + logger *log.Logger +} + +func NewPolicyEngine(kubeClient *kubeClient.KubeClient, logger *log.Logger) PolicyEngine { + return &policyEngine{ + kubeClient: kubeClient, + logger: logger, + } +} + +func (p *policyEngine) ProcessExisting(policy types.Policy, rawResource []byte) ([]policyviolation.Info, []event.Info, error) { + var violations []policyviolation.Info + var events []event.Info + + patchingSets := mutation.GetPolicyPatchingSets(policy) + + for _, rule := range policy.Spec.Rules { + err := rule.Validate() + if err != nil { + p.logger.Printf("Invalid rule detected: #%s in policy %s, err: %v\n", rule.Name, policy.ObjectMeta.Name, err) + continue + } + + if ok, err := mutation.IsRuleApplicableToResource(rawResource, rule.Resource); !ok { + p.logger.Printf("Rule %s of policy %s is not applicable to the request", rule.Name, policy.Name) + return nil, nil, err + } + + violation, eventInfos, err := p.processRuleOnResource(policy.Name, rule, rawResource, patchingSets) + if err != nil { + p.logger.Printf("Failed to process rule %s, err: %v\n", rule.Name, err) + continue + } + // } else { + // policyPatches = append(policyPatches, processedPatches...) + // } + violations = append(violations, violation) + events = append(events, eventInfos...) + } + return violations, events, nil +} + +func (p *policyEngine) processRuleOnResource(policyName string, rule types.PolicyRule, rawResource []byte, patchingSets mutation.PatchingSets) ( + policyviolation.Info, []event.Info, error) { + + var violationInfo policyviolation.Info + var eventInfos []event.Info + + resourceKind := mutation.ParseKindFromObject(rawResource) + resourceName := mutation.ParseNameFromObject(rawResource) + resourceNamespace := mutation.ParseNamespaceFromObject(rawResource) + + rulePatchesProcessed, err := mutation.ProcessPatches(rule.Patches, nil, patchingSets) + if err != nil { + return violationInfo, eventInfos, fmt.Errorf("Failed to process patches from rule %s: %v", rule.Name, err) + } + + if rulePatchesProcessed != nil { + log.Printf("Rule %s: prepared %d patches", rule.Name, len(rulePatchesProcessed)) + + violationInfo = policyviolation.NewViolation(policyName, resourceKind, resourceNamespace+"/"+resourceName, rule.Name) + // add a violation to queue + + // add an event to policy + //TODO: event msg + eventInfos = append(eventInfos, event.NewEvent("Policy", policyName, event.PolicyViolation, event.FResourcePolcy)) + // add an event to resource + eventInfos = append(eventInfos, event.NewEvent(resourceKind, resourceNamespace+"/"+resourceName, event.PolicyViolation, event.FResourcePolcy)) + } + + return violationInfo, eventInfos, nil +} diff --git a/pkg/policyengine/validation.go b/pkg/policyengine/validation.go new file mode 100644 index 0000000000..282a4496a4 --- /dev/null +++ b/pkg/policyengine/validation.go @@ -0,0 +1,5 @@ +package policyengine + +import types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + +func (p *policyEngine) ProcessValidation(policy types.Policy, rawResource []byte) {} diff --git a/pkg/policyviolation/builder.go b/pkg/policyviolation/builder.go new file mode 100644 index 0000000000..ff43c5dfd0 --- /dev/null +++ b/pkg/policyviolation/builder.go @@ -0,0 +1,115 @@ +package policyviolation + +import ( + "fmt" + "log" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" +) + +//Generator to generate policy violation +type Generator interface { + Add(info Info) error +} + +type builder struct { + kubeClient *kubeClient.KubeClient + policyLister policylister.PolicyLister + policyInterface policyclientset.Interface + eventBuilder event.Generator + logger *log.Logger +} + +//Builder is to build policy violations +type Builder interface { + Generator + processViolation(info Info) error + isActive(kind string, resource string) (bool, error) +} + +//NewPolicyViolationBuilder returns new violation builder +func NewPolicyViolationBuilder( + kubeClient *kubeClient.KubeClient, + policyLister policylister.PolicyLister, + policyInterface policyclientset.Interface, + eventController event.Generator, + logger *log.Logger) Builder { + + builder := &builder{ + kubeClient: kubeClient, + policyLister: policyLister, + policyInterface: policyInterface, + eventBuilder: eventController, + logger: logger, + } + return builder +} + +func (b *builder) Add(info Info) error { + return b.processViolation(info) +} + +func (b *builder) processViolation(info Info) error { + // Get the policy + namespace, name, err := cache.SplitMetaNamespaceKey(info.Policy) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", info.Policy)) + return err + } + policy, err := b.policyLister.Policies(namespace).Get(name) + if err != nil { + utilruntime.HandleError(err) + return err + } + modifiedPolicy := policy.DeepCopy() + modifiedViolations := []types.Violation{} + + // Create new violation + newViolation := info.Violation + + for _, violation := range modifiedPolicy.Status.Violations { + ok, err := b.isActive(info.Kind, violation.Resource) + if err != nil { + utilruntime.HandleError(err) + continue + } + if !ok { + b.logger.Printf("removed violation") + } + } + // If violation already exists for this rule, we update the violation + //TODO: update violation, instead of re-creating one every time + modifiedViolations = append(modifiedViolations, newViolation) + + modifiedPolicy.Status.Violations = modifiedViolations + // Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object + _, err = b.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy) + if err != nil { + return err + } + return nil +} + +func (b *builder) isActive(kind string, resource string) (bool, error) { + // Generate Merge Patch + _, err := b.kubeClient.GetResource(kind, resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource)) + return false, err + } + return true, nil +} + +//NewViolation return new policy violation +func NewViolation(policyName string, kind string, resource string, rule string) Info { + return Info{Policy: policyName, + Violation: types.Violation{ + Kind: kind, Resource: resource, Rule: rule}, + } +} diff --git a/pkg/policyviolation/util.go b/pkg/policyviolation/util.go new file mode 100644 index 0000000000..7fa1ebd333 --- /dev/null +++ b/pkg/policyviolation/util.go @@ -0,0 +1,18 @@ +package policyviolation + +import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + +// Source for the events recorder +const violationEventSource = "policy-controller" + +// Name for the workqueue to store the events +const workqueueViolationName = "Policy-Violations" + +// Event Reason +const violationEventResrouce = "Violation" + +//ViolationInfo describes the policyviolation details +type Info struct { + Policy string + policytype.Violation +} diff --git a/pkg/resourceClient/client.go b/pkg/resourceClient/client.go deleted file mode 100644 index 23a7af0e54..0000000000 --- a/pkg/resourceClient/client.go +++ /dev/null @@ -1,159 +0,0 @@ -package resourceClient - -import ( - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" -) - -func GetResouce(clientSet *kubernetes.Clientset, kind string, resourceNamespace string, resourceName string) (runtime.Object, error) { - switch kind { - case "Deployment": - { - obj, err := clientSet.AppsV1().Deployments(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Pods": - { - obj, err := clientSet.CoreV1().Pods(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "ConfigMap": - { - obj, err := clientSet.CoreV1().ConfigMaps(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "CronJob": - { - obj, err := clientSet.BatchV1beta1().CronJobs(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Endpoints": - { - obj, err := clientSet.CoreV1().Endpoints(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "HorizontalPodAutoscaler": - { - obj, err := clientSet.AutoscalingV1().HorizontalPodAutoscalers(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Ingress": - { - obj, err := clientSet.ExtensionsV1beta1().Ingresses(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Job": - { - obj, err := clientSet.BatchV1().Jobs(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "LimitRange": - { - obj, err := clientSet.CoreV1().LimitRanges(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Namespace": - { - obj, err := clientSet.CoreV1().Namespaces().Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "NetworkPolicy": - { - obj, err := clientSet.NetworkingV1().NetworkPolicies(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "PersistentVolumeClaim": - { - obj, err := clientSet.CoreV1().PersistentVolumeClaims(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "PodDisruptionBudget": - { - obj, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "PodTemplate": - { - obj, err := clientSet.CoreV1().PodTemplates(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "ResourceQuota": - { - obj, err := clientSet.CoreV1().ResourceQuotas(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Secret": - { - obj, err := clientSet.CoreV1().Secrets(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "Service": - { - obj, err := clientSet.CoreV1().Services(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - case "StatefulSet": - { - obj, err := clientSet.AppsV1().StatefulSets(resourceNamespace).Get(resourceName, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - return obj, nil - } - - default: - return nil, nil - } -} diff --git a/pkg/violation/util.go b/pkg/violation/util.go deleted file mode 100644 index c5c3bcbe2b..0000000000 --- a/pkg/violation/util.go +++ /dev/null @@ -1,56 +0,0 @@ -package violation - -// Mode to identify the CRUD event when the violation was identified -type Mode string - -const ( - // Create resource - Create Mode = "create" - // Update resource - Update Mode = "update" - // Delete resource - Delete Mode = "delete" -) - -// ResourceMode to identify the source of violatino check -type ResourceMode string - -const ( - // Resource type is kubernetes resource - Resource ResourceMode = "resource" - // Policy type is policy custom resource - Policy ResourceMode = "policy" -) - -type Target int - -const ( - ResourceTarget Target = 1 - PolicyTarget Target = 2 -) - -// Source for the events recorder -const violationEventSource = "policy-controller" - -// Name for the workqueue to store the events -const workqueueViolationName = "Policy-Violations" - -// Event Reason -const violationEventResrouce = "Violation" - -type EventInfo struct { - Resource string - Kind string - Reason string - Source string - ResourceTarget Target -} - -// Info input details -type Info struct { - Kind string - Resource string - Policy string - RuleName string - Reason string -} diff --git a/pkg/violation/violation.go b/pkg/violation/violation.go deleted file mode 100644 index f4c1c1a21d..0000000000 --- a/pkg/violation/violation.go +++ /dev/null @@ -1,284 +0,0 @@ -package violation - -import ( - "fmt" - "log" - "time" - - types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - clientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" - policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" - informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" - lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" - resourceClient "github.com/nirmata/kube-policy/pkg/resourceClient" - v1 "k8s.io/api/core/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcc1orev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" -) - -type Violations []Violation - -type Violation struct { -} - -// Builder to generate violations -type Builder struct { - kubeClient *kubernetes.Clientset - policyClientset *clientset.Clientset - workqueue workqueue.RateLimitingInterface - logger *log.Logger - recorder record.EventRecorder - policyLister lister.PolicyLister - policySynced cache.InformerSynced -} - -func NewViolationHelper(kubeClient *kubernetes.Clientset, policyClientSet *clientset.Clientset, logger *log.Logger, policyInformer informers.PolicyInformer) (*Builder, error) { - - // Initialize Event Broadcaster - policyscheme.AddToScheme(scheme.Scheme) - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(log.Printf) - eventBroadcaster.StartRecordingToSink( - &typedcc1orev1.EventSinkImpl{ - Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder( - scheme.Scheme, - v1.EventSource{Component: violationEventSource}) - // Build the builder - builder := &Builder{ - kubeClient: kubeClient, - policyClientset: policyClientSet, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workqueueViolationName), - logger: logger, - recorder: recorder, - policyLister: policyInformer.Lister(), - policySynced: policyInformer.Informer().HasSynced, - } - return builder, nil -} - -// Create Violation -> (Info) - -// Create to generate violation jsonpatch script & -// queue events to generate events -// TODO: create should validate the rule number and update the violation if one exists -func (b *Builder) Create(info Info) error { - // generate patch - // we can generate the patch as the policy resource will alwasy exist - // Apply Patch - err := b.patchViolation(info) - if err != nil { - return err - } - - // Generate event for policy - b.workqueue.Add( - EventInfo{ - Resource: info.Policy, - Reason: info.Reason, - ResourceTarget: PolicyTarget, - }) - // Generat event for resource - b.workqueue.Add( - EventInfo{ - Kind: info.Kind, - Resource: info.Resource, - Reason: info.Reason, - ResourceTarget: ResourceTarget, - }) - - return nil -} - -// Remove the violation -func (b *Builder) Remove(info Info) ([]byte, error) { - b.workqueue.Add(info) - return nil, nil -} - -func (b *Builder) patchViolation(info Info) error { - // policy-controller handlers are post events - // adm-ctr will always have policy resource created - // Get Policy namespace and name - policyNamespace, policyName, err := cache.SplitMetaNamespaceKey(info.Policy) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", info.Policy)) - return err - } - // Try to access the policy - // Try to access the resource - // if the above resource objects have not been created then we reque the request to create the event - policy, err := b.policyLister.Policies(policyNamespace).Get(policyName) - if err != nil { - utilruntime.HandleError(err) - return err - } - // Add violation - updatedPolicy := policy.DeepCopy() - // var update bool - // inactiveViolationindex := []int{} - updatedViolations := []types.Violation{} - // Check if the violation with the same rule exists for the same resource and rule name - for _, violation := range updatedPolicy.Status.Violations { - - if ok, err := b.IsActive(violation); ok { - if err != nil { - fmt.Println(err) - } - updatedViolations = append(updatedViolations, violation) - } else { - fmt.Println("Remove violation") - b.workqueue.Add( - EventInfo{ - Resource: info.Policy, - Reason: "Removing violation for rule " + info.RuleName, - ResourceTarget: PolicyTarget, - }) - } - } - // Rule is updated TO-DO - // Dont validate if the resouce is active as a new Violation will not be created if it did not - updatedViolations = append(updatedViolations, - types.Violation{ - Kind: info.Kind, - Resource: info.Resource, - Rule: info.RuleName, - Reason: info.Reason, - }) - updatedPolicy.Status.Violations = updatedViolations - // Patch - return b.patch(policy, updatedPolicy) -} - -func (b *Builder) getPolicyEvent(info Info) EventInfo { - return EventInfo{Resource: info.Resource} -} - -func (b *Builder) IsActive(violation types.Violation) (bool, error) { - if ok, err := b.ValidationResourceActive(violation); !ok { - return false, err - } - return true, nil -} - -func (b *Builder) ValidationResourceActive(violation types.Violation) (bool, error) { - resourceNamespace, resourceName, err := cache.SplitMetaNamespaceKey(violation.Resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", violation.Resource)) - // Remove the corresponding violation - return false, err - } - - // Check if the corresponding resource is still present - _, err = resourceClient.GetResouce(b.kubeClient, violation.Kind, resourceNamespace, resourceName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", violation.Resource)) - return false, err - } - - return true, nil -} - -func (b *Builder) patch(policy *types.Policy, updatedPolicy *types.Policy) error { - _, err := b.policyClientset.Nirmata().Policies(updatedPolicy.Namespace).UpdateStatus(updatedPolicy) - if err != nil { - return err - } - return nil -} - -// Run : Initialize the worker routines to process the event creation -func (b *Builder) Run(threadiness int, stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - defer b.workqueue.ShutDown() - log.Println("Starting violation builder") - - fmt.Println(("Wait for informer cache to sync")) - if ok := cache.WaitForCacheSync(stopCh, b.policySynced); !ok { - fmt.Println("Unable to sync the cache") - } - - log.Println("Starting workers") - for i := 0; i < threadiness; i++ { - go wait.Until(b.runWorker, time.Second, stopCh) - } - log.Println("Started workers") - <-stopCh - log.Println("Shutting down workers") - return nil -} - -func (b *Builder) runWorker() { - for b.processNextWorkItem() { - } -} - -func (b *Builder) processNextWorkItem() bool { - // get info object - obj, shutdown := b.workqueue.Get() - if shutdown { - return false - } - err := func(obj interface{}) error { - defer b.workqueue.Done(obj) - var key EventInfo - var ok bool - if key, ok = obj.(EventInfo); !ok { - b.workqueue.Forget(obj) - log.Printf("Expecting type info but got %v", obj) - return nil - } - - // Run the syncHandler, passing the resource and the policy - if err := b.syncHandler(key); err != nil { - b.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error()) - } - - return nil - }(obj) - - if err != nil { - log.Println((err)) - } - return true - -} - -// TO-DO: how to handle events if the resource has been delted, and clean the dirty object -func (b *Builder) syncHandler(key EventInfo) error { - fmt.Println(key) - // Get Policy namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid policy key: %s", key.Resource)) - return nil - } - if key.ResourceTarget == ResourceTarget { - // Resource Event - resource, err := resourceClient.GetResouce(b.kubeClient, key.Kind, namespace, name) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) - return err - } - b.recorder.Event(resource, v1.EventTypeNormal, violationEventResrouce, key.Reason) - } else { - // Policy Event - policy, err := b.policyLister.Policies(namespace).Get(name) - if err != nil { - // TO-DO: this scenario will not exist as the policy will always exist - // unless the namespace and resource name are invalid - utilruntime.HandleError(err) - return err - } - b.recorder.Event(policy, v1.EventTypeNormal, violationEventResrouce, key.Reason) - } - - return nil -} diff --git a/policycontroller/policycontroller.go b/policycontroller/policycontroller.go new file mode 100644 index 0000000000..e4df2c7d08 --- /dev/null +++ b/policycontroller/policycontroller.go @@ -0,0 +1,191 @@ +package policycontroller + +import ( + "fmt" + "log" + "time" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" + lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" + policyengine "github.com/nirmata/kube-policy/pkg/policyengine" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +//PolicyController to manage Policy CRD +type PolicyController struct { + kubeClient *kubeClient.KubeClient + policyLister lister.PolicyLister + policyInterface policyclientset.Interface + policySynced cache.InformerSynced + policyEngine policyengine.PolicyEngine + violationBuilder policyviolation.Generator + eventBuilder event.Generator + logger *log.Logger + queue workqueue.RateLimitingInterface +} + +// NewPolicyController from cmd args +func NewPolicyController(policyInterface policyclientset.Interface, + policyInformer infomertypes.PolicyInformer, + policyEngine policyengine.PolicyEngine, + violationBuilder policyviolation.Generator, + eventController event.Generator, + logger *log.Logger, + kubeClient *kubeClient.KubeClient) *PolicyController { + + controller := &PolicyController{ + kubeClient: kubeClient, + policyLister: policyInformer.Lister(), + policyInterface: policyInterface, + policySynced: policyInformer.Informer().HasSynced, + policyEngine: policyEngine, + violationBuilder: violationBuilder, + eventBuilder: eventController, + logger: logger, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName), + } + + policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.createPolicyHandler, + UpdateFunc: controller.updatePolicyHandler, + DeleteFunc: controller.deletePolicyHandler, + }) + return controller +} + +func (pc *PolicyController) createPolicyHandler(resource interface{}) { + pc.enqueuePolicy(resource) +} + +func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) { + newPolicy := newResource.(*types.Policy) + oldPolicy := oldResource.(*types.Policy) + if newPolicy.ResourceVersion == oldPolicy.ResourceVersion { + return + } + pc.enqueuePolicy(newResource) +} + +func (pc *PolicyController) deletePolicyHandler(resource interface{}) { + var object metav1.Object + var ok bool + if object, ok = resource.(metav1.Object); !ok { + utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + pc.logger.Printf("policy deleted: %s", object.GetName()) +} + +func (pc *PolicyController) enqueuePolicy(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + pc.queue.Add(key) +} + +// Run is main controller thread +func (pc *PolicyController) Run(stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer pc.queue.ShutDown() + + pc.logger.Printf("starting policy controller") + + pc.logger.Printf("waiting for infomer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, pc.policySynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + pc.logger.Println("starting policy controller workers") + for i := 0; i < policyControllerWorkerCount; i++ { + go wait.Until(pc.runWorker, time.Second, stopCh) + } + + pc.logger.Println("started policy controller workers") + <-stopCh + pc.logger.Println("shutting down policy controller workers") + return nil +} + +func (pc *PolicyController) runWorker() { + for pc.processNextWorkItem() { + } +} + +func (pc *PolicyController) processNextWorkItem() bool { + obj, shutdown := pc.queue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer pc.queue.Done(obj) + err := pc.syncHandler(obj) + pc.handleErr(err, obj) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (pc *PolicyController) handleErr(err error, key interface{}) { + if err == nil { + pc.queue.Forget(key) + return + } + // This controller retries if something goes wrong. After that, it stops trying. + if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit { + pc.logger.Printf("Error syncing events %v: %v", key, err) + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + pc.queue.AddRateLimited(key) + return + } + pc.queue.Forget(key) + utilruntime.HandleError(err) + pc.logger.Printf("Dropping the key %q out of the queue: %v", key, err) +} + +func (pc *PolicyController) syncHandler(obj interface{}) error { + var key string + var ok bool + if key, ok = obj.(string); !ok { + return fmt.Errorf("expected string in workqueue but got %#v", obj) + } + // convert the namespace/name string into distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid policy key: %s", key)) + return nil + } + + // Get Policy resource with namespace/name + policy, err := pc.policyLister.Policies(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("policy '%s' in work queue no longer exists", key)) + return nil + } + return err + } + // process policy on existing resource + // get the violations and pass to violation Builder + // get the events and pass to event Builder + fmt.Println(policy) + return nil +} diff --git a/controller/controller_test.go b/policycontroller/policycontroller_test.go similarity index 99% rename from controller/controller_test.go rename to policycontroller/policycontroller_test.go index d7eec6df8c..b4f513bc87 100644 --- a/controller/controller_test.go +++ b/policycontroller/policycontroller_test.go @@ -1,9 +1,10 @@ -package controller_test +package policycontroller import ( - "gotest.tools/assert" "testing" + "gotest.tools/assert" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) diff --git a/policycontroller/processPolicy.go b/policycontroller/processPolicy.go new file mode 100644 index 0000000000..de4950bbea --- /dev/null +++ b/policycontroller/processPolicy.go @@ -0,0 +1,122 @@ +package policycontroller + +import ( + "encoding/json" + "fmt" + + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" + "github.com/nirmata/kube-policy/pkg/policyengine/mutation" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" +) + +func (pc *PolicyController) runForPolicy(key string) { + + policy, err := pc.getPolicyByKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s, err: %v", key, err)) + return + } + + if policy == nil { + pc.logger.Printf("Counld not find policy by key %s", key) + return + } + + violations, events, err := pc.processPolicy(*policy) + if err != nil { + // add Error processing policy event + } + + pc.logger.Printf("%v, %v", violations, events) + // TODO: + // create violations + // pc.violationBuilder.Add() + // create events + // pc.eventBuilder.Add() + +} + +// processPolicy process the policy to all the matched resources +func (pc *PolicyController) processPolicy(policy types.Policy) ( + violations []policyviolation.Info, events []event.Info, err error) { + + for _, rule := range policy.Spec.Rules { + resources, err := pc.filterResourceByRule(rule) + if err != nil { + pc.logger.Printf("Failed to filter resources by rule %s, err: %v\n", rule.Name, err) + } + + for _, resource := range resources { + rawResource, err := json.Marshal(resource) + if err != nil { + pc.logger.Printf("Failed to marshal resources map to rule %s, err: %v\n", rule.Name, err) + continue + } + + violation, eventInfos, err := pc.policyEngine.ProcessExisting(policy, rawResource) + if err != nil { + pc.logger.Printf("Failed to process rule %s, err: %v\n", rule.Name, err) + continue + } + + violations = append(violations, violation...) + events = append(events, eventInfos...) + } + } + return violations, events, nil +} + +func (pc *PolicyController) filterResourceByRule(rule types.PolicyRule) ([]runtime.Object, error) { + var targetResources []runtime.Object + // TODO: make this namespace all + var namespace = "default" + if err := rule.Validate(); err != nil { + return nil, fmt.Errorf("invalid rule detected: %s, err: %v", rule.Name, err) + } + + // Get the resource list from kind + resources, err := pc.kubeClient.ListResource(rule.Resource.Kind, namespace) + if err != nil { + return nil, err + } + + for _, resource := range resources { + // TODO: + rawResource, err := json.Marshal(resource) + // objKind := resource.GetObjectKind() + // codecFactory := serializer.NewCodecFactory(runtime.NewScheme()) + // codecFactory.EncoderForVersion() + + if err != nil { + pc.logger.Printf("failed to marshal object %v", resource) + continue + } + + // filter the resource by name and label + if ok, _ := mutation.IsRuleApplicableToResource(rawResource, rule.Resource); ok { + targetResources = append(targetResources, resource) + } + } + return targetResources, nil +} + +func (pc *PolicyController) getPolicyByKey(key string) (*types.Policy, error) { + // Create nil Selector to grab all the policies + selector := labels.NewSelector() + cachedPolicies, err := pc.policyLister.List(selector) + if err != nil { + return nil, err + } + + for _, elem := range cachedPolicies { + if elem.Name == key { + return elem, nil + } + } + + return nil, nil +} diff --git a/policycontroller/utils.go b/policycontroller/utils.go new file mode 100644 index 0000000000..22f11696a1 --- /dev/null +++ b/policycontroller/utils.go @@ -0,0 +1,7 @@ +package policycontroller + +const policyWorkQueueName = "policyworkqueue" + +const policyWorkQueueRetryLimit = 5 + +const policyControllerWorkerCount = 2 diff --git a/scripts/deploy-controller.sh b/scripts/deploy-controller.sh index a9c4a31581..ff9bd3f0d5 100755 --- a/scripts/deploy-controller.sh +++ b/scripts/deploy-controller.sh @@ -34,11 +34,11 @@ if [ -z "${namespace}" ]; then # controller should be launched locally ${certsGenerator} "--service=${service_name}" "--serverIp=${serverIp}" || exit 2 echo "Applying webhook..." - kubectl delete -f crd/MutatingWebhookConfiguration_local.yaml - kubectl create -f crd/MutatingWebhookConfiguration_local.yaml || exit 3 + kubectl delete -f definitions/MutatingWebhookConfiguration_debug.yaml + kubectl create -f definitions/MutatingWebhookConfiguration_debug.yaml || exit 3 - kubectl delete -f crd/crd.yaml - kubectl create -f crd/crd.yaml || exit 3 + kubectl delete -f definitions/install.yaml + kubectl create -f definitions/install.yaml || exit 3 echo -e "\n### You can build and run kube-policy project locally.\n### To check its work, run it with parameters -cert, -key and -kubeconfig parameters (see paths of -cert and -key in the log above)." diff --git a/scripts/update-codegen.sh b/scripts/update-codegen.sh index 331be38ff7..ed6e2212ff 100755 --- a/scripts/update-codegen.sh +++ b/scripts/update-codegen.sh @@ -5,7 +5,7 @@ set -o pipefail # get nirmata root NIRMATA_DIR=$(dirname ${BASH_SOURCE})/.. -NIRMATA_ROOT=$(readlink -f ${NIRMATA_DIR}) +NIRMATA_ROOT=$(greadlink -f ${NIRMATA_DIR}) # get relative path to code generation script CODEGEN_PKG=${NIRMATA_DIR}/vendor/k8s.io/code-generator diff --git a/webhooks/admission.go b/webhooks/admission.go index 9dcca827ef..ba2913588f 100644 --- a/webhooks/admission.go +++ b/webhooks/admission.go @@ -1,35 +1,14 @@ package webhooks import ( - "github.com/minio/minio/pkg/wildcard" + kubeclient "github.com/nirmata/kube-policy/kubeclient" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + mutation "github.com/nirmata/kube-policy/pkg/policyengine/mutation" "k8s.io/api/admission/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var supportedKinds = [...]string{ - "ConfigMap", - "CronJob", - "DaemonSet", - "Deployment", - "Endpoints", - "HorizontalPodAutoscaler", - "Ingress", - "Job", - "LimitRange", - "Namespace", - "NetworkPolicy", - "PersistentVolumeClaim", - "PodDisruptionBudget", - "PodTemplate", - "ResourceQuota", - "Secret", - "Service", - "StatefulSet", -} - func kindIsSupported(kind string) bool { - for _, k := range supportedKinds { + for _, k := range kubeclient.GetSupportedKinds() { if k == kind { return true } @@ -45,41 +24,5 @@ func AdmissionIsRequired(request *v1beta1.AdmissionRequest) bool { // Checks requests kind, name and labels to fit the policy func IsRuleApplicableToRequest(policyResource types.PolicyResource, request *v1beta1.AdmissionRequest) (bool, error) { - return IsRuleApplicableToResource(request.Kind.Kind, request.Object.Raw, policyResource) -} - -// kind is the type of object being manipulated -// Checks requests kind, name and labels to fit the policy -func IsRuleApplicableToResource(kind string, resourceRaw []byte, policyResource types.PolicyResource) (bool, error) { - if policyResource.Kind != kind { - return false, nil - } - - if resourceRaw != nil { - meta := parseMetadataFromObject(resourceRaw) - name := parseNameFromObject(resourceRaw) - - if policyResource.Name != nil { - - if !wildcard.Match(*policyResource.Name, name) { - return false, nil - } - } - - if policyResource.Selector != nil { - selector, err := metav1.LabelSelectorAsSelector(policyResource.Selector) - - if err != nil { - return false, err - } - - labelMap := parseLabelsFromMetadata(meta) - - if !selector.Matches(labelMap) { - return false, nil - } - - } - } - return true, nil + return mutation.IsRuleApplicableToResource(request.Object.Raw, policyResource) } diff --git a/webhooks/mutation.go b/webhooks/mutation.go index cb12602bcc..1b74c35ae4 100644 --- a/webhooks/mutation.go +++ b/webhooks/mutation.go @@ -5,27 +5,44 @@ import ( "fmt" "log" "os" + "sort" - controller "github.com/nirmata/kube-policy/controller" kubeclient "github.com/nirmata/kube-policy/kubeclient" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" + policyengine "github.com/nirmata/kube-policy/pkg/policyengine" + mutation "github.com/nirmata/kube-policy/pkg/policyengine/mutation" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" v1beta1 "k8s.io/api/admission/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" rest "k8s.io/client-go/rest" ) // MutationWebhook is a data type that represents // business logic for resource mutation type MutationWebhook struct { - kubeclient *kubeclient.KubeClient - controller *controller.PolicyController - registration *MutationWebhookRegistration - logger *log.Logger + kubeclient *kubeclient.KubeClient + policyEngine policyengine.PolicyEngine + policyLister policylister.PolicyLister + registration *MutationWebhookRegistration + violationBuilder policyviolation.Generator + eventBuilder event.Generator + logger *log.Logger } // Registers mutation webhook in cluster and creates object for this webhook -func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.KubeClient, controller *controller.PolicyController, logger *log.Logger) (*MutationWebhook, error) { - if clientConfig == nil || kubeclient == nil || controller == nil { +func CreateMutationWebhook( + clientConfig *rest.Config, + kubeclient *kubeclient.KubeClient, + policyLister policylister.PolicyLister, + violationBuilder policyviolation.Generator, + eventController event.Generator, + logger *log.Logger) (*MutationWebhook, error) { + if clientConfig == nil || kubeclient == nil { return nil, errors.New("Some parameters are not set") } @@ -42,31 +59,60 @@ func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.Kub if logger == nil { logger = log.New(os.Stdout, "Mutation WebHook: ", log.LstdFlags|log.Lshortfile) } + policyengine := policyengine.NewPolicyEngine(kubeclient, logger) + return &MutationWebhook{ - kubeclient: kubeclient, - controller: controller, - registration: registration, - logger: logger, + kubeclient: kubeclient, + policyEngine: policyengine, + policyLister: policyLister, + registration: registration, + violationBuilder: violationBuilder, + eventBuilder: eventController, + logger: logger, }, nil } +func (mw *MutationWebhook) getPolicies() ([]types.Policy, error) { + selector := labels.NewSelector() + cachedPolicies, err := mw.policyLister.List(selector) + if err != nil { + mw.logger.Printf("Error: %v", err) + return nil, err + } + + var policies []types.Policy + for _, elem := range cachedPolicies { + policies = append(policies, *elem.DeepCopy()) + } + + sort.Slice(policies, func(i, j int) bool { + return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time) + }) + return policies, nil + +} + // Mutate applies admission to request func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse { mw.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo) - policies := mw.controller.GetPolicies() + policies, err := mw.getPolicies() + if err != nil { + utilruntime.HandleError(err) + return nil + } if len(policies) == 0 { return nil } - var allPatches []PatchBytes + var allPatches []mutation.PatchBytes for _, policy := range policies { mw.logger.Printf("Applying policy %s with %d rules", policy.ObjectMeta.Name, len(policy.Spec.Rules)) policyPatches, err := mw.applyPolicyRules(request, policy) if err != nil { - mw.controller.LogPolicyError(policy.Name, err.Error()) + //TODO Log Policy Error errStr := fmt.Sprintf("Unable to apply policy %s: %v", policy.Name, err) mw.logger.Printf("Denying the request because of error: %s", errStr) @@ -74,9 +120,9 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad } if len(policyPatches) > 0 { - namespace := parseNamespaceFromObject(request.Object.Raw) - name := parseNameFromObject(request.Object.Raw) - mw.controller.LogPolicyInfo(policy.Name, fmt.Sprintf("Applied to %s %s/%s", request.Kind.Kind, namespace, name)) + namespace := mutation.ParseNamespaceFromObject(request.Object.Raw) + name := mutation.ParseNameFromObject(request.Object.Raw) + //TODO Log Policy Info mw.logger.Printf("%s applied to %s %s/%s", policy.Name, request.Kind.Kind, namespace, name) allPatches = append(allPatches, policyPatches...) @@ -86,31 +132,22 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad patchType := v1beta1.PatchTypeJSONPatch return &v1beta1.AdmissionResponse{ Allowed: true, - Patch: JoinPatches(allPatches), + Patch: mutation.JoinPatches(allPatches), PatchType: &patchType, } } -func getPolicyPatchingSets(policy types.Policy) PatchingSets { - // failurePolicy property is the only available way for now to define behavior on patching error. - // TODO: define new failurePolicy values specific for patching and other policy features. - if policy.Spec.FailurePolicy != nil && *policy.Spec.FailurePolicy == "continueOnError" { - return PatchingSetsContinueAlways - } - return PatchingSetsDefault -} - // Applies all policy rules to the created object and returns list of processed JSON patches. // May return nil patches if it is not necessary to create patches for requested object. // Returns error ONLY in case when creation of resource should be denied. -func (mw *MutationWebhook) applyPolicyRules(request *v1beta1.AdmissionRequest, policy types.Policy) ([]PatchBytes, error) { - return mw.applyPolicyRulesOnResource(request.Kind.Kind, request.Object.Raw, policy) +func (mw *MutationWebhook) applyPolicyRules(request *v1beta1.AdmissionRequest, policy types.Policy) ([]mutation.PatchBytes, error) { + return mw.policyEngine.ProcessMutation(policy, request.Object.Raw) } -// kind is the type of object being manipulated -func (mw *MutationWebhook) applyPolicyRulesOnResource(kind string, rawResource []byte, policy types.Policy) ([]PatchBytes, error) { - patchingSets := getPolicyPatchingSets(policy) - var policyPatches []PatchBytes +// kind is the type of object being manipulated, e.g. request.Kind.kind +func (mw *MutationWebhook) applyPolicyRulesOnResource(kind string, rawResource []byte, policy types.Policy) ([]mutation.PatchBytes, error) { + patchingSets := mutation.GetPolicyPatchingSets(policy) + var policyPatches []mutation.PatchBytes for ruleIdx, rule := range policy.Spec.Rules { err := rule.Validate() @@ -119,7 +156,7 @@ func (mw *MutationWebhook) applyPolicyRulesOnResource(kind string, rawResource [ continue } - if ok, err := IsRuleApplicableToResource(kind, rawResource, rule.Resource); !ok { + if ok, err := mutation.IsRuleApplicableToResource(rawResource, rule.Resource); !ok { mw.logger.Printf("Rule %d of policy %s is not applicable to the request", ruleIdx, policy.Name) return nil, err } @@ -127,12 +164,12 @@ func (mw *MutationWebhook) applyPolicyRulesOnResource(kind string, rawResource [ // configMapGenerator and secretGenerator can be applied only to namespaces if kind == "Namespace" { err = mw.applyRuleGenerators(rawResource, rule) - if err != nil && patchingSets == PatchingSetsStopOnError { + if err != nil && patchingSets == mutation.PatchingSetsStopOnError { return nil, fmt.Errorf("Failed to apply generators from rule #%d: %s", ruleIdx, err) } } - rulePatchesProcessed, err := ProcessPatches(rule.Patches, rawResource, patchingSets) + rulePatchesProcessed, err := mutation.ProcessPatches(rule.Patches, rawResource, patchingSets) if err != nil { return nil, fmt.Errorf("Failed to process patches from rule #%d: %s", ruleIdx, err) } @@ -155,7 +192,7 @@ func (mw *MutationWebhook) applyPolicyRulesOnResource(kind string, rawResource [ // Applies "configMapGenerator" and "secretGenerator" described in PolicyRule func (mw *MutationWebhook) applyRuleGenerators(rawResource []byte, rule types.PolicyRule) error { - namespaceName := parseNameFromObject(rawResource) + namespaceName := mutation.ParseNameFromObject(rawResource) err := mw.applyConfigGenerator(rule.ConfigMapGenerator, namespaceName, "ConfigMap") if err == nil {