diff --git a/pkg/jobs/controller.go b/pkg/jobs/controller.go index 9cfc4cbb3c..bac6469277 100644 --- a/pkg/jobs/controller.go +++ b/pkg/jobs/controller.go @@ -1,17 +1,19 @@ package jobs import ( - "fmt" "context" + "fmt" + "math/rand" + "reflect" + "strings" + "sync" + "time" + "github.com/nirmata/kyverno/pkg/config" v1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "reflect" - "strings" - "sync" - "time" "github.com/go-logr/logr" @@ -38,14 +40,10 @@ type Job struct { // Job Info Define Job Type type JobInfo struct { JobType string - Policy string } func (i JobInfo) toKey() string { - if i.Policy != "" { - return fmt.Sprintf("%s-%s", i.JobType, i.Policy) - } - return fmt.Sprintf("%s", i.JobType) + return fmt.Sprintf("kyverno-%v", rand.Int63n(1000)) } //NewDataStore returns an instance of data store @@ -127,7 +125,7 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) { go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh) } - go func(){ + go func() { ctx := context.Background() ticker := time.NewTicker(100 * time.Second) for { @@ -135,9 +133,9 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) { case <-ticker.C: var wg sync.WaitGroup wg.Add(3) - go j.syncNamespace(&wg, "Helm", "CONFIGMAP") - go j.syncNamespace(&wg, "Namespace", "CONFIGMAP") - go j.syncNamespace(&wg, "Cluster", "CONFIGMAP") + go j.syncNamespace(&wg, "Helm", "SYNC") + go j.syncNamespace(&wg, "Namespace", "SYNC") + go j.syncNamespace(&wg, "Cluster", "SYNC") wg.Wait() case <-ctx.Done(): break @@ -224,9 +222,9 @@ func (j *Job) syncHandler(info JobInfo) error { j.mux.Lock() var wg sync.WaitGroup wg.Add(3) - go j.syncNamespace(&wg, "Helm", "SYNC") - go j.syncNamespace(&wg, "Namespace", "SYNC") - go j.syncNamespace(&wg, "Cluster", "SYNC") + go j.syncNamespace(&wg, "Helm", "CONFIGMAP") + go j.syncNamespace(&wg, "Namespace", "CONFIGMAP") + go j.syncNamespace(&wg, "Cluster", "CONFIGMAP") wg.Wait() return nil } @@ -272,8 +270,7 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope string) { if err != nil { return } - deadline := time.Now().Add(15 * time.Second) - var failure bool + deadline := time.Now().Add(80 * time.Second) for { resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName()) if err != nil { @@ -283,17 +280,15 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope string) { if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil { continue } - if job.Status.Active == 0 || time.Now().After(deadline) { - failure = true + if time.Now().After(deadline) { break } } - if failure { - err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false) - if err != nil { - return - } + err = j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false) + if err != nil { + return } + return } @@ -314,7 +309,8 @@ func CreateJob(args []string, jobType, scope string) *v1.Job { Args: args, }, }, - RestartPolicy: "OnFailure", + ServiceAccountName: "kyverno-service-account", + RestartPolicy: "OnFailure", }, }, }, diff --git a/pkg/kyverno/report/common.go b/pkg/kyverno/report/common.go index 0c99d53bbe..a124b5d3e4 100644 --- a/pkg/kyverno/report/common.go +++ b/pkg/kyverno/report/common.go @@ -3,6 +3,7 @@ package report import ( "encoding/json" "fmt" + kyvernov1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1" policyreportv1alpha1 "github.com/nirmata/kyverno/pkg/api/policyreport/v1alpha1" kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" @@ -22,16 +23,17 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + "os" + "strings" + "sync" + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/rest" - "os" log "sigs.k8s.io/controller-runtime/pkg/log" - "strings" - "sync" - "time" ) const ( @@ -56,7 +58,7 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config } kubeClient, err := utils.NewKubeClient(restConfig) if err != nil { - log.Log.Error(err, "Failed to create kubernetes client") + log.Log.Error(err, "Failed to Create kubernetes client") os.Exit(1) } pclient, err := kyvernoclient.NewForConfig(restConfig) @@ -84,7 +86,7 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config piSynced := pi.Informer().HasSynced cpiSynced := cpi.Informer().HasSynced if !cache.WaitForCacheSync(stopCh, cSynced, piSynced, cpiSynced, nSynced) { - log.Log.Error(err, "Failed to create kubernetes client") + log.Log.Error(err, "Failed to Create kubernetes client") os.Exit(1) } @@ -123,15 +125,14 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config continue } - if !resourceSchema.Namespaced && scope == Cluster { rMap := policy.GetResourcesPerNamespace(k, dClient, "", rule, configData, log.Log) policy.MergeResources(resourceMap, rMap) } else if resourceSchema.Namespaced { - namespaces := policy.GetNamespacesForRule(&rule, np.Lister(), log.Log) + namespaces := policy.GetNamespacesForRule(&rule, np.Lister(), log.Log) for _, ns := range namespaces { if ns == n { - rMap := policy.GetResourcesPerNamespace(k, dClient, ns, rule, configData, log.Log) + rMap := policy.GetResourcesPerNamespace(k, dClient, ns, rule, configData, log.Log) for _, r := range rMap { labels := r.GetLabels() _, okChart := labels["app"] @@ -216,9 +217,7 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config } results[appname] = append(results[appname], *result) } - } - } for k, _ := range results { @@ -253,10 +252,10 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config } } availablepr, action := mergeReport(availablepr, results[k]) - if action == "CREATE" { + if action == "Create" { _, err := kclient.PolicyV1alpha1().PolicyReports(n).Create(availablepr) if err != nil { - log.Log.Error(err, "Error in create polciy report", "appreport", k) + log.Log.Error(err, "Error in Create polciy report", "appreport", k) } } else { _, err := kclient.PolicyV1alpha1().PolicyReports(n).Update(availablepr) @@ -292,7 +291,7 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config if action == "Create" { _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr) if err != nil { - log.Log.Error(err, "Error in create polciy report", "appreport", k) + log.Log.Error(err, "Error in Create polciy report", "appreport", k) } } else { _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr) @@ -332,78 +331,83 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config) os.Exit(1) } var response map[string][]policyreport.Info - var data []policyreport.Info if scope == Cluster { if err := json.Unmarshal([]byte(job.Data["Namespace"]), &response); err != nil { log.Log.Error(err, "") } - data = response["cluster"] } else if scope == Helm { if err := json.Unmarshal([]byte(job.Data["Helm"]), &response); err != nil { log.Log.Error(err, "") } - data = response[n] } else { if err := json.Unmarshal([]byte(job.Data["Namespace"]), &response); err != nil { log.Log.Error(err, "") } - data = response[n] } var results = make(map[string][]policyreportv1alpha1.PolicyReportResult) var ns []string - for _, v := range data { - for _, r := range v.Rules { - builder := policyreport.NewPrBuilder() - pv := builder.Generate(v) - result := &policyreportv1alpha1.PolicyReportResult{ - Policy: pv.Spec.Policy, - Rule: r.Name, - Message: r.Message, - Status: policyreportv1alpha1.PolicyStatus(r.Check), - Resource: &corev1.ObjectReference{ - Kind: pv.Spec.Kind, - Namespace: pv.Spec.Namespace, - APIVersion: pv.Spec.APIVersion, - Name: pv.Spec.Name, - }, - } - if !strings.Contains(strings.Join(ns, ","), v.Resource.GetNamespace()) { - ns = append(ns, v.Resource.GetNamespace()) - } - var appname string - // Increase Count - if scope == Cluster { - appname = fmt.Sprintf("kyverno-clusterpolicyreport") - } else if scope == Helm { - resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName()) - if err != nil { - log.Log.Error(err, "failed to get resource") - continue + for k := range response { + for _, v := range response[k] { + for _, r := range v.Rules { + builder := policyreport.NewPrBuilder() + pv := builder.Generate(v) + result := &policyreportv1alpha1.PolicyReportResult{ + Policy: pv.Spec.Policy, + Rule: r.Name, + Message: r.Message, + Status: policyreportv1alpha1.PolicyStatus(r.Check), + Resource: &corev1.ObjectReference{ + Kind: pv.Spec.Kind, + Namespace: pv.Spec.Namespace, + APIVersion: pv.Spec.APIVersion, + Name: pv.Spec.Name, + }, } - labels := resource.GetLabels() - _, okChart := labels["app"] - _, okRelease := labels["release"] - if okChart && okRelease { - appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], v.Resource.GetNamespace()) + if !strings.Contains(strings.Join(ns, ","), v.Resource.GetNamespace()) { + ns = append(ns, v.Resource.GetNamespace()) + } + var appname string + // Increase Count + if scope == Cluster { + appname = fmt.Sprintf("kyverno-clusterpolicyreport") + } else if scope == Helm { + resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName()) + if err != nil { + log.Log.Error(err, "failed to get resource") + continue + } + labels := resource.GetLabels() + _, okChart := labels["app"] + _, okRelease := labels["release"] + if okChart && okRelease { + appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], v.Resource.GetNamespace()) + } + } else { + appname = fmt.Sprintf("kyverno-policyreport-%s", v.Resource.GetNamespace()) } - } else { - appname = fmt.Sprintf("kyverno-policyreport-%s", v.Resource.GetNamespace()) + results[appname] = append(results[appname], *result) } - results[appname] = append(results[appname], *result) + } - } - for k, _ := range results { + for k := range results { if scope == Helm || scope == Namespace { availablepr, err := kclient.PolicyV1alpha1().PolicyReports(n).Get(k, metav1.GetOptions{}) + str := strings.Split(k, "-") + var namespace string + if len(str) == 2 { + namespace = str[1] + } else if len(str) == 3 { + namespace = str[2] + } if err != nil { if apierrors.IsNotFound(err) { availablepr = &policyreportv1alpha1.PolicyReport{ Scope: &corev1.ObjectReference{ Kind: scope, - Namespace: n, + Namespace: namespace, }, Summary: policyreportv1alpha1.PolicyReportSummary{}, Results: []*policyreportv1alpha1.PolicyReportResult{}, @@ -413,7 +417,7 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config) "policy-state": "init", } availablepr.SetName(k) - availablepr.SetNamespace(n) + availablepr.SetNamespace(namespace) availablepr.SetLabels(labelMap) availablepr.SetGroupVersionKind(schema.GroupVersionKind{ Kind: "PolicyReport", @@ -424,16 +428,16 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config) } availablepr, action := mergeReport(availablepr, results[k]) - if action == "CREATE" { + if action == "Create" { availablepr.SetLabels(map[string]string{ "policy-state": "state", }) - _, err := kclient.PolicyV1alpha1().PolicyReports(n).Create(availablepr) + _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Create(availablepr) if err != nil { - log.Log.Error(err, "Error in create polciy report", "appreport", k) + log.Log.Error(err, "Error in Create polciy report", "appreport", k) } } else { - _, err := kclient.PolicyV1alpha1().PolicyReports(n).Update(availablepr) + _, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Update(availablepr) if err != nil { log.Log.Error(err, "Error in update polciy report", "appreport", k) } @@ -463,10 +467,10 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config) } } availablepr, action := mergeClusterReport(availablepr, results[k]) - if action == "CREATE" { + if action == "Create" { _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr) if err != nil { - log.Log.Error(err, "Error in create polciy report", "appreport", action) + log.Log.Error(err, "Error in Create polciy report", "appreport", action) } } else { _, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr) @@ -490,18 +494,39 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1 } else { action = "Update" } + var uniqueResponse []*policyreportv1alpha1.PolicyReportResult for _, r := range results { - var isExist = true - for _, v := range pr.Results { + var isExist = false + for _, v := range uniqueResponse { if r.Policy == v.Policy && r.Rule == v.Rule && r.Resource.APIVersion == v.Resource.APIVersion && r.Resource.Kind == v.Resource.Kind && r.Resource.Namespace == v.Resource.Namespace && r.Resource.Name == v.Resource.Name { - r = *v - pr = changeClusterReportCount(string(r.Status), string(v.Status), pr) - isExist = false + v = &r + isExist = true + break } } - if isExist { - pr = changeClusterReportCount(string(r.Status), string(""), pr) - pr.Results = append(pr.Results, &r) + if !isExist { + uniqueResponse = append(uniqueResponse, &r) + } + } + if len(pr.Results) == 0 { + pr.Results = append(pr.Results, uniqueResponse...) + return pr, action + } + for _, r := range uniqueResponse { + var isExist = false + for _, v := range pr.Results { + if r.Policy == v.Policy && r.Rule == v.Rule && r.Resource.APIVersion == v.Resource.APIVersion && r.Resource.Kind == v.Resource.Kind && r.Resource.Namespace == v.Resource.Namespace && r.Resource.Name == v.Resource.Name { + v = r + isExist = true + if string(r.Status) != string(v.Status) { + pr = changeCount(string(r.Status), string(v.Status), pr) + } + break + } + } + if !isExist { + pr = changeCount(string(r.Status), string(""), pr) + pr.Results = append(pr.Results, r) } } return pr, action @@ -519,24 +544,48 @@ func mergeClusterReport(pr *policyreportv1alpha1.ClusterPolicyReport, results [] action = "Update" } + var uniqueResponse []*policyreportv1alpha1.PolicyReportResult for _, r := range results { - var isExist = true - for _, v := range pr.Results { + var isExist = false + for _, v := range uniqueResponse { if r.Policy == v.Policy && r.Rule == v.Rule && r.Resource.APIVersion == v.Resource.APIVersion && r.Resource.Kind == v.Resource.Kind && r.Resource.Namespace == v.Resource.Namespace && r.Resource.Name == v.Resource.Name { - r = *v - pr = changeCount(string(r.Status), string(v.Status), pr) - isExist = false + v = &r + isExist = true + break } } - if isExist { - pr = changeCount(string(r.Status), string(""), pr) - pr.Results = append(pr.Results, &r) + if !isExist { + uniqueResponse = append(uniqueResponse, &r) } } + + if len(pr.Results) == 0 { + pr.Results = append(pr.Results, uniqueResponse...) + return pr, action + } + + for _, r := range uniqueResponse { + var isExist = false + for _, v := range pr.Results { + if r.Policy == v.Policy && r.Rule == v.Rule && r.Resource.APIVersion == v.Resource.APIVersion && r.Resource.Kind == v.Resource.Kind && r.Resource.Namespace == v.Resource.Namespace && r.Resource.Name == v.Resource.Name { + v = r + isExist = true + if string(r.Status) != string(v.Status) { + pr = changeClusterReportCount(string(r.Status), string(v.Status), pr) + } + break + } + } + if !isExist { + pr = changeClusterReportCount(string(r.Status), string(""), pr) + pr.Results = append(pr.Results, r) + } + } + return pr, action } -func changeCount(status, oldStatus string, report *policyreportv1alpha1.ClusterPolicyReport) *policyreportv1alpha1.ClusterPolicyReport { +func changeCount(status, oldStatus string, report *policyreportv1alpha1.PolicyReport) *policyreportv1alpha1.PolicyReport { switch oldStatus { case "Pass": if report.Summary.Pass--; report.Summary.Pass < 0 { @@ -564,7 +613,7 @@ func changeCount(status, oldStatus string, report *policyreportv1alpha1.ClusterP return report } -func changeClusterReportCount(status, oldStatus string, report *policyreportv1alpha1.PolicyReport) *policyreportv1alpha1.PolicyReport { +func changeClusterReportCount(status, oldStatus string, report *policyreportv1alpha1.ClusterPolicyReport) *policyreportv1alpha1.ClusterPolicyReport { switch oldStatus { case "Pass": if report.Summary.Pass--; report.Summary.Pass < 0 { diff --git a/pkg/policyreport/generator.go b/pkg/policyreport/generator.go index 451afe2ad5..818b6a0c6f 100755 --- a/pkg/policyreport/generator.go +++ b/pkg/policyreport/generator.go @@ -3,16 +3,17 @@ package policyreport import ( "context" "encoding/json" - "github.com/nirmata/kyverno/pkg/config" - "github.com/nirmata/kyverno/pkg/jobs" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" "reflect" "strconv" "strings" "sync" "time" + "github.com/nirmata/kyverno/pkg/config" + "github.com/nirmata/kyverno/pkg/jobs" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "github.com/go-logr/logr" kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" policyreportclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" @@ -187,9 +188,7 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { select { case <-ticker.C: err := gen.createConfigmap() - gen.job.Add(jobs.JobInfo{ - JobType: "background", - }) + gen.job.Add(jobs.JobInfo{}) if err != nil { logger.Error(err, "configmap error") }