diff --git a/main.go b/main.go index 32e7ccf6c7..716d79f18b 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" - violation "github.com/nirmata/kube-policy/pkg/violation" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" event "github.com/nirmata/kube-policy/pkg/event" "k8s.io/sample-controller/pkg/signals" @@ -44,11 +44,12 @@ func main() { policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil) - violationBuilder := violation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil) + violationBuilder := policyviolation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil) policyController := policycontroller.NewPolicyController(policyClientset, policyInformer, violationBuilder, + eventController, nil, kubeclient) @@ -56,6 +57,7 @@ func main() { kubeclient, policyInformer.Lister(), violationBuilder, + eventController, nil) if err != nil { log.Fatalf("Error creating mutation webhook: %v\n", err) diff --git a/pkg/event/eventcontroller.go b/pkg/event/eventcontroller.go index 250712f5bd..6033d398b7 100644 --- a/pkg/event/eventcontroller.go +++ b/pkg/event/eventcontroller.go @@ -19,7 +19,7 @@ import ( "k8s.io/client-go/util/workqueue" ) -type eventController struct { +type controller struct { kubeClient *kubeClient.KubeClient policyLister policylister.PolicyLister queue workqueue.RateLimitingInterface @@ -27,19 +27,22 @@ type eventController struct { logger *log.Logger } -// EventGenertor to generate event -type EventGenerator interface { - Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) +//Generator to generate event +type Generator interface { + Add(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) } -type EventController interface { - EventGenerator + +//Controller api +type Controller interface { + Generator Run(stopCh <-chan struct{}) error } +//NewEventController to generate a new event controller func NewEventController(kubeClient *kubeClient.KubeClient, policyLister policylister.PolicyLister, - logger *log.Logger) EventController { - controller := &eventController{ + logger *log.Logger) Controller { + controller := &controller{ kubeClient: kubeClient, policyLister: policyLister, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), @@ -63,8 +66,8 @@ func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder { return recorder } -func (eb *eventController) Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) { - eb.queue.Add(eb.newEvent( +func (c *controller) Add(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) { + c.queue.Add(c.newEvent( kind, resource, reason, @@ -72,16 +75,15 @@ func (eb *eventController) Add(kind string, resource string, reason Reason, mess )) } -// Run : Initialize the worker routines to process the event creation -func (eb *eventController) Run(stopCh <-chan struct{}) error { +func (c *controller) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() - defer eb.queue.ShutDown() + defer c.queue.ShutDown() log.Println("starting eventbuilder controller") log.Println("Starting eventbuilder controller workers") for i := 0; i < eventWorkerThreadCount; i++ { - go wait.Until(eb.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, stopCh) } log.Println("Started eventbuilder controller workers") <-stopCh @@ -89,28 +91,28 @@ func (eb *eventController) Run(stopCh <-chan struct{}) error { return nil } -func (eb *eventController) runWorker() { - for eb.processNextWorkItem() { +func (c *controller) runWorker() { + for c.processNextWorkItem() { } } -func (eb *eventController) processNextWorkItem() bool { - obj, shutdown := eb.queue.Get() +func (c *controller) processNextWorkItem() bool { + obj, shutdown := c.queue.Get() if shutdown { return false } err := func(obj interface{}) error { - defer eb.queue.Done(obj) + defer c.queue.Done(obj) var key eventInfo var ok bool if key, ok = obj.(eventInfo); !ok { - eb.queue.Forget(obj) + c.queue.Forget(obj) log.Printf("Expecting type info by got %v", obj) return nil } // Run the syncHandler, passing the resource and the policy - if err := eb.SyncHandler(key); err != nil { - eb.queue.AddRateLimited(key) + if err := c.SyncHandler(key); err != nil { + c.queue.AddRateLimited(key) return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error()) } return nil @@ -122,7 +124,7 @@ func (eb *eventController) processNextWorkItem() bool { return true } -func (eb *eventController) SyncHandler(key eventInfo) error { +func (c *controller) SyncHandler(key eventInfo) error { var resource runtime.Object var err error switch key.Kind { @@ -132,30 +134,23 @@ func (eb *eventController) SyncHandler(key eventInfo) error { utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource)) return err } - resource, err = eb.policyLister.Policies(namespace).Get(name) + resource, err = c.policyLister.Policies(namespace).Get(name) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource)) return err } default: - resource, err = eb.kubeClient.GetResource(key.Kind, key.Resource) + resource, err = c.kubeClient.GetResource(key.Kind, key.Resource) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource)) return err } } - eb.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message) + c.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message) return nil } -type eventInfo struct { - Kind string - Resource string - Reason string - Message string -} - -func (eb *eventController) newEvent(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) eventInfo { +func (c *controller) newEvent(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) eventInfo { msgText, err := getEventMsg(message, args) if err != nil { utilruntime.HandleError(err) diff --git a/pkg/event/eventmsgbuilder.go b/pkg/event/eventmsgbuilder.go index 39d95d82df..1e06c3c5a4 100644 --- a/pkg/event/eventmsgbuilder.go +++ b/pkg/event/eventmsgbuilder.go @@ -5,20 +5,7 @@ import ( "regexp" ) -//Key to describe the event -type EventMsg int - -const ( - FResourcePolcy EventMsg = iota - FProcessRule - SPolicyApply - SRuleApply - FPolicyApplyBlockCreate - FPolicyApplyBlockUpdate - FPolicyApplyBlockUpdateRule -) - -func (k EventMsg) String() string { +func (k MsgKey) String() string { return [...]string{ "Failed to satisfy policy on resource %s.The following rules %s failed to apply. Created Policy Violation", "Failed to process rule %s of policy %s. Created Policy Violation %s", @@ -34,7 +21,7 @@ const argRegex = "%[s,d,v]" //GetEventMsg return the application message based on the message id and the arguments, // if the number of arguments passed to the message are incorrect generate an error -func getEventMsg(key EventMsg, args ...interface{}) (string, error) { +func getEventMsg(key MsgKey, args ...interface{}) (string, error) { // Verify the number of arguments re := regexp.MustCompile(argRegex) argsCount := len(re.FindAllString(key.String(), -1)) diff --git a/pkg/event/util.go b/pkg/event/util.go index 27470b71e6..b56bad7f67 100644 --- a/pkg/event/util.go +++ b/pkg/event/util.go @@ -5,3 +5,23 @@ const eventSource = "policy-controller" const eventWorkQueueName = "policy-controller-events" const eventWorkerThreadCount = 1 + +type eventInfo struct { + Kind string + Resource string + Reason string + Message string +} + +//MsgKey is an identified to determine the preset message formats +type MsgKey int + +const ( + FResourcePolcy MsgKey = iota + FProcessRule + SPolicyApply + SRuleApply + FPolicyApplyBlockCreate + FPolicyApplyBlockUpdate + FPolicyApplyBlockUpdateRule +) diff --git a/pkg/violation/builder.go b/pkg/policyviolation/builder.go similarity index 72% rename from pkg/violation/builder.go rename to pkg/policyviolation/builder.go index c872ce4702..043a87bee4 100644 --- a/pkg/violation/builder.go +++ b/pkg/policyviolation/builder.go @@ -1,4 +1,4 @@ -package violation +package policyviolation import ( "fmt" @@ -13,32 +13,35 @@ import ( "k8s.io/client-go/tools/cache" ) -type PolicyViolationGenerator interface { +//Generator to generate policy violation +type Generator interface { Add(info ViolationInfo) error } -type policyViolationBuilder struct { +type builder struct { kubeClient *kubeClient.KubeClient policyLister policylister.PolicyLister policyInterface policyclientset.Interface - eventBuilder event.EventGenerator + eventBuilder event.Generator logger *log.Logger } -type PolicyViolationBuilder interface { - PolicyViolationGenerator +//Builder is to build policy violations +type Builder interface { + Generator processViolation(info ViolationInfo) error isActive(kind string, resource string) (bool, error) } +//NewPolicyViolationBuilder returns new violation builder func NewPolicyViolationBuilder( kubeClient *kubeClient.KubeClient, policyLister policylister.PolicyLister, policyInterface policyclientset.Interface, - eventController event.EventGenerator, - logger *log.Logger) PolicyViolationBuilder { + eventController event.Generator, + logger *log.Logger) Builder { - builder := &policyViolationBuilder{ + builder := &builder{ kubeClient: kubeClient, policyLister: policyLister, policyInterface: policyInterface, @@ -48,18 +51,18 @@ func NewPolicyViolationBuilder( return builder } -func (pvb *policyViolationBuilder) Add(info ViolationInfo) error { - return pvb.processViolation(info) +func (b *builder) Add(info ViolationInfo) error { + return b.processViolation(info) } -func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error { +func (b *builder) processViolation(info ViolationInfo) error { // Get the policy namespace, name, err := cache.SplitMetaNamespaceKey(info.Policy) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", info.Policy)) return err } - policy, err := pvb.policyLister.Policies(namespace).Get(name) + policy, err := b.policyLister.Policies(namespace).Get(name) if err != nil { utilruntime.HandleError(err) return err @@ -77,13 +80,13 @@ func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error { } for _, violation := range modifiedPolicy.Status.Violations { - ok, err := pvb.isActive(info.Kind, violation.Resource) + ok, err := b.isActive(info.Kind, violation.Resource) if err != nil { utilruntime.HandleError(err) continue } if !ok { - pvb.logger.Printf("removed violation ") + b.logger.Printf("removed violation") } } // If violation already exists for this rule, we update the violation @@ -92,16 +95,16 @@ func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error { modifiedPolicy.Status.Violations = modifiedViolations // Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object - _, err = pvb.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy) + _, err = b.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy) if err != nil { return err } return nil } -func (pvb *policyViolationBuilder) isActive(kind string, resource string) (bool, error) { +func (b *builder) isActive(kind string, resource string) (bool, error) { // Generate Merge Patch - _, err := pvb.kubeClient.GetResource(kind, resource) + _, err := b.kubeClient.GetResource(kind, resource) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource)) return false, err diff --git a/pkg/violation/util.go b/pkg/policyviolation/util.go similarity index 83% rename from pkg/violation/util.go rename to pkg/policyviolation/util.go index 406a8a707d..8cfaaf99b3 100644 --- a/pkg/violation/util.go +++ b/pkg/policyviolation/util.go @@ -1,4 +1,4 @@ -package violation +package policyviolation import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" @@ -11,6 +11,7 @@ const workqueueViolationName = "Policy-Violations" // Event Reason const violationEventResrouce = "Violation" +//ViolationInfo describes the policyviolation details type ViolationInfo struct { Policy string policytype.Violation diff --git a/policycontroller/policycontroller.go b/policycontroller/policycontroller.go index c13c8755b9..921c88e76c 100644 --- a/policycontroller/policycontroller.go +++ b/policycontroller/policycontroller.go @@ -10,23 +10,25 @@ import ( policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" - violation "github.com/nirmata/kube-policy/pkg/violation" + event "github.com/nirmata/kube-policy/pkg/event" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) -//PolicyController for CRD +//PolicyController to manage Policy CRD type PolicyController struct { kubeClient *kubeClient.KubeClient policyLister lister.PolicyLister policyInterface policyclientset.Interface policySynced cache.InformerSynced - violationBuilder violation.PolicyViolationGenerator + violationBuilder policyviolation.Generator + eventBuilder event.Generator logger *log.Logger queue workqueue.RateLimitingInterface } @@ -34,7 +36,8 @@ type PolicyController struct { // NewPolicyController from cmd args func NewPolicyController(policyInterface policyclientset.Interface, policyInformer infomertypes.PolicyInformer, - violationBuilder violation.PolicyViolationGenerator, + violationBuilder policyviolation.Generator, + eventController event.Generator, logger *log.Logger, kubeClient *kubeClient.KubeClient) *PolicyController { @@ -44,9 +47,9 @@ func NewPolicyController(policyInterface policyclientset.Interface, policyInterface: policyInterface, policySynced: policyInformer.Informer().HasSynced, violationBuilder: violationBuilder, + eventBuilder: eventController, logger: logger, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName), - //TODO Event Builder, this will used to record events with policy cannot be processed, using eventBuilder as we can restrict the event types } policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -69,6 +72,7 @@ func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interfa } pc.enqueuePolicy(newResource) } + func (pc *PolicyController) deletePolicyHandler(resource interface{}) { var object metav1.Object var ok bool @@ -112,16 +116,11 @@ func (pc *PolicyController) Run(stopCh <-chan struct{}) error { return nil } -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. func (pc *PolicyController) runWorker() { for pc.processNextWorkItem() { } } -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. func (pc *PolicyController) processNextWorkItem() bool { obj, shutdown := pc.queue.Get() if shutdown { @@ -146,20 +145,15 @@ func (pc *PolicyController) handleErr(err error, key interface{}) { pc.queue.Forget(key) return } - - // This controller retries 5 times if something goes wrong. After that, it stops trying. + // This controller retries if something goes wrong. After that, it stops trying. if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit { - pc.logger.Printf("Error syncing events %v: %v", key, err) - // Re-enqueue the key rate limited. Based on the rate limiter on the // queue and the re-enqueue history, the key will be processed later again. pc.queue.AddRateLimited(key) return } - pc.queue.Forget(key) - // Report to an external entity that, even after several retries, we could not successfully process this key utilruntime.HandleError(err) pc.logger.Printf("Dropping the key %q out of the queue: %v", key, err) } @@ -173,7 +167,7 @@ func (pc *PolicyController) syncHandler(obj interface{}) error { // convert the namespace/name string into distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + utilruntime.HandleError(fmt.Errorf("invalid policy key: %s", key)) return nil } @@ -181,7 +175,7 @@ func (pc *PolicyController) syncHandler(obj interface{}) error { policy, err := pc.policyLister.Policies(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { - utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) + utilruntime.HandleError(fmt.Errorf("policy '%s' in work queue no longer exists", key)) return nil } return err diff --git a/webhooks/mutation.go b/webhooks/mutation.go index ca7fca63c3..13f5f0ee93 100644 --- a/webhooks/mutation.go +++ b/webhooks/mutation.go @@ -10,8 +10,9 @@ import ( kubeclient "github.com/nirmata/kube-policy/kubeclient" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + event "github.com/nirmata/kube-policy/pkg/event" mutation "github.com/nirmata/kube-policy/pkg/mutation" - violation "github.com/nirmata/kube-policy/pkg/violation" + policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation" v1beta1 "k8s.io/api/admission/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,7 +27,8 @@ type MutationWebhook struct { kubeclient *kubeclient.KubeClient policyLister policylister.PolicyLister registration *MutationWebhookRegistration - violationBuilder violation.PolicyViolationGenerator + violationBuilder policyviolation.Generator + eventBuilder event.Generator logger *log.Logger } @@ -35,7 +37,8 @@ func CreateMutationWebhook( clientConfig *rest.Config, kubeclient *kubeclient.KubeClient, policyLister policylister.PolicyLister, - violationBuilder violation.PolicyViolationGenerator, + violationBuilder policyviolation.Generator, + eventController event.Generator, logger *log.Logger) (*MutationWebhook, error) { if clientConfig == nil || kubeclient == nil { return nil, errors.New("Some parameters are not set") @@ -59,6 +62,7 @@ func CreateMutationWebhook( policyLister: policyLister, registration: registration, violationBuilder: violationBuilder, + eventBuilder: eventController, logger: logger, }, nil }