diff --git a/main.go b/main.go index 6db33a255e..2c2a950354 100644 --- a/main.go +++ b/main.go @@ -110,7 +110,7 @@ func main() { if err = webhookRegistrationClient.Register(); err != nil { glog.Fatalf("Failed registering Admission Webhooks: %v\n", err) } - server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, webhookRegistrationClient, filterK8Resources) + server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, webhookRegistrationClient, pc.GetPolicyStatusAggregator(), filterK8Resources) if err != nil { glog.Fatalf("Unable to create webhook server: %v\n", err) } diff --git a/pkg/engine/generation.go b/pkg/engine/generation.go index 9578d39674..9b46aa755f 100644 --- a/pkg/engine/generation.go +++ b/pkg/engine/generation.go @@ -20,16 +20,14 @@ import ( //Generate apply generation rules on a resource func Generate(client *client.Client, policy kyverno.Policy, ns unstructured.Unstructured) EngineResponse { var response EngineResponse - var executionTime time.Duration startTime := time.Now() glog.V(4).Infof("started applying generation rules of policy %q (%v)", policy.Name, startTime) defer func() { - executionTime = time.Since(startTime) response.ExecutionTime = time.Since(startTime) glog.V(4).Infof("Finished applying generation rules policy %q (%v)", policy.Name, response.ExecutionTime) glog.V(4).Infof("Mutation Rules appplied succesfully count %q for policy %q", response.RulesAppliedCount, policy.Name) }() - succesfulRuleCount := func() { + incrementAppliedRuleCount := func() { // rules applied succesfully count response.RulesAppliedCount++ } @@ -49,9 +47,9 @@ func Generate(client *client.Client, policy kyverno.Policy, ns unstructured.Unst } else { ri.Addf("Generation succesfully.", rule.Name) glog.Infof("succesfully applied policy %s rule %s on resource %s/%s/%s", policy.Name, rule.Name, ns.GetKind(), ns.GetNamespace(), ns.GetName()) - succesfulRuleCount() } ris = append(ris, ri) + incrementAppliedRuleCount() } response.RuleInfos = ris return response diff --git a/pkg/engine/mutation.go b/pkg/engine/mutation.go index 88ae7d7644..fd453f4d16 100644 --- a/pkg/engine/mutation.go +++ b/pkg/engine/mutation.go @@ -12,22 +12,20 @@ import ( // Mutate performs mutation. Overlay first and then mutation patches -func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineResponse { - var response EngineResponse +func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) (response EngineResponse) { + // var response EngineResponse var allPatches, rulePatches [][]byte var err error var errs []error ris := []info.RuleInfo{} - var executionTime time.Duration startTime := time.Now() glog.V(4).Infof("started applying mutation rules of policy %q (%v)", policy.Name, startTime) defer func() { - executionTime = time.Since(startTime) response.ExecutionTime = time.Since(startTime) - glog.V(4).Infof("Finished applying mutation rules policy %q (%v)", policy.Name, response.ExecutionTime) - glog.V(4).Infof("Mutation Rules appplied succesfully count %q for policy %q", response.RulesAppliedCount, policy.Name) + glog.V(4).Infof("finished applying mutation rules policy %v (%v)", policy.Name, response.ExecutionTime) + glog.V(4).Infof("Mutation Rules appplied succesfully count %v for policy %q", response.RulesAppliedCount, policy.Name) }() - succesfulRuleCount := func() { + incrementAppliedRuleCount := func() { // rules applied succesfully count response.RulesAppliedCount++ } @@ -78,12 +76,12 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes allPatches = append(allPatches, rulePatches...) glog.V(4).Infof("overlay applied succesfully on resource %s/%s", resource.GetNamespace(), resource.GetName()) - succesfulRuleCount() } else { glog.V(4).Infof("failed to apply overlay: %v", err) ruleInfo.Fail() ruleInfo.Addf("failed to apply overlay: %v", err) } + incrementAppliedRuleCount() } // Process Patches @@ -101,8 +99,8 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes ruleInfo.Patches = rulePatches allPatches = append(allPatches, rulePatches...) - succesfulRuleCount() } + incrementAppliedRuleCount() } patchedDocument, err = ApplyPatches(patchedDocument, rulePatches) diff --git a/pkg/engine/validation.go b/pkg/engine/validation.go index 9dcaca2ae1..f43c59c2c1 100644 --- a/pkg/engine/validation.go +++ b/pkg/engine/validation.go @@ -18,18 +18,16 @@ import ( // Validate handles validating admission request // Checks the target resources for rules defined in the policy -func Validate(policy kyverno.Policy, resource unstructured.Unstructured) EngineResponse { - var response EngineResponse - var executionTime time.Duration +func Validate(policy kyverno.Policy, resource unstructured.Unstructured) (response EngineResponse) { + // var response EngineResponse startTime := time.Now() glog.V(4).Infof("started applying validation rules of policy %q (%v)", policy.Name, startTime) defer func() { - executionTime = time.Since(startTime) response.ExecutionTime = time.Since(startTime) - glog.V(4).Infof("Finished applying mutation rules policy %q (%v)", policy.Name, response.ExecutionTime) - glog.V(4).Infof("Mutation Rules appplied succesfully count %q for policy %q", response.RulesAppliedCount, policy.Name) + glog.V(4).Infof("Finished applying validation rules policy %v (%v)", policy.Name, response.ExecutionTime) + glog.V(4).Infof("Validation Rules appplied succesfully count %v for policy %q", response.RulesAppliedCount, policy.Name) }() - succesfulRuleCount := func() { + incrementAppliedRuleCount := func() { // rules applied succesfully count response.RulesAppliedCount++ } @@ -72,8 +70,8 @@ func Validate(policy kyverno.Policy, resource unstructured.Unstructured) EngineR } else { ruleInfo.Add("Pattern succesfully validated") glog.V(4).Infof("pattern validated succesfully on resource %s/%s", resource.GetNamespace(), resource.GetName()) - succesfulRuleCount() } + incrementAppliedRuleCount() ruleInfos = append(ruleInfos, ruleInfo) } response.RuleInfos = ruleInfos diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index 142f28594f..b7bd93cef8 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -68,6 +68,8 @@ type PolicyController struct { rm resourceManager // filter the resources defined in the list filterK8Resources []utils.K8Resource + // recieves stats and aggregates details + statusAggregator *PolicyStatusAggregator } // NewPolicyController create a new PolicyController @@ -116,6 +118,9 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, client *client. //TODO: pass the time in seconds instead of converting it internally pc.rm = NewResourceManager(30) + // aggregator + pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient) + return &pc, nil } @@ -335,6 +340,9 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(pc.worker, time.Second, stopCh) } + // policy status aggregator + //TODO: workers required for aggergation + pc.statusAggregator.Run(1, stopCh) <-stopCh } @@ -403,7 +411,8 @@ func (pc *PolicyController) syncPolicy(key string) error { policyInfos := pc.processExistingResources(*p) // report errors pc.report(policyInfos) - return pc.syncStatusOnly(p, pvList) + return pc.statusAggregator.UpdateViolationCount(p, pvList) + // return pc.syncStatusOnly(p, pvList) } //syncStatusOnly updates the policy status subresource @@ -422,13 +431,6 @@ func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno. return err } -func calculateStatus(pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus { - violationCount := len(pvList) - status := kyverno.PolicyStatus{ - Violations: violationCount, - } - return status -} func (pc *PolicyController) getPolicyViolationsForPolicy(p *kyverno.Policy) ([]*kyverno.PolicyViolation, error) { // List all PolicyViolation to find those we own but that no longer match our // selector. They will be orphaned by ClaimPolicyViolation(). diff --git a/pkg/policy/status.go b/pkg/policy/status.go index 7a49726b98..acc1ed3dbf 100644 --- a/pkg/policy/status.go +++ b/pkg/policy/status.go @@ -1,6 +1,15 @@ package policy -import "time" +import ( + "reflect" + "time" + + "github.com/golang/glog" + kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1" + kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" +) type PolicyStatus struct { // average time required to process the policy rules on a resource @@ -12,3 +21,101 @@ type PolicyStatus struct { // Count of the resource for whom the mutation rules were applied succesfully resourcesMutatedCount int } + +type PolicyStatusAggregator struct { + // time since we start aggregating the stats + startTime time.Time + // channel to recieve stats + ch chan PolicyStat + // update polict status + psControl PStatusControlInterface +} + +//NewPolicyStatAggregator returns a new policy status +func NewPolicyStatAggregator(client *kyvernoclient.Clientset) *PolicyStatusAggregator { + psa := PolicyStatusAggregator{ + startTime: time.Now(), + ch: make(chan PolicyStat), + } + psa.psControl = PSControl{Client: client} + //TODO: add WaitGroup + return &psa +} + +func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + glog.V(4).Info("Started aggregator for policy status stats") + defer func() { + glog.V(4).Info("Shutting down aggregator for policy status stats") + }() + for i := 0; i < workers; i++ { + go wait.Until(psa.aggregate, time.Second, stopCh) + } +} +func (psa *PolicyStatusAggregator) aggregate() { + for r := range psa.ch { + glog.V(4).Infof("recieved policy stats %v", r) + } +} + +type PolicyStatusInterface interface { + SendStat(stat PolicyStat) + UpdateViolationCount(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error +} + +type PolicyStat struct { + PolicyName string + MutationExecutionTime time.Duration + ValidationExecutionTime time.Duration + RulesAppliedCount int + ResourceBlocked bool +} + +//SendStat sends the stat information for aggregation +func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) { + glog.V(4).Infof("sending policy stats: %v", stat) + // Send over channel + psa.ch <- stat +} + +//UpdateViolationCount updates the active violation count +func (psa *PolicyStatusAggregator) UpdateViolationCount(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error { + newStatus := calculateStatus(pvList) + if reflect.DeepEqual(newStatus, p.Status) { + // no update to status + return nil + } + // update status + newPolicy := p + newPolicy.Status = newStatus + + return psa.psControl.UpdatePolicyStatus(newPolicy) +} + +func calculateStatus(pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus { + violationCount := len(pvList) + status := kyverno.PolicyStatus{ + Violations: violationCount, + } + return status +} + +//GetPolicyStatusAggregator returns interface to send policy status stats +func (pc *PolicyController) GetPolicyStatusAggregator() PolicyStatusInterface { + return pc.statusAggregator +} + +//PStatusControlInterface Provides interface to operate on policy status +type PStatusControlInterface interface { + UpdatePolicyStatus(newPolicy *kyverno.Policy) error +} + +type PSControl struct { + Client kyvernoclient.Interface +} + +//UpdatePolicyStatus update policy status +func (c PSControl) UpdatePolicyStatus(newPolicy *kyverno.Policy) error { + _, err := c.Client.KyvernoV1alpha1().Policies().UpdateStatus(newPolicy) + return err +} diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index d1e9247af6..f706020627 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -4,6 +4,7 @@ import ( "github.com/golang/glog" engine "github.com/nirmata/kyverno/pkg/engine" "github.com/nirmata/kyverno/pkg/info" + policyctr "github.com/nirmata/kyverno/pkg/policy" "github.com/nirmata/kyverno/pkg/utils" v1beta1 "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -14,6 +15,23 @@ import ( func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool, engine.EngineResponse) { var patches [][]byte var policyInfos []info.PolicyInfo + var policyStats []policyctr.PolicyStat + // gather stats from the engine response + gatherStat := func(policyName string, er engine.EngineResponse) { + ps := policyctr.PolicyStat{} + ps.PolicyName = policyName + ps.MutationExecutionTime = er.ExecutionTime + ps.RulesAppliedCount = er.RulesAppliedCount + policyStats = append(policyStats, ps) + } + // send stats for aggregation + sendStat := func(blocked bool) { + for _, stat := range policyStats { + stat.ResourceBlocked = blocked + //SEND + ws.policyStatus.SendStat(stat) + } + } glog.V(5).Infof("Receive request in mutating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) @@ -54,6 +72,10 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool engineResponse = engine.Mutate(*policy, *resource) policyInfo.AddRuleInfos(engineResponse.RuleInfos) + // Gather policy application statistics + gatherStat(policy.Name, engineResponse) + + // ps := policyctr.NewPolicyStat(policy.Name, engineResponse.ExecutionTime, nil, engineResponse.RulesAppliedCount) if !policyInfo.IsSuccessful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName()) @@ -67,6 +89,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool patches = append(patches, engineResponse.Patches...) policyInfos = append(policyInfos, policyInfo) glog.V(4).Infof("Mutation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName()) + } // ADD ANNOTATIONS @@ -80,11 +103,14 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool } ok, msg := isAdmSuccesful(policyInfos) + // Send policy engine Stats if ok { + sendStat(false) engineResponse.Patches = patches return true, engineResponse } + sendStat(true) glog.Errorf("Failed to mutate the resource: %s\n", msg) return false, engineResponse } diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 2bfb048c45..af6ae8a880 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -19,6 +19,7 @@ import ( "github.com/nirmata/kyverno/pkg/config" client "github.com/nirmata/kyverno/pkg/dclient" "github.com/nirmata/kyverno/pkg/event" + "github.com/nirmata/kyverno/pkg/policy" tlsutils "github.com/nirmata/kyverno/pkg/tls" "github.com/nirmata/kyverno/pkg/utils" v1beta1 "k8s.io/api/admission/v1beta1" @@ -37,7 +38,9 @@ type WebhookServer struct { pvListerSynced cache.InformerSynced eventGen event.Interface webhookRegistrationClient *WebhookRegistrationClient - filterK8Resources []utils.K8Resource + // API to send policy stats for aggregation + policyStatus policy.PolicyStatusInterface + filterK8Resources []utils.K8Resource } // NewWebhookServer creates new instance of WebhookServer accordingly to given configuration @@ -50,6 +53,7 @@ func NewWebhookServer( pvInormer kyvernoinformer.PolicyViolationInformer, eventGen event.Interface, webhookRegistrationClient *WebhookRegistrationClient, + policyStatus policy.PolicyStatusInterface, filterK8Resources string) (*WebhookServer, error) { if tlsPair == nil { @@ -73,6 +77,7 @@ func NewWebhookServer( pvListerSynced: pInformer.Informer().HasSynced, eventGen: eventGen, webhookRegistrationClient: webhookRegistrationClient, + policyStatus: policyStatus, filterK8Resources: utils.ParseKinds(filterK8Resources), } mux := http.NewServeMux() diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index 6e2953b324..088694bd67 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -4,6 +4,7 @@ import ( "github.com/golang/glog" engine "github.com/nirmata/kyverno/pkg/engine" "github.com/nirmata/kyverno/pkg/info" + policyctr "github.com/nirmata/kyverno/pkg/policy" "github.com/nirmata/kyverno/pkg/policyviolation" "github.com/nirmata/kyverno/pkg/utils" v1beta1 "k8s.io/api/admission/v1beta1" @@ -17,6 +18,23 @@ import ( // If there are no errors in validating rule we apply generation rules func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, resource unstructured.Unstructured) *v1beta1.AdmissionResponse { var policyInfos []info.PolicyInfo + var policyStats []policyctr.PolicyStat + // gather stats from the engine response + gatherStat := func(policyName string, er engine.EngineResponse) { + ps := policyctr.PolicyStat{} + ps.PolicyName = policyName + ps.ValidationExecutionTime = er.ExecutionTime + ps.RulesAppliedCount = er.RulesAppliedCount + policyStats = append(policyStats, ps) + } + // send stats for aggregation + sendStat := func(blocked bool) { + for _, stat := range policyStats { + stat.ResourceBlocked = blocked + //SEND + ws.policyStatus.SendStat(stat) + } + } glog.V(5).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) @@ -55,6 +73,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res if len(engineResponse.RuleInfos) == 0 { continue } + gatherStat(policy.Name, engineResponse) if len(engineResponse.RuleInfos) > 0 { glog.V(4).Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName()) @@ -87,6 +106,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res // Even if one the policy being applied ok, msg := isAdmSuccesful(policyInfos) if !ok && toBlock(policyInfos) { + sendStat(true) return &v1beta1.AdmissionResponse{ Allowed: false, Result: &metav1.Status{ @@ -98,6 +118,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res // ADD POLICY VIOLATIONS policyviolation.GeneratePolicyViolations(ws.pvListerSynced, ws.pvLister, ws.kyvernoClient, policyInfos) + sendStat(false) return &v1beta1.AdmissionResponse{ Allowed: true, }