From 135f241a4aae7d832e3a37caa333d266f394ccc3 Mon Sep 17 00:00:00 2001 From: shivkumar dudhani Date: Fri, 9 Aug 2019 13:41:56 -0700 Subject: [PATCH] event generator cleanup --- main.go | 9 +++-- pkg/event/controller.go | 83 ++++++++++++++++++-------------------- pkg/webhooks/mutation.go | 7 +--- pkg/webhooks/server.go | 6 +-- pkg/webhooks/validation.go | 76 ++++++++++++---------------------- 5 files changed, 76 insertions(+), 105 deletions(-) diff --git a/main.go b/main.go index ee3a7745d0..45e6a026ad 100644 --- a/main.go +++ b/main.go @@ -62,7 +62,7 @@ func main() { glog.Fatalf("Error creating policy sharedinformer: %v\n", err) } kubeInformer := utils.NewKubeInformerFactory(clientConfig) - eventController := event.NewEventController(client, policyInformerFactory) + egen := event.NewEventGenerator(client, policyInformerFactory) // violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformerFactory, eventController) annotationsController := annotations.NewAnnotationControler(client) // policyController := controller.NewPolicyController( @@ -78,7 +78,7 @@ func main() { if err != nil { glog.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err) } - server, err := webhooks.NewWebhookServer(client, tlsPair, policyInformerFactory, eventController, nil, annotationsController, filterK8Resources) + server, err := webhooks.NewWebhookServer(client, tlsPair, policyInformerFactory, egen, nil, annotationsController, filterK8Resources) if err != nil { glog.Fatalf("Unable to create webhook server: %v\n", err) } @@ -98,11 +98,12 @@ func main() { pInformer.Start(stopCh) go pc.Run(1, stopCh) go pvc.Run(1, stopCh) + go egen.Run(1, stopCh) //TODO add WG for the go routine? //-------- policyInformerFactory.Run(stopCh) kubeInformer.Start(stopCh) - eventController.Run(stopCh) + // eventController.Run(stopCh) // genControler.Run(stopCh) annotationsController.Run(stopCh) // if err = policyController.Run(stopCh); err != nil { @@ -113,7 +114,7 @@ func main() { <-stopCh server.Stop() // genControler.Stop() - eventController.Stop() + // eventController.Stop() annotationsController.Stop() // policyController.Stop() } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index 5c5e9fca0b..bcbde5fb91 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -18,35 +18,31 @@ import ( "k8s.io/client-go/util/workqueue" ) -type controller struct { +//Generator generate events +type Generator struct { client *client.Client policyLister v1alpha1.PolicyLister queue workqueue.RateLimitingInterface recorder record.EventRecorder } -//Generator to generate event -type Generator interface { +//Interface to generate event +type Interface interface { Add(infoList ...*Info) } -//Controller api -type Controller interface { - Generator - Run(stopCh <-chan struct{}) - Stop() -} +//NewEventGenerator to generate a new event controller +func NewEventGenerator(client *client.Client, + shareInformer sharedinformer.PolicyInformer) *Generator { -//NewEventController to generate a new event controller -func NewEventController(client *client.Client, - shareInformer sharedinformer.PolicyInformer) Controller { - - return &controller{ + gen := Generator{ client: client, policyLister: shareInformer.GetLister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), recorder: initRecorder(client), } + + return &gen } func initRecorder(client *client.Client) record.EventRecorder { @@ -72,67 +68,67 @@ func initRecorder(client *client.Client) record.EventRecorder { return recorder } -func (c *controller) Add(infos ...*Info) { +//Add queues an event for generation +func (gen *Generator) Add(infos ...*Info) { for _, info := range infos { - c.queue.Add(*info) + gen.queue.Add(*info) } } -func (c *controller) Run(stopCh <-chan struct{}) { +// Run begins generator +func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + glog.Info("Starting event generator") + defer glog.Info("Shutting down event generator") - for i := 0; i < eventWorkerThreadCount; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + for i := 0; i < workers; i++ { + go wait.Until(gen.runWorker, time.Second, stopCh) } - glog.Info("Started eventbuilder controller workers") + <-stopCh + } -func (c *controller) Stop() { - defer c.queue.ShutDown() - glog.Info("Shutting down eventbuilder controller workers") -} - -func (c *controller) runWorker() { - for c.processNextWorkItem() { +func (gen *Generator) runWorker() { + for gen.processNextWorkItem() { } } -func (c *controller) handleErr(err error, key interface{}) { +func (gen *Generator) handleErr(err error, key interface{}) { if err == nil { - c.queue.Forget(key) + gen.queue.Forget(key) return } // This controller retries if something goes wrong. After that, it stops trying. - if c.queue.NumRequeues(key) < workQueueRetryLimit { + if gen.queue.NumRequeues(key) < workQueueRetryLimit { glog.Warningf("Error syncing events %v: %v", key, err) // Re-enqueue the key rate limited. Based on the rate limiter on the // queue and the re-enqueue history, the key will be processed later again. - c.queue.AddRateLimited(key) + gen.queue.AddRateLimited(key) return } - c.queue.Forget(key) + gen.queue.Forget(key) glog.Error(err) glog.Warningf("Dropping the key out of the queue: %v", err) } -func (c *controller) processNextWorkItem() bool { - obj, shutdown := c.queue.Get() +func (gen *Generator) processNextWorkItem() bool { + obj, shutdown := gen.queue.Get() if shutdown { return false } err := func(obj interface{}) error { - defer c.queue.Done(obj) + defer gen.queue.Done(obj) var key Info var ok bool if key, ok = obj.(Info); !ok { - c.queue.Forget(obj) + gen.queue.Forget(obj) glog.Warningf("Expecting type info by got %v\n", obj) return nil } - err := c.syncHandler(key) - c.handleErr(err, obj) + err := gen.syncHandler(key) + gen.handleErr(err, obj) return nil }(obj) if err != nil { @@ -142,20 +138,20 @@ func (c *controller) processNextWorkItem() bool { return true } -func (c *controller) syncHandler(key Info) error { +func (gen *Generator) syncHandler(key Info) error { var robj runtime.Object var err error switch key.Kind { case "Policy": //TODO: policy is clustered resource so wont need namespace - robj, err = c.policyLister.Get(key.Name) + robj, err = gen.policyLister.Get(key.Name) if err != nil { glog.Errorf("Error creating event: unable to get policy %s, will retry ", key.Name) return err } default: - robj, err = c.client.GetResource(key.Kind, key.Namespace, key.Name) + robj, err = gen.client.GetResource(key.Kind, key.Namespace, key.Name) if err != nil { glog.Errorf("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name) return err @@ -163,13 +159,14 @@ func (c *controller) syncHandler(key Info) error { } if key.Reason == PolicyApplied.String() { - c.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message) + gen.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message) } else { - c.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message) + gen.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message) } return nil } +//TODO: check if we need this ? //NewEvent returns a new event func NewEvent(rkind string, rnamespace string, rname string, reason Reason, message MsgKey, args ...interface{}) *Info { msgText, err := getEventMsg(message, args...) diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index db94cab820..d518206289 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -43,11 +43,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) *v1be if !StringInSlice(request.Kind.Kind, getApplicableKindsForPolicy(policy)) { continue } - policyInfo := info.NewPolicyInfo(policy.Name, - resource.GetKind(), - resource.GetName(), - resource.GetNamespace(), - policy.Spec.ValidationFailureAction) + policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction) glog.V(4).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation) @@ -72,6 +68,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) *v1be // ADD ANNOTATIONS // ADD EVENTS + // ADD POLICY VIOLATIONS ok, msg := isAdmSuccesful(policyInfos) if ok { diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 6c5ce351f7..8d4a339a1e 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -29,7 +29,7 @@ type WebhookServer struct { server http.Server client *client.Client policyLister v1alpha1.PolicyLister - eventController event.Generator + eventGen event.Interface violationBuilder violation.Generator annotationsController annotations.Controller filterK8Resources []utils.K8Resource @@ -41,7 +41,7 @@ func NewWebhookServer( client *client.Client, tlsPair *tlsutils.TlsPemPair, shareInformer sharedinformer.PolicyInformer, - eventController event.Generator, + eventGen event.Interface, violationBuilder violation.Generator, annotationsController annotations.Controller, filterK8Resources string) (*WebhookServer, error) { @@ -60,7 +60,7 @@ func NewWebhookServer( ws := &WebhookServer{ client: client, policyLister: shareInformer.GetLister(), - eventController: eventController, + eventGen: eventGen, violationBuilder: violationBuilder, annotationsController: annotationsController, filterK8Resources: utils.ParseKinds(filterK8Resources), diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index 93c13955cc..cbe6457ee3 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -7,18 +7,21 @@ import ( v1beta1 "k8s.io/api/admission/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" ) // HandleValidation handles validating webhook admission request // If there are no errors in validating rule we apply generation rules func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse { + // var patches [][]byte + var policyInfos []*info.PolicyInfo glog.V(4).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) - policyInfos := []*info.PolicyInfo{} policies, err := ws.policyLister.List(labels.NewSelector()) if err != nil { + //TODO check if the CRD is created ? // Unable to connect to policy Lister to access policies glog.Error("Unable to connect to policy controller to access policies. Validation Rules are NOT being applied") glog.Warning(err) @@ -27,36 +30,29 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1 } } - rname := engine.ParseNameFromObject(request.Object.Raw) - rns := engine.ParseNamespaceFromObject(request.Object.Raw) - rkind := request.Kind.Kind - if rkind == "" { - glog.Errorf("failed to parse KIND from request: Namespace=%s Name=%s UID=%s patchOperation=%s\n", request.Namespace, request.Name, request.UID, request.Operation) + resource, err := convertToUnstructured(request.Object.Raw) + if err != nil { + glog.Errorf("unable to convert raw resource to unstructured: %v", err) } + //TODO: check if resource gvk is available in raw resource, + // if not then set it from the api request + resource.SetGroupVersionKind(schema.GroupVersionKind{Group: request.Kind.Group, Version: request.Kind.Version, Kind: request.Kind.Kind}) + //TODO: check if the name and namespace is also passed right in the resource? + // all the patches to be applied on the resource for _, policy := range policies { if !StringInSlice(request.Kind.Kind, getApplicableKindsForPolicy(policy)) { continue } - //TODO: HACK Check if an update of annotations - if checkIfOnlyAnnotationsUpdate(request) { - // allow the update of resource to add annotations - return &v1beta1.AdmissionResponse{ - Allowed: true, - } - } - policyInfo := info.NewPolicyInfo(policy.Name, - rkind, - rname, - rns, - policy.Spec.ValidationFailureAction) + policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction) - glog.V(3).Infof("Handling validation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", - request.Kind.Kind, rns, rname, request.UID, request.Operation) + glog.V(4).Infof("Handling validation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", + resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation) + + glog.V(4).Infof("Applying policy %s with %d rules\n", policy.ObjectMeta.Name, len(policy.Spec.Rules)) - glog.Infof("Validating resource %s/%s/%s with policy %s with %d rules", rkind, rns, rname, policy.ObjectMeta.Name, len(policy.Spec.Rules)) ruleInfos, err := engine.Validate(*policy, request.Object.Raw, request.Kind) if err != nil { // This is not policy error @@ -68,42 +64,22 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1 policyInfo.AddRuleInfos(ruleInfos) if !policyInfo.IsSuccessful() { - glog.Infof("Failed to apply policy %s on resource %s/%s", policy.Name, rname, rns) + glog.Infof("Failed to apply policy %s on resource %s/%s", policy.Name, resource.GetNamespace(), resource.GetName()) + glog.V(4).Info("Failed rule details") for _, r := range ruleInfos { - glog.Warningf("%s: %s\n", r.Name, r.Msgs) - } - } else { - // CleanUp Violations if exists - err := ws.violationBuilder.RemoveInactiveViolation(policy.Name, request.Kind.Kind, rns, rname, info.Validation) - if err != nil { - glog.Info(err) - } - - if len(ruleInfos) > 0 { - glog.Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, rname, rns) + glog.V(4).Infof("%s: %s\n", r.Name, r.Msgs) } + continue + } + if len(ruleInfos) > 0 { + glog.V(4).Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName()) } policyInfos = append(policyInfos, policyInfo) - // annotations - annPatch := addAnnotationsToResource(request.Object.Raw, policyInfo, info.Validation) - if annPatch != nil { - ws.annotationsController.Add(rkind, rns, rname, annPatch) - } } - if len(policyInfos) > 0 && len(policyInfos[0].Rules) != 0 { - eventsInfo, violations := newEventInfoFromPolicyInfo(policyInfos, (request.Operation == v1beta1.Update), info.Validation) - // If the validationFailureAction flag is set "audit", - // then we dont block the request and report the violations - ws.violationBuilder.Add(violations...) - ws.eventController.Add(eventsInfo...) - } - // If Validation fails then reject the request + // ADD EVENTS + // ADD POLICY VIOLATIONS ok, msg := isAdmSuccesful(policyInfos) - // violations are created if "audit" flag is set - // and if there are any then we dont bock the resource creation - // Even if one the policy being applied - if !ok && toBlock(policyInfos) { return &v1beta1.AdmissionResponse{ Allowed: false,