From 68855c2ca9a9d618f7e10d561cd8a7a4896604d5 Mon Sep 17 00:00:00 2001 From: evalsocket Date: Tue, 15 Sep 2020 06:59:05 -0700 Subject: [PATCH] improvment added in jobs sheduler --- cmd/kyverno/main.go | 55 +++-- pkg/constant/constant.go | 3 + pkg/jobs/controller.go | 93 +++++---- pkg/kyverno/report/allreports.go | 62 ++++++ pkg/kyverno/report/cluster.go | 7 +- pkg/kyverno/report/command.go | 1 + pkg/kyverno/report/common.go | 337 +++++++++++++++---------------- pkg/kyverno/report/helm.go | 30 +-- pkg/kyverno/report/namespace.go | 27 +-- pkg/policy/controller.go | 33 +-- pkg/policy/report.go | 12 +- pkg/policyreport/generator.go | 31 +-- pkg/policyviolation/generator.go | 13 -- pkg/webhooks/generation.go | 1 - pkg/webhooks/mutation.go | 10 + pkg/webhooks/server.go | 10 +- pkg/webhooks/validate_audit.go | 6 +- pkg/webhooks/validation.go | 13 +- 18 files changed, 429 insertions(+), 315 deletions(-) create mode 100644 pkg/kyverno/report/allreports.go diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 7e06c0bafc..eb0859c810 100755 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/nirmata/kyverno/pkg/common" + "github.com/nirmata/kyverno/pkg/policyreport" "net/http" _ "net/http/pprof" "os" @@ -190,17 +191,34 @@ func main() { // POLICY VIOLATION GENERATOR // -- generate policy violation - pvgen := policyviolation.NewPVGenerator(pclient, - client, - pInformer.Kyverno().V1().ClusterPolicyViolations(), - pInformer.Kyverno().V1().PolicyViolations(), - pInformer.Policy().V1alpha1().ClusterPolicyReports(), - pInformer.Policy().V1alpha1().PolicyReports(), - statusSync.Listener, - jobController, - log.Log.WithName("PolicyViolationGenerator"), - stopCh, - ) + var pvgen *policyviolation.Generator + if os.Getenv("POLICY-TYPE") == common.PolicyViolation { + pvgen = policyviolation.NewPVGenerator(pclient, + client, + pInformer.Kyverno().V1().ClusterPolicyViolations(), + pInformer.Kyverno().V1().PolicyViolations(), + pInformer.Policy().V1alpha1().ClusterPolicyReports(), + pInformer.Policy().V1alpha1().PolicyReports(), + statusSync.Listener, + jobController, + log.Log.WithName("PolicyViolationGenerator"), + stopCh, + ) + } + // POLICY Report GENERATOR + // -- generate policy report + var prgen *policyreport.Generator + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + prgen = policyreport.NewPRGenerator(pclient, + client, + pInformer.Policy().V1alpha1().ClusterPolicyReports(), + pInformer.Policy().V1alpha1().PolicyReports(), + statusSync.Listener, + jobController, + log.Log.WithName("PolicyReportGenerator"), + ) + } + // POLICY CONTROLLER // - reconciliation policy and policy violation // - process policy on existing resources @@ -215,6 +233,7 @@ func main() { configData, eventGenerator, pvgen, + prgen, rWebhookWatcher, kubeInformer.Core().V1().Namespaces(), jobController, @@ -265,6 +284,7 @@ func main() { eventGenerator, statusSync.Listener, pvgen, + prgen, kubeInformer.Rbac().V1().RoleBindings(), kubeInformer.Rbac().V1().ClusterRoleBindings(), log.Log.WithName("ValidateAuditHandler"), @@ -320,6 +340,7 @@ func main() { statusSync.Listener, configData, pvgen, + prgen, grgen, rWebhookWatcher, auditHandler, @@ -341,17 +362,21 @@ func main() { go grgen.Run(1) go rWebhookWatcher.Run(stopCh) go configData.Run(stopCh) + go policyCtrl.Run(2, stopCh) - go policyCtrl.Run(3, stopCh) - + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + go prgen.Run(2,stopCh) + }else{ + go pvgen.Run(2, stopCh) + } go eventGenerator.Run(3, stopCh) go grc.Run(1, stopCh) go grcc.Run(1, stopCh) - go pvgen.Run(1, stopCh) + go statusSync.Run(1, stopCh) go pCacheController.Run(1, stopCh) go auditHandler.Run(10, stopCh) - go jobController.Run(3, stopCh) + go jobController.Run(2, stopCh) openAPISync.Run(1, stopCh) // verifies if the admission control is enabled and active diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index c8ae0dc50a..a3dfc97e7e 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -9,4 +9,7 @@ const ( EventControllerResync = 15 * time.Minute GenerateControllerResync = 15 * time.Minute GenerateRequestControllerResync = 15 * time.Minute + + PolicyReportPolicyChangeResync = 120 * time.Second + PolicyReportResourceChangeResync = 120 * time.Second ) diff --git a/pkg/jobs/controller.go b/pkg/jobs/controller.go index b655d32566..f75695f968 100644 --- a/pkg/jobs/controller.go +++ b/pkg/jobs/controller.go @@ -96,17 +96,6 @@ func NewJobsJob(dclient *dclient.Client, configHandler: configHandler, log: log, } - go func(configHandler config.Interface) { - for k := range time.Tick(time.Duration(configHandler.GetBackgroundSync()) * time.Second) { - gen.log.V(2).Info("Background Sync sync at ", "time", k.String()) - var wg sync.WaitGroup - wg.Add(3) - go gen.syncKyverno(&wg, "Helm", "SYNC", "") - go gen.syncKyverno(&wg, "Namespace", "SYNC", "") - go gen.syncKyverno(&wg, "Cluster", "SYNC", "") - wg.Wait() - } - }(configHandler) return &gen } @@ -133,7 +122,15 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() logger.Info("start") defer logger.Info("shutting down") - + go func(configHandler config.Interface) { + for k := range time.Tick(time.Duration(configHandler.GetBackgroundSync()) * time.Second) { + j.log.V(2).Info("Background Sync sync at ", "time", k.String()) + var wg sync.WaitGroup + wg.Add(1) + go j.syncKyverno(&wg, "All", "SYNC", "") + wg.Wait() + } + }(j.configHandler) for i := 0; i < workers; i++ { go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh) } @@ -188,7 +185,6 @@ func (j *Job) processNextWorkItem() bool { // lookup data store info := j.dataStore.lookup(keyHash) - err := j.syncHandler(info) j.handleErr(err, obj) return nil @@ -209,10 +205,8 @@ func (j *Job) syncHandler(info JobInfo) error { j.mux.Lock() var wg sync.WaitGroup if info.JobType == "POLICYSYNC" { - wg.Add(3) - go j.syncKyverno(&wg, "Helm", "SYNC", info.JobData) - go j.syncKyverno(&wg, "Namespace", "SYNC", info.JobData) - go j.syncKyverno(&wg, "Cluster", "SYNC", info.JobData) + wg.Add(1) + go j.syncKyverno(&wg, "All", "SYNC", info.JobData) } else if info.JobType == "CONFIGMAP" { if info.JobData != "" { str := strings.Split(info.JobData, ",") @@ -256,13 +250,34 @@ func (j *Job) syncKyverno(wg *sync.WaitGroup, jobType, scope, data string) { fmt.Sprintf("--mode=%s", mode), } break + case "All": + args = []string{ + "report", + "all", + fmt.Sprintf("--mode=%s", mode), + } + break } if scope == "POLICYSYNC" && data != "" { args = append(args, fmt.Sprintf("-p=%s", data)) } - go j.CreateJob(args, jobType, scope, wg) - wg.Wait() + resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scope" : scope, + "type" : jobType, + }, + }) + if err != nil { + j.log.Error(err, "failed to get job") + } + if len(resourceList.Items) == 0 { + go j.CreateJob(args, jobType, scope, wg) + wg.Wait() + }else{ + wg.Done() + } + } // CreateJob will create Job template for background scan @@ -301,29 +316,29 @@ func (j *Job) CreateJob(args []string, jobType, scope string, wg *sync.WaitGroup j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName()) return } - deadline := time.Now().Add(100 * time.Second) + deadline := time.Now().Add(150 * time.Second) for { - time.Sleep(20 * time.Second) - resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName()) - if err != nil { - if apierrors.IsNotFound(err) { - j.log.Error(err, "job is already deleted", "job_name", job.GetName()) - break - } - continue - } - job := v1.Job{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil { - j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName()) - continue - } - if time.Now().After(deadline) { - if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil { - j.log.Error(err, "Error in deleting jobs", "job_name", job.GetName()) + time.Sleep(20 * time.Second) + resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName()) + if err != nil { + if apierrors.IsNotFound(err) { + j.log.Error(err, "job is already deleted", "job_name", job.GetName()) + break + } continue } - break - } + job := v1.Job{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil { + j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName()) + continue + } + if time.Now().After(deadline) { + if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil { + j.log.Error(err, "Error in deleting jobs", "job_name", job.GetName()) + continue + } + break + } } wg.Done() } diff --git a/pkg/kyverno/report/allreports.go b/pkg/kyverno/report/allreports.go new file mode 100644 index 0000000000..13be60a305 --- /dev/null +++ b/pkg/kyverno/report/allreports.go @@ -0,0 +1,62 @@ +package report + +import ( + "fmt" + "github.com/nirmata/kyverno/pkg/common" + "github.com/nirmata/kyverno/pkg/utils" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + "os" + log "sigs.k8s.io/controller-runtime/pkg/log" + "sync" + "time" +) + +func AllReportsCommand() *cobra.Command { + kubernetesConfig := genericclioptions.NewConfigFlags(true) + var namespace, policy string + cmd := &cobra.Command{ + Use: "all", + Short: "generate report", + Example: fmt.Sprintf("To create a namespace report from background scan:\nkyverno report namespace --namespace=defaults \n kyverno report namespace"), + RunE: func(cmd *cobra.Command, args []string) (err error) { + os.Setenv("POLICY-TYPE", common.PolicyReport) + logger := log.Log.WithName("Report") + restConfig, err := kubernetesConfig.ToRESTConfig() + if err != nil { + logger.Error(err, "failed to create rest config of kubernetes cluster ") + os.Exit(1) + } + const resyncPeriod = 1 * time.Second + kubeClient, err := utils.NewKubeClient(restConfig) + if err != nil { + log.Log.Error(err, "Failed to create kubernetes client") + os.Exit(1) + } + var stopCh <-chan struct{} + var wg sync.WaitGroup + if namespace != "" { + wg.Add(1) + go backgroundScan(namespace, All, policy, &wg, restConfig, logger) + wg.Wait() + } else { + ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) + if err != nil { + os.Exit(1) + } + wg.Add(len(ns.Items)) + for _, n := range ns.Items { + go backgroundScan(n.GetName(), All, policy, &wg, restConfig, logger) + } + wg.Wait() + } + os.Exit(0) + <-stopCh + return nil + }, + } + cmd.Flags().StringVarP(&namespace, "namespace", "n", "", "define specific namespace") + cmd.Flags().StringVarP(&policy, "policy", "p", "", "define specific policy") + return cmd +} diff --git a/pkg/kyverno/report/cluster.go b/pkg/kyverno/report/cluster.go index db787cb1db..256bd3238d 100644 --- a/pkg/kyverno/report/cluster.go +++ b/pkg/kyverno/report/cluster.go @@ -28,12 +28,13 @@ func ClusterCommand() *cobra.Command { var wg sync.WaitGroup wg.Add(1) if mode == "cli" { - go backgroundScan("", "Cluster", policy, &wg, restConfig, logger) + go backgroundScan("", Cluster, policy, &wg, restConfig, logger) wg.Wait() - return nil + os.Exit(0) } - go configmapScan("", "Cluster", &wg, restConfig, logger) + go configmapScan("", Cluster, &wg, restConfig, logger) wg.Wait() + os.Exit(0) return nil }, } diff --git a/pkg/kyverno/report/command.go b/pkg/kyverno/report/command.go index 465963b1d3..d7249c819a 100644 --- a/pkg/kyverno/report/command.go +++ b/pkg/kyverno/report/command.go @@ -28,5 +28,6 @@ func Command() *cobra.Command { cmd.AddCommand(HelmCommand()) cmd.AddCommand(NamespaceCommand()) cmd.AddCommand(ClusterCommand()) + cmd.AddCommand(AllReportsCommand()) return cmd } diff --git a/pkg/kyverno/report/common.go b/pkg/kyverno/report/common.go index 3ed50fdc79..ecd0b05248 100644 --- a/pkg/kyverno/report/common.go +++ b/pkg/kyverno/report/common.go @@ -41,13 +41,15 @@ const ( Helm string = "Helm" Namespace string = "Namespace" Cluster string = "Cluster" + All string = "All" ) func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfig *rest.Config, logger logr.Logger) { + lgr := logger.WithValues("namespace", n, "scope", scope, "policychange", policychange) defer func() { + lgr.Error(nil, "done") wg.Done() }() - lgr := logger.WithValues("namespace", n, "scope", scope, "policychange", policychange) dClient, err := client.NewClient(restConfig, 5*time.Minute, make(chan struct{}), lgr) if err != nil { lgr.Error(err, "Error in creating dcclient with provided rest config") @@ -66,7 +68,7 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi } pclient, err := kyvernoclient.NewForConfig(restConfig) if err != nil { - lgr.Error(err, "Error in creating kyverno client for polciy with provided rest config") + lgr.Error(err, "Error in creating kyverno client for policy with provided rest config") os.Exit(1) } var stopCh <-chan struct{} @@ -147,10 +149,11 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi cpolicies = append(cpolicies, cp) } } - // key uid - resourceMap := map[string]unstructured.Unstructured{} - var engineResponses []response.EngineResponse + resourceMap := map[string]map[string]unstructured.Unstructured{} + resourceMap[Cluster] = make(map[string]unstructured.Unstructured) + resourceMap[Helm] = make(map[string]unstructured.Unstructured) + resourceMap[Namespace] = make(map[string]unstructured.Unstructured) for _, p := range cpolicies { for _, rule := range p.Spec.Rules { for _, k := range rule.MatchResources.Kinds { @@ -159,10 +162,9 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi lgr.Error(err, "failed to find resource", "kind", k) continue } - - if !resourceSchema.Namespaced && scope == Cluster { + if !resourceSchema.Namespaced { rMap := policy.GetResourcesPerNamespace(k, dClient, "", rule, configData, log.Log) - policy.MergeResources(resourceMap, rMap) + policy.MergeResources(resourceMap[Cluster], rMap) } else if resourceSchema.Namespaced { namespaces := policy.GetNamespacesForRule(&rule, np.Lister(), log.Log) for _, ns := range namespaces { @@ -172,10 +174,10 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi labels := r.GetLabels() _, okChart := labels["app"] _, okRelease := labels["release"] - if okChart && okRelease && scope == Helm { - policy.MergeResources(resourceMap, rMap) - } else if scope == Namespace && r.GetNamespace() != "" { - policy.MergeResources(resourceMap, rMap) + if okChart && okRelease { + policy.MergeResources(resourceMap[Helm], rMap) + } else if r.GetNamespace() != "" { + policy.MergeResources(resourceMap[Namespace], rMap) } } } @@ -186,123 +188,161 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi } if p.HasAutoGenAnnotation() { - resourceMap = policy.ExcludePod(resourceMap, log.Log) + switch scope { + case Cluster: + resourceMap[Cluster] = policy.ExcludePod(resourceMap[Cluster], log.Log) + break + case Namespace: + resourceMap[Namespace] = policy.ExcludePod(resourceMap[Namespace], log.Log) + break + case Helm: + resourceMap[Helm] = policy.ExcludePod(resourceMap[Helm], log.Log) + break + case All: + resourceMap[Cluster] = policy.ExcludePod(resourceMap[Cluster], log.Log) + resourceMap[Namespace] = policy.ExcludePod(resourceMap[Namespace], log.Log) + resourceMap[Helm] = policy.ExcludePod(resourceMap[Helm], log.Log) + } } results := make(map[string][]policyreportv1alpha1.PolicyReportResult) - for _, resource := range resourceMap { - policyContext := engine.PolicyContext{ - NewResource: resource, - Context: context.NewContext(), - Policy: *p, - ExcludeGroupRole: configData.GetExcludeGroupRole(), - } - - engineResponse := engine.Validate(policyContext) - - if len(engineResponse.PolicyResponse.Rules) > 0 { - engineResponses = append(engineResponses, engineResponse) - } - - engineResponse = engine.Mutate(policyContext) - if len(engineResponse.PolicyResponse.Rules) > 0 { - engineResponses = append(engineResponses, engineResponse) - } - - pv := policyreport.GeneratePRsFromEngineResponse(engineResponses, log.Log) - - for _, v := range pv { - var appname string - switch scope { - case Helm: - resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName()) - if err != nil { - lgr.Error(err, "failed to get resource") - continue - } - labels := resource.GetLabels() - _, okChart := labels["app"] - _, okRelease := labels["release"] - if okChart && okRelease { - appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], policyContext.NewResource.GetNamespace()) - - } - break - case Namespace: - appname = fmt.Sprintf("kyverno-policyreport-%s", policyContext.NewResource.GetNamespace()) - break - case Cluster: - appname = fmt.Sprintf("kyverno-clusterpolicyreport") - break - } - builder := policyreport.NewPrBuilder() - pv := builder.Generate(v) - - for _, e := range pv.Spec.ViolatedRules { - result := &policyreportv1alpha1.PolicyReportResult{ - Policy: pv.Spec.Policy, - Rule: e.Name, - Message: e.Message, - Status: policyreportv1alpha1.PolicyStatus(e.Check), - Resource: &corev1.ObjectReference{ - Kind: pv.Spec.Kind, - Namespace: pv.Spec.Namespace, - APIVersion: pv.Spec.APIVersion, - Name: pv.Spec.Name, - }, - } - results[appname] = append(results[appname], *result) + for key, _ := range resourceMap { + for _, resource := range resourceMap[key] { + policyContext := engine.PolicyContext{ + NewResource: resource, + Context: context.NewContext(), + Policy: *p, + ExcludeGroupRole: configData.GetExcludeGroupRole(), } + results = createResults(policyContext, key, results) } } - for k, _ := range results { if k == "" { continue } - if scope == Helm || scope == Namespace { - availablepr, err := kclient.PolicyV1alpha1().PolicyReports(n).Get(k, metav1.GetOptions{}) - - if err != nil { - if apierrors.IsNotFound(err) { - availablepr = initPolicyReport(scope, n, k) - } - } - availablepr, action := mergeReport(availablepr, results[k], removePolicy) - if action == "Create" { - _, err := kclient.PolicyV1alpha1().PolicyReports(n).Create(availablepr) - if err != nil { - lgr.Error(err, "Error in Create polciy report", "appreport", k) - } - } else { - _, err := kclient.PolicyV1alpha1().PolicyReports(n).Update(availablepr) - if err != nil { - lgr.Error(err, "Error in update polciy report", "appreport", k) - } - } - } else { - availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - availablepr = initClusterPolicyReport(scope, k) - } - } - availablepr, action := mergeClusterReport(availablepr, results[k]) - if action == "Create" { - _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr) - if err != nil { - lgr.Error(err, "Error in Create polciy report", "appreport", k) - } - } else { - _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr) - if err != nil { - lgr.Error(err, "Error in update polciy report", "appreport", k) - } - } + err := createReport(kclient, k, n, results[k], lgr) + if err != nil { + continue } - } } - os.Exit(0) +} + +func createReport(kclient *kyvernoclient.Clientset, name, namespace string, results []policyreportv1alpha1.PolicyReportResult, lgr logr.Logger) error { + str := strings.Split(name, "-") + var scope string + if len(str) == 1 { + scope = Cluster + } else if strings.Contains(name, "policyreport-helm-") { + scope = Helm + } else { + scope = Cluster + } + if len(str) > 1 { + availablepr, err := kclient.PolicyV1alpha1().PolicyReports(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + availablepr = initPolicyReport(scope, namespace, name) + } else { + return err + } + } + + availablepr, action := mergeReport(availablepr, results, []string{}) + if action == "Create" { + availablepr.SetLabels(map[string]string{ + "policy-state": "state", + }) + _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Create(availablepr) + if err != nil { + lgr.Error(err, "Error in Create policy report", "appreport", name) + return err + } + } else { + _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Update(availablepr) + if err != nil { + lgr.Error(err, "Error in update policy report", "appreport", name) + return err + } + } + } else { + availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + availablepr = initClusterPolicyReport(scope, name) + } else { + return err + } + } + availablepr, action := mergeClusterReport(availablepr, results) + if action == "Create" { + _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr) + if err != nil { + lgr.Error(err, "Error in Create policy report", "appreport", name) + return err + } + } else { + _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr) + if err != nil { + lgr.Error(err, "Error in update policy report", "appreport", name) + return err + } + } + } + return nil +} + +func createResults(policyContext engine.PolicyContext, key string, results map[string][]policyreportv1alpha1.PolicyReportResult) map[string][]policyreportv1alpha1.PolicyReportResult { + + var engineResponses []response.EngineResponse + engineResponse := engine.Validate(policyContext) + + if len(engineResponse.PolicyResponse.Rules) > 0 { + engineResponses = append(engineResponses, engineResponse) + } + + engineResponse = engine.Mutate(policyContext) + if len(engineResponse.PolicyResponse.Rules) > 0 { + engineResponses = append(engineResponses, engineResponse) + } + + pv := policyreport.GeneratePRsFromEngineResponse(engineResponses, log.Log) + + for _, v := range pv { + var appname string + if key == Helm { + labels := policyContext.NewResource.GetLabels() + _, okChart := labels["app"] + _, okRelease := labels["release"] + if okChart && okRelease { + appname = fmt.Sprintf("policyreport-helm-%s-%s", labels["app"], policyContext.NewResource.GetNamespace()) + } + } else if key == Namespace { + appname = fmt.Sprintf("policyreport-%s", policyContext.NewResource.GetNamespace()) + } else { + appname = fmt.Sprintf("clusterpolicyreport") + } + + builder := policyreport.NewPrBuilder() + pv := builder.Generate(v) + + for _, e := range pv.Spec.ViolatedRules { + result := &policyreportv1alpha1.PolicyReportResult{ + Policy: pv.Spec.Policy, + Rule: e.Name, + Message: e.Message, + Status: policyreportv1alpha1.PolicyStatus(e.Check), + Resource: &corev1.ObjectReference{ + Kind: pv.Spec.Kind, + Namespace: pv.Spec.Namespace, + APIVersion: pv.Spec.APIVersion, + Name: pv.Spec.Name, + }, + } + results[appname] = append(results[appname], *result) + } + } + return results } func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config, logger logr.Logger) { @@ -371,7 +411,7 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config, var appname string // Increase Count if scope == Cluster { - appname = fmt.Sprintf("kyverno-clusterpolicyreport") + appname = fmt.Sprintf("clusterpolicyreport") } else if scope == Helm { resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName()) if err != nil { @@ -382,11 +422,11 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config, _, okChart := labels["app"] _, okRelease := labels["release"] if okChart && okRelease { - appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], v.Resource.GetNamespace()) + appname = fmt.Sprintf("policyreport-helm-%s-%s", labels["app"], v.Resource.GetNamespace()) } } else { - appname = fmt.Sprintf("kyverno-policyreport-%s", v.Resource.GetNamespace()) + appname = fmt.Sprintf("policyreport-%s", v.Resource.GetNamespace()) } results[appname] = append(results[appname], *result) } @@ -395,59 +435,14 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config, } for k := range results { - if scope == Helm || scope == Namespace { - availablepr, err := kclient.PolicyV1alpha1().PolicyReports(n).Get(k, metav1.GetOptions{}) - str := strings.Split(k, "-") - var namespace string - if len(str) == 2 { - namespace = str[1] - } else if len(str) == 3 { - namespace = str[2] - } - if err != nil { - if apierrors.IsNotFound(err) { - availablepr = initPolicyReport(scope, namespace, k) - } - } - - availablepr, action := mergeReport(availablepr, results[k], []string{}) - if action == "Create" { - availablepr.SetLabels(map[string]string{ - "policy-state": "state", - }) - _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Create(availablepr) - if err != nil { - lgr.Error(err, "Error in Create polciy report", "appreport", k) - } - } else { - _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Update(availablepr) - if err != nil { - lgr.Error(err, "Error in update polciy report", "appreport", k) - } - } - } else { - availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - availablepr = initClusterPolicyReport(scope, k) - } - } - availablepr, action := mergeClusterReport(availablepr, results[k]) - if action == "Create" { - _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr) - if err != nil { - lgr.Error(err, "Error in Create polciy report", "appreport", action) - } - } else { - _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr) - if err != nil { - lgr.Error(err, "Error in update polciy report", "appreport", action) - } - } + if k != "" { + continue + } + err := createReport(kclient, k, "", results[k], lgr) + if err != nil { + continue } - } - os.Exit(0) } func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1alpha1.PolicyReportResult, removePolicy []string) (*policyreportv1alpha1.PolicyReport, string) { diff --git a/pkg/kyverno/report/helm.go b/pkg/kyverno/report/helm.go index 9cc321ff45..f916be93dd 100644 --- a/pkg/kyverno/report/helm.go +++ b/pkg/kyverno/report/helm.go @@ -2,6 +2,7 @@ package report import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "sync" "time" @@ -9,10 +10,7 @@ import ( "github.com/nirmata/kyverno/pkg/common" "github.com/nirmata/kyverno/pkg/utils" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/labels" "k8s.io/cli-runtime/pkg/genericclioptions" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" log "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -38,40 +36,28 @@ func HelmCommand() *cobra.Command { os.Exit(1) } - var stopCh <-chan struct{} - - kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod) - np := kubeInformer.Core().V1().Namespaces() - - go np.Informer().Run(stopCh) - - nSynced := np.Informer().HasSynced - - if !cache.WaitForCacheSync(stopCh, nSynced) { - logger.Error(err, "Failed to create kubernetes client") - os.Exit(1) - } var wg sync.WaitGroup if mode == "cli" { if namespace != "" { wg.Add(1) - go backgroundScan(namespace, "Helm", policy, &wg, restConfig, logger) + go backgroundScan(namespace, Helm, policy, &wg, restConfig, logger) } else { - ns, err := np.Lister().List(labels.Everything()) + ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { logger.Error(err, "Failed to list all namespaces") os.Exit(1) } - wg.Add(len(ns)) - for _, n := range ns { - go backgroundScan(n.GetName(), "Helm", policy, &wg, restConfig, logger) + wg.Add(len(ns.Items)) + for _, n := range ns.Items { + go backgroundScan(n.GetName(), Helm, policy, &wg, restConfig, logger) } } } else { wg.Add(1) - go configmapScan("", "Helm", &wg, restConfig, logger) + go configmapScan("", Helm, &wg, restConfig, logger) } wg.Wait() + os.Exit(0) return nil }, } diff --git a/pkg/kyverno/report/namespace.go b/pkg/kyverno/report/namespace.go index e6824f25ad..dca58e0ec1 100644 --- a/pkg/kyverno/report/namespace.go +++ b/pkg/kyverno/report/namespace.go @@ -5,10 +5,8 @@ import ( "github.com/nirmata/kyverno/pkg/common" "github.com/nirmata/kyverno/pkg/utils" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/labels" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/cli-runtime/pkg/genericclioptions" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" "os" log "sigs.k8s.io/controller-runtime/pkg/log" "sync" @@ -38,36 +36,27 @@ func NamespaceCommand() *cobra.Command { } var stopCh <-chan struct{} - kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod) - np := kubeInformer.Core().V1().Namespaces() - - go np.Informer().Run(stopCh) - nSynced := np.Informer().HasSynced - nLister := np.Lister() - if !cache.WaitForCacheSync(stopCh, nSynced) { - log.Log.Error(err, "Failed to create kubernetes client") - os.Exit(1) - } var wg sync.WaitGroup if mode == "cli" { if namespace != "" { wg.Add(1) - go backgroundScan(namespace, "Namespace", policy, &wg, restConfig, logger) + go backgroundScan(namespace, Namespace, policy, &wg, restConfig, logger) } else { - ns, err := nLister.List(labels.Everything()) + ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { os.Exit(1) } - wg.Add(len(ns)) - for _, n := range ns { - go backgroundScan(n.GetName(), "Namespace", policy, &wg, restConfig, logger) + wg.Add(len(ns.Items)) + for _, n := range ns.Items { + go backgroundScan(n.GetName(), Namespace, policy, &wg, restConfig, logger) } } } else { wg.Add(1) - go configmapScan("", "Namespace", &wg, restConfig, logger) + go configmapScan("", Namespace, &wg, restConfig, logger) } wg.Wait() + os.Exit(0) <-stopCh return nil }, diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index df1f991a94..5ed56de3f5 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -3,6 +3,7 @@ package policy import ( "fmt" "github.com/nirmata/kyverno/pkg/common" + "github.com/nirmata/kyverno/pkg/policyreport" "k8s.io/apimachinery/pkg/labels" "math/rand" "os" @@ -103,6 +104,9 @@ type PolicyController struct { // policy violation generator pvGenerator policyviolation.GeneratorInterface + // policy violation generator + prGenerator policyreport.GeneratorInterface + // resourceWebhookWatcher queues the webhook creation request, creates the webhook resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister @@ -128,6 +132,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, grInformer kyvernoinformer.GenerateRequestInformer, configHandler config.Interface, eventGen event.Interface, pvGenerator policyviolation.GeneratorInterface, + prGenerator policyreport.GeneratorInterface, resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister, namespaces informers.NamespaceInformer, job *jobs.Job, @@ -150,6 +155,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"), configHandler: configHandler, pvGenerator: pvGenerator, + prGenerator: prGenerator, resourceWebhookWatcher: resourceWebhookWatcher, job: job, log: log, @@ -206,19 +212,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, // rebuild after 300 seconds/ 5 mins //TODO: pass the time in seconds instead of converting it internally pc.rm = NewResourceManager(30) - if os.Getenv("POLICY-TYPE") == common.PolicyReport { - go func(pc PolicyController) { - for k := range time.Tick(60 * time.Second) { - pc.log.V(2).Info("Policy Background sync at", "time", k.String()) - if len(pc.policySync.policy) > 0 { - pc.job.Add(jobs.JobInfo{ - JobType: "POLICYSYNC", - JobData: strings.Join(pc.policySync.policy, ","), - }) - } - } - }(pc) - } return &pc, nil } @@ -367,6 +360,20 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { } + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + go func(pc *PolicyController) { + for k := range time.Tick(constant.PolicyReportPolicyChangeResync) { + pc.log.V(2).Info("Policy Background sync at", "time", k.String()) + if len(pc.policySync.policy) > 0 { + pc.job.Add(jobs.JobInfo{ + JobType: "POLICYSYNC", + JobData: strings.Join(pc.policySync.policy, ","), + }) + } + } + }(pc) + } + for i := 0; i < workers; i++ { go wait.Until(pc.worker, constant.PolicyControllerResync, stopCh) } diff --git a/pkg/policy/report.go b/pkg/policy/report.go index 54f5142dc9..bde6ba5a91 100644 --- a/pkg/policy/report.go +++ b/pkg/policy/report.go @@ -2,6 +2,9 @@ package policy import ( "fmt" + "github.com/nirmata/kyverno/pkg/common" + "github.com/nirmata/kyverno/pkg/policyreport" + "os" "github.com/go-logr/logr" "github.com/nirmata/kyverno/pkg/engine/response" @@ -23,7 +26,14 @@ func (pc *PolicyController) cleanupAndReport(engineResponses []response.EngineRe pvInfos[i].FromSync = true } - pc.pvGenerator.Add(pvInfos...) + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + for _, v := range pvInfos { + pc.prGenerator.Add(policyreport.Info(v)) + } + } else { + pc.pvGenerator.Add(pvInfos...) + } + // cleanup existing violations if any // if there is any error in clean up, we dont re-queue the resource // it will be re-tried in the next controller cache resync diff --git a/pkg/policyreport/generator.go b/pkg/policyreport/generator.go index 4eddc5e015..3a2762daca 100755 --- a/pkg/policyreport/generator.go +++ b/pkg/policyreport/generator.go @@ -2,17 +2,16 @@ package policyreport import ( "encoding/json" + "github.com/nirmata/kyverno/pkg/config" + "github.com/nirmata/kyverno/pkg/jobs" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "reflect" "strconv" "strings" "sync" "time" - "github.com/nirmata/kyverno/pkg/config" - "github.com/nirmata/kyverno/pkg/jobs" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "github.com/go-logr/logr" kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" policyreportclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" @@ -118,6 +117,11 @@ type PVEvent struct { Cluster map[string][]Info } +//GeneratorInterface provides API to create PVs +type GeneratorInterface interface { + Add(infos ...Info) +} + // NewPRGenerator returns a new instance of policy violation generator func NewPRGenerator(client *policyreportclient.Clientset, dclient *dclient.Client, @@ -125,8 +129,7 @@ func NewPRGenerator(client *policyreportclient.Clientset, nsprInformer policyreportinformer.PolicyReportInformer, policyStatus policystatus.Listener, job *jobs.Job, - log logr.Logger, - stopChna <-chan struct{}) *Generator { + log logr.Logger) *Generator { gen := Generator{ policyreportInterface: client.PolicyV1alpha1(), dclient: dclient, @@ -178,11 +181,8 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { logger.Info("failed to sync informer cache") } - for i := 0; i < workers; i++ { - go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh) - } - go func() { - for k := range time.Tick(60 * time.Second) { + go func(gen *Generator) { + for k := range time.Tick(constant.PolicyReportResourceChangeResync) { gen.log.V(2).Info("Configmap sync at ", "time", k.String()) err := gen.createConfigmap() scops := []string{} @@ -208,7 +208,12 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { Cluster: make(map[string][]Info), } } - }() + }(gen) + + for i := 0; i < workers; i++ { + go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh) + } + <-stopCh } diff --git a/pkg/policyviolation/generator.go b/pkg/policyviolation/generator.go index 46a76005d9..eff7eebb0c 100755 --- a/pkg/policyviolation/generator.go +++ b/pkg/policyviolation/generator.go @@ -134,19 +134,6 @@ func NewPVGenerator(client *kyvernoclient.Clientset, job: job, policyStatusListener: policyStatus, } - if os.Getenv("POLICY-TYPE") == common.PolicyReport { - gen.prgen = policyreport.NewPRGenerator(client, - dclient, - prInformer, - nsprInformer, - policyStatus, - job, - log, - stopChna, - ) - go gen.prgen.Run(3, stopChna) - - } return &gen } diff --git a/pkg/webhooks/generation.go b/pkg/webhooks/generation.go index fecb3c1f5c..76f21b50e3 100644 --- a/pkg/webhooks/generation.go +++ b/pkg/webhooks/generation.go @@ -36,7 +36,6 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic logger.Error(err, "failed to convert RAR resource to unstructured format") return } - // CREATE resources, do not have name, assigned in admission-request policyContext := engine.PolicyContext{ diff --git a/pkg/webhooks/mutation.go b/pkg/webhooks/mutation.go index 275e819bcb..5be1d069b0 100644 --- a/pkg/webhooks/mutation.go +++ b/pkg/webhooks/mutation.go @@ -1,6 +1,9 @@ package webhooks import ( + "github.com/nirmata/kyverno/pkg/common" + "github.com/nirmata/kyverno/pkg/policyreport" + "os" "reflect" "sort" "time" @@ -89,6 +92,13 @@ func (ws *WebhookServer) HandleMutation( // generate violation when response fails pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses, logger) ws.pvGenerator.Add(pvInfos...) + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + for _, v := range pvInfos { + ws.prGenerator.Add(policyreport.Info(v)) + } + } else { + ws.pvGenerator.Add(pvInfos...) + } // REPORTING EVENTS // Scenario 1: diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index b072ae1af7..a3d5026512 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/nirmata/kyverno/pkg/policyreport" "io/ioutil" "net/http" "time" @@ -100,6 +101,9 @@ type WebhookServer struct { // policy violation generator pvGenerator policyviolation.GeneratorInterface + // policy report generator + prGenerator policyreport.GeneratorInterface + // generate request generator grGenerator *generate.Generator @@ -130,6 +134,7 @@ func NewWebhookServer( statusSync policystatus.Listener, configHandler config.Interface, pvGenerator policyviolation.GeneratorInterface, + prGenerator policyreport.GeneratorInterface, grGenerator *generate.Generator, resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister, auditHandler AuditHandler, @@ -172,6 +177,7 @@ func NewWebhookServer( cleanUp: cleanUp, lastReqTime: resourceWebhookWatcher.LastReqTime, pvGenerator: pvGenerator, + prGenerator: prGenerator, grGenerator: grGenerator, resourceWebhookWatcher: resourceWebhookWatcher, auditHandler: auditHandler, @@ -347,7 +353,7 @@ func (ws *WebhookServer) ResourceMutation(request *v1beta1.AdmissionRequest) *v1 ws.auditHandler.Add(request.DeepCopy()) // VALIDATION - ok, msg := HandleValidation(request, validatePolicies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.log, ws.configHandler) + ok, msg := HandleValidation(request, validatePolicies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.prGenerator, ws.log, ws.configHandler) if !ok { logger.Info("admission request denied") return &v1beta1.AdmissionResponse{ @@ -473,7 +479,7 @@ func (ws *WebhookServer) resourceValidation(request *v1beta1.AdmissionRequest) * logger.Error(err, "failed to load service account in context") } - ok, msg := HandleValidation(request, policies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.log, ws.configHandler) + ok, msg := HandleValidation(request, policies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.prGenerator, ws.log, ws.configHandler) if !ok { logger.Info("admission request denied") return &v1beta1.AdmissionResponse{ diff --git a/pkg/webhooks/validate_audit.go b/pkg/webhooks/validate_audit.go index cf3867d73e..d29daffaa7 100644 --- a/pkg/webhooks/validate_audit.go +++ b/pkg/webhooks/validate_audit.go @@ -10,6 +10,7 @@ import ( enginectx "github.com/nirmata/kyverno/pkg/engine/context" "github.com/nirmata/kyverno/pkg/event" "github.com/nirmata/kyverno/pkg/policycache" + "github.com/nirmata/kyverno/pkg/policyreport" "github.com/nirmata/kyverno/pkg/policystatus" "github.com/nirmata/kyverno/pkg/policyviolation" "github.com/nirmata/kyverno/pkg/userinfo" @@ -44,6 +45,7 @@ type auditHandler struct { eventGen event.Interface statusListener policystatus.Listener pvGenerator policyviolation.GeneratorInterface + prGenerator policyreport.GeneratorInterface rbLister rbaclister.RoleBindingLister rbSynced cache.InformerSynced @@ -59,6 +61,7 @@ func NewValidateAuditHandler(pCache policycache.Interface, eventGen event.Interface, statusListener policystatus.Listener, pvGenerator policyviolation.GeneratorInterface, + prGenerator policyreport.GeneratorInterface, rbInformer rbacinformer.RoleBindingInformer, crbInformer rbacinformer.ClusterRoleBindingInformer, log logr.Logger, @@ -75,6 +78,7 @@ func NewValidateAuditHandler(pCache policycache.Interface, crbLister: crbInformer.Lister(), crbSynced: crbInformer.Informer().HasSynced, log: log, + prGenerator: prGenerator, configHandler: dynamicConfig, } } @@ -167,7 +171,7 @@ func (h *auditHandler) process(request *v1beta1.AdmissionRequest) error { return errors.Wrap(err, "failed to load service account in context") } - HandleValidation(request, policies, nil, ctx, userRequestInfo, h.statusListener, h.eventGen, h.pvGenerator, logger, h.configHandler) + HandleValidation(request, policies, nil, ctx, userRequestInfo, h.statusListener, h.eventGen, h.pvGenerator, h.prGenerator, logger, h.configHandler) return nil } diff --git a/pkg/webhooks/validation.go b/pkg/webhooks/validation.go index 6426aab1b5..478726def5 100644 --- a/pkg/webhooks/validation.go +++ b/pkg/webhooks/validation.go @@ -1,7 +1,10 @@ package webhooks import ( + "github.com/nirmata/kyverno/pkg/common" "github.com/nirmata/kyverno/pkg/config" + "github.com/nirmata/kyverno/pkg/policyreport" + "os" "reflect" "sort" "time" @@ -34,6 +37,7 @@ func HandleValidation( statusListener policystatus.Listener, eventGen event.Interface, pvGenerator policyviolation.GeneratorInterface, + prGenerator policyreport.GeneratorInterface, log logr.Logger, dynamicConfig config.Interface) (bool, string) { @@ -121,8 +125,13 @@ func HandleValidation( // ADD POLICY VIOLATIONS // violations are created with resource on "audit" pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses, logger) - pvGenerator.Add(pvInfos...) - + if os.Getenv("POLICY-TYPE") == common.PolicyReport { + for _, v := range pvInfos { + prGenerator.Add(policyreport.Info(v)) + } + } else { + pvGenerator.Add(pvInfos...) + } return true, "" }