1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

527 tested mutate needs further testing

This commit is contained in:
shravan 2020-02-22 23:35:02 +05:30
parent 3cfa70bbba
commit 592df74c57
11 changed files with 249 additions and 538 deletions

View file

@ -200,6 +200,8 @@ func main() {
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err) glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
} }
statusSync := policy.NewStatusSync(pclient, stopCh, policyMetaStore)
// WEBHOOOK // WEBHOOOK
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration // - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
// - reports the results based on the response from the policy engine: // - reports the results based on the response from the policy engine:
@ -215,7 +217,7 @@ func main() {
kubeInformer.Rbac().V1().ClusterRoleBindings(), kubeInformer.Rbac().V1().ClusterRoleBindings(),
egen, egen,
webhookRegistrationClient, webhookRegistrationClient,
policy.NewStatusSync(pclient, stopCh, policyMetaStore), statusSync,
configData, configData,
policyMetaStore, policyMetaStore,
pvgen, pvgen,
@ -238,6 +240,7 @@ func main() {
go grc.Run(1, stopCh) go grc.Run(1, stopCh)
go grcc.Run(1, stopCh) go grcc.Run(1, stopCh)
go pvgen.Run(1, stopCh) go pvgen.Run(1, stopCh)
go statusSync.Run()
// verifys if the admission control is enabled and active // verifys if the admission control is enabled and active
// resync: 60 seconds // resync: 60 seconds

View file

@ -232,15 +232,15 @@ type PolicyStatus struct {
// average time required to process the policy rules on a resource // average time required to process the policy rules on a resource
AvgExecutionTime string `json:"averageExecutionTime"` AvgExecutionTime string `json:"averageExecutionTime"`
// Count of rules that failed // Count of rules that failed
ViolationCount int `json:"violationCount"` ViolationCount int `json:"violationCount,omitempty"`
// Count of rules that were applied // Count of rules that were applied
RulesAppliedCount int `json:"rulesAppliedCount"` RulesAppliedCount int `json:"rulesAppliedCount,omitempty"`
// Count of resources that were blocked for failing a validate, across all rules // Count of resources that were blocked for failing a validate, across all rules
ResourcesBlockedCount int `json:"resourcesBlockedCount"` ResourcesBlockedCount int `json:"resourcesBlockedCount,omitempty"`
// Count of resources that were successfully mutated, across all rules // Count of resources that were successfully mutated, across all rules
ResourcesMutatedCount int `json:"resourcesMutatedCount"` ResourcesMutatedCount int `json:"resourcesMutatedCount,omitempty"`
Rules []RuleStats `json:"ruleStatus"` Rules []RuleStats `json:"ruleStatus,omitempty"`
} }
//RuleStats provides status per rule //RuleStats provides status per rule
@ -248,15 +248,15 @@ type RuleStats struct {
// Rule name // Rule name
Name string `json:"ruleName"` Name string `json:"ruleName"`
// average time require to process the rule // average time require to process the rule
ExecutionTime string `json:"averageExecutionTime"` ExecutionTime string `json:"averageExecutionTime,omitempty"`
// Count of rules that failed // Count of rules that failed
ViolationCount int `json:"violationCount"` ViolationCount int `json:"violationCount,omitempty"`
// Count of rules that were applied // Count of rules that were applied
AppliedCount int `json:"appliedCount"` AppliedCount int `json:"appliedCount,omitempty"`
// Count of resources for whom update/create api requests were blocked as the resource did not satisfy the policy rules // Count of resources for whom update/create api requests were blocked as the resource did not satisfy the policy rules
ResourcesBlockedCount int `json:"resourcesBlockedCount"` ResourcesBlockedCount int `json:"resourcesBlockedCount,omitempty"`
// Count of resources that were successfully mutated // Count of resources that were successfully mutated
ResourcesMutatedCount int `json:"resourcesMutatedCount"` ResourcesMutatedCount int `json:"resourcesMutatedCount,omitempty"`
} }
// PolicyList is a list of Policy resources // PolicyList is a list of Policy resources

View file

@ -19,7 +19,7 @@ import (
// applyPolicy applies policy on a resource // applyPolicy applies policy on a resource
//TODO: generation rules //TODO: generation rules
func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface) (responses []response.EngineResponse) { func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructured) (responses []response.EngineResponse) {
startTime := time.Now() startTime := time.Now()
glog.V(4).Infof("Started apply policy %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), startTime) glog.V(4).Infof("Started apply policy %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), startTime)
@ -35,7 +35,7 @@ func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructure
ctx.AddResource(transformResource(resource)) ctx.AddResource(transformResource(resource))
//MUTATION //MUTATION
engineResponse, err = mutation(policy, resource, policyStatus, ctx) engineResponse, err = mutation(policy, resource, ctx)
engineResponses = append(engineResponses, engineResponse) engineResponses = append(engineResponses, engineResponse)
if err != nil { if err != nil {
glog.Errorf("unable to process mutation rules: %v", err) glog.Errorf("unable to process mutation rules: %v", err)
@ -48,7 +48,7 @@ func applyPolicy(policy kyverno.ClusterPolicy, resource unstructured.Unstructure
//TODO: GENERATION //TODO: GENERATION
return engineResponses return engineResponses
} }
func mutation(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface, ctx context.EvalInterface) (response.EngineResponse, error) { func mutation(policy kyverno.ClusterPolicy, resource unstructured.Unstructured, ctx context.EvalInterface) (response.EngineResponse, error) {
engineResponse := engine.Mutate(engine.PolicyContext{Policy: policy, NewResource: resource, Context: ctx}) engineResponse := engine.Mutate(engine.PolicyContext{Policy: policy, NewResource: resource, Context: ctx})
if !engineResponse.IsSuccesful() { if !engineResponse.IsSuccesful() {

View file

@ -2,7 +2,6 @@ package policy
import ( import (
"fmt" "fmt"
"reflect"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -68,8 +67,6 @@ type PolicyController struct {
rm resourceManager rm resourceManager
// helpers to validate against current loaded configuration // helpers to validate against current loaded configuration
configHandler config.Interface configHandler config.Interface
// receives stats and aggregates details
statusAggregator *PolicyStatusAggregator
// store to hold policy meta data for faster lookup // store to hold policy meta data for faster lookup
pMetaStore policystore.UpdateInterface pMetaStore policystore.UpdateInterface
// policy violation generator // policy violation generator
@ -145,10 +142,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
//TODO: pass the time in seconds instead of converting it internally //TODO: pass the time in seconds instead of converting it internally
pc.rm = NewResourceManager(30) pc.rm = NewResourceManager(30)
// aggregator
// pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer)
pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient)
return &pc, nil return &pc, nil
} }
@ -265,9 +258,6 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(pc.worker, time.Second, stopCh) go wait.Until(pc.worker, time.Second, stopCh)
} }
// policy status aggregator
//TODO: workers required for aggergation
pc.statusAggregator.Run(1, stopCh)
<-stopCh <-stopCh
} }
@ -329,8 +319,6 @@ func (pc *PolicyController) syncPolicy(key string) error {
if err := pc.deleteNamespacedPolicyViolations(key); err != nil { if err := pc.deleteNamespacedPolicyViolations(key); err != nil {
return err return err
} }
// remove the recorded stats for the policy
pc.statusAggregator.RemovePolicyStats(key)
// remove webhook configurations if there are no policies // remove webhook configurations if there are no policies
if err := pc.removeResourceWebhookConfiguration(); err != nil { if err := pc.removeResourceWebhookConfiguration(); err != nil {
@ -346,23 +334,12 @@ func (pc *PolicyController) syncPolicy(key string) error {
pc.resourceWebhookWatcher.RegisterResourceWebhook() pc.resourceWebhookWatcher.RegisterResourceWebhook()
// cluster policy violations
cpvList, err := pc.getClusterPolicyViolationForPolicy(policy.Name)
if err != nil {
return err
}
// namespaced policy violation
nspvList, err := pc.getNamespacedPolicyViolationForPolicy(policy.Name)
if err != nil {
return err
}
// process policies on existing resources // process policies on existing resources
engineResponses := pc.processExistingResources(*policy) engineResponses := pc.processExistingResources(*policy)
// report errors // report errors
pc.cleanupAndReport(engineResponses) pc.cleanupAndReport(engineResponses)
// sync active
return pc.syncStatusOnly(policy, cpvList, nspvList) return nil
} }
func (pc *PolicyController) deleteClusterPolicyViolations(policy string) error { func (pc *PolicyController) deleteClusterPolicyViolations(policy string) error {
@ -391,39 +368,6 @@ func (pc *PolicyController) deleteNamespacedPolicyViolations(policy string) erro
return nil return nil
} }
//syncStatusOnly updates the policy status subresource
func (pc *PolicyController) syncStatusOnly(p *kyverno.ClusterPolicy, pvList []*kyverno.ClusterPolicyViolation, nspvList []*kyverno.PolicyViolation) error {
newStatus := pc.calculateStatus(p.Name, pvList, nspvList)
if reflect.DeepEqual(newStatus, p.Status) {
// no update to status
return nil
}
// update status
newPolicy := p
newPolicy.Status = newStatus
_, err := pc.kyvernoClient.KyvernoV1().ClusterPolicies().UpdateStatus(newPolicy)
return err
}
func (pc *PolicyController) calculateStatus(policyName string, pvList []*kyverno.ClusterPolicyViolation, nspvList []*kyverno.PolicyViolation) kyverno.PolicyStatus {
violationCount := len(pvList) + len(nspvList)
status := kyverno.PolicyStatus{
ViolationCount: violationCount,
}
// get stats
stats := pc.statusAggregator.GetPolicyStats(policyName)
if !reflect.DeepEqual(stats, (PolicyStatInfo{})) {
status.RulesAppliedCount = stats.RulesAppliedCount
status.ResourcesBlockedCount = stats.ResourceBlocked
status.AvgExecutionTimeMutation = stats.MutationExecutionTime.String()
status.AvgExecutionTimeValidation = stats.ValidationExecutionTime.String()
status.AvgExecutionTimeGeneration = stats.GenerationExecutionTime.String()
// update rule stats
status.Rules = convertRules(stats.Rules)
}
return status
}
func (pc *PolicyController) getNamespacedPolicyViolationForPolicy(policy string) ([]*kyverno.PolicyViolation, error) { func (pc *PolicyController) getNamespacedPolicyViolationForPolicy(policy string) ([]*kyverno.PolicyViolation, error) {
policySelector, err := buildPolicyLabel(policy) policySelector, err := buildPolicyLabel(policy)
if err != nil { if err != nil {
@ -459,19 +403,3 @@ func (r RealPVControl) DeleteClusterPolicyViolation(name string) error {
func (r RealPVControl) DeleteNamespacedPolicyViolation(ns, name string) error { func (r RealPVControl) DeleteNamespacedPolicyViolation(ns, name string) error {
return r.Client.KyvernoV1().PolicyViolations(ns).Delete(name, &metav1.DeleteOptions{}) return r.Client.KyvernoV1().PolicyViolations(ns).Delete(name, &metav1.DeleteOptions{})
} }
// convertRules converts the internal rule stats to one used in policy.stats struct
func convertRules(rules []RuleStatinfo) []kyverno.RuleStats {
var stats []kyverno.RuleStats
for _, r := range rules {
stat := kyverno.RuleStats{
Name: r.RuleName,
ExecutionTime: r.ExecutionTime.String(),
AppliedCount: r.RuleAppliedCount,
ViolationCount: r.RulesFailedCount,
MutationCount: r.MutationCount,
}
stats = append(stats, stat)
}
return stats
}

View file

@ -39,7 +39,7 @@ func (pc *PolicyController) processExistingResources(policy kyverno.ClusterPolic
// apply the policy on each // apply the policy on each
glog.V(4).Infof("apply policy %s with resource version %s on resource %s/%s/%s with resource version %s", policy.Name, policy.ResourceVersion, resource.GetKind(), resource.GetNamespace(), resource.GetName(), resource.GetResourceVersion()) glog.V(4).Infof("apply policy %s with resource version %s on resource %s/%s/%s with resource version %s", policy.Name, policy.ResourceVersion, resource.GetKind(), resource.GetNamespace(), resource.GetName(), resource.GetResourceVersion())
engineResponse := applyPolicy(policy, resource, pc.statusAggregator) engineResponse := applyPolicy(policy, resource)
// get engine response for mutation & validation independently // get engine response for mutation & validation independently
engineResponses = append(engineResponses, engineResponse...) engineResponses = append(engineResponses, engineResponse...)
// post-processing, register the resource as processed // post-processing, register the resource as processed

View file

@ -1,210 +1,249 @@
package policy package policy
import ( import (
"log"
"sync" "sync"
"time" "time"
"github.com/golang/glog" "github.com/nirmata/kyverno/pkg/policystore"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" "github.com/nirmata/kyverno/pkg/engine/response"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"github.com/nirmata/kyverno/pkg/client/clientset/versioned"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
) )
//PolicyStatusAggregator stores information abt aggregation type statusCache struct {
type PolicyStatusAggregator struct { mu sync.RWMutex
// time since we start aggregating the stats data map[string]v1.PolicyStatus
startTime time.Time
// channel to receive stats
ch chan PolicyStat
//TODO: lock based on key, possibly sync.Map ?
//sync RW for policyData
mux sync.RWMutex
// stores aggregated stats for policy
policyData map[string]PolicyStatInfo
} }
//NewPolicyStatAggregator returns a new policy status type StatSync struct {
func NewPolicyStatAggregator(client *kyvernoclient.Clientset) *PolicyStatusAggregator { cache *statusCache
psa := PolicyStatusAggregator{ stop <-chan struct{}
startTime: time.Now(), client *versioned.Clientset
ch: make(chan PolicyStat), policyStore *policystore.PolicyStore
policyData: map[string]PolicyStatInfo{},
}
return &psa
} }
//Run begins aggregator func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}, pMetaStore *policystore.PolicyStore) *StatSync {
func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) { return &StatSync{
defer utilruntime.HandleCrash() cache: &statusCache{
glog.V(4).Info("Started aggregator for policy status stats") mu: sync.RWMutex{},
defer func() { data: make(map[string]v1.PolicyStatus),
glog.V(4).Info("Shutting down aggregator for policy status stats") },
}() stop: stopCh,
for i := 0; i < workers; i++ { client: client,
go wait.Until(psa.process, time.Second, stopCh) policyStore: pMetaStore,
}
<-stopCh
}
func (psa *PolicyStatusAggregator) process() {
// As mutation and validation are handled separately
// ideally we need to combine the execution time from both for a policy
// but its tricky to detect here the type of rules policy contains
// so we dont combine the results, but instead compute the execution time for
// mutation & validation rules separately
for r := range psa.ch {
glog.V(4).Infof("received policy stats %v", r)
psa.aggregate(r)
} }
} }
func (psa *PolicyStatusAggregator) aggregate(ps PolicyStat) { func (s *StatSync) Run() {
func() { // update policy status every 10 seconds - waits for previous updateStatus to complete
glog.V(4).Infof("write lock update policy %s", ps.PolicyName) wait.Until(s.updateStats, 1*time.Second, s.stop)
psa.mux.Lock() <-s.stop
}() s.updateStats()
defer func() {
glog.V(4).Infof("write Unlock update policy %s", ps.PolicyName)
psa.mux.Unlock()
}()
if len(ps.Stats.Rules) == 0 {
glog.V(4).Infof("ignoring stats, as no rule was applied")
return
}
info, ok := psa.policyData[ps.PolicyName]
if !ok {
psa.policyData[ps.PolicyName] = ps.Stats
glog.V(4).Infof("added stats for policy %s", ps.PolicyName)
return
}
// aggregate policy information
info.RulesAppliedCount = info.RulesAppliedCount + ps.Stats.RulesAppliedCount
if ps.Stats.ResourceBlocked == 1 {
info.ResourceBlocked++
}
var zeroDuration time.Duration
if info.MutationExecutionTime != zeroDuration {
info.MutationExecutionTime = (info.MutationExecutionTime + ps.Stats.MutationExecutionTime) / 2
glog.V(4).Infof("updated avg mutation time %v", info.MutationExecutionTime)
} else {
info.MutationExecutionTime = ps.Stats.MutationExecutionTime
}
if info.ValidationExecutionTime != zeroDuration {
info.ValidationExecutionTime = (info.ValidationExecutionTime + ps.Stats.ValidationExecutionTime) / 2
glog.V(4).Infof("updated avg validation time %v", info.ValidationExecutionTime)
} else {
info.ValidationExecutionTime = ps.Stats.ValidationExecutionTime
}
if info.GenerationExecutionTime != zeroDuration {
info.GenerationExecutionTime = (info.GenerationExecutionTime + ps.Stats.GenerationExecutionTime) / 2
glog.V(4).Infof("updated avg generation time %v", info.GenerationExecutionTime)
} else {
info.GenerationExecutionTime = ps.Stats.GenerationExecutionTime
}
// aggregate rule details
info.Rules = aggregateRules(info.Rules, ps.Stats.Rules)
// update
psa.policyData[ps.PolicyName] = info
glog.V(4).Infof("updated stats for policy %s", ps.PolicyName)
} }
func aggregateRules(old []RuleStatinfo, update []RuleStatinfo) []RuleStatinfo { func (s *StatSync) updateStats() {
var zeroDuration time.Duration s.cache.mu.Lock()
searchRule := func(list []RuleStatinfo, key string) *RuleStatinfo { var nameToStatus = make(map[string]v1.PolicyStatus, len(s.cache.data))
for _, v := range list { for k, v := range s.cache.data {
if v.RuleName == key { nameToStatus[k] = v
return &v }
} s.cache.mu.Unlock()
for policyName, status := range nameToStatus {
var policy = &v1.ClusterPolicy{}
policy, err := s.policyStore.Get(policyName)
if err != nil {
continue
} }
return nil policy.Status = status
} _, err = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy)
newRules := []RuleStatinfo{} if err != nil {
// search for new rules in old rules and update it log.Println(err)
for _, updateR := range update {
if updateR.ExecutionTime != zeroDuration {
if rule := searchRule(old, updateR.RuleName); rule != nil {
rule.ExecutionTime = (rule.ExecutionTime + updateR.ExecutionTime) / 2
rule.RuleAppliedCount = rule.RuleAppliedCount + updateR.RuleAppliedCount
rule.RulesFailedCount = rule.RulesFailedCount + updateR.RulesFailedCount
rule.MutationCount = rule.MutationCount + updateR.MutationCount
newRules = append(newRules, *rule)
} else {
newRules = append(newRules, updateR)
}
} }
} }
return newRules
} }
//GetPolicyStats returns the policy stats func (s *StatSync) UpdateStatusWithMutateStats(response response.EngineResponse) {
func (psa *PolicyStatusAggregator) GetPolicyStats(policyName string) PolicyStatInfo { s.cache.mu.Lock()
func() { var policyStatus v1.PolicyStatus
glog.V(4).Infof("read lock update policy %s", policyName) policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
psa.mux.RLock() if !exist {
}() policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
defer func() { if policy != nil {
glog.V(4).Infof("read Unlock update policy %s", policyName) policyStatus = policy.Status
psa.mux.RUnlock() }
}() }
glog.V(4).Infof("read stats for policy %s", policyName)
return psa.policyData[policyName] var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
policyStatus.ResourcesMutatedCount++
ruleStat.AppliedCount++
ruleStat.ResourcesMutatedCount++
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
} }
//RemovePolicyStats rmves policy stats records func (s *StatSync) UpdateStatusWithValidateStats(response response.EngineResponse) {
func (psa *PolicyStatusAggregator) RemovePolicyStats(policyName string) { s.cache.mu.Lock()
func() { var policyStatus v1.PolicyStatus
glog.V(4).Infof("write lock update policy %s", policyName) policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
psa.mux.Lock() if !exist {
}() policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
defer func() { if policy != nil {
glog.V(4).Infof("write Unlock update policy %s", policyName) policyStatus = policy.Status
psa.mux.Unlock() }
}() }
glog.V(4).Infof("removing stats for policy %s", policyName)
delete(psa.policyData, policyName) var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
ruleStat.AppliedCount++
if response.PolicyResponse.ValidationFailureAction == "enforce" {
policyStatus.ResourcesBlockedCount++
ruleStat.ResourcesBlockedCount++
}
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
} }
//PolicyStatusInterface provides methods to modify policyStatus func (s *StatSync) UpdateStatusWithGenerateStats(response response.EngineResponse) {
type PolicyStatusInterface interface { s.cache.mu.Lock()
SendStat(stat PolicyStat) var policyStatus v1.PolicyStatus
// UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
if !exist {
policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
} }
//PolicyStat stored stats for policy func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration {
type PolicyStat struct { if averageOver == 0 {
PolicyName string return newTime
Stats PolicyStatInfo }
} oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString)
numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds()
//PolicyStatInfo provides statistics for policy denominator := averageOver + 1
type PolicyStatInfo struct { newAverageTimeInNanoSeconds := numerator / denominator
MutationExecutionTime time.Duration return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond
ValidationExecutionTime time.Duration
GenerationExecutionTime time.Duration
RulesAppliedCount int
ResourceBlocked int
Rules []RuleStatinfo
}
//RuleStatinfo provides statistics for rule
type RuleStatinfo struct {
RuleName string
ExecutionTime time.Duration
RuleAppliedCount int
RulesFailedCount int
MutationCount int
}
//SendStat sends the stat information for aggregation
func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) {
glog.V(4).Infof("sending policy stats: %v", stat)
// Send over channel
psa.ch <- stat
}
//GetPolicyStatusAggregator returns interface to send policy status stats
func (pc *PolicyController) GetPolicyStatusAggregator() PolicyStatusInterface {
return pc.statusAggregator
} }

View file

@ -1,230 +0,0 @@
package policy
import (
"sync"
"time"
"github.com/nirmata/kyverno/pkg/policystore"
"github.com/nirmata/kyverno/pkg/engine/response"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/nirmata/kyverno/pkg/client/clientset/versioned"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
)
type statusCache struct {
mu sync.RWMutex
data map[string]v1.PolicyStatus
}
type StatSync struct {
cache *statusCache
stop <-chan struct{}
client *versioned.Clientset
policyStore *policystore.PolicyStore
}
func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}, pMetaStore *policystore.PolicyStore) *StatSync {
return &StatSync{
cache: &statusCache{
mu: sync.RWMutex{},
data: make(map[string]v1.PolicyStatus),
},
stop: stopCh,
client: client,
policyStore: pMetaStore,
}
}
func (s *StatSync) Start() {
// update policy status every 10 seconds - waits for previous updateStatus to complete
wait.Until(s.updateStats, 1*time.Second, s.stop)
<-s.stop
s.updateStats()
}
func (s *StatSync) updateStats() {
s.cache.mu.Lock()
for policyName, status := range s.cache.data {
var policy = &v1.ClusterPolicy{}
policy.Name = policyName
policy.Status = status
_, _ = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy)
}
s.cache.data = make(map[string]v1.PolicyStatus)
s.cache.mu.Unlock()
}
func (s *StatSync) UpdateStatusWithMutateStats(response response.EngineResponse) {
s.cache.mu.Lock()
var policyStatus v1.PolicyStatus
policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
if !exist {
policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
var policyAverageExecutionTime time.Duration
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
newAverageExecutionTime := updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver)
policyAverageExecutionTime += newAverageExecutionTime
ruleStat.ExecutionTime = newAverageExecutionTime.String()
if rule.Success {
policyStatus.RulesAppliedCount++
policyStatus.ResourcesMutatedCount++
ruleStat.AppliedCount++
ruleStat.ResourcesMutatedCount++
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
}
func (s *StatSync) UpdateStatusWithValidateStats(response response.EngineResponse) {
s.cache.mu.Lock()
var policyStatus v1.PolicyStatus
policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
if !exist {
policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
var policyAverageExecutionTime time.Duration
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
newAverageExecutionTime := updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver)
policyAverageExecutionTime += newAverageExecutionTime
ruleStat.ExecutionTime = newAverageExecutionTime.String()
if rule.Success {
policyStatus.RulesAppliedCount++
policyStatus.ResourcesBlockedCount++
ruleStat.AppliedCount++
ruleStat.ResourcesBlockedCount++
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
}
func (s *StatSync) UpdateStatusWithGenerateStats(response response.EngineResponse) {
s.cache.mu.Lock()
var policyStatus v1.PolicyStatus
policyStatus, exist := s.cache.data[response.PolicyResponse.Policy]
if !exist {
policy, _ := s.policyStore.Get(response.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
var nameToRule = make(map[string]v1.RuleStats, 0)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
var policyAverageExecutionTime time.Duration
for _, rule := range response.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.ViolationCount)
newAverageExecutionTime := updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver)
policyAverageExecutionTime += newAverageExecutionTime
ruleStat.ExecutionTime = newAverageExecutionTime.String()
if rule.Success {
policyStatus.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
policyStatus.ViolationCount++
ruleStat.ViolationCount++
}
nameToRule[rule.Name] = ruleStat
}
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
ruleStats = append(ruleStats, ruleStat)
}
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
s.cache.data[response.PolicyResponse.Policy] = policyStatus
s.cache.mu.Unlock()
}
func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration {
if averageOver == 0 {
return newTime
}
oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString)
numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds()
denominator := averageOver + 1
newAverageTimeInNanoSeconds := numerator / denominator
return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond
}

View file

@ -58,6 +58,7 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
for _, policy := range policies { for _, policy := range policies {
policyContext.Policy = policy policyContext.Policy = policy
engineResponse := engine.Generate(policyContext) engineResponse := engine.Generate(policyContext)
go ws.status.UpdateStatusWithGenerateStats(engineResponse)
if len(engineResponse.PolicyResponse.Rules) > 0 { if len(engineResponse.PolicyResponse.Rules) > 0 {
// some generate rules do apply to the resource // some generate rules do apply to the resource
engineResponses = append(engineResponses, engineResponse) engineResponses = append(engineResponses, engineResponse)

View file

@ -1,6 +1,7 @@
package webhooks package webhooks
import ( import (
"log"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -56,13 +57,16 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
glog.V(2).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", glog.V(2).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation) resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation)
policyContext.Policy = policy policyContext.Policy = policy
if resource.GetKind() == "Pod" {
log.Println("some")
}
engineResponse := engine.Mutate(policyContext) engineResponse := engine.Mutate(policyContext)
go ws.status.UpdateStatusWithMutateStats(engineResponse)
engineResponses = append(engineResponses, engineResponse) engineResponses = append(engineResponses, engineResponse)
if !engineResponse.IsSuccesful() { if !engineResponse.IsSuccesful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName()) glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName())
continue continue
} }
go ws.status.UpdateStatusWithMutateStats(engineResponse)
// gather patches // gather patches
patches = append(patches, engineResponse.GetPatches()...) patches = append(patches, engineResponse.GetPatches()...)
glog.V(4).Infof("Mutation from policy %s has applied successfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName()) glog.V(4).Infof("Mutation from policy %s has applied successfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName())

View file

@ -56,7 +56,7 @@ type WebhookServer struct {
// webhook registration client // webhook registration client
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
// API to send policy stats for aggregation // API to send policy stats for aggregation
status policy.StatSync status *policy.StatSync
// helpers to validate against current loaded configuration // helpers to validate against current loaded configuration
configHandler config.Interface configHandler config.Interface
// channel for cleanup notification // channel for cleanup notification
@ -83,7 +83,7 @@ func NewWebhookServer(
crbInformer rbacinformer.ClusterRoleBindingInformer, crbInformer rbacinformer.ClusterRoleBindingInformer,
eventGen event.Interface, eventGen event.Interface,
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient, webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
status *policy.StatSync, statusSync *policy.StatSync,
configHandler config.Interface, configHandler config.Interface,
pMetaStore policystore.LookupInterface, pMetaStore policystore.LookupInterface,
pvGenerator policyviolation.GeneratorInterface, pvGenerator policyviolation.GeneratorInterface,
@ -113,7 +113,7 @@ func NewWebhookServer(
crbSynced: crbInformer.Informer().HasSynced, crbSynced: crbInformer.Informer().HasSynced,
eventGen: eventGen, eventGen: eventGen,
webhookRegistrationClient: webhookRegistrationClient, webhookRegistrationClient: webhookRegistrationClient,
status: status, status: statusSync,
configHandler: configHandler, configHandler: configHandler,
cleanUp: cleanUp, cleanUp: cleanUp,
lastReqTime: resourceWebhookWatcher.LastReqTime, lastReqTime: resourceWebhookWatcher.LastReqTime,

View file

@ -9,9 +9,7 @@ import (
"github.com/nirmata/kyverno/pkg/engine" "github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/engine/context" "github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/engine/response" "github.com/nirmata/kyverno/pkg/engine/response"
policyctr "github.com/nirmata/kyverno/pkg/policy"
"github.com/nirmata/kyverno/pkg/policyviolation" "github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/utils"
v1beta1 "k8s.io/api/admission/v1beta1" v1beta1 "k8s.io/api/admission/v1beta1"
) )
@ -22,36 +20,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
glog.V(4).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", glog.V(4).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation)
var policyStats []policyctr.PolicyStat
evalTime := time.Now() evalTime := time.Now()
// gather stats from the engine response
gatherStat := func(policyName string, policyResponse response.PolicyResponse) {
ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.ValidationExecutionTime = policyResponse.ProcessingTime
ps.Stats.RulesAppliedCount = policyResponse.RulesAppliedCount
// capture rule level stats
for _, rule := range policyResponse.Rules {
rs := policyctr.RuleStatinfo{}
rs.RuleName = rule.Name
rs.ExecutionTime = rule.RuleStats.ProcessingTime
if rule.Success {
rs.RuleAppliedCount++
} else {
rs.RulesFailedCount++
}
ps.Stats.Rules = append(ps.Stats.Rules, rs)
}
policyStats = append(policyStats, ps)
}
// send stats for aggregation
sendStat := func(blocked bool) {
for _, stat := range policyStats {
stat.Stats.ResourceBlocked = utils.Btoi(blocked)
//SEND
ws.policyStatus.SendStat(stat)
}
}
// Get new and old resource // Get new and old resource
newR, oldR, err := extractResources(patchedResource, request) newR, oldR, err := extractResources(patchedResource, request)
@ -100,12 +69,11 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
continue continue
} }
engineResponses = append(engineResponses, engineResponse) engineResponses = append(engineResponses, engineResponse)
// Gather policy application statistics
gatherStat(policy.Name, engineResponse.PolicyResponse)
if !engineResponse.IsSuccesful() { if !engineResponse.IsSuccesful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, newR.GetNamespace(), newR.GetName()) glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, newR.GetNamespace(), newR.GetName())
continue continue
} }
go ws.status.UpdateStatusWithValidateStats(engineResponse)
} }
glog.V(4).Infof("eval: %v %s/%s/%s ", time.Since(evalTime), request.Kind, request.Namespace, request.Name) glog.V(4).Infof("eval: %v %s/%s/%s ", time.Since(evalTime), request.Kind, request.Namespace, request.Name)
// report time // report time
@ -117,7 +85,6 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
blocked := toBlockResource(engineResponses) blocked := toBlockResource(engineResponses)
if blocked { if blocked {
glog.V(4).Infof("resource %s/%s/%s is blocked\n", newR.GetKind(), newR.GetNamespace(), newR.GetName()) glog.V(4).Infof("resource %s/%s/%s is blocked\n", newR.GetKind(), newR.GetNamespace(), newR.GetName())
sendStat(true)
return false, getEnforceFailureErrorMsg(engineResponses) return false, getEnforceFailureErrorMsg(engineResponses)
} }
@ -128,7 +95,6 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
// ADD EVENTS // ADD EVENTS
events := generateEvents(engineResponses, (request.Operation == v1beta1.Update)) events := generateEvents(engineResponses, (request.Operation == v1beta1.Update))
ws.eventGen.Add(events...) ws.eventGen.Add(events...)
sendStat(false)
// report time end // report time end
glog.V(4).Infof("report: %v %s/%s/%s", time.Since(reportTime), request.Kind, request.Namespace, request.Name) glog.V(4).Infof("report: %v %s/%s/%s", time.Since(reportTime), request.Kind, request.Namespace, request.Name)
return true, "" return true, ""