diff --git a/controller/controller.go b/controller/controller.go deleted file mode 100644 index 3ccde81b97..0000000000 --- a/controller/controller.go +++ /dev/null @@ -1,209 +0,0 @@ -package controller - -import ( - "errors" - "fmt" - "log" - "os" - "sort" - "time" - - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" - kubeClient "github.com/nirmata/kube-policy/kubeclient" - types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - clientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" - policies "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/typed/policy/v1alpha1" - informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" - lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" - event "github.com/nirmata/kube-policy/pkg/event" - eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces" - eventutils "github.com/nirmata/kube-policy/pkg/event/utils" - violation "github.com/nirmata/kube-policy/pkg/violation" - violationinterfaces "github.com/nirmata/kube-policy/pkg/violation/interfaces" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - mergetypes "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -// PolicyController API -type PolicyController interface { - controllerinterfaces.PolicyGetter - controllerinterfaces.PolicyHandlers - Run(stopCh <-chan struct{}) -} - -//policyController for CRD -type policyController struct { - policyInformerFactory informers.SharedInformerFactory - policyLister lister.PolicyLister - policiesInterface policies.PolicyInterface - logger *log.Logger - violationBuilder violationinterfaces.ViolationGenerator - eventBuilder eventinterfaces.BuilderInternal -} - -// NewPolicyController from cmd args -func NewPolicyController(config *rest.Config, logger *log.Logger, kubeClient *kubeClient.KubeClient) (PolicyController, error) { - if logger == nil { - logger = log.New(os.Stdout, "Policy Controller: ", log.LstdFlags|log.Lshortfile) - } - - if config == nil { - return nil, errors.New("Client Config should be set for controller") - } - - policyClientset, err := clientset.NewForConfig(config) - if err != nil { - return nil, err - } - - policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) - policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() - - // generate Event builder - eventBuilder, err := event.NewEventBuilder(kubeClient, logger) - if err != nil { - return nil, err - } - - // generate Violation builer - violationBuilder, err := violation.NewViolationBuilder(kubeClient, eventBuilder, logger) - - controller := &policyController{ - policyInformerFactory: policyInformerFactory, - policyLister: policyInformer.Lister(), - policiesInterface: policyClientset.NirmataV1alpha1().Policies("default"), - logger: logger, - violationBuilder: violationBuilder, - eventBuilder: eventBuilder, - } - policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.CreatePolicyHandler, - UpdateFunc: controller.UpdatePolicyHandler, - DeleteFunc: controller.DeletePolicyHandler, - }) - // Set the controller - eventBuilder.SetController(controller) - violationBuilder.SetController(controller) - return controller, nil -} - -func (c *policyController) GetCacheInformerSync() cache.InformerSynced { - return c.policyInformerFactory.Nirmata().V1alpha1().Policies().Informer().HasSynced -} - -// Run is main controller thread -func (c *policyController) Run(stopCh <-chan struct{}) { - c.policyInformerFactory.Start(stopCh) - c.eventBuilder.Run(eventutils.EventWorkerThreadCount, stopCh) -} - -func (c *policyController) GetPolicies() ([]types.Policy, error) { - // Create nil Selector to grab all the policies - selector := labels.NewSelector() - cachedPolicies, err := c.policyLister.List(selector) - if err != nil { - c.logger.Printf("Error: %v", err) - return nil, err - } - - var policies []types.Policy - for _, elem := range cachedPolicies { - policies = append(policies, *elem.DeepCopy()) - } - - sort.Slice(policies, func(i, j int) bool { - return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time) - }) - return policies, nil -} - -// Writes error message to the policy logs in status section -func (c *policyController) LogPolicyError(name, text string) { - c.addPolicyLog(name, "[ERROR] "+text) -} - -// Writes info message to the policy logs in status section -func (c *policyController) LogPolicyInfo(name, text string) { - c.addPolicyLog(name, "[ INFO] "+text) -} - -// This is the maximum number of records that can be written to the log object of the policy. -// If this number is exceeded, the older entries will be deleted. -const policyLogMaxRecords int = 50 - -// Appends given log text to the status/logs array. -func (c *policyController) addPolicyLog(name, text string) { - getOptions := metav1.GetOptions{ - ResourceVersion: "1", - IncludeUninitialized: true, - } - policy, err := c.policiesInterface.Get(name, getOptions) - if err != nil { - c.logger.Printf("Unable to get policy %s: %s", name, err) - return - } - - // Add new log record - text = time.Now().Format("2006 Jan 02 15:04:05.999 ") + text - policy.Status.Logs = append(policy.Status.Logs, text) - // Pop front extra log records - logsCount := len(policy.Status.Logs) - if logsCount > policyLogMaxRecords { - policy.Status.Logs = policy.Status.Logs[logsCount-policyLogMaxRecords:] - } - // Save logs to policy object - _, err = c.policiesInterface.UpdateStatus(policy) - if err != nil { - c.logger.Printf("Unable to update logs for policy %s: %s", name, err) - } -} - -func (c *policyController) CreatePolicyHandler(resource interface{}) { - key := c.GetResourceKey(resource) - c.logger.Printf("Policy created: %s", key) -} - -func (c *policyController) UpdatePolicyHandler(oldResource, newResource interface{}) { - oldKey := c.GetResourceKey(oldResource) - newKey := c.GetResourceKey(newResource) - c.logger.Printf("Policy %s updated to %s", oldKey, newKey) -} - -func (c *policyController) DeletePolicyHandler(resource interface{}) { - key := c.GetResourceKey(resource) - c.logger.Printf("Policy deleted: %s", key) -} - -func (c *policyController) GetResourceKey(resource interface{}) string { - if key, err := cache.MetaNamespaceKeyFunc(resource); err != nil { - c.logger.Fatalf("Error retrieving policy key: %v", err) - } else { - return key - } - return "" -} -func (c *policyController) GetPolicy(name string) (*types.Policy, error) { - policyNamespace, policyName, err := cache.SplitMetaNamespaceKey(name) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", name)) - return nil, err - } - return c.getPolicyInterface(policyNamespace).Get(policyName) -} - -func (c *policyController) getPolicyInterface(namespace string) lister.PolicyNamespaceLister { - return c.policyLister.Policies(namespace) -} - -func (c *policyController) PatchPolicy(policy string, pt mergetypes.PatchType, data []byte) (*types.Policy, error) { - return c.policiesInterface.Patch(policy, pt, data) -} - -func (c *policyController) UpdatePolicyViolations(updatedPolicy *types.Policy) error { - _, err := c.policiesInterface.UpdateStatus(updatedPolicy) - return err -} diff --git a/controller/interfaces/controller_interfaces.go b/controller/interfaces/controller_interfaces.go deleted file mode 100755 index 8b3911bab2..0000000000 --- a/controller/interfaces/controller_interfaces.go +++ /dev/null @@ -1,24 +0,0 @@ -package interfaces - -import ( - policytypes "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - types "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" -) - -type PolicyGetter interface { - GetPolicies() ([]policytypes.Policy, error) - GetPolicy(name string) (*policytypes.Policy, error) - GetCacheInformerSync() cache.InformerSynced - PatchPolicy(policy string, pt types.PatchType, data []byte) (*policytypes.Policy, error) - UpdatePolicyViolations(updatedPolicy *policytypes.Policy) error - LogPolicyError(name, text string) - LogPolicyInfo(name, text string) -} - -type PolicyHandlers interface { - CreatePolicyHandler(resource interface{}) - UpdatePolicyHandler(oldResource, newResource interface{}) - DeletePolicyHandler(resource interface{}) - GetResourceKey(resource interface{}) string -} diff --git a/main.go b/main.go index 6059b5019f..32e7ccf6c7 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,17 @@ import ( "flag" "log" - "github.com/nirmata/kube-policy/controller" "github.com/nirmata/kube-policy/kubeclient" + "github.com/nirmata/kube-policy/policycontroller" "github.com/nirmata/kube-policy/server" "github.com/nirmata/kube-policy/webhooks" - signals "k8s.io/sample-controller/pkg/signals" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" + violation "github.com/nirmata/kube-policy/pkg/violation" + + event "github.com/nirmata/kube-policy/pkg/event" + "k8s.io/sample-controller/pkg/signals" ) var ( @@ -29,12 +34,29 @@ func main() { log.Fatalf("Error creating kubeclient: %v\n", err) } - controller, err := controller.NewPolicyController(clientConfig, nil, kubeclient) + policyClientset, err := policyclientset.NewForConfig(clientConfig) if err != nil { - log.Fatalf("Error creating PolicyController: %s\n", err) + log.Fatalf("Error creating policyClient: %v\n", err) } - mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig, kubeclient, controller, nil) + //TODO wrap the policyInformer inside a factory + policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) + policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() + + eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil) + violationBuilder := violation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil) + + policyController := policycontroller.NewPolicyController(policyClientset, + policyInformer, + violationBuilder, + nil, + kubeclient) + + mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig, + kubeclient, + policyInformer.Lister(), + violationBuilder, + nil) if err != nil { log.Fatalf("Error creating mutation webhook: %v\n", err) } @@ -51,17 +73,17 @@ func main() { server.RunAsync() stopCh := signals.SetupSignalHandler() - controller.Run(stopCh) - - if err != nil { - log.Fatalf("Error running PolicyController: %s\n", err) + policyInformerFactory.Start(stopCh) + if err = eventController.Run(stopCh); err != nil { + log.Fatalf("Error running EventController: %v\n", err) + } + + if err = policyController.Run(stopCh); err != nil { + log.Fatalf("Error running PolicyController: %v\n", err) } - log.Println("Policy Controller has started") <-stopCh - server.Stop() - log.Println("Policy Controller has stopped") } func init() { diff --git a/pkg/event/builder.go b/pkg/event/builder.go deleted file mode 100644 index 93db5795ec..0000000000 --- a/pkg/event/builder.go +++ /dev/null @@ -1,161 +0,0 @@ -package event - -import ( - "errors" - "fmt" - "log" - "time" - - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" - kubeClient "github.com/nirmata/kube-policy/kubeclient" - "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" - policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" - eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces" - utils "github.com/nirmata/kube-policy/pkg/event/utils" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" -) - -type builder struct { - kubeClient *kubeClient.KubeClient - controller controllerinterfaces.PolicyGetter - workqueue workqueue.RateLimitingInterface - recorder record.EventRecorder - logger *log.Logger - policySynced cache.InformerSynced -} - -type Builder interface { - eventinterfaces.BuilderInternal - SyncHandler(key utils.EventInfo) error - ProcessNextWorkItem() bool - RunWorker() -} - -func NewEventBuilder(kubeClient *kubeClient.KubeClient, - logger *log.Logger, -) (Builder, error) { - builder := &builder{ - kubeClient: kubeClient, - workqueue: initWorkqueue(), - recorder: initRecorder(kubeClient), - logger: logger, - } - - return builder, nil -} - -func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder { - // Initliaze Event Broadcaster - policyscheme.AddToScheme(scheme.Scheme) - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(log.Printf) - eventBroadcaster.StartRecordingToSink( - &typedcorev1.EventSinkImpl{ - - Interface: kubeClient.GetEventsInterface("")}) - recorder := eventBroadcaster.NewRecorder( - scheme.Scheme, - v1.EventSource{Component: utils.EventSource}) - return recorder -} - -func initWorkqueue() workqueue.RateLimitingInterface { - return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), utils.EventWorkQueueName) -} - -func (b *builder) SetController(controller controllerinterfaces.PolicyGetter) { - b.controller = controller - b.policySynced = controller.GetCacheInformerSync() -} - -func (b *builder) AddEvent(info utils.EventInfo) { - b.workqueue.Add(info) -} - -// Run : Initialize the worker routines to process the event creation -func (b *builder) Run(threadiness int, stopCh <-chan struct{}) error { - if b.controller == nil { - return errors.New("Controller has not be set") - } - defer utilruntime.HandleCrash() - defer b.workqueue.ShutDown() - log.Println("Starting violation builder") - - fmt.Println(("Wait for informer cache to sync")) - if ok := cache.WaitForCacheSync(stopCh, b.policySynced); !ok { - fmt.Println("Unable to sync the cache") - } - log.Println("Starting workers") - - for i := 0; i < threadiness; i++ { - go wait.Until(b.RunWorker, time.Second, stopCh) - } - log.Println("Started workers") - <-stopCh - log.Println("Shutting down workers") - return nil - -} - -func (b *builder) RunWorker() { - for b.ProcessNextWorkItem() { - } -} - -func (b *builder) ProcessNextWorkItem() bool { - obj, shutdown := b.workqueue.Get() - if shutdown { - return false - } - err := func(obj interface{}) error { - defer b.workqueue.Done(obj) - var key utils.EventInfo - var ok bool - if key, ok = obj.(utils.EventInfo); !ok { - b.workqueue.Forget(obj) - log.Printf("Expecting type info by got %v", obj) - return nil - } - - // Run the syncHandler, passing the resource and the policy - if err := b.SyncHandler(key); err != nil { - b.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error()) - } - - return nil - }(obj) - - if err != nil { - log.Println((err)) - } - return true -} - -func (b *builder) SyncHandler(key utils.EventInfo) error { - var resource runtime.Object - var err error - switch key.Kind { - case "Policy": - resource, err = b.controller.GetPolicy(key.Resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource)) - return err - } - default: - resource, err = b.kubeClient.GetResource(key.Kind, key.Resource) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) - return err - } - } - b.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message) - return nil -} diff --git a/pkg/event/eventcontroller.go b/pkg/event/eventcontroller.go new file mode 100644 index 0000000000..250712f5bd --- /dev/null +++ b/pkg/event/eventcontroller.go @@ -0,0 +1,169 @@ +package event + +import ( + "fmt" + "log" + "time" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" + policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +type eventController struct { + kubeClient *kubeClient.KubeClient + policyLister policylister.PolicyLister + queue workqueue.RateLimitingInterface + recorder record.EventRecorder + logger *log.Logger +} + +// EventGenertor to generate event +type EventGenerator interface { + Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) +} +type EventController interface { + EventGenerator + Run(stopCh <-chan struct{}) error +} + +func NewEventController(kubeClient *kubeClient.KubeClient, + policyLister policylister.PolicyLister, + logger *log.Logger) EventController { + controller := &eventController{ + kubeClient: kubeClient, + policyLister: policyLister, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), + recorder: initRecorder(kubeClient), + logger: logger, + } + return controller +} + +func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder { + // Initliaze Event Broadcaster + policyscheme.AddToScheme(scheme.Scheme) + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.Printf) + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{ + Interface: kubeClient.GetEventsInterface("")}) + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, + v1.EventSource{Component: eventSource}) + return recorder +} + +func (eb *eventController) Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) { + eb.queue.Add(eb.newEvent( + kind, + resource, + reason, + message, + )) +} + +// Run : Initialize the worker routines to process the event creation +func (eb *eventController) Run(stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer eb.queue.ShutDown() + + log.Println("starting eventbuilder controller") + + log.Println("Starting eventbuilder controller workers") + for i := 0; i < eventWorkerThreadCount; i++ { + go wait.Until(eb.runWorker, time.Second, stopCh) + } + log.Println("Started eventbuilder controller workers") + <-stopCh + log.Println("Shutting down eventbuilder controller workers") + return nil +} + +func (eb *eventController) runWorker() { + for eb.processNextWorkItem() { + } +} + +func (eb *eventController) processNextWorkItem() bool { + obj, shutdown := eb.queue.Get() + if shutdown { + return false + } + err := func(obj interface{}) error { + defer eb.queue.Done(obj) + var key eventInfo + var ok bool + if key, ok = obj.(eventInfo); !ok { + eb.queue.Forget(obj) + log.Printf("Expecting type info by got %v", obj) + return nil + } + // Run the syncHandler, passing the resource and the policy + if err := eb.SyncHandler(key); err != nil { + eb.queue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error()) + } + return nil + }(obj) + + if err != nil { + log.Println((err)) + } + return true +} + +func (eb *eventController) SyncHandler(key eventInfo) error { + var resource runtime.Object + var err error + switch key.Kind { + case "Policy": + namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource)) + return err + } + resource, err = eb.policyLister.Policies(namespace).Get(name) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource)) + return err + } + default: + resource, err = eb.kubeClient.GetResource(key.Kind, key.Resource) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) + return err + } + } + eb.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message) + return nil +} + +type eventInfo struct { + Kind string + Resource string + Reason string + Message string +} + +func (eb *eventController) newEvent(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) eventInfo { + msgText, err := getEventMsg(message, args) + if err != nil { + utilruntime.HandleError(err) + } + return eventInfo{ + Kind: kind, + Resource: resource, + Reason: reason.String(), + Message: msgText, + } +} diff --git a/pkg/event/eventmsgbuilder.go b/pkg/event/eventmsgbuilder.go new file mode 100644 index 0000000000..39d95d82df --- /dev/null +++ b/pkg/event/eventmsgbuilder.go @@ -0,0 +1,45 @@ +package event + +import ( + "fmt" + "regexp" +) + +//Key to describe the event +type EventMsg int + +const ( + FResourcePolcy EventMsg = iota + FProcessRule + SPolicyApply + SRuleApply + FPolicyApplyBlockCreate + FPolicyApplyBlockUpdate + FPolicyApplyBlockUpdateRule +) + +func (k EventMsg) String() string { + return [...]string{ + "Failed to satisfy policy on resource %s.The following rules %s failed to apply. Created Policy Violation", + "Failed to process rule %s of policy %s. Created Policy Violation %s", + "Policy applied successfully on the resource %s", + "Rule %s of Policy %s applied successfull", + "Failed to apply policy, blocked creation of resource %s. The following rules %s failed to apply", + "Failed to apply rule %s of policy %s Blocked update of the resource", + "Failed to apply policy on resource %s.Blocked update of the resource. The following rules %s failed to apply", + }[k] +} + +const argRegex = "%[s,d,v]" + +//GetEventMsg return the application message based on the message id and the arguments, +// if the number of arguments passed to the message are incorrect generate an error +func getEventMsg(key EventMsg, args ...interface{}) (string, error) { + // Verify the number of arguments + re := regexp.MustCompile(argRegex) + argsCount := len(re.FindAllString(key.String(), -1)) + if argsCount != len(args) { + return "", fmt.Errorf("message expects %d arguments, but %d arguments passed", argsCount, len(args)) + } + return fmt.Sprintf(key.String(), args...), nil +} diff --git a/pkg/event/eventmsgbuilder_test.go b/pkg/event/eventmsgbuilder_test.go new file mode 100644 index 0000000000..dcedd1e377 --- /dev/null +++ b/pkg/event/eventmsgbuilder_test.go @@ -0,0 +1,23 @@ +package event + +import ( + "fmt" + "testing" + + "gotest.tools/assert" +) + +func TestPositive(t *testing.T) { + resourceName := "test_resource" + expectedMsg := fmt.Sprintf("Policy applied successfully on the resource %s", resourceName) + msg, err := getEventMsg(SPolicyApply, resourceName) + assert.NilError(t, err) + assert.Equal(t, expectedMsg, msg) +} + +// passing incorrect args +func TestIncorrectArgs(t *testing.T) { + resourceName := "test_resource" + _, err := getEventMsg(SPolicyApply, resourceName, "extra_args") + assert.Error(t, err, "message expects 1 arguments, but 2 arguments passed") +} diff --git a/pkg/event/interfaces/builder_interfaces.go b/pkg/event/interfaces/builder_interfaces.go deleted file mode 100644 index 94a685f719..0000000000 --- a/pkg/event/interfaces/builder_interfaces.go +++ /dev/null @@ -1,12 +0,0 @@ -package interfaces - -import ( - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" - utils "github.com/nirmata/kube-policy/pkg/event/utils" -) - -type BuilderInternal interface { - SetController(controller controllerinterfaces.PolicyGetter) - Run(threadiness int, stopCh <-chan struct{}) error - AddEvent(info utils.EventInfo) -} diff --git a/pkg/event/reason.go b/pkg/event/reason.go new file mode 100644 index 0000000000..ceac4cb0d9 --- /dev/null +++ b/pkg/event/reason.go @@ -0,0 +1,21 @@ +package event + +//Reason types of Event Reasons +type Reason int + +const ( + //PolicyViolation there is a violation of policy + PolicyViolation Reason = iota + //PolicyApplied policy applied + PolicyApplied + //RequestBlocked the request to create/update the resource was blocked( generated from admission-controller) + RequestBlocked +) + +func (r Reason) String() string { + return [...]string{ + "PolicyViolation", + "PolicyApplied", + "RequestBlocked", + }[r] +} diff --git a/pkg/event/util.go b/pkg/event/util.go new file mode 100644 index 0000000000..27470b71e6 --- /dev/null +++ b/pkg/event/util.go @@ -0,0 +1,7 @@ +package event + +const eventSource = "policy-controller" + +const eventWorkQueueName = "policy-controller-events" + +const eventWorkerThreadCount = 1 diff --git a/pkg/event/utils/util.go b/pkg/event/utils/util.go deleted file mode 100644 index de21252a3f..0000000000 --- a/pkg/event/utils/util.go +++ /dev/null @@ -1,15 +0,0 @@ -package utils - -const EventSource = "policy-controller" - -const EventWorkQueueName = "policy-controller-events" - -type EventInfo struct { - Kind string - Resource string - Rule string - Reason string - Message string -} - -const EventWorkerThreadCount = 1 diff --git a/pkg/violation/builder.go b/pkg/violation/builder.go index 9011aa6b1b..c872ce4702 100644 --- a/pkg/violation/builder.go +++ b/pkg/violation/builder.go @@ -1,60 +1,65 @@ package violation import ( - "encoding/json" "fmt" "log" - jsonpatch "github.com/evanphx/json-patch" - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" kubeClient "github.com/nirmata/kube-policy/kubeclient" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces" - eventutils "github.com/nirmata/kube-policy/pkg/event/utils" - violationinterfaces "github.com/nirmata/kube-policy/pkg/violation/interfaces" - utils "github.com/nirmata/kube-policy/pkg/violation/utils" - mergetypes "k8s.io/apimachinery/pkg/types" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" ) -type builder struct { - kubeClient *kubeClient.KubeClient - controller controllerinterfaces.PolicyGetter - eventBuilder eventinterfaces.BuilderInternal - logger *log.Logger +type PolicyViolationGenerator interface { + Add(info ViolationInfo) error } -type Builder interface { - violationinterfaces.ViolationGenerator - ProcessViolation(info utils.ViolationInfo) error - Patch(policy *types.Policy, updatedPolicy *types.Policy) error - IsActive(kind string, resource string) (bool, error) +type policyViolationBuilder struct { + kubeClient *kubeClient.KubeClient + policyLister policylister.PolicyLister + policyInterface policyclientset.Interface + eventBuilder event.EventGenerator + logger *log.Logger } -func NewViolationBuilder( +type PolicyViolationBuilder interface { + PolicyViolationGenerator + processViolation(info ViolationInfo) error + isActive(kind string, resource string) (bool, error) +} + +func NewPolicyViolationBuilder( kubeClient *kubeClient.KubeClient, - eventBuilder eventinterfaces.BuilderInternal, - logger *log.Logger) (Builder, error) { + policyLister policylister.PolicyLister, + policyInterface policyclientset.Interface, + eventController event.EventGenerator, + logger *log.Logger) PolicyViolationBuilder { - builder := &builder{ - kubeClient: kubeClient, - eventBuilder: eventBuilder, - logger: logger, + builder := &policyViolationBuilder{ + kubeClient: kubeClient, + policyLister: policyLister, + policyInterface: policyInterface, + eventBuilder: eventController, + logger: logger, } - return builder, nil + return builder } -func (b *builder) Create(info utils.ViolationInfo) error { - return b.ProcessViolation(info) +func (pvb *policyViolationBuilder) Add(info ViolationInfo) error { + return pvb.processViolation(info) } -func (b *builder) SetController(controller controllerinterfaces.PolicyGetter) { - b.controller = controller -} - -func (b *builder) ProcessViolation(info utils.ViolationInfo) error { +func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error { // Get the policy - policy, err := b.controller.GetPolicy(info.Policy) + namespace, name, err := cache.SplitMetaNamespaceKey(info.Policy) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", info.Policy)) + return err + } + policy, err := pvb.policyLister.Policies(namespace).Get(name) if err != nil { utilruntime.HandleError(err) return err @@ -72,63 +77,34 @@ func (b *builder) ProcessViolation(info utils.ViolationInfo) error { } for _, violation := range modifiedPolicy.Status.Violations { - ok, err := b.IsActive(info.Kind, violation.Resource) + ok, err := pvb.isActive(info.Kind, violation.Resource) if err != nil { utilruntime.HandleError(err) continue } if !ok { - // Remove the violation - // Create a removal event - b.eventBuilder.AddEvent(eventutils.EventInfo{ - Kind: "Policy", - Resource: info.Policy, - Rule: info.Rule, - Reason: info.Reason, - Message: info.Message, - }) - continue + pvb.logger.Printf("removed violation ") } - // If violation already exists for this rule, we update the violation - //TODO: update violation, instead of re-creating one every time } + // If violation already exists for this rule, we update the violation + //TODO: update violation, instead of re-creating one every time modifiedViolations = append(modifiedViolations, newViolation) modifiedPolicy.Status.Violations = modifiedViolations - // return b.Patch(policy, modifiedPolicy) // Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object - return b.controller.UpdatePolicyViolations(modifiedPolicy) + _, err = pvb.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy) + if err != nil { + return err + } + return nil } -func (b *builder) IsActive(kind string, resource string) (bool, error) { +func (pvb *policyViolationBuilder) isActive(kind string, resource string) (bool, error) { // Generate Merge Patch - _, err := b.kubeClient.GetResource(kind, resource) + _, err := pvb.kubeClient.GetResource(kind, resource) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource)) return false, err } return true, nil } - -func (b *builder) Patch(policy *types.Policy, updatedPolicy *types.Policy) error { - originalData, err := json.Marshal(policy) - if err != nil { - return err - } - modifiedData, err := json.Marshal(updatedPolicy) - if err != nil { - return err - } - // generate merge patch - patchBytes, err := jsonpatch.CreateMergePatch(originalData, modifiedData) - if err != nil { - return err - } - _, err = b.controller.PatchPolicy(policy.Name, mergetypes.MergePatchType, patchBytes) - if err != nil { - - // Unable to patch - return err - } - return nil -} diff --git a/pkg/violation/interfaces/violation_interfaces.go b/pkg/violation/interfaces/violation_interfaces.go deleted file mode 100644 index f74cd28c6f..0000000000 --- a/pkg/violation/interfaces/violation_interfaces.go +++ /dev/null @@ -1,11 +0,0 @@ -package interfaces - -import ( - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" - utils "github.com/nirmata/kube-policy/pkg/violation/utils" -) - -type ViolationGenerator interface { - SetController(controller controllerinterfaces.PolicyGetter) - Create(info utils.ViolationInfo) error -} diff --git a/pkg/violation/util.go b/pkg/violation/util.go index 00ad937cfa..406a8a707d 100644 --- a/pkg/violation/util.go +++ b/pkg/violation/util.go @@ -1,5 +1,7 @@ package violation +import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + // Source for the events recorder const violationEventSource = "policy-controller" @@ -9,11 +11,7 @@ const workqueueViolationName = "Policy-Violations" // Event Reason const violationEventResrouce = "Violation" -// Info input details -type Info struct { - Kind string - Resource string - Policy string - RuleName string - Reason string +type ViolationInfo struct { + Policy string + policytype.Violation } diff --git a/pkg/violation/utils/util.go b/pkg/violation/utils/util.go deleted file mode 100644 index 1d3db344f4..0000000000 --- a/pkg/violation/utils/util.go +++ /dev/null @@ -1,8 +0,0 @@ -package utils - -import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - -type ViolationInfo struct { - Policy string - policytype.Violation -} diff --git a/policycontroller/policycontroller.go b/policycontroller/policycontroller.go new file mode 100644 index 0000000000..c13c8755b9 --- /dev/null +++ b/policycontroller/policycontroller.go @@ -0,0 +1,194 @@ +package policycontroller + +import ( + "fmt" + "log" + "time" + + kubeClient "github.com/nirmata/kube-policy/kubeclient" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" + lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + violation "github.com/nirmata/kube-policy/pkg/violation" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +//PolicyController for CRD +type PolicyController struct { + kubeClient *kubeClient.KubeClient + policyLister lister.PolicyLister + policyInterface policyclientset.Interface + policySynced cache.InformerSynced + violationBuilder violation.PolicyViolationGenerator + logger *log.Logger + queue workqueue.RateLimitingInterface +} + +// NewPolicyController from cmd args +func NewPolicyController(policyInterface policyclientset.Interface, + policyInformer infomertypes.PolicyInformer, + violationBuilder violation.PolicyViolationGenerator, + logger *log.Logger, + kubeClient *kubeClient.KubeClient) *PolicyController { + + controller := &PolicyController{ + kubeClient: kubeClient, + policyLister: policyInformer.Lister(), + policyInterface: policyInterface, + policySynced: policyInformer.Informer().HasSynced, + violationBuilder: violationBuilder, + logger: logger, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName), + //TODO Event Builder, this will used to record events with policy cannot be processed, using eventBuilder as we can restrict the event types + } + + policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.createPolicyHandler, + UpdateFunc: controller.updatePolicyHandler, + DeleteFunc: controller.deletePolicyHandler, + }) + return controller +} + +func (pc *PolicyController) createPolicyHandler(resource interface{}) { + pc.enqueuePolicy(resource) +} + +func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) { + newPolicy := newResource.(*types.Policy) + oldPolicy := oldResource.(*types.Policy) + if newPolicy.ResourceVersion == oldPolicy.ResourceVersion { + return + } + pc.enqueuePolicy(newResource) +} +func (pc *PolicyController) deletePolicyHandler(resource interface{}) { + var object metav1.Object + var ok bool + if object, ok = resource.(metav1.Object); !ok { + utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + pc.logger.Printf("policy deleted: %s", object.GetName()) +} + +func (pc *PolicyController) enqueuePolicy(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + pc.queue.Add(key) +} + +// Run is main controller thread +func (pc *PolicyController) Run(stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer pc.queue.ShutDown() + + pc.logger.Printf("starting policy controller") + + pc.logger.Printf("waiting for infomer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, pc.policySynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + pc.logger.Println("starting policy controller workers") + for i := 0; i < policyControllerWorkerCount; i++ { + go wait.Until(pc.runWorker, time.Second, stopCh) + } + + pc.logger.Println("started policy controller workers") + <-stopCh + pc.logger.Println("shutting down policy controller workers") + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (pc *PolicyController) runWorker() { + for pc.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (pc *PolicyController) processNextWorkItem() bool { + obj, shutdown := pc.queue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer pc.queue.Done(obj) + err := pc.syncHandler(obj) + pc.handleErr(err, obj) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (pc *PolicyController) handleErr(err error, key interface{}) { + if err == nil { + pc.queue.Forget(key) + return + } + + // This controller retries 5 times if something goes wrong. After that, it stops trying. + if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit { + + pc.logger.Printf("Error syncing events %v: %v", key, err) + + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + pc.queue.AddRateLimited(key) + return + } + + pc.queue.Forget(key) + // Report to an external entity that, even after several retries, we could not successfully process this key + utilruntime.HandleError(err) + pc.logger.Printf("Dropping the key %q out of the queue: %v", key, err) +} + +func (pc *PolicyController) syncHandler(obj interface{}) error { + var key string + var ok bool + if key, ok = obj.(string); !ok { + return fmt.Errorf("expected string in workqueue but got %#v", obj) + } + // convert the namespace/name string into distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // Get Policy resource with namespace/name + policy, err := pc.policyLister.Policies(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) + return nil + } + return err + } + // process policy on existing resource + // get the violations and pass to violation Builder + // get the events and pass to event Builder + fmt.Println(policy) + return nil +} diff --git a/controller/controller_test.go b/policycontroller/policycontroller_test.go similarity index 99% rename from controller/controller_test.go rename to policycontroller/policycontroller_test.go index d7eec6df8c..b4f513bc87 100644 --- a/controller/controller_test.go +++ b/policycontroller/policycontroller_test.go @@ -1,9 +1,10 @@ -package controller_test +package policycontroller import ( - "gotest.tools/assert" "testing" + "gotest.tools/assert" + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) diff --git a/policycontroller/utils.go b/policycontroller/utils.go new file mode 100644 index 0000000000..22f11696a1 --- /dev/null +++ b/policycontroller/utils.go @@ -0,0 +1,7 @@ +package policycontroller + +const policyWorkQueueName = "policyworkqueue" + +const policyWorkQueueRetryLimit = 5 + +const policyControllerWorkerCount = 2 diff --git a/webhooks/mutation.go b/webhooks/mutation.go index ef0b4c6b72..ca7fca63c3 100644 --- a/webhooks/mutation.go +++ b/webhooks/mutation.go @@ -5,13 +5,17 @@ import ( "fmt" "log" "os" + "sort" - controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces" kubeclient "github.com/nirmata/kube-policy/kubeclient" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" mutation "github.com/nirmata/kube-policy/pkg/mutation" + violation "github.com/nirmata/kube-policy/pkg/violation" v1beta1 "k8s.io/api/admission/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" rest "k8s.io/client-go/rest" ) @@ -19,15 +23,21 @@ import ( // MutationWebhook is a data type that represents // business logic for resource mutation type MutationWebhook struct { - kubeclient *kubeclient.KubeClient - controller controllerinterfaces.PolicyGetter - registration *MutationWebhookRegistration - logger *log.Logger + kubeclient *kubeclient.KubeClient + policyLister policylister.PolicyLister + registration *MutationWebhookRegistration + violationBuilder violation.PolicyViolationGenerator + logger *log.Logger } // Registers mutation webhook in cluster and creates object for this webhook -func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.KubeClient, controller controllerinterfaces.PolicyGetter, logger *log.Logger) (*MutationWebhook, error) { - if clientConfig == nil || kubeclient == nil || controller == nil { +func CreateMutationWebhook( + clientConfig *rest.Config, + kubeclient *kubeclient.KubeClient, + policyLister policylister.PolicyLister, + violationBuilder violation.PolicyViolationGenerator, + logger *log.Logger) (*MutationWebhook, error) { + if clientConfig == nil || kubeclient == nil { return nil, errors.New("Some parameters are not set") } @@ -45,19 +55,40 @@ func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.Kub logger = log.New(os.Stdout, "Mutation WebHook: ", log.LstdFlags|log.Lshortfile) } return &MutationWebhook{ - kubeclient: kubeclient, - controller: controller, - registration: registration, - logger: logger, + kubeclient: kubeclient, + policyLister: policyLister, + registration: registration, + violationBuilder: violationBuilder, + logger: logger, }, nil } +func (mw *MutationWebhook) getPolicies() ([]types.Policy, error) { + selector := labels.NewSelector() + cachedPolicies, err := mw.policyLister.List(selector) + if err != nil { + mw.logger.Printf("Error: %v", err) + return nil, err + } + + var policies []types.Policy + for _, elem := range cachedPolicies { + policies = append(policies, *elem.DeepCopy()) + } + + sort.Slice(policies, func(i, j int) bool { + return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time) + }) + return policies, nil + +} + // Mutate applies admission to request func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse { mw.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo) - policies, err := mw.controller.GetPolicies() + policies, err := mw.getPolicies() if err != nil { utilruntime.HandleError(err) return nil @@ -72,7 +103,7 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad policyPatches, err := mw.applyPolicyRules(request, policy) if err != nil { - mw.controller.LogPolicyError(policy.Name, err.Error()) + //TODO Log Policy Error errStr := fmt.Sprintf("Unable to apply policy %s: %v", policy.Name, err) mw.logger.Printf("Denying the request because of error: %s", errStr) @@ -82,7 +113,7 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad if len(policyPatches) > 0 { namespace := mutation.ParseNamespaceFromObject(request.Object.Raw) name := mutation.ParseNameFromObject(request.Object.Raw) - mw.controller.LogPolicyInfo(policy.Name, fmt.Sprintf("Applied to %s %s/%s", request.Kind.Kind, namespace, name)) + //TODO Log Policy Info mw.logger.Printf("%s applied to %s %s/%s", policy.Name, request.Kind.Kind, namespace, name) allPatches = append(allPatches, policyPatches...)