diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index ae9b53ce57..ee6644e91c 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -200,6 +200,8 @@ func main() { glog.Fatalf("Failed registering Admission Webhooks: %v\n", err) } + statusSync := policy.NewStatusSync(pclient, stopCh, policyMetaStore) + // WEBHOOOK // - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration // - reports the results based on the response from the policy engine: @@ -215,7 +217,7 @@ func main() { kubeInformer.Rbac().V1().ClusterRoleBindings(), egen, webhookRegistrationClient, - policy.NewStatusSync(pclient, stopCh, policyMetaStore), + statusSync, configData, policyMetaStore, pvgen, @@ -238,6 +240,7 @@ func main() { go grc.Run(1, stopCh) go grcc.Run(1, stopCh) go pvgen.Run(1, stopCh) + go statusSync.Run() // verifys if the admission control is enabled and active // resync: 60 seconds diff --git a/pkg/api/kyverno/v1/types.go b/pkg/api/kyverno/v1/types.go index 1f020492f5..fa29e4be83 100644 --- a/pkg/api/kyverno/v1/types.go +++ b/pkg/api/kyverno/v1/types.go @@ -232,15 +232,15 @@ type PolicyStatus struct { // average time required to process the policy rules on a resource AvgExecutionTime string `json:"averageExecutionTime"` // Count of rules that failed - ViolationCount int `json:"violationCount"` + ViolationCount int `json:"violationCount,omitempty"` // Count of rules that were applied - RulesAppliedCount int `json:"rulesAppliedCount"` + RulesAppliedCount int `json:"rulesAppliedCount,omitempty"` // Count of resources that were blocked for failing a validate, across all rules - ResourcesBlockedCount int `json:"resourcesBlockedCount"` + ResourcesBlockedCount int `json:"resourcesBlockedCount,omitempty"` // Count of resources that were successfully mutated, across all rules - ResourcesMutatedCount int `json:"resourcesMutatedCount"` + ResourcesMutatedCount int `json:"resourcesMutatedCount,omitempty"` - Rules []RuleStats `json:"ruleStatus"` + Rules []RuleStats `json:"ruleStatus,omitempty"` } //RuleStats provides status per rule @@ -248,15 +248,15 @@ type RuleStats struct { // Rule name Name string `json:"ruleName"` // average time require to process the rule - ExecutionTime string `json:"averageExecutionTime"` + ExecutionTime string `json:"averageExecutionTime,omitempty"` // Count of rules that failed - ViolationCount int `json:"violationCount"` + ViolationCount int `json:"violationCount,omitempty"` // Count of rules that were applied - AppliedCount int `json:"appliedCount"` + AppliedCount int `json:"appliedCount,omitempty"` // Count of resources for whom update/create api requests were blocked as the resource did not satisfy the policy rules - ResourcesBlockedCount int `json:"resourcesBlockedCount"` + ResourcesBlockedCount int `json:"resourcesBlockedCount,omitempty"` // Count of resources that were successfully mutated - ResourcesMutatedCount int `json:"resourcesMutatedCount"` + ResourcesMutatedCount int `json:"resourcesMutatedCount,omitempty"` } // PolicyList is a list of Policy resources diff --git a/pkg/policy/apply.go b/pkg/policy/apply.go index 5b79cc4bec..32d654346c 100644 --- a/pkg/policy/apply.go +++ b/pkg/policy/apply.go @@ -19,7 +19,7 @@ import ( // applyPolicy applies policy on a resource //TODO: generation rules -func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface) (responses []response.EngineResponse) { +func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructured) (responses []response.EngineResponse) { startTime := time.Now() glog.V(4).Infof("Started apply policy %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), startTime) @@ -35,7 +35,7 @@ func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructure ctx.AddResource(transformResource(resource)) //MUTATION - engineResponse, err = mutation(policy, resource, policyStatus, ctx) + engineResponse, err = mutation(policy, resource, ctx) engineResponses = append(engineResponses, engineResponse) if err != nil { glog.Errorf("unable to process mutation rules: %v", err) @@ -48,7 +48,7 @@ func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructure //TODO: GENERATION return engineResponses } -func mutation(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface, ctx context.EvalInterface) (response.EngineResponse, error) { +func mutation(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, ctx context.EvalInterface) (response.EngineResponse, error) { engineResponse := engine.Mutate(engine.PolicyContext{Policy: policy, NewResource: resource, Context: ctx}) if !engineResponse.IsSuccesful() { diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index 1b62db5671..2f643f2745 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -2,7 +2,6 @@ package policy import ( "fmt" - "reflect" "time" "github.com/golang/glog" @@ -68,8 +67,6 @@ type PolicyController struct { rm resourceManager // helpers to validate against current loaded configuration configHandler config.Interface - // receives stats and aggregates details - statusAggregator *PolicyStatusAggregator // store to hold policy meta data for faster lookup pMetaStore policystore.UpdateInterface // policy violation generator @@ -145,10 +142,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, //TODO: pass the time in seconds instead of converting it internally pc.rm = NewResourceManager(30) - // aggregator - // pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer) - pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient) - return &pc, nil } @@ -265,9 +258,6 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(pc.worker, time.Second, stopCh) } - // policy status aggregator - //TODO: workers required for aggergation - pc.statusAggregator.Run(1, stopCh) <-stopCh } @@ -329,8 +319,6 @@ func (pc *PolicyController) syncPolicy(key string) error { if err := pc.deleteNamespacedPolicyViolations(key); err != nil { return err } - // remove the recorded stats for the policy - pc.statusAggregator.RemovePolicyStats(key) // remove webhook configurations if there are no policies if err := pc.removeResourceWebhookConfiguration(); err != nil { @@ -346,23 +334,12 @@ func (pc *PolicyController) syncPolicy(key string) error { pc.resourceWebhookWatcher.RegisterResourceWebhook() - // cluster policy violations - cpvList, err := pc.getClusterPolicyViolationForPolicy(policy.Name) - if err != nil { - return err - } - // namespaced policy violation - nspvList, err := pc.getNamespacedPolicyViolationForPolicy(policy.Name) - if err != nil { - return err - } - // process policies on existing resources engineResponses := pc.processExistingResources(*policy) // report errors pc.cleanupAndReport(engineResponses) - // sync active - return pc.syncStatusOnly(policy, cpvList, nspvList) + + return nil } func (pc *PolicyController) deleteClusterPolicyViolations(policy string) error { @@ -391,39 +368,6 @@ func (pc *PolicyController) deleteNamespacedPolicyViolations(policy string) erro return nil } -//syncStatusOnly updates the policy status subresource -func (pc *PolicyController) syncStatusOnly(p *kyverno.ClusterPolicy, pvList []*kyverno.ClusterPolicyViolation, nspvList []*kyverno.PolicyViolation) error { - newStatus := pc.calculateStatus(p.Name, pvList, nspvList) - if reflect.DeepEqual(newStatus, p.Status) { - // no update to status - return nil - } - // update status - newPolicy := p - newPolicy.Status = newStatus - _, err := pc.kyvernoClient.KyvernoV1().ClusterPolicies().UpdateStatus(newPolicy) - return err -} - -func (pc *PolicyController) calculateStatus(policyName string, pvList []*kyverno.ClusterPolicyViolation, nspvList []*kyverno.PolicyViolation) kyverno.PolicyStatus { - violationCount := len(pvList) + len(nspvList) - status := kyverno.PolicyStatus{ - ViolationCount: violationCount, - } - // get stats - stats := pc.statusAggregator.GetPolicyStats(policyName) - if !reflect.DeepEqual(stats, (PolicyStatInfo{})) { - status.RulesAppliedCount = stats.RulesAppliedCount - status.ResourcesBlockedCount = stats.ResourceBlocked - status.AvgExecutionTimeMutation = stats.MutationExecutionTime.String() - status.AvgExecutionTimeValidation = stats.ValidationExecutionTime.String() - status.AvgExecutionTimeGeneration = stats.GenerationExecutionTime.String() - // update rule stats - status.Rules = convertRules(stats.Rules) - } - return status -} - func (pc *PolicyController) getNamespacedPolicyViolationForPolicy(policy string) ([]*kyverno.PolicyViolation, error) { policySelector, err := buildPolicyLabel(policy) if err != nil { @@ -459,19 +403,3 @@ func (r RealPVControl) DeleteClusterPolicyViolation(name string) error { func (r RealPVControl) DeleteNamespacedPolicyViolation(ns, name string) error { return r.Client.KyvernoV1().PolicyViolations(ns).Delete(name, &metav1.DeleteOptions{}) } - -// convertRules converts the internal rule stats to one used in policy.stats struct -func convertRules(rules []RuleStatinfo) []kyverno.RuleStats { - var stats []kyverno.RuleStats - for _, r := range rules { - stat := kyverno.RuleStats{ - Name: r.RuleName, - ExecutionTime: r.ExecutionTime.String(), - AppliedCount: r.RuleAppliedCount, - ViolationCount: r.RulesFailedCount, - MutationCount: r.MutationCount, - } - stats = append(stats, stat) - } - return stats -} diff --git a/pkg/policy/existing.go b/pkg/policy/existing.go index 9165978ca4..97c09affac 100644 --- a/pkg/policy/existing.go +++ b/pkg/policy/existing.go @@ -39,7 +39,7 @@ func (pc *PolicyController) processExistingResources(policy kyverno.ClusterPolic // apply the policy on each glog.V(4).Infof("apply policy %s with resource version %s on resource %s/%s/%s with resource version %s", policy.Name, policy.ResourceVersion, resource.GetKind(), resource.GetNamespace(), resource.GetName(), resource.GetResourceVersion()) - engineResponse := applyPolicy(policy, resource, pc.statusAggregator) + engineResponse := applyPolicy(policy, resource) // get engine response for mutation & validation independently engineResponses = append(engineResponses, engineResponse...) // post-processing, register the resource as processed diff --git a/pkg/policy/status.go b/pkg/policy/status.go index 8d4beb4f02..2f55fd252b 100644 --- a/pkg/policy/status.go +++ b/pkg/policy/status.go @@ -1,210 +1,249 @@ package policy import ( + "log" "sync" "time" - "github.com/golang/glog" - kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "github.com/nirmata/kyverno/pkg/policystore" + + "github.com/nirmata/kyverno/pkg/engine/response" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/nirmata/kyverno/pkg/client/clientset/versioned" + + v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1" ) -//PolicyStatusAggregator stores information abt aggregation -type PolicyStatusAggregator struct { - // time since we start aggregating the stats - startTime time.Time - // channel to receive stats - ch chan PolicyStat - //TODO: lock based on key, possibly sync.Map ? - //sync RW for policyData - mux sync.RWMutex - // stores aggregated stats for policy - policyData map[string]PolicyStatInfo +type statusCache struct { + mu sync.RWMutex + data map[string]v1.PolicyStatus } -//NewPolicyStatAggregator returns a new policy status -func NewPolicyStatAggregator(client *kyvernoclient.Clientset) *PolicyStatusAggregator { - psa := PolicyStatusAggregator{ - startTime: time.Now(), - ch: make(chan PolicyStat), - policyData: map[string]PolicyStatInfo{}, - } - return &psa +type StatSync struct { + cache *statusCache + stop <-chan struct{} + client *versioned.Clientset + policyStore *policystore.PolicyStore } -//Run begins aggregator -func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - glog.V(4).Info("Started aggregator for policy status stats") - defer func() { - glog.V(4).Info("Shutting down aggregator for policy status stats") - }() - for i := 0; i < workers; i++ { - go wait.Until(psa.process, time.Second, stopCh) - } - <-stopCh -} - -func (psa *PolicyStatusAggregator) process() { - // As mutation and validation are handled separately - // ideally we need to combine the execution time from both for a policy - // but its tricky to detect here the type of rules policy contains - // so we dont combine the results, but instead compute the execution time for - // mutation & validation rules separately - for r := range psa.ch { - glog.V(4).Infof("received policy stats %v", r) - psa.aggregate(r) +func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}, pMetaStore *policystore.PolicyStore) *StatSync { + return &StatSync{ + cache: &statusCache{ + mu: sync.RWMutex{}, + data: make(map[string]v1.PolicyStatus), + }, + stop: stopCh, + client: client, + policyStore: pMetaStore, } } -func (psa *PolicyStatusAggregator) aggregate(ps PolicyStat) { - func() { - glog.V(4).Infof("write lock update policy %s", ps.PolicyName) - psa.mux.Lock() - }() - defer func() { - glog.V(4).Infof("write Unlock update policy %s", ps.PolicyName) - psa.mux.Unlock() - }() - - if len(ps.Stats.Rules) == 0 { - glog.V(4).Infof("ignoring stats, as no rule was applied") - return - } - - info, ok := psa.policyData[ps.PolicyName] - if !ok { - psa.policyData[ps.PolicyName] = ps.Stats - glog.V(4).Infof("added stats for policy %s", ps.PolicyName) - return - } - // aggregate policy information - info.RulesAppliedCount = info.RulesAppliedCount + ps.Stats.RulesAppliedCount - if ps.Stats.ResourceBlocked == 1 { - info.ResourceBlocked++ - } - var zeroDuration time.Duration - if info.MutationExecutionTime != zeroDuration { - info.MutationExecutionTime = (info.MutationExecutionTime + ps.Stats.MutationExecutionTime) / 2 - glog.V(4).Infof("updated avg mutation time %v", info.MutationExecutionTime) - } else { - info.MutationExecutionTime = ps.Stats.MutationExecutionTime - } - if info.ValidationExecutionTime != zeroDuration { - info.ValidationExecutionTime = (info.ValidationExecutionTime + ps.Stats.ValidationExecutionTime) / 2 - glog.V(4).Infof("updated avg validation time %v", info.ValidationExecutionTime) - } else { - info.ValidationExecutionTime = ps.Stats.ValidationExecutionTime - } - if info.GenerationExecutionTime != zeroDuration { - info.GenerationExecutionTime = (info.GenerationExecutionTime + ps.Stats.GenerationExecutionTime) / 2 - glog.V(4).Infof("updated avg generation time %v", info.GenerationExecutionTime) - } else { - info.GenerationExecutionTime = ps.Stats.GenerationExecutionTime - } - // aggregate rule details - info.Rules = aggregateRules(info.Rules, ps.Stats.Rules) - // update - psa.policyData[ps.PolicyName] = info - glog.V(4).Infof("updated stats for policy %s", ps.PolicyName) +func (s *StatSync) Run() { + // update policy status every 10 seconds - waits for previous updateStatus to complete + wait.Until(s.updateStats, 1*time.Second, s.stop) + <-s.stop + s.updateStats() } -func aggregateRules(old []RuleStatinfo, update []RuleStatinfo) []RuleStatinfo { - var zeroDuration time.Duration - searchRule := func(list []RuleStatinfo, key string) *RuleStatinfo { - for _, v := range list { - if v.RuleName == key { - return &v - } +func (s *StatSync) updateStats() { + s.cache.mu.Lock() + var nameToStatus = make(map[string]v1.PolicyStatus, len(s.cache.data)) + for k, v := range s.cache.data { + nameToStatus[k] = v + } + s.cache.mu.Unlock() + + for policyName, status := range nameToStatus { + var policy = &v1.ClusterPolicy{} + policy, err := s.policyStore.Get(policyName) + if err != nil { + continue } - return nil - } - newRules := []RuleStatinfo{} - // search for new rules in old rules and update it - for _, updateR := range update { - if updateR.ExecutionTime != zeroDuration { - if rule := searchRule(old, updateR.RuleName); rule != nil { - rule.ExecutionTime = (rule.ExecutionTime + updateR.ExecutionTime) / 2 - rule.RuleAppliedCount = rule.RuleAppliedCount + updateR.RuleAppliedCount - rule.RulesFailedCount = rule.RulesFailedCount + updateR.RulesFailedCount - rule.MutationCount = rule.MutationCount + updateR.MutationCount - newRules = append(newRules, *rule) - } else { - newRules = append(newRules, updateR) - } + policy.Status = status + _, err = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy) + if err != nil { + log.Println(err) } } - return newRules } -//GetPolicyStats returns the policy stats -func (psa *PolicyStatusAggregator) GetPolicyStats(policyName string) PolicyStatInfo { - func() { - glog.V(4).Infof("read lock update policy %s", policyName) - psa.mux.RLock() - }() - defer func() { - glog.V(4).Infof("read Unlock update policy %s", policyName) - psa.mux.RUnlock() - }() - glog.V(4).Infof("read stats for policy %s", policyName) - return psa.policyData[policyName] +func (s *StatSync) UpdateStatusWithMutateStats(response response.EngineResponse) { + s.cache.mu.Lock() + var policyStatus v1.PolicyStatus + policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] + if !exist { + policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) + if policy != nil { + policyStatus = policy.Status + } + } + + var nameToRule = make(map[string]v1.RuleStats, 0) + for _, rule := range policyStatus.Rules { + nameToRule[rule.Name] = rule + } + + for _, rule := range response.PolicyResponse.Rules { + ruleStat := nameToRule[rule.Name] + ruleStat.Name = rule.Name + + averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) + ruleStat.ExecutionTime = updateAverageTime( + rule.ProcessingTime, + ruleStat.ExecutionTime, + averageOver).String() + + if rule.Success { + policyStatus.RulesAppliedCount++ + policyStatus.ResourcesMutatedCount++ + ruleStat.AppliedCount++ + ruleStat.ResourcesMutatedCount++ + } else { + policyStatus.ViolationCount++ + ruleStat.ViolationCount++ + } + + nameToRule[rule.Name] = ruleStat + } + + var policyAverageExecutionTime time.Duration + var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) + for _, ruleStat := range nameToRule { + executionTime, err := time.ParseDuration(ruleStat.ExecutionTime) + if err == nil { + policyAverageExecutionTime += executionTime + } + ruleStats = append(ruleStats, ruleStat) + } + + policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() + policyStatus.Rules = ruleStats + + s.cache.data[response.PolicyResponse.Policy] = policyStatus + s.cache.mu.Unlock() } -//RemovePolicyStats rmves policy stats records -func (psa *PolicyStatusAggregator) RemovePolicyStats(policyName string) { - func() { - glog.V(4).Infof("write lock update policy %s", policyName) - psa.mux.Lock() - }() - defer func() { - glog.V(4).Infof("write Unlock update policy %s", policyName) - psa.mux.Unlock() - }() - glog.V(4).Infof("removing stats for policy %s", policyName) - delete(psa.policyData, policyName) +func (s *StatSync) UpdateStatusWithValidateStats(response response.EngineResponse) { + s.cache.mu.Lock() + var policyStatus v1.PolicyStatus + policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] + if !exist { + policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) + if policy != nil { + policyStatus = policy.Status + } + } + + var nameToRule = make(map[string]v1.RuleStats, 0) + for _, rule := range policyStatus.Rules { + nameToRule[rule.Name] = rule + } + + for _, rule := range response.PolicyResponse.Rules { + ruleStat := nameToRule[rule.Name] + ruleStat.Name = rule.Name + + averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) + ruleStat.ExecutionTime = updateAverageTime( + rule.ProcessingTime, + ruleStat.ExecutionTime, + averageOver).String() + + if rule.Success { + policyStatus.RulesAppliedCount++ + ruleStat.AppliedCount++ + if response.PolicyResponse.ValidationFailureAction == "enforce" { + policyStatus.ResourcesBlockedCount++ + ruleStat.ResourcesBlockedCount++ + } + } else { + policyStatus.ViolationCount++ + ruleStat.ViolationCount++ + } + + nameToRule[rule.Name] = ruleStat + } + + var policyAverageExecutionTime time.Duration + var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) + for _, ruleStat := range nameToRule { + executionTime, err := time.ParseDuration(ruleStat.ExecutionTime) + if err == nil { + policyAverageExecutionTime += executionTime + } + ruleStats = append(ruleStats, ruleStat) + } + + policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() + policyStatus.Rules = ruleStats + + s.cache.data[response.PolicyResponse.Policy] = policyStatus + s.cache.mu.Unlock() } -//PolicyStatusInterface provides methods to modify policyStatus -type PolicyStatusInterface interface { - SendStat(stat PolicyStat) - // UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error +func (s *StatSync) UpdateStatusWithGenerateStats(response response.EngineResponse) { + s.cache.mu.Lock() + var policyStatus v1.PolicyStatus + policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] + if !exist { + policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) + if policy != nil { + policyStatus = policy.Status + } + } + + var nameToRule = make(map[string]v1.RuleStats, 0) + for _, rule := range policyStatus.Rules { + nameToRule[rule.Name] = rule + } + + for _, rule := range response.PolicyResponse.Rules { + ruleStat := nameToRule[rule.Name] + ruleStat.Name = rule.Name + + averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) + ruleStat.ExecutionTime = updateAverageTime( + rule.ProcessingTime, + ruleStat.ExecutionTime, + averageOver).String() + + if rule.Success { + policyStatus.RulesAppliedCount++ + ruleStat.AppliedCount++ + } else { + policyStatus.ViolationCount++ + ruleStat.ViolationCount++ + } + + nameToRule[rule.Name] = ruleStat + } + + var policyAverageExecutionTime time.Duration + var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) + for _, ruleStat := range nameToRule { + executionTime, err := time.ParseDuration(ruleStat.ExecutionTime) + if err == nil { + policyAverageExecutionTime += executionTime + } + ruleStats = append(ruleStats, ruleStat) + } + + policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() + policyStatus.Rules = ruleStats + + s.cache.data[response.PolicyResponse.Policy] = policyStatus + s.cache.mu.Unlock() } -//PolicyStat stored stats for policy -type PolicyStat struct { - PolicyName string - Stats PolicyStatInfo -} - -//PolicyStatInfo provides statistics for policy -type PolicyStatInfo struct { - MutationExecutionTime time.Duration - ValidationExecutionTime time.Duration - GenerationExecutionTime time.Duration - RulesAppliedCount int - ResourceBlocked int - Rules []RuleStatinfo -} - -//RuleStatinfo provides statistics for rule -type RuleStatinfo struct { - RuleName string - ExecutionTime time.Duration - RuleAppliedCount int - RulesFailedCount int - MutationCount int -} - -//SendStat sends the stat information for aggregation -func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) { - glog.V(4).Infof("sending policy stats: %v", stat) - // Send over channel - psa.ch <- stat -} - -//GetPolicyStatusAggregator returns interface to send policy status stats -func (pc *PolicyController) GetPolicyStatusAggregator() PolicyStatusInterface { - return pc.statusAggregator +func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration { + if averageOver == 0 { + return newTime + } + oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString) + numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds() + denominator := averageOver + 1 + newAverageTimeInNanoSeconds := numerator / denominator + return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond } diff --git a/pkg/policy/status2.go b/pkg/policy/status2.go deleted file mode 100644 index 41b4c0a90a..0000000000 --- a/pkg/policy/status2.go +++ /dev/null @@ -1,230 +0,0 @@ -package policy - -import ( - "sync" - "time" - - "github.com/nirmata/kyverno/pkg/policystore" - - "github.com/nirmata/kyverno/pkg/engine/response" - - "k8s.io/apimachinery/pkg/util/wait" - - "github.com/nirmata/kyverno/pkg/client/clientset/versioned" - - v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1" -) - -type statusCache struct { - mu sync.RWMutex - data map[string]v1.PolicyStatus -} - -type StatSync struct { - cache *statusCache - stop <-chan struct{} - client *versioned.Clientset - policyStore *policystore.PolicyStore -} - -func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}, pMetaStore *policystore.PolicyStore) *StatSync { - return &StatSync{ - cache: &statusCache{ - mu: sync.RWMutex{}, - data: make(map[string]v1.PolicyStatus), - }, - stop: stopCh, - client: client, - policyStore: pMetaStore, - } -} - -func (s *StatSync) Start() { - // update policy status every 10 seconds - waits for previous updateStatus to complete - wait.Until(s.updateStats, 1*time.Second, s.stop) - <-s.stop - s.updateStats() -} - -func (s *StatSync) updateStats() { - s.cache.mu.Lock() - for policyName, status := range s.cache.data { - var policy = &v1.ClusterPolicy{} - policy.Name = policyName - policy.Status = status - _, _ = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy) - } - s.cache.data = make(map[string]v1.PolicyStatus) - s.cache.mu.Unlock() -} - -func (s *StatSync) UpdateStatusWithMutateStats(response response.EngineResponse) { - s.cache.mu.Lock() - var policyStatus v1.PolicyStatus - policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] - if !exist { - policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) - if policy != nil { - policyStatus = policy.Status - } - } - - var nameToRule = make(map[string]v1.RuleStats, 0) - for _, rule := range policyStatus.Rules { - nameToRule[rule.Name] = rule - } - - var policyAverageExecutionTime time.Duration - for _, rule := range response.PolicyResponse.Rules { - ruleStat := nameToRule[rule.Name] - ruleStat.Name = rule.Name - - averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) - newAverageExecutionTime := updateAverageTime( - rule.ProcessingTime, - ruleStat.ExecutionTime, - averageOver) - policyAverageExecutionTime += newAverageExecutionTime - ruleStat.ExecutionTime = newAverageExecutionTime.String() - - if rule.Success { - policyStatus.RulesAppliedCount++ - policyStatus.ResourcesMutatedCount++ - ruleStat.AppliedCount++ - ruleStat.ResourcesMutatedCount++ - } else { - policyStatus.ViolationCount++ - ruleStat.ViolationCount++ - } - - nameToRule[rule.Name] = ruleStat - } - - var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) - for _, ruleStat := range nameToRule { - ruleStats = append(ruleStats, ruleStat) - } - - policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() - policyStatus.Rules = ruleStats - - s.cache.data[response.PolicyResponse.Policy] = policyStatus - s.cache.mu.Unlock() -} - -func (s *StatSync) UpdateStatusWithValidateStats(response response.EngineResponse) { - s.cache.mu.Lock() - var policyStatus v1.PolicyStatus - policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] - if !exist { - policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) - if policy != nil { - policyStatus = policy.Status - } - } - - var nameToRule = make(map[string]v1.RuleStats, 0) - for _, rule := range policyStatus.Rules { - nameToRule[rule.Name] = rule - } - - var policyAverageExecutionTime time.Duration - for _, rule := range response.PolicyResponse.Rules { - ruleStat := nameToRule[rule.Name] - ruleStat.Name = rule.Name - - averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) - newAverageExecutionTime := updateAverageTime( - rule.ProcessingTime, - ruleStat.ExecutionTime, - averageOver) - policyAverageExecutionTime += newAverageExecutionTime - ruleStat.ExecutionTime = newAverageExecutionTime.String() - - if rule.Success { - policyStatus.RulesAppliedCount++ - policyStatus.ResourcesBlockedCount++ - ruleStat.AppliedCount++ - ruleStat.ResourcesBlockedCount++ - } else { - policyStatus.ViolationCount++ - ruleStat.ViolationCount++ - } - - nameToRule[rule.Name] = ruleStat - } - - var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) - for _, ruleStat := range nameToRule { - ruleStats = append(ruleStats, ruleStat) - } - - policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() - policyStatus.Rules = ruleStats - - s.cache.data[response.PolicyResponse.Policy] = policyStatus - s.cache.mu.Unlock() -} - -func (s *StatSync) UpdateStatusWithGenerateStats(response response.EngineResponse) { - s.cache.mu.Lock() - var policyStatus v1.PolicyStatus - policyStatus, exist := s.cache.data[response.PolicyResponse.Policy] - if !exist { - policy, _ := s.policyStore.Get(response.PolicyResponse.Policy) - if policy != nil { - policyStatus = policy.Status - } - } - - var nameToRule = make(map[string]v1.RuleStats, 0) - for _, rule := range policyStatus.Rules { - nameToRule[rule.Name] = rule - } - - var policyAverageExecutionTime time.Duration - for _, rule := range response.PolicyResponse.Rules { - ruleStat := nameToRule[rule.Name] - ruleStat.Name = rule.Name - - averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount) - newAverageExecutionTime := updateAverageTime( - rule.ProcessingTime, - ruleStat.ExecutionTime, - averageOver) - policyAverageExecutionTime += newAverageExecutionTime - ruleStat.ExecutionTime = newAverageExecutionTime.String() - - if rule.Success { - policyStatus.RulesAppliedCount++ - ruleStat.AppliedCount++ - } else { - policyStatus.ViolationCount++ - ruleStat.ViolationCount++ - } - - nameToRule[rule.Name] = ruleStat - } - - var ruleStats = make([]v1.RuleStats, 0, len(nameToRule)) - for _, ruleStat := range nameToRule { - ruleStats = append(ruleStats, ruleStat) - } - - policyStatus.AvgExecutionTime = policyAverageExecutionTime.String() - policyStatus.Rules = ruleStats - - s.cache.data[response.PolicyResponse.Policy] = policyStatus - s.cache.mu.Unlock() -} - -func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration { - if averageOver == 0 { - return newTime - } - oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString) - numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds() - denominator := averageOver + 1 - newAverageTimeInNanoSeconds := numerator / denominator - return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond -} diff --git a/pkg/webhooks/generation.go b/pkg/webhooks/generation.go index 2dddf71e1c..4f8a087977 100644 --- a/pkg/webhooks/generation.go +++ b/pkg/webhooks/generation.go @@ -58,6 +58,7 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic for _, policy := range policies { policyContext.Policy = policy engineResponse := engine.Generate(policyContext) + go ws.status.UpdateStatusWithGenerateStats(engineResponse) if len(engineResponse.PolicyResponse.Rules) > 0 { // some generate rules do apply to the resource engineResponses = append(engineResponses, engineResponse) diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index dbdb8552c7..90bb14ccb9 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -1,6 +1,7 @@ package webhooks import ( + "log" "time" "github.com/golang/glog" @@ -56,13 +57,16 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou glog.V(2).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation) policyContext.Policy = policy + if resource.GetKind() == "Pod" { + log.Println("some") + } engineResponse := engine.Mutate(policyContext) - go ws.status.UpdateStatusWithMutateStats(engineResponse) engineResponses = append(engineResponses, engineResponse) if !engineResponse.IsSuccesful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName()) continue } + go ws.status.UpdateStatusWithMutateStats(engineResponse) // gather patches patches = append(patches, engineResponse.GetPatches()...) glog.V(4).Infof("Mutation from policy %s has applied successfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName()) diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 27dbff78ed..6e51fd692b 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -56,7 +56,7 @@ type WebhookServer struct { // webhook registration client webhookRegistrationClient *webhookconfig.WebhookRegistrationClient // API to send policy stats for aggregation - status policy.StatSync + status *policy.StatSync // helpers to validate against current loaded configuration configHandler config.Interface // channel for cleanup notification @@ -83,7 +83,7 @@ func NewWebhookServer( crbInformer rbacinformer.ClusterRoleBindingInformer, eventGen event.Interface, webhookRegistrationClient *webhookconfig.WebhookRegistrationClient, - status *policy.StatSync, + statusSync *policy.StatSync, configHandler config.Interface, pMetaStore policystore.LookupInterface, pvGenerator policyviolation.GeneratorInterface, @@ -113,7 +113,7 @@ func NewWebhookServer( crbSynced: crbInformer.Informer().HasSynced, eventGen: eventGen, webhookRegistrationClient: webhookRegistrationClient, - status: status, + status: statusSync, configHandler: configHandler, cleanUp: cleanUp, lastReqTime: resourceWebhookWatcher.LastReqTime, diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index c67a012ea2..af397d2c17 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -9,9 +9,7 @@ import ( "github.com/nirmata/kyverno/pkg/engine" "github.com/nirmata/kyverno/pkg/engine/context" "github.com/nirmata/kyverno/pkg/engine/response" - policyctr "github.com/nirmata/kyverno/pkg/policy" "github.com/nirmata/kyverno/pkg/policyviolation" - "github.com/nirmata/kyverno/pkg/utils" v1beta1 "k8s.io/api/admission/v1beta1" ) @@ -22,36 +20,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol glog.V(4).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) - var policyStats []policyctr.PolicyStat evalTime := time.Now() - // gather stats from the engine response - gatherStat := func(policyName string, policyResponse response.PolicyResponse) { - ps := policyctr.PolicyStat{} - ps.PolicyName = policyName - ps.Stats.ValidationExecutionTime = policyResponse.ProcessingTime - ps.Stats.RulesAppliedCount = policyResponse.RulesAppliedCount - // capture rule level stats - for _, rule := range policyResponse.Rules { - rs := policyctr.RuleStatinfo{} - rs.RuleName = rule.Name - rs.ExecutionTime = rule.RuleStats.ProcessingTime - if rule.Success { - rs.RuleAppliedCount++ - } else { - rs.RulesFailedCount++ - } - ps.Stats.Rules = append(ps.Stats.Rules, rs) - } - policyStats = append(policyStats, ps) - } - // send stats for aggregation - sendStat := func(blocked bool) { - for _, stat := range policyStats { - stat.Stats.ResourceBlocked = utils.Btoi(blocked) - //SEND - ws.policyStatus.SendStat(stat) - } - } // Get new and old resource newR, oldR, err := extractResources(patchedResource, request) @@ -100,12 +69,11 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol continue } engineResponses = append(engineResponses, engineResponse) - // Gather policy application statistics - gatherStat(policy.Name, engineResponse.PolicyResponse) if !engineResponse.IsSuccesful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, newR.GetNamespace(), newR.GetName()) continue } + go ws.status.UpdateStatusWithValidateStats(engineResponse) } glog.V(4).Infof("eval: %v %s/%s/%s ", time.Since(evalTime), request.Kind, request.Namespace, request.Name) // report time @@ -117,7 +85,6 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol blocked := toBlockResource(engineResponses) if blocked { glog.V(4).Infof("resource %s/%s/%s is blocked\n", newR.GetKind(), newR.GetNamespace(), newR.GetName()) - sendStat(true) return false, getEnforceFailureErrorMsg(engineResponses) } @@ -128,7 +95,6 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol // ADD EVENTS events := generateEvents(engineResponses, (request.Operation == v1beta1.Update)) ws.eventGen.Add(events...) - sendStat(false) // report time end glog.V(4).Infof("report: %v %s/%s/%s", time.Since(reportTime), request.Kind, request.Namespace, request.Name) return true, ""