diff --git a/pkg/api/kyverno/v1alpha1/types.go b/pkg/api/kyverno/v1alpha1/types.go index 691a0c1f11..44769bd446 100644 --- a/pkg/api/kyverno/v1alpha1/types.go +++ b/pkg/api/kyverno/v1alpha1/types.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -97,9 +95,9 @@ type PolicyStatus struct { // Count of resources for whom update/create api requests were blocked as the resoruce did not satisfy the policy rules ResourcesBlockedCount int `json:"resourcesBlockedCount"` // average time required to process the policy Mutation rules on a resource - AvgExecutionTimeMutation time.Duration `json:"averageMutationExecutionTime"` + AvgExecutionTimeMutation string `json:"averageMutationExecutionTime"` // average time required to process the policy Validation rules on a resource - AvgExecutionTimeValidation time.Duration `json:"averageValidationExecutionTime"` + AvgExecutionTimeValidation string `json:"averageValidationExecutionTime"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index 02cc1c072e..fccbed13c5 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -119,7 +119,8 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, client *client. pc.rm = NewResourceManager(30) // aggregator - pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer) + // pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer) + pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient) return &pc, nil } @@ -392,6 +393,8 @@ func (pc *PolicyController) syncPolicy(key string) error { policy, err := pc.pLister.Get(key) if errors.IsNotFound(err) { glog.V(2).Infof("Policy %v has been deleted", key) + // remove the recorded stats for the policy + pc.statusAggregator.RemovePolicyStats(key) return nil } @@ -412,15 +415,15 @@ func (pc *PolicyController) syncPolicy(key string) error { // report errors pc.report(policyInfos) // fetch the policy again via the aggreagator to remain consistent - return pc.statusAggregator.UpdateViolationCount(p.Name, pvList) - // return pc.syncStatusOnly(p, pvList) + // return pc.statusAggregator.UpdateViolationCount(p.Name, pvList) + return pc.syncStatusOnly(p, pvList) } //syncStatusOnly updates the policy status subresource // status: // - violations : (count of the resources that violate this policy ) func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error { - newStatus := calculateStatus(pvList) + newStatus := pc.calculateStatus(p.Name, pvList) if reflect.DeepEqual(newStatus, p.Status) { // no update to status return nil @@ -432,6 +435,21 @@ func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno. return err } +func (pc *PolicyController) calculateStatus(policyName string, pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus { + violationCount := len(pvList) + status := kyverno.PolicyStatus{ + ViolationCount: violationCount, + } + // get stats + stats := pc.statusAggregator.GetPolicyStats(policyName) + if stats != (PolicyStatInfo{}) { + status.RulesAppliedCount = stats.RulesAppliedCount + status.ResourcesBlockedCount = stats.ResourceBlocked + status.AvgExecutionTimeMutation = stats.MutationExecutionTime.String() + status.AvgExecutionTimeValidation = stats.ValidationExecutionTime.String() + } + return status +} func (pc *PolicyController) getPolicyViolationsForPolicy(p *kyverno.Policy) ([]*kyverno.PolicyViolation, error) { // List all PolicyViolation to find those we own but that no longer match our // selector. They will be orphaned by ClaimPolicyViolation(). diff --git a/pkg/policy/status.go b/pkg/policy/status.go index f19961dba5..60a6a6dbf0 100644 --- a/pkg/policy/status.go +++ b/pkg/policy/status.go @@ -1,59 +1,38 @@ package policy import ( - "fmt" - "reflect" "sync" "time" "github.com/golang/glog" - kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1" kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" - kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1" - kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1alpha1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" ) -// type PolicyStatus struct { -// // average time required to process the policy rules on a resource -// avgExecutionTime time.Duration -// // Count of rules that were applied succesfully -// rulesAppliedCount int -// // Count of resources for whom update/create api requests were blocked as the resoruce did not satisfy the policy rules -// resourcesBlockedCount int -// // Count of the resource for whom the mutation rules were applied succesfully -// resourcesMutatedCount int -// } - //PolicyStatusAggregator stores information abt aggregation type PolicyStatusAggregator struct { // time since we start aggregating the stats startTime time.Time // channel to recieve stats ch chan PolicyStat - // update policy status - psControl PStatusControlInterface - // pLister can list/get policy from the shared informer's store - pLister kyvernolister.PolicyLister - // pListerSynced returns true if the Policy store has been synced at least once - pListerSynced cache.InformerSynced - // UpdateViolationCount and SendStat can update same policy status - // we need to sync the updates using policyName - policyUpdateData sync.Map + //TODO: lock based on key, possibly sync.Map ? + //sync RW for policyData + mux sync.RWMutex + // stores aggregated stats for policy + policyData map[string]PolicyStatInfo } //NewPolicyStatAggregator returns a new policy status -func NewPolicyStatAggregator(client *kyvernoclient.Clientset, pInformer kyvernoinformer.PolicyInformer) *PolicyStatusAggregator { +func NewPolicyStatAggregator(client *kyvernoclient.Clientset, + +// pInformer kyvernoinformer.PolicyInformer +) *PolicyStatusAggregator { psa := PolicyStatusAggregator{ - startTime: time.Now(), - ch: make(chan PolicyStat), + startTime: time.Now(), + ch: make(chan PolicyStat), + policyData: map[string]PolicyStatInfo{}, } - psa.pLister = pInformer.Lister() - psa.pListerSynced = pInformer.Informer().HasSynced - psa.psControl = PSControl{Client: client} - //TODO: add WaitGroup return &psa } @@ -65,10 +44,11 @@ func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) { glog.V(4).Info("Shutting down aggregator for policy status stats") }() for i := 0; i < workers; i++ { - go wait.Until(psa.aggregate, time.Second, stopCh) + go wait.Until(psa.process, time.Second, stopCh) } } -func (psa *PolicyStatusAggregator) aggregate() { + +func (psa *PolicyStatusAggregator) process() { // As mutation and validation are handled seperately // ideally we need to combine the exection time from both for a policy // but its tricky to detect here the type of rules policy contains @@ -76,74 +56,96 @@ func (psa *PolicyStatusAggregator) aggregate() { // mutation & validation rules seperately for r := range psa.ch { glog.V(4).Infof("recieved policy stats %v", r) - if err := psa.updateStats(r); err != nil { - glog.Infof("Failed to update stats for policy %s: %v", r.PolicyName, err) - } + // if err := psa.updateStats(r); err != nil { + // glog.Infof("Failed to update stats for policy %s: %v", r.PolicyName, err) + // } + psa.aggregate(r) } - } -func (psa *PolicyStatusAggregator) updateStats(stats PolicyStat) error { +func (psa *PolicyStatusAggregator) aggregate(ps PolicyStat) { func() { - glog.V(4).Infof("lock updates for policy %s", stats.PolicyName) - // Lock the update for policy - psa.policyUpdateData.Store(stats.PolicyName, struct{}{}) + glog.V(4).Infof("write lock update policy %s", ps.PolicyName) + psa.mux.Lock() }() defer func() { - glog.V(4).Infof("Unlock updates for policy %s", stats.PolicyName) - psa.policyUpdateData.Delete(stats.PolicyName) + glog.V(4).Infof("write Unlock update policy %s", ps.PolicyName) + psa.mux.Unlock() }() - - // //wait for cache sync - // if !cache.WaitForCacheSync(nil, psa.pListerSynced) { - // glog.Infof("unable to sync cache for policy informer") - // return nil - // } - // get policy - policy, err := psa.pLister.Get(stats.PolicyName) - if err != nil { - glog.V(4).Infof("failed to get policy %s. Unable to update violation count: %v", stats.PolicyName, err) - return err + info, ok := psa.policyData[ps.PolicyName] + if !ok { + psa.policyData[ps.PolicyName] = ps.Stats + glog.V(4).Infof("added stats for policy %s", ps.PolicyName) + return } - newpolicy := policy - fmt.Println(newpolicy.ResourceVersion) - newpolicy.Status = kyverno.PolicyStatus{} - glog.V(4).Infof("updating stats for policy %s", policy.Name) - // rules applied count - newpolicy.Status.RulesAppliedCount = newpolicy.Status.RulesAppliedCount + stats.RulesAppliedCount - // resource blocked count - if stats.ResourceBlocked { - policy.Status.ResourcesBlockedCount++ + // aggregate + info.RulesAppliedCount = info.RulesAppliedCount + ps.Stats.RulesAppliedCount + if ps.Stats.ResourceBlocked == 1 { + info.ResourceBlocked++ } var zeroDuration time.Duration - if newpolicy.Status.AvgExecutionTimeMutation != zeroDuration { - // avg execution time for mutation rules - newpolicy.Status.AvgExecutionTimeMutation = (newpolicy.Status.AvgExecutionTimeMutation + stats.MutationExecutionTime) / 2 + if info.MutationExecutionTime != zeroDuration { + info.MutationExecutionTime = (info.MutationExecutionTime + ps.Stats.MutationExecutionTime) / 2 + glog.V(4).Infof("updated avg mutation time %v", info.MutationExecutionTime) } else { - newpolicy.Status.AvgExecutionTimeMutation = stats.MutationExecutionTime + info.MutationExecutionTime = ps.Stats.MutationExecutionTime } - if policy.Status.AvgExecutionTimeValidation != zeroDuration { - // avg execution time for validation rules - newpolicy.Status.AvgExecutionTimeValidation = (newpolicy.Status.AvgExecutionTimeValidation + stats.ValidationExecutionTime) / 2 + if info.ValidationExecutionTime != zeroDuration { + info.ValidationExecutionTime = (info.ValidationExecutionTime + ps.Stats.ValidationExecutionTime) / 2 + glog.V(4).Infof("updated avg validation time %v", info.ValidationExecutionTime) } else { - newpolicy.Status.AvgExecutionTimeValidation = stats.ValidationExecutionTime + info.ValidationExecutionTime = ps.Stats.ValidationExecutionTime } - return psa.psControl.UpdatePolicyStatus(newpolicy) + // update + psa.policyData[ps.PolicyName] = info + glog.V(4).Infof("updated stats for policy %s", ps.PolicyName) +} + +//GetPolicyStats returns the policy stats +func (psa *PolicyStatusAggregator) GetPolicyStats(policyName string) PolicyStatInfo { + func() { + glog.V(4).Infof("read lock update policy %s", policyName) + psa.mux.RLock() + }() + defer func() { + glog.V(4).Infof("read Unlock update policy %s", policyName) + psa.mux.RUnlock() + }() + glog.V(4).Infof("read stats for policy %s", policyName) + return psa.policyData[policyName] +} + +//RemovePolicyStats rmves policy stats records +func (psa *PolicyStatusAggregator) RemovePolicyStats(policyName string) { + func() { + glog.V(4).Infof("write lock update policy %s", policyName) + psa.mux.Lock() + }() + defer func() { + glog.V(4).Infof("write Unlock update policy %s", policyName) + psa.mux.Unlock() + }() + glog.V(4).Infof("removing stats for policy %s", policyName) + delete(psa.policyData, policyName) } //PolicyStatusInterface provides methods to modify policyStatus type PolicyStatusInterface interface { SendStat(stat PolicyStat) - UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error + // UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error } //PolicyStat stored stats for policy type PolicyStat struct { - PolicyName string + PolicyName string + Stats PolicyStatInfo +} + +type PolicyStatInfo struct { MutationExecutionTime time.Duration ValidationExecutionTime time.Duration RulesAppliedCount int - ResourceBlocked bool + ResourceBlocked int } //SendStat sends the stat information for aggregation @@ -153,62 +155,7 @@ func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) { psa.ch <- stat } -//UpdateViolationCount updates the active violation count -func (psa *PolicyStatusAggregator) UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error { - func() { - glog.V(4).Infof("lock updates for policy %s", policyName) - // Lock the update for policy - psa.policyUpdateData.Store(policyName, struct{}{}) - }() - defer func() { - glog.V(4).Infof("Unlock updates for policy %s", policyName) - psa.policyUpdateData.Delete(policyName) - }() - // get policy - policy, err := psa.pLister.Get(policyName) - if err != nil { - glog.V(4).Infof("failed to get policy %s. Unable to update violation count: %v", policyName, err) - return err - } - - newStatus := calculateStatus(pvList) - if reflect.DeepEqual(newStatus, policy.Status) { - // no update to status - glog.V(4).Infof("no changes in policy violation count for policy %s", policy.Name) - return nil - } - // update status - newPolicy := policy - newPolicy.Status = newStatus - - return psa.psControl.UpdatePolicyStatus(newPolicy) -} - -func calculateStatus(pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus { - violationCount := len(pvList) - status := kyverno.PolicyStatus{ - ViolationCount: violationCount, - } - return status -} - //GetPolicyStatusAggregator returns interface to send policy status stats func (pc *PolicyController) GetPolicyStatusAggregator() PolicyStatusInterface { return pc.statusAggregator } - -//PStatusControlInterface Provides interface to operate on policy status -type PStatusControlInterface interface { - UpdatePolicyStatus(newPolicy *kyverno.Policy) error -} - -//PSControl allows update for policy status -type PSControl struct { - Client kyvernoclient.Interface -} - -//UpdatePolicyStatus update policy status -func (c PSControl) UpdatePolicyStatus(newPolicy *kyverno.Policy) error { - _, err := c.Client.KyvernoV1alpha1().Policies().UpdateStatus(newPolicy) - return err -} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index e77e1a55db..eb548c328d 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -86,3 +86,11 @@ func NewKubeClient(config *rest.Config) (kubernetes.Interface, error) { } return kclient, nil } + +//Btoi converts boolean to int +func Btoi(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index f706020627..620ab3242c 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -20,14 +20,14 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool gatherStat := func(policyName string, er engine.EngineResponse) { ps := policyctr.PolicyStat{} ps.PolicyName = policyName - ps.MutationExecutionTime = er.ExecutionTime - ps.RulesAppliedCount = er.RulesAppliedCount + ps.Stats.MutationExecutionTime = er.ExecutionTime + ps.Stats.RulesAppliedCount = er.RulesAppliedCount policyStats = append(policyStats, ps) } // send stats for aggregation sendStat := func(blocked bool) { for _, stat := range policyStats { - stat.ResourceBlocked = blocked + stat.Stats.ResourceBlocked = utils.Btoi(blocked) //SEND ws.policyStatus.SendStat(stat) } diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index 088694bd67..14105ed0f3 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -23,14 +23,14 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res gatherStat := func(policyName string, er engine.EngineResponse) { ps := policyctr.PolicyStat{} ps.PolicyName = policyName - ps.ValidationExecutionTime = er.ExecutionTime - ps.RulesAppliedCount = er.RulesAppliedCount + ps.Stats.ValidationExecutionTime = er.ExecutionTime + ps.Stats.RulesAppliedCount = er.RulesAppliedCount policyStats = append(policyStats, ps) } // send stats for aggregation sendStat := func(blocked bool) { for _, stat := range policyStats { - stat.ResourceBlocked = blocked + stat.Stats.ResourceBlocked = utils.Btoi(blocked) //SEND ws.policyStatus.SendStat(stat) }