From b5e5f3eeda1e764281309ba65cc95959a703d70d Mon Sep 17 00:00:00 2001 From: shravan Date: Sat, 15 Feb 2020 16:38:59 +0530 Subject: [PATCH] 527 save commit --- cmd/kyverno/main.go | 2 +- pkg/policy/status2.go | 107 +++++++++++++++++++++++++++++++++++++++ pkg/webhooks/mutation.go | 49 +++--------------- pkg/webhooks/server.go | 9 ++-- 4 files changed, 121 insertions(+), 46 deletions(-) create mode 100644 pkg/policy/status2.go diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 62cefceeb9..b002ce0f76 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -215,7 +215,7 @@ func main() { kubeInformer.Rbac().V1().ClusterRoleBindings(), egen, webhookRegistrationClient, - pc.GetPolicyStatusAggregator(), + policy.NewStatusSync(pclient, stopCh), configData, policyMetaStore, pvgen, diff --git a/pkg/policy/status2.go b/pkg/policy/status2.go new file mode 100644 index 0000000000..7161f2015c --- /dev/null +++ b/pkg/policy/status2.go @@ -0,0 +1,107 @@ +package policy + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/nirmata/kyverno/pkg/client/clientset/versioned" + + v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1" +) + +type statusCache struct { + mu sync.RWMutex + data map[string]v1.PolicyStatus +} + +func (c *statusCache) Get(key string) v1.PolicyStatus { + c.mu.RLock() + status := c.data[key] + c.mu.RUnlock() + return status + +} + +func (c *statusCache) GetAll() map[string]v1.PolicyStatus { + c.mu.RLock() + mapCopy := make(map[string]v1.PolicyStatus, len(c.data)) + for k, v := range c.data { + mapCopy[k] = v + } + c.mu.RUnlock() + return mapCopy + +} +func (c *statusCache) Set(key string, status v1.PolicyStatus) { + c.mu.Lock() + c.data[key] = status + c.mu.Unlock() +} +func (c *statusCache) Clear() { + c.mu.Lock() + c.data = make(map[string]v1.PolicyStatus) + c.mu.Unlock() +} + +func newStatusCache() *statusCache { + return &statusCache{ + mu: sync.RWMutex{}, + data: make(map[string]v1.PolicyStatus), + } +} + +func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}) *StatusSync { + return &StatusSync{ + statusReceiver: make(chan map[string]v1.PolicyStatus), + cache: newStatusCache(), + stop: stopCh, + client: client, + } +} + +type StatusSync struct { + statusReceiver chan map[string]v1.PolicyStatus + cache *statusCache + stop <-chan struct{} + client *versioned.Clientset +} + +func (s *StatusSync) Cache() *statusCache { + return s.cache +} + +func (s *StatusSync) Receiver() chan<- map[string]v1.PolicyStatus { + return s.statusReceiver +} + +func (s *StatusSync) Start() { + // receive status and store it in cache + go func() { + for { + select { + case nameToStatus := <-s.statusReceiver: + for policyName, status := range nameToStatus { + s.cache.Set(policyName, status) + } + case <-s.stop: + return + } + } + }() + + // update policy status every 10 seconds - waits for previous updateStatus to complete + wait.Until(s.updateStatus, 10*time.Second, s.stop) + <-s.stop +} + +func (s *StatusSync) updateStatus() { + for policyName, status := range s.cache.GetAll() { + var policy = &v1.ClusterPolicy{} + policy.Name = policyName + policy.Status = status + _, _ = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy) + } + s.cache.Clear() +} diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index c664c8a03f..612db1e40f 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -3,15 +3,15 @@ package webhooks import ( "time" + "github.com/nirmata/kyverno/pkg/policy" + "github.com/golang/glog" kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" "github.com/nirmata/kyverno/pkg/engine" "github.com/nirmata/kyverno/pkg/engine/context" "github.com/nirmata/kyverno/pkg/engine/response" engineutils "github.com/nirmata/kyverno/pkg/engine/utils" - policyctr "github.com/nirmata/kyverno/pkg/policy" "github.com/nirmata/kyverno/pkg/policyviolation" - "github.com/nirmata/kyverno/pkg/utils" v1beta1 "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -23,40 +23,6 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation) var patches [][]byte - var policyStats []policyctr.PolicyStat - - // gather stats from the engine response - gatherStat := func(policyName string, policyResponse response.PolicyResponse) { - ps := policyctr.PolicyStat{} - ps.PolicyName = policyName - ps.Stats.MutationExecutionTime = policyResponse.ProcessingTime - ps.Stats.RulesAppliedCount = policyResponse.RulesAppliedCount - // capture rule level stats - for _, rule := range policyResponse.Rules { - rs := policyctr.RuleStatinfo{} - rs.RuleName = rule.Name - rs.ExecutionTime = rule.RuleStats.ProcessingTime - if rule.Success { - rs.RuleAppliedCount++ - } else { - rs.RulesFailedCount++ - } - if rule.Patches != nil { - rs.MutationCount++ - } - ps.Stats.Rules = append(ps.Stats.Rules, rs) - } - policyStats = append(policyStats, ps) - } - // send stats for aggregation - sendStat := func(blocked bool) { - for _, stat := range policyStats { - stat.Stats.ResourceBlocked = utils.Btoi(blocked) - //SEND - ws.policyStatus.SendStat(stat) - } - } - var engineResponses []response.EngineResponse userRequestInfo := kyverno.RequestInfo{ @@ -91,12 +57,10 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou for _, policy := range policies { glog.V(2).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s", resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation) - policyContext.Policy = policy engineResponse := engine.Mutate(policyContext) engineResponses = append(engineResponses, engineResponse) - // Gather policy application statistics - gatherStat(policy.Name, engineResponse.PolicyResponse) + updateStatusWithMutate(ws.status, policy, engineResponse) if !engineResponse.IsSuccesful() { glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName()) continue @@ -125,8 +89,6 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou events := generateEvents(engineResponses, (request.Operation == v1beta1.Update)) ws.eventGen.Add(events...) - sendStat(false) - // debug info func() { if len(patches) != 0 { @@ -146,3 +108,8 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou // patches holds all the successful patches, if no patch is created, it returns nil return engineutils.JoinPatches(patches) } + +func updateStatusWithMutate(statusSync *policy.StatusSync, policy kyverno.ClusterPolicy, response response.EngineResponse) { + status := statusSync.Cache().Get(policy.Name) + status +} diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index d401cdc449..f4a962dda2 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -10,6 +10,8 @@ import ( "net/http" "time" + "github.com/nirmata/kyverno/pkg/policy" + "github.com/golang/glog" "github.com/nirmata/kyverno/pkg/checker" kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" @@ -18,7 +20,6 @@ import ( "github.com/nirmata/kyverno/pkg/config" client "github.com/nirmata/kyverno/pkg/dclient" "github.com/nirmata/kyverno/pkg/event" - "github.com/nirmata/kyverno/pkg/policy" "github.com/nirmata/kyverno/pkg/policystore" "github.com/nirmata/kyverno/pkg/policyviolation" tlsutils "github.com/nirmata/kyverno/pkg/tls" @@ -55,7 +56,7 @@ type WebhookServer struct { // webhook registration client webhookRegistrationClient *webhookconfig.WebhookRegistrationClient // API to send policy stats for aggregation - policyStatus policy.PolicyStatusInterface + status policy.StatusSync // helpers to validate against current loaded configuration configHandler config.Interface // channel for cleanup notification @@ -82,7 +83,7 @@ func NewWebhookServer( crbInformer rbacinformer.ClusterRoleBindingInformer, eventGen event.Interface, webhookRegistrationClient *webhookconfig.WebhookRegistrationClient, - policyStatus policy.PolicyStatusInterface, + status *policy.StatusSync, configHandler config.Interface, pMetaStore policystore.LookupInterface, pvGenerator policyviolation.GeneratorInterface, @@ -112,7 +113,7 @@ func NewWebhookServer( crbSynced: crbInformer.Informer().HasSynced, eventGen: eventGen, webhookRegistrationClient: webhookRegistrationClient, - policyStatus: policyStatus, + status: status, configHandler: configHandler, cleanUp: cleanUp, lastReqTime: resourceWebhookWatcher.LastReqTime,