diff --git a/pkg/api/kyverno/v1alpha1/types.go b/pkg/api/kyverno/v1alpha1/types.go index ffad06ca80..54a1f7f093 100644 --- a/pkg/api/kyverno/v1alpha1/types.go +++ b/pkg/api/kyverno/v1alpha1/types.go @@ -109,8 +109,8 @@ type PolicyList struct { type PolicyViolation struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec PolicyViolationSpec `json:"spec"` - Status string `json:"status"` + Spec PolicyViolationSpec `json:"spec"` + Status PolicyViolationStatus `json:"status"` } // PolicyViolationSpec describes policy behavior by its rules @@ -127,12 +127,21 @@ type ResourceSpec struct { Name string `json:"name"` } +// ViolatedRule stores the information regarding the rule type ViolatedRule struct { Name string `json:"name"` Type string `json:"type"` Message string `json:"message"` } +//PolicyViolationStatus provides information regarding policyviolation status +// status: +// LastUpdateTime : the time the polivy violation was updated +type PolicyViolationStatus struct { + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + //TODO: having user information regarding the owner of resource can be helpful +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // PolicyViolationList is a list of Policy Violation diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index ad5f53360d..4022a8a9be 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -30,7 +30,7 @@ import ( ) const ( - // maxRetries is the number of times a Polict will be retried before it is dropped out of the queue. + // maxRetries is the number of times a Policy will be retried before it is dropped out of the queue. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times // a deployment is going to be requeued: // @@ -44,12 +44,12 @@ var controllerKind = kyverno.SchemeGroupVersion.WithKind("Policy") // in the system with the corresponding policy violations type PolicyController struct { client *client.Client - kyvernoclient *kyvernoclient.Clientset + kyvernoClient *kyvernoclient.Clientset eventRecorder record.EventRecorder syncHandler func(pKey string) error enqueuePolicy func(policy *kyverno.Policy) - //pvControl is used for adoptin/releasing replica sets + //pvControl is used for adoptin/releasing policy violation pvControl PVControlInterface // Policys that need to be synced queue workqueue.RateLimitingInterface @@ -76,7 +76,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, client *client. pc := PolicyController{ client: client, - kyvernoclient: kyvernoClient, + kyvernoClient: kyvernoClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policy_controller"}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"), } @@ -316,7 +316,7 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { glog.Info("Starting policy controller") defer glog.Info("Shutting down policy controller") - if !cache.WaitForCacheSync(stopCh, pc.pListerSynced) { + if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced) { return } for i := 0; i < workers; i++ { @@ -358,7 +358,7 @@ func (pc *PolicyController) handleErr(err error, key interface{}) { } utilruntime.HandleError(err) - glog.V(2).Infof("Dropping deployment %q out of the queue: %v", key, err) + glog.V(2).Infof("Dropping policy %q out of the queue: %v", key, err) pc.queue.Forget(key) } @@ -391,7 +391,9 @@ func (pc *PolicyController) syncPolicy(key string) error { return pc.syncStatusOnly(p, pvList) } -//TODO +//syncStatusOnly updates the policy status subresource +// status: +// - violations : (count of the resources that violate this policy ) func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error { newStatus := calculateStatus(pvList) if reflect.DeepEqual(newStatus, p.Status) { @@ -401,7 +403,7 @@ func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno. // update status newPolicy := p newPolicy.Status = newStatus - _, err := pc.kyvernoclient.KyvernoV1alpha1().Policies().UpdateStatus(newPolicy) + _, err := pc.kyvernoClient.KyvernoV1alpha1().Policies().UpdateStatus(newPolicy) return err } @@ -436,7 +438,7 @@ func (pc *PolicyController) getPolicyViolationsForPolicy(p *kyverno.Policy) ([]* } canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := pc.kyvernoclient.KyvernoV1alpha1().Policies().Get(p.Name, metav1.GetOptions{}) + fresh, err := pc.kyvernoClient.KyvernoV1alpha1().Policies().Get(p.Name, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/pkg/policyviolation/controller.go b/pkg/policyviolation/controller.go new file mode 100644 index 0000000000..86922390c9 --- /dev/null +++ b/pkg/policyviolation/controller.go @@ -0,0 +1,261 @@ +package policyviolation + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/glog" + kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1" + kyvernoclient "github.com/nirmata/kyverno/pkg/clientNew/clientset/versioned" + "github.com/nirmata/kyverno/pkg/clientNew/clientset/versioned/scheme" + informer "github.com/nirmata/kyverno/pkg/clientNew/informers/externalversions/kyverno/v1alpha1" + lister "github.com/nirmata/kyverno/pkg/clientNew/listers/kyverno/v1alpha1" + client "github.com/nirmata/kyverno/pkg/dclient" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +const ( + // maxRetries is the number of times a PolicyViolation will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a deployment is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +var controllerKind = kyverno.SchemeGroupVersion.WithKind("PolicyViolation") + +type PolicyViolationController struct { + kyvernoClient *kyvernoclient.Clientset + eventRecorder record.EventRecorder + syncHandler func(pKey string) error + enqueuePolicyViolation func(policy *kyverno.PolicyViolation) + // Policys that need to be synced + queue workqueue.RateLimitingInterface + // pvLister can list/get policy violation from the shared informer's store + pvLister lister.PolicyViolationLister + // pLister can list/get policy from the shared informer's store + pLister lister.PolicyLister + // pListerSynced returns true if the Policy store has been synced at least once + pListerSynced cache.InformerSynced + // pvListerSynced retrns true if the Policy store has been synced at least once + pvListerSynced cache.InformerSynced + //pvControl is used for updating status/cleanup policy violation + pvControl PVControlInterface +} + +//NewPolicyViolationController creates a new NewPolicyViolationController +func NewPolicyViolationController(client *client.Client, kyvernoClient *kyvernoclient.Clientset, pInformer informer.PolicyInformer, pvInformer informer.PolicyViolationInformer) (*PolicyViolationController, error) { + // Event broad caster + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventInterface, err := client.GetEventsInterface() + if err != nil { + return nil, err + } + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: eventInterface}) + + pvc := PolicyViolationController{ + kyvernoClient: kyvernoClient, + + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policyviolation_controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policyviolation"), + } + pvc.pvControl = RealPVControl{Client: kyvernoClient, Recorder: pvc.eventRecorder} + pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pvc.addPolicyViolation, + UpdateFunc: pvc.updatePolicyViolation, + DeleteFunc: pvc.deletePolicyViolation, + }) + + pvc.enqueuePolicyViolation = pvc.enqueue + pvc.syncHandler = pvc.syncPolicyViolation + + pvc.pLister = pInformer.Lister() + pvc.pvLister = pvInformer.Lister() + pvc.pListerSynced = pInformer.Informer().HasSynced + pvc.pvListerSynced = pInformer.Informer().HasSynced + + return &pvc, nil +} + +func (pvc *PolicyViolationController) addPolicyViolation(obj interface{}) { + pv := obj.(*kyverno.PolicyViolation) + glog.V(4).Infof("Adding PolicyViolation %s", pv.Name) + pvc.enqueuePolicyViolation(pv) +} + +func (pvc *PolicyViolationController) updatePolicyViolation(old, cur interface{}) { + oldPv := old.(*kyverno.PolicyViolation) + curPv := cur.(*kyverno.PolicyViolation) + glog.V(4).Infof("Updating Policy Violation %s", oldPv.Name) + if err := pvc.syncLastUpdateTimeStatus(curPv, oldPv); err != nil { + glog.Errorf("Failed to update lastUpdateTime in PolicyViolation %s status: %v", curPv.Name, err) + } + pvc.enqueuePolicyViolation(curPv) +} + +func (pvc *PolicyViolationController) deletePolicyViolation(obj interface{}) { + pv, ok := obj.(*kyverno.PolicyViolation) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Info(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + pv, ok = tombstone.Obj.(*kyverno.PolicyViolation) + if !ok { + glog.Info(fmt.Errorf("Tombstone contained object that is not a PolicyViolation %#v", obj)) + return + } + } + glog.V(4).Infof("Deleting PolicyViolation %s", pv.Name) + pvc.enqueuePolicyViolation(pv) +} + +func (pvc *PolicyViolationController) enqueue(policyViolation *kyverno.PolicyViolation) { + key, err := cache.MetaNamespaceKeyFunc(policyViolation) + if err != nil { + glog.Error(err) + return + } + pvc.queue.Add(key) +} + +// Run begins watching and syncing. +func (pvc *PolicyViolationController) Run(workers int, stopCh <-chan struct{}) { + + defer utilruntime.HandleCrash() + defer pvc.queue.ShutDown() + + glog.Info("Starting policyviolation controller") + defer glog.Info("Shutting down policyviolation controller") + + if !cache.WaitForCacheSync(stopCh, pvc.pListerSynced, pvc.pvListerSynced) { + return + } + for i := 0; i < workers; i++ { + go wait.Until(pvc.worker, time.Second, stopCh) + } + <-stopCh +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (pvc *PolicyViolationController) worker() { + for pvc.processNextWorkItem() { + } +} + +func (pvc *PolicyViolationController) processNextWorkItem() bool { + key, quit := pvc.queue.Get() + if quit { + return false + } + defer pvc.queue.Done(key) + + err := pvc.syncHandler(key.(string)) + pvc.handleErr(err, key) + + return true +} + +func (pvc *PolicyViolationController) handleErr(err error, key interface{}) { + if err == nil { + pvc.queue.Forget(key) + return + } + + if pvc.queue.NumRequeues(key) < maxRetries { + glog.V(2).Infof("Error syncing PolicyViolation %v: %v", key, err) + pvc.queue.AddRateLimited(key) + return + } + + utilruntime.HandleError(err) + glog.V(2).Infof("Dropping policyviolation %q out of the queue: %v", key, err) + pvc.queue.Forget(key) +} + +func (pvc *PolicyViolationController) syncPolicyViolation(key string) error { + startTime := time.Now() + glog.V(4).Infof("Started syncing policy violation %q (%v)", key, startTime) + defer func() { + glog.V(4).Infof("Finished syncing policy violation %q (%v)", key, time.Since(startTime)) + }() + policyViolation, err := pvc.pvLister.Get(key) + if errors.IsNotFound(err) { + glog.V(2).Infof("PolicyViolation %v has been deleted", key) + return nil + } + + if err != nil { + return err + } + + // Deep-copy otherwise we are mutating our cache. + // TODO: Deep-copy only when needed. + pv := policyViolation.DeepCopy() + // TODO: Update Status to update ObserverdGeneration + + return pvc.syncStatusOnly(pv) +} + +//syncStatusOnly updates the policyviolation status subresource +// status: +func (pvc *PolicyViolationController) syncStatusOnly(curPv *kyverno.PolicyViolation) error { + // newStatus := calculateStatus(pv) + return nil +} + +//TODO: think this through again +//syncLastUpdateTimeStatus updates the policyviolation lastUpdateTime if anything in ViolationSpec changed +// - lastUpdateTime : (time stamp when the policy violation changed) +func (pvc *PolicyViolationController) syncLastUpdateTimeStatus(curPv *kyverno.PolicyViolation, oldPv *kyverno.PolicyViolation) error { + // check if there is any change in policy violation information + if !updated(curPv, oldPv) { + return nil + } + // update the lastUpdateTime + newPolicyViolation := curPv + newPolicyViolation.Status = kyverno.PolicyViolationStatus{LastUpdateTime: metav1.Now()} + + return pvc.pvControl.UpdateStatusPolicyViolation(newPolicyViolation) +} + +func updated(curPv *kyverno.PolicyViolation, oldPv *kyverno.PolicyViolation) bool { + return !reflect.DeepEqual(curPv.Spec, oldPv.Spec) + //TODO check if owner reference changed, then should we update the lastUpdateTime as well ? +} + +type PVControlInterface interface { + UpdateStatusPolicyViolation(newPv *kyverno.PolicyViolation) error + RemovePolicyViolation(name string) error +} + +// RealPVControl is the default implementation of PVControlInterface. +type RealPVControl struct { + Client kyvernoclient.Interface + Recorder record.EventRecorder +} + +//UpdateStatusPolicyViolation updates the status for policy violation +func (r RealPVControl) UpdateStatusPolicyViolation(newPv *kyverno.PolicyViolation) error { + _, err := r.Client.KyvernoV1alpha1().PolicyViolations().UpdateStatus(newPv) + return err +} + +//RemovePolicyViolation removes the policy violation +func (r RealPVControl) RemovePolicyViolation(name string) error { + return nil +}