From 38b92a0d34f6b61286a7fc0b3cb532910ccb6bbc Mon Sep 17 00:00:00 2001 From: shravan Date: Wed, 4 Mar 2020 13:35:49 +0530 Subject: [PATCH] 527 making status listner into a buffered channel instead of go routines --- pkg/generate/generate.go | 10 ++++------ pkg/policyStatus/main.go | 2 +- pkg/policyviolation/clusterpv.go | 8 ++------ pkg/policyviolation/namespacedpv.go | 8 ++------ pkg/webhooks/generation.go | 4 +--- pkg/webhooks/mutation.go | 4 +--- pkg/webhooks/validation.go | 4 +--- 7 files changed, 12 insertions(+), 28 deletions(-) diff --git a/pkg/generate/generate.go b/pkg/generate/generate.go index 48d53cae4c..8391b37978 100644 --- a/pkg/generate/generate.go +++ b/pkg/generate/generate.go @@ -127,12 +127,10 @@ func (c *Controller) applyGeneratePolicy(policyContext engine.PolicyContext, gr } if gr.Status.State == "" { - go func() { - c.policyStatus.Listener <- &generateSyncStats{ - policyName: policy.Name, - ruleNameToProcessingTime: ruleNameToProcessingTime, - } - }() + c.policyStatus.Listener <- &generateSyncStats{ + policyName: policy.Name, + ruleNameToProcessingTime: ruleNameToProcessingTime, + } } return genResources, nil diff --git a/pkg/policyStatus/main.go b/pkg/policyStatus/main.go index a1507cac81..7e85ada691 100644 --- a/pkg/policyStatus/main.go +++ b/pkg/policyStatus/main.go @@ -41,7 +41,7 @@ func NewSync(c *versioned.Clientset, p policyStore) *Sync { }, client: c, PolicyStore: p, - Listener: make(chan statusUpdater), + Listener: make(chan statusUpdater, 20), } } diff --git a/pkg/policyviolation/clusterpv.go b/pkg/policyviolation/clusterpv.go index 3b57ff2597..4d42464ba8 100644 --- a/pkg/policyviolation/clusterpv.go +++ b/pkg/policyviolation/clusterpv.go @@ -100,9 +100,7 @@ func (cpv *clusterPV) createPV(newPv *kyverno.ClusterPolicyViolation) error { } if newPv.Annotations["fromSync"] != "true" { - go func() { - cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) - }() + cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) } glog.Infof("policy violation created for resource %v", newPv.Spec.ResourceSpec) @@ -128,9 +126,7 @@ func (cpv *clusterPV) updatePV(newPv, oldPv *kyverno.ClusterPolicyViolation) err glog.Infof("cluster policy violation updated for resource %v", newPv.Spec.ResourceSpec) if newPv.Annotations["fromSync"] != "true" { - go func() { - cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) - }() + cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) } return nil } diff --git a/pkg/policyviolation/namespacedpv.go b/pkg/policyviolation/namespacedpv.go index 10e92ef24a..acc5095572 100644 --- a/pkg/policyviolation/namespacedpv.go +++ b/pkg/policyviolation/namespacedpv.go @@ -99,9 +99,7 @@ func (nspv *namespacedPV) createPV(newPv *kyverno.PolicyViolation) error { } if newPv.Annotations["fromSync"] != "true" { - go func() { - nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) - }() + nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) } glog.Infof("policy violation created for resource %v", newPv.Spec.ResourceSpec) return nil @@ -124,9 +122,7 @@ func (nspv *namespacedPV) updatePV(newPv, oldPv *kyverno.PolicyViolation) error } if newPv.Annotations["fromSync"] != "true" { - go func() { - nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) - }() + nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules) } glog.Infof("namespaced policy violation updated for resource %v", newPv.Spec.ResourceSpec) return nil diff --git a/pkg/webhooks/generation.go b/pkg/webhooks/generation.go index b62de1bddb..5cd692de6e 100644 --- a/pkg/webhooks/generation.go +++ b/pkg/webhooks/generation.go @@ -68,9 +68,7 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic if len(engineResponse.PolicyResponse.Rules) > 0 { // some generate rules do apply to the resource engineResponses = append(engineResponses, engineResponse) - go func() { - ws.status.Listener <- updateStatusWithGenerateStats(engineResponse) - }() + ws.status.Listener <- updateStatusWithGenerateStats(engineResponse) } } // Adds Generate Request to a channel(queue size 1000) to generators diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index dfaa8c2fa7..fdf5862cb8 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -63,9 +63,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou policyContext.Policy = policy engineResponse := engine.Mutate(policyContext) engineResponses = append(engineResponses, engineResponse) - go func() { - ws.status.Listener <- updateStatusWithMutateStats(engineResponse) - }() + ws.status.Listener <- updateStatusWithMutateStats(engineResponse) if !engineResponse.IsSuccesful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName()) continue diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index b539bfef9d..2ab599793b 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -73,9 +73,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol continue } engineResponses = append(engineResponses, engineResponse) - go func() { - ws.status.Listener <- updateStatusWithValidateStats(engineResponse) - }() + ws.status.Listener <- updateStatusWithValidateStats(engineResponse) if !engineResponse.IsSuccesful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, newR.GetNamespace(), newR.GetName()) continue