2019-08-08 13:59:50 -07:00
package policyviolation
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1"
kyvernoclient "github.com/nirmata/kyverno/pkg/clientNew/clientset/versioned"
"github.com/nirmata/kyverno/pkg/clientNew/clientset/versioned/scheme"
informer "github.com/nirmata/kyverno/pkg/clientNew/informers/externalversions/kyverno/v1alpha1"
lister "github.com/nirmata/kyverno/pkg/clientNew/listers/kyverno/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
const (
// maxRetries is the number of times a PolicyViolation will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a deployment is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
var controllerKind = kyverno . SchemeGroupVersion . WithKind ( "PolicyViolation" )
2019-08-12 10:52:02 -07:00
// PolicyViolationController manages the policy violation resource
// - sync the lastupdate time
// - check if the resource is active
2019-08-08 13:59:50 -07:00
type PolicyViolationController struct {
2019-08-12 10:52:02 -07:00
client * client . Client
2019-08-08 13:59:50 -07:00
kyvernoClient * kyvernoclient . Clientset
eventRecorder record . EventRecorder
syncHandler func ( pKey string ) error
enqueuePolicyViolation func ( policy * kyverno . PolicyViolation )
// Policys that need to be synced
queue workqueue . RateLimitingInterface
// pvLister can list/get policy violation from the shared informer's store
pvLister lister . PolicyViolationLister
// pLister can list/get policy from the shared informer's store
pLister lister . PolicyLister
// pListerSynced returns true if the Policy store has been synced at least once
pListerSynced cache . InformerSynced
// pvListerSynced retrns true if the Policy store has been synced at least once
pvListerSynced cache . InformerSynced
//pvControl is used for updating status/cleanup policy violation
pvControl PVControlInterface
}
//NewPolicyViolationController creates a new NewPolicyViolationController
func NewPolicyViolationController ( client * client . Client , kyvernoClient * kyvernoclient . Clientset , pInformer informer . PolicyInformer , pvInformer informer . PolicyViolationInformer ) ( * PolicyViolationController , error ) {
// Event broad caster
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
eventInterface , err := client . GetEventsInterface ( )
if err != nil {
return nil , err
}
eventBroadcaster . StartRecordingToSink ( & typedcorev1 . EventSinkImpl { Interface : eventInterface } )
pvc := PolicyViolationController {
kyvernoClient : kyvernoClient ,
2019-08-12 10:52:02 -07:00
client : client ,
2019-08-08 13:59:50 -07:00
eventRecorder : eventBroadcaster . NewRecorder ( scheme . Scheme , v1 . EventSource { Component : "policyviolation_controller" } ) ,
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "policyviolation" ) ,
}
pvc . pvControl = RealPVControl { Client : kyvernoClient , Recorder : pvc . eventRecorder }
pvInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : pvc . addPolicyViolation ,
UpdateFunc : pvc . updatePolicyViolation ,
DeleteFunc : pvc . deletePolicyViolation ,
} )
pvc . enqueuePolicyViolation = pvc . enqueue
pvc . syncHandler = pvc . syncPolicyViolation
pvc . pLister = pInformer . Lister ( )
pvc . pvLister = pvInformer . Lister ( )
pvc . pListerSynced = pInformer . Informer ( ) . HasSynced
2019-08-12 10:02:07 -07:00
pvc . pvListerSynced = pvInformer . Informer ( ) . HasSynced
2019-08-08 13:59:50 -07:00
return & pvc , nil
}
func ( pvc * PolicyViolationController ) addPolicyViolation ( obj interface { } ) {
pv := obj . ( * kyverno . PolicyViolation )
glog . V ( 4 ) . Infof ( "Adding PolicyViolation %s" , pv . Name )
pvc . enqueuePolicyViolation ( pv )
}
func ( pvc * PolicyViolationController ) updatePolicyViolation ( old , cur interface { } ) {
oldPv := old . ( * kyverno . PolicyViolation )
curPv := cur . ( * kyverno . PolicyViolation )
glog . V ( 4 ) . Infof ( "Updating Policy Violation %s" , oldPv . Name )
if err := pvc . syncLastUpdateTimeStatus ( curPv , oldPv ) ; err != nil {
glog . Errorf ( "Failed to update lastUpdateTime in PolicyViolation %s status: %v" , curPv . Name , err )
}
pvc . enqueuePolicyViolation ( curPv )
}
func ( pvc * PolicyViolationController ) deletePolicyViolation ( obj interface { } ) {
pv , ok := obj . ( * kyverno . PolicyViolation )
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
glog . Info ( fmt . Errorf ( "Couldn't get object from tombstone %#v" , obj ) )
return
}
pv , ok = tombstone . Obj . ( * kyverno . PolicyViolation )
if ! ok {
glog . Info ( fmt . Errorf ( "Tombstone contained object that is not a PolicyViolation %#v" , obj ) )
return
}
}
glog . V ( 4 ) . Infof ( "Deleting PolicyViolation %s" , pv . Name )
pvc . enqueuePolicyViolation ( pv )
}
func ( pvc * PolicyViolationController ) enqueue ( policyViolation * kyverno . PolicyViolation ) {
key , err := cache . MetaNamespaceKeyFunc ( policyViolation )
if err != nil {
glog . Error ( err )
return
}
pvc . queue . Add ( key )
}
// Run begins watching and syncing.
func ( pvc * PolicyViolationController ) Run ( workers int , stopCh <- chan struct { } ) {
defer utilruntime . HandleCrash ( )
defer pvc . queue . ShutDown ( )
glog . Info ( "Starting policyviolation controller" )
defer glog . Info ( "Shutting down policyviolation controller" )
if ! cache . WaitForCacheSync ( stopCh , pvc . pListerSynced , pvc . pvListerSynced ) {
return
}
for i := 0 ; i < workers ; i ++ {
go wait . Until ( pvc . worker , time . Second , stopCh )
}
<- stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( pvc * PolicyViolationController ) worker ( ) {
for pvc . processNextWorkItem ( ) {
}
}
func ( pvc * PolicyViolationController ) processNextWorkItem ( ) bool {
key , quit := pvc . queue . Get ( )
if quit {
return false
}
defer pvc . queue . Done ( key )
err := pvc . syncHandler ( key . ( string ) )
pvc . handleErr ( err , key )
return true
}
func ( pvc * PolicyViolationController ) handleErr ( err error , key interface { } ) {
if err == nil {
pvc . queue . Forget ( key )
return
}
if pvc . queue . NumRequeues ( key ) < maxRetries {
glog . V ( 2 ) . Infof ( "Error syncing PolicyViolation %v: %v" , key , err )
pvc . queue . AddRateLimited ( key )
return
}
utilruntime . HandleError ( err )
glog . V ( 2 ) . Infof ( "Dropping policyviolation %q out of the queue: %v" , key , err )
pvc . queue . Forget ( key )
}
func ( pvc * PolicyViolationController ) syncPolicyViolation ( key string ) error {
startTime := time . Now ( )
glog . V ( 4 ) . Infof ( "Started syncing policy violation %q (%v)" , key , startTime )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing policy violation %q (%v)" , key , time . Since ( startTime ) )
} ( )
policyViolation , err := pvc . pvLister . Get ( key )
if errors . IsNotFound ( err ) {
glog . V ( 2 ) . Infof ( "PolicyViolation %v has been deleted" , key )
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
pv := policyViolation . DeepCopy ( )
// TODO: Update Status to update ObserverdGeneration
2019-08-12 10:52:02 -07:00
// TODO: check if the policy violation refers to a resource thats active ?
// TODO: additional check on deleted webhook for a resource, to delete a policy violation it has a policy violation
// list the resource with label selectors, but this can be expensive for each delete request of a resource
if err := pvc . syncActiveResource ( pv ) ; err != nil {
glog . V ( 4 ) . Infof ( "not syncing policy violation status" )
return err
}
2019-08-08 13:59:50 -07:00
return pvc . syncStatusOnly ( pv )
}
2019-08-12 10:52:02 -07:00
func ( pvc * PolicyViolationController ) syncActiveResource ( curPv * kyverno . PolicyViolation ) error {
// check if the resource is active or not ?
rspec := curPv . Spec . ResourceSpec
// get resource
_ , err := pvc . client . GetResource ( rspec . Kind , rspec . Namespace , rspec . Name )
if errors . IsNotFound ( err ) {
// TODO: does it help to retry?
// resource is not found
// remove the violation
if err := pvc . pvControl . RemovePolicyViolation ( curPv . Name ) ; err != nil {
glog . Infof ( "unable to delete the policy violation %s: %v" , curPv . Name , err )
return err
}
glog . V ( 4 ) . Infof ( "removing policy violation %s as the corresponding resource %s/%s/%s does not exist anymore" , curPv . Name , rspec . Kind , rspec . Namespace , rspec . Name )
}
if err != nil {
glog . V ( 4 ) . Infof ( "error while retrieved resource %s/%s/%s: %v" , rspec . Kind , rspec . Namespace , rspec . Name , err )
return err
}
return nil
}
2019-08-08 13:59:50 -07:00
//syncStatusOnly updates the policyviolation status subresource
// status:
func ( pvc * PolicyViolationController ) syncStatusOnly ( curPv * kyverno . PolicyViolation ) error {
// newStatus := calculateStatus(pv)
return nil
}
//TODO: think this through again
//syncLastUpdateTimeStatus updates the policyviolation lastUpdateTime if anything in ViolationSpec changed
// - lastUpdateTime : (time stamp when the policy violation changed)
func ( pvc * PolicyViolationController ) syncLastUpdateTimeStatus ( curPv * kyverno . PolicyViolation , oldPv * kyverno . PolicyViolation ) error {
// check if there is any change in policy violation information
if ! updated ( curPv , oldPv ) {
return nil
}
// update the lastUpdateTime
newPolicyViolation := curPv
newPolicyViolation . Status = kyverno . PolicyViolationStatus { LastUpdateTime : metav1 . Now ( ) }
return pvc . pvControl . UpdateStatusPolicyViolation ( newPolicyViolation )
}
func updated ( curPv * kyverno . PolicyViolation , oldPv * kyverno . PolicyViolation ) bool {
return ! reflect . DeepEqual ( curPv . Spec , oldPv . Spec )
//TODO check if owner reference changed, then should we update the lastUpdateTime as well ?
}
type PVControlInterface interface {
UpdateStatusPolicyViolation ( newPv * kyverno . PolicyViolation ) error
RemovePolicyViolation ( name string ) error
}
// RealPVControl is the default implementation of PVControlInterface.
type RealPVControl struct {
Client kyvernoclient . Interface
Recorder record . EventRecorder
}
//UpdateStatusPolicyViolation updates the status for policy violation
func ( r RealPVControl ) UpdateStatusPolicyViolation ( newPv * kyverno . PolicyViolation ) error {
_ , err := r . Client . KyvernoV1alpha1 ( ) . PolicyViolations ( ) . UpdateStatus ( newPv )
return err
}
//RemovePolicyViolation removes the policy violation
func ( r RealPVControl ) RemovePolicyViolation ( name string ) error {
2019-08-12 10:52:02 -07:00
return r . Client . KyvernoV1alpha1 ( ) . PolicyViolations ( ) . Delete ( name , & metav1 . DeleteOptions { } )
2019-08-08 13:59:50 -07:00
}