1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

improvment added in jobs sheduler

This commit is contained in:
evalsocket 2020-09-15 06:59:05 -07:00
parent 5a69b489a6
commit 68855c2ca9
18 changed files with 429 additions and 315 deletions

View file

@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/policyreport"
"net/http"
_ "net/http/pprof"
"os"
@ -190,17 +191,34 @@ func main() {
// POLICY VIOLATION GENERATOR
// -- generate policy violation
pvgen := policyviolation.NewPVGenerator(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
pInformer.Policy().V1alpha1().ClusterPolicyReports(),
pInformer.Policy().V1alpha1().PolicyReports(),
statusSync.Listener,
jobController,
log.Log.WithName("PolicyViolationGenerator"),
stopCh,
)
var pvgen *policyviolation.Generator
if os.Getenv("POLICY-TYPE") == common.PolicyViolation {
pvgen = policyviolation.NewPVGenerator(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
pInformer.Policy().V1alpha1().ClusterPolicyReports(),
pInformer.Policy().V1alpha1().PolicyReports(),
statusSync.Listener,
jobController,
log.Log.WithName("PolicyViolationGenerator"),
stopCh,
)
}
// POLICY Report GENERATOR
// -- generate policy report
var prgen *policyreport.Generator
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
prgen = policyreport.NewPRGenerator(pclient,
client,
pInformer.Policy().V1alpha1().ClusterPolicyReports(),
pInformer.Policy().V1alpha1().PolicyReports(),
statusSync.Listener,
jobController,
log.Log.WithName("PolicyReportGenerator"),
)
}
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
@ -215,6 +233,7 @@ func main() {
configData,
eventGenerator,
pvgen,
prgen,
rWebhookWatcher,
kubeInformer.Core().V1().Namespaces(),
jobController,
@ -265,6 +284,7 @@ func main() {
eventGenerator,
statusSync.Listener,
pvgen,
prgen,
kubeInformer.Rbac().V1().RoleBindings(),
kubeInformer.Rbac().V1().ClusterRoleBindings(),
log.Log.WithName("ValidateAuditHandler"),
@ -320,6 +340,7 @@ func main() {
statusSync.Listener,
configData,
pvgen,
prgen,
grgen,
rWebhookWatcher,
auditHandler,
@ -341,17 +362,21 @@ func main() {
go grgen.Run(1)
go rWebhookWatcher.Run(stopCh)
go configData.Run(stopCh)
go policyCtrl.Run(2, stopCh)
go policyCtrl.Run(3, stopCh)
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
go prgen.Run(2,stopCh)
}else{
go pvgen.Run(2, stopCh)
}
go eventGenerator.Run(3, stopCh)
go grc.Run(1, stopCh)
go grcc.Run(1, stopCh)
go pvgen.Run(1, stopCh)
go statusSync.Run(1, stopCh)
go pCacheController.Run(1, stopCh)
go auditHandler.Run(10, stopCh)
go jobController.Run(3, stopCh)
go jobController.Run(2, stopCh)
openAPISync.Run(1, stopCh)
// verifies if the admission control is enabled and active

View file

@ -9,4 +9,7 @@ const (
EventControllerResync = 15 * time.Minute
GenerateControllerResync = 15 * time.Minute
GenerateRequestControllerResync = 15 * time.Minute
PolicyReportPolicyChangeResync = 120 * time.Second
PolicyReportResourceChangeResync = 120 * time.Second
)

View file

@ -96,17 +96,6 @@ func NewJobsJob(dclient *dclient.Client,
configHandler: configHandler,
log: log,
}
go func(configHandler config.Interface) {
for k := range time.Tick(time.Duration(configHandler.GetBackgroundSync()) * time.Second) {
gen.log.V(2).Info("Background Sync sync at ", "time", k.String())
var wg sync.WaitGroup
wg.Add(3)
go gen.syncKyverno(&wg, "Helm", "SYNC", "")
go gen.syncKyverno(&wg, "Namespace", "SYNC", "")
go gen.syncKyverno(&wg, "Cluster", "SYNC", "")
wg.Wait()
}
}(configHandler)
return &gen
}
@ -133,7 +122,15 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
go func(configHandler config.Interface) {
for k := range time.Tick(time.Duration(configHandler.GetBackgroundSync()) * time.Second) {
j.log.V(2).Info("Background Sync sync at ", "time", k.String())
var wg sync.WaitGroup
wg.Add(1)
go j.syncKyverno(&wg, "All", "SYNC", "")
wg.Wait()
}
}(j.configHandler)
for i := 0; i < workers; i++ {
go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
@ -188,7 +185,6 @@ func (j *Job) processNextWorkItem() bool {
// lookup data store
info := j.dataStore.lookup(keyHash)
err := j.syncHandler(info)
j.handleErr(err, obj)
return nil
@ -209,10 +205,8 @@ func (j *Job) syncHandler(info JobInfo) error {
j.mux.Lock()
var wg sync.WaitGroup
if info.JobType == "POLICYSYNC" {
wg.Add(3)
go j.syncKyverno(&wg, "Helm", "SYNC", info.JobData)
go j.syncKyverno(&wg, "Namespace", "SYNC", info.JobData)
go j.syncKyverno(&wg, "Cluster", "SYNC", info.JobData)
wg.Add(1)
go j.syncKyverno(&wg, "All", "SYNC", info.JobData)
} else if info.JobType == "CONFIGMAP" {
if info.JobData != "" {
str := strings.Split(info.JobData, ",")
@ -256,13 +250,34 @@ func (j *Job) syncKyverno(wg *sync.WaitGroup, jobType, scope, data string) {
fmt.Sprintf("--mode=%s", mode),
}
break
case "All":
args = []string{
"report",
"all",
fmt.Sprintf("--mode=%s", mode),
}
break
}
if scope == "POLICYSYNC" && data != "" {
args = append(args, fmt.Sprintf("-p=%s", data))
}
go j.CreateJob(args, jobType, scope, wg)
wg.Wait()
resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, &metav1.LabelSelector{
MatchLabels: map[string]string{
"scope" : scope,
"type" : jobType,
},
})
if err != nil {
j.log.Error(err, "failed to get job")
}
if len(resourceList.Items) == 0 {
go j.CreateJob(args, jobType, scope, wg)
wg.Wait()
}else{
wg.Done()
}
}
// CreateJob will create Job template for background scan
@ -301,29 +316,29 @@ func (j *Job) CreateJob(args []string, jobType, scope string, wg *sync.WaitGroup
j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName())
return
}
deadline := time.Now().Add(100 * time.Second)
deadline := time.Now().Add(150 * time.Second)
for {
time.Sleep(20 * time.Second)
resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName())
if err != nil {
if apierrors.IsNotFound(err) {
j.log.Error(err, "job is already deleted", "job_name", job.GetName())
break
}
continue
}
job := v1.Job{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName())
continue
}
if time.Now().After(deadline) {
if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil {
j.log.Error(err, "Error in deleting jobs", "job_name", job.GetName())
time.Sleep(20 * time.Second)
resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName())
if err != nil {
if apierrors.IsNotFound(err) {
j.log.Error(err, "job is already deleted", "job_name", job.GetName())
break
}
continue
}
break
}
job := v1.Job{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName())
continue
}
if time.Now().After(deadline) {
if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil {
j.log.Error(err, "Error in deleting jobs", "job_name", job.GetName())
continue
}
break
}
}
wg.Done()
}

View file

@ -0,0 +1,62 @@
package report
import (
"fmt"
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"os"
log "sigs.k8s.io/controller-runtime/pkg/log"
"sync"
"time"
)
func AllReportsCommand() *cobra.Command {
kubernetesConfig := genericclioptions.NewConfigFlags(true)
var namespace, policy string
cmd := &cobra.Command{
Use: "all",
Short: "generate report",
Example: fmt.Sprintf("To create a namespace report from background scan:\nkyverno report namespace --namespace=defaults \n kyverno report namespace"),
RunE: func(cmd *cobra.Command, args []string) (err error) {
os.Setenv("POLICY-TYPE", common.PolicyReport)
logger := log.Log.WithName("Report")
restConfig, err := kubernetesConfig.ToRESTConfig()
if err != nil {
logger.Error(err, "failed to create rest config of kubernetes cluster ")
os.Exit(1)
}
const resyncPeriod = 1 * time.Second
kubeClient, err := utils.NewKubeClient(restConfig)
if err != nil {
log.Log.Error(err, "Failed to create kubernetes client")
os.Exit(1)
}
var stopCh <-chan struct{}
var wg sync.WaitGroup
if namespace != "" {
wg.Add(1)
go backgroundScan(namespace, All, policy, &wg, restConfig, logger)
wg.Wait()
} else {
ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
os.Exit(1)
}
wg.Add(len(ns.Items))
for _, n := range ns.Items {
go backgroundScan(n.GetName(), All, policy, &wg, restConfig, logger)
}
wg.Wait()
}
os.Exit(0)
<-stopCh
return nil
},
}
cmd.Flags().StringVarP(&namespace, "namespace", "n", "", "define specific namespace")
cmd.Flags().StringVarP(&policy, "policy", "p", "", "define specific policy")
return cmd
}

View file

@ -28,12 +28,13 @@ func ClusterCommand() *cobra.Command {
var wg sync.WaitGroup
wg.Add(1)
if mode == "cli" {
go backgroundScan("", "Cluster", policy, &wg, restConfig, logger)
go backgroundScan("", Cluster, policy, &wg, restConfig, logger)
wg.Wait()
return nil
os.Exit(0)
}
go configmapScan("", "Cluster", &wg, restConfig, logger)
go configmapScan("", Cluster, &wg, restConfig, logger)
wg.Wait()
os.Exit(0)
return nil
},
}

View file

@ -28,5 +28,6 @@ func Command() *cobra.Command {
cmd.AddCommand(HelmCommand())
cmd.AddCommand(NamespaceCommand())
cmd.AddCommand(ClusterCommand())
cmd.AddCommand(AllReportsCommand())
return cmd
}

View file

@ -41,13 +41,15 @@ const (
Helm string = "Helm"
Namespace string = "Namespace"
Cluster string = "Cluster"
All string = "All"
)
func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfig *rest.Config, logger logr.Logger) {
lgr := logger.WithValues("namespace", n, "scope", scope, "policychange", policychange)
defer func() {
lgr.Error(nil, "done")
wg.Done()
}()
lgr := logger.WithValues("namespace", n, "scope", scope, "policychange", policychange)
dClient, err := client.NewClient(restConfig, 5*time.Minute, make(chan struct{}), lgr)
if err != nil {
lgr.Error(err, "Error in creating dcclient with provided rest config")
@ -66,7 +68,7 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
}
pclient, err := kyvernoclient.NewForConfig(restConfig)
if err != nil {
lgr.Error(err, "Error in creating kyverno client for polciy with provided rest config")
lgr.Error(err, "Error in creating kyverno client for policy with provided rest config")
os.Exit(1)
}
var stopCh <-chan struct{}
@ -147,10 +149,11 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
cpolicies = append(cpolicies, cp)
}
}
// key uid
resourceMap := map[string]unstructured.Unstructured{}
var engineResponses []response.EngineResponse
resourceMap := map[string]map[string]unstructured.Unstructured{}
resourceMap[Cluster] = make(map[string]unstructured.Unstructured)
resourceMap[Helm] = make(map[string]unstructured.Unstructured)
resourceMap[Namespace] = make(map[string]unstructured.Unstructured)
for _, p := range cpolicies {
for _, rule := range p.Spec.Rules {
for _, k := range rule.MatchResources.Kinds {
@ -159,10 +162,9 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
lgr.Error(err, "failed to find resource", "kind", k)
continue
}
if !resourceSchema.Namespaced && scope == Cluster {
if !resourceSchema.Namespaced {
rMap := policy.GetResourcesPerNamespace(k, dClient, "", rule, configData, log.Log)
policy.MergeResources(resourceMap, rMap)
policy.MergeResources(resourceMap[Cluster], rMap)
} else if resourceSchema.Namespaced {
namespaces := policy.GetNamespacesForRule(&rule, np.Lister(), log.Log)
for _, ns := range namespaces {
@ -172,10 +174,10 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
labels := r.GetLabels()
_, okChart := labels["app"]
_, okRelease := labels["release"]
if okChart && okRelease && scope == Helm {
policy.MergeResources(resourceMap, rMap)
} else if scope == Namespace && r.GetNamespace() != "" {
policy.MergeResources(resourceMap, rMap)
if okChart && okRelease {
policy.MergeResources(resourceMap[Helm], rMap)
} else if r.GetNamespace() != "" {
policy.MergeResources(resourceMap[Namespace], rMap)
}
}
}
@ -186,123 +188,161 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
}
if p.HasAutoGenAnnotation() {
resourceMap = policy.ExcludePod(resourceMap, log.Log)
switch scope {
case Cluster:
resourceMap[Cluster] = policy.ExcludePod(resourceMap[Cluster], log.Log)
break
case Namespace:
resourceMap[Namespace] = policy.ExcludePod(resourceMap[Namespace], log.Log)
break
case Helm:
resourceMap[Helm] = policy.ExcludePod(resourceMap[Helm], log.Log)
break
case All:
resourceMap[Cluster] = policy.ExcludePod(resourceMap[Cluster], log.Log)
resourceMap[Namespace] = policy.ExcludePod(resourceMap[Namespace], log.Log)
resourceMap[Helm] = policy.ExcludePod(resourceMap[Helm], log.Log)
}
}
results := make(map[string][]policyreportv1alpha1.PolicyReportResult)
for _, resource := range resourceMap {
policyContext := engine.PolicyContext{
NewResource: resource,
Context: context.NewContext(),
Policy: *p,
ExcludeGroupRole: configData.GetExcludeGroupRole(),
}
engineResponse := engine.Validate(policyContext)
if len(engineResponse.PolicyResponse.Rules) > 0 {
engineResponses = append(engineResponses, engineResponse)
}
engineResponse = engine.Mutate(policyContext)
if len(engineResponse.PolicyResponse.Rules) > 0 {
engineResponses = append(engineResponses, engineResponse)
}
pv := policyreport.GeneratePRsFromEngineResponse(engineResponses, log.Log)
for _, v := range pv {
var appname string
switch scope {
case Helm:
resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName())
if err != nil {
lgr.Error(err, "failed to get resource")
continue
}
labels := resource.GetLabels()
_, okChart := labels["app"]
_, okRelease := labels["release"]
if okChart && okRelease {
appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], policyContext.NewResource.GetNamespace())
}
break
case Namespace:
appname = fmt.Sprintf("kyverno-policyreport-%s", policyContext.NewResource.GetNamespace())
break
case Cluster:
appname = fmt.Sprintf("kyverno-clusterpolicyreport")
break
}
builder := policyreport.NewPrBuilder()
pv := builder.Generate(v)
for _, e := range pv.Spec.ViolatedRules {
result := &policyreportv1alpha1.PolicyReportResult{
Policy: pv.Spec.Policy,
Rule: e.Name,
Message: e.Message,
Status: policyreportv1alpha1.PolicyStatus(e.Check),
Resource: &corev1.ObjectReference{
Kind: pv.Spec.Kind,
Namespace: pv.Spec.Namespace,
APIVersion: pv.Spec.APIVersion,
Name: pv.Spec.Name,
},
}
results[appname] = append(results[appname], *result)
for key, _ := range resourceMap {
for _, resource := range resourceMap[key] {
policyContext := engine.PolicyContext{
NewResource: resource,
Context: context.NewContext(),
Policy: *p,
ExcludeGroupRole: configData.GetExcludeGroupRole(),
}
results = createResults(policyContext, key, results)
}
}
for k, _ := range results {
if k == "" {
continue
}
if scope == Helm || scope == Namespace {
availablepr, err := kclient.PolicyV1alpha1().PolicyReports(n).Get(k, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initPolicyReport(scope, n, k)
}
}
availablepr, action := mergeReport(availablepr, results[k], removePolicy)
if action == "Create" {
_, err := kclient.PolicyV1alpha1().PolicyReports(n).Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create polciy report", "appreport", k)
}
} else {
_, err := kclient.PolicyV1alpha1().PolicyReports(n).Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update polciy report", "appreport", k)
}
}
} else {
availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initClusterPolicyReport(scope, k)
}
}
availablepr, action := mergeClusterReport(availablepr, results[k])
if action == "Create" {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create polciy report", "appreport", k)
}
} else {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update polciy report", "appreport", k)
}
}
err := createReport(kclient, k, n, results[k], lgr)
if err != nil {
continue
}
}
}
os.Exit(0)
}
func createReport(kclient *kyvernoclient.Clientset, name, namespace string, results []policyreportv1alpha1.PolicyReportResult, lgr logr.Logger) error {
str := strings.Split(name, "-")
var scope string
if len(str) == 1 {
scope = Cluster
} else if strings.Contains(name, "policyreport-helm-") {
scope = Helm
} else {
scope = Cluster
}
if len(str) > 1 {
availablepr, err := kclient.PolicyV1alpha1().PolicyReports(namespace).Get(name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initPolicyReport(scope, namespace, name)
} else {
return err
}
}
availablepr, action := mergeReport(availablepr, results, []string{})
if action == "Create" {
availablepr.SetLabels(map[string]string{
"policy-state": "state",
})
_, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create policy report", "appreport", name)
return err
}
} else {
_, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update policy report", "appreport", name)
return err
}
}
} else {
availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initClusterPolicyReport(scope, name)
} else {
return err
}
}
availablepr, action := mergeClusterReport(availablepr, results)
if action == "Create" {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create policy report", "appreport", name)
return err
}
} else {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update policy report", "appreport", name)
return err
}
}
}
return nil
}
func createResults(policyContext engine.PolicyContext, key string, results map[string][]policyreportv1alpha1.PolicyReportResult) map[string][]policyreportv1alpha1.PolicyReportResult {
var engineResponses []response.EngineResponse
engineResponse := engine.Validate(policyContext)
if len(engineResponse.PolicyResponse.Rules) > 0 {
engineResponses = append(engineResponses, engineResponse)
}
engineResponse = engine.Mutate(policyContext)
if len(engineResponse.PolicyResponse.Rules) > 0 {
engineResponses = append(engineResponses, engineResponse)
}
pv := policyreport.GeneratePRsFromEngineResponse(engineResponses, log.Log)
for _, v := range pv {
var appname string
if key == Helm {
labels := policyContext.NewResource.GetLabels()
_, okChart := labels["app"]
_, okRelease := labels["release"]
if okChart && okRelease {
appname = fmt.Sprintf("policyreport-helm-%s-%s", labels["app"], policyContext.NewResource.GetNamespace())
}
} else if key == Namespace {
appname = fmt.Sprintf("policyreport-%s", policyContext.NewResource.GetNamespace())
} else {
appname = fmt.Sprintf("clusterpolicyreport")
}
builder := policyreport.NewPrBuilder()
pv := builder.Generate(v)
for _, e := range pv.Spec.ViolatedRules {
result := &policyreportv1alpha1.PolicyReportResult{
Policy: pv.Spec.Policy,
Rule: e.Name,
Message: e.Message,
Status: policyreportv1alpha1.PolicyStatus(e.Check),
Resource: &corev1.ObjectReference{
Kind: pv.Spec.Kind,
Namespace: pv.Spec.Namespace,
APIVersion: pv.Spec.APIVersion,
Name: pv.Spec.Name,
},
}
results[appname] = append(results[appname], *result)
}
}
return results
}
func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config, logger logr.Logger) {
@ -371,7 +411,7 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config,
var appname string
// Increase Count
if scope == Cluster {
appname = fmt.Sprintf("kyverno-clusterpolicyreport")
appname = fmt.Sprintf("clusterpolicyreport")
} else if scope == Helm {
resource, err := dClient.GetResource(v.Resource.GetAPIVersion(), v.Resource.GetKind(), v.Resource.GetNamespace(), v.Resource.GetName())
if err != nil {
@ -382,11 +422,11 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config,
_, okChart := labels["app"]
_, okRelease := labels["release"]
if okChart && okRelease {
appname = fmt.Sprintf("kyverno-policyreport-%s-%s", labels["app"], v.Resource.GetNamespace())
appname = fmt.Sprintf("policyreport-helm-%s-%s", labels["app"], v.Resource.GetNamespace())
}
} else {
appname = fmt.Sprintf("kyverno-policyreport-%s", v.Resource.GetNamespace())
appname = fmt.Sprintf("policyreport-%s", v.Resource.GetNamespace())
}
results[appname] = append(results[appname], *result)
}
@ -395,59 +435,14 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config,
}
for k := range results {
if scope == Helm || scope == Namespace {
availablepr, err := kclient.PolicyV1alpha1().PolicyReports(n).Get(k, metav1.GetOptions{})
str := strings.Split(k, "-")
var namespace string
if len(str) == 2 {
namespace = str[1]
} else if len(str) == 3 {
namespace = str[2]
}
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initPolicyReport(scope, namespace, k)
}
}
availablepr, action := mergeReport(availablepr, results[k], []string{})
if action == "Create" {
availablepr.SetLabels(map[string]string{
"policy-state": "state",
})
_, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create polciy report", "appreport", k)
}
} else {
_, err := kclient.PolicyV1alpha1().PolicyReports(availablepr.GetNamespace()).Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update polciy report", "appreport", k)
}
}
} else {
availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = initClusterPolicyReport(scope, k)
}
}
availablepr, action := mergeClusterReport(availablepr, results[k])
if action == "Create" {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Create(availablepr)
if err != nil {
lgr.Error(err, "Error in Create polciy report", "appreport", action)
}
} else {
_, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Update(availablepr)
if err != nil {
lgr.Error(err, "Error in update polciy report", "appreport", action)
}
}
if k != "" {
continue
}
err := createReport(kclient, k, "", results[k], lgr)
if err != nil {
continue
}
}
os.Exit(0)
}
func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1alpha1.PolicyReportResult, removePolicy []string) (*policyreportv1alpha1.PolicyReport, string) {

View file

@ -2,6 +2,7 @@ package report
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"sync"
"time"
@ -9,10 +10,7 @@ import (
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/cli-runtime/pkg/genericclioptions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
log "sigs.k8s.io/controller-runtime/pkg/log"
)
@ -38,40 +36,28 @@ func HelmCommand() *cobra.Command {
os.Exit(1)
}
var stopCh <-chan struct{}
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
np := kubeInformer.Core().V1().Namespaces()
go np.Informer().Run(stopCh)
nSynced := np.Informer().HasSynced
if !cache.WaitForCacheSync(stopCh, nSynced) {
logger.Error(err, "Failed to create kubernetes client")
os.Exit(1)
}
var wg sync.WaitGroup
if mode == "cli" {
if namespace != "" {
wg.Add(1)
go backgroundScan(namespace, "Helm", policy, &wg, restConfig, logger)
go backgroundScan(namespace, Helm, policy, &wg, restConfig, logger)
} else {
ns, err := np.Lister().List(labels.Everything())
ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
logger.Error(err, "Failed to list all namespaces")
os.Exit(1)
}
wg.Add(len(ns))
for _, n := range ns {
go backgroundScan(n.GetName(), "Helm", policy, &wg, restConfig, logger)
wg.Add(len(ns.Items))
for _, n := range ns.Items {
go backgroundScan(n.GetName(), Helm, policy, &wg, restConfig, logger)
}
}
} else {
wg.Add(1)
go configmapScan("", "Helm", &wg, restConfig, logger)
go configmapScan("", Helm, &wg, restConfig, logger)
}
wg.Wait()
os.Exit(0)
return nil
},
}

View file

@ -5,10 +5,8 @@ import (
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/labels"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"os"
log "sigs.k8s.io/controller-runtime/pkg/log"
"sync"
@ -38,36 +36,27 @@ func NamespaceCommand() *cobra.Command {
}
var stopCh <-chan struct{}
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
np := kubeInformer.Core().V1().Namespaces()
go np.Informer().Run(stopCh)
nSynced := np.Informer().HasSynced
nLister := np.Lister()
if !cache.WaitForCacheSync(stopCh, nSynced) {
log.Log.Error(err, "Failed to create kubernetes client")
os.Exit(1)
}
var wg sync.WaitGroup
if mode == "cli" {
if namespace != "" {
wg.Add(1)
go backgroundScan(namespace, "Namespace", policy, &wg, restConfig, logger)
go backgroundScan(namespace, Namespace, policy, &wg, restConfig, logger)
} else {
ns, err := nLister.List(labels.Everything())
ns, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
os.Exit(1)
}
wg.Add(len(ns))
for _, n := range ns {
go backgroundScan(n.GetName(), "Namespace", policy, &wg, restConfig, logger)
wg.Add(len(ns.Items))
for _, n := range ns.Items {
go backgroundScan(n.GetName(), Namespace, policy, &wg, restConfig, logger)
}
}
} else {
wg.Add(1)
go configmapScan("", "Namespace", &wg, restConfig, logger)
go configmapScan("", Namespace, &wg, restConfig, logger)
}
wg.Wait()
os.Exit(0)
<-stopCh
return nil
},

View file

@ -3,6 +3,7 @@ package policy
import (
"fmt"
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/policyreport"
"k8s.io/apimachinery/pkg/labels"
"math/rand"
"os"
@ -103,6 +104,9 @@ type PolicyController struct {
// policy violation generator
pvGenerator policyviolation.GeneratorInterface
// policy violation generator
prGenerator policyreport.GeneratorInterface
// resourceWebhookWatcher queues the webhook creation request, creates the webhook
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister
@ -128,6 +132,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
grInformer kyvernoinformer.GenerateRequestInformer,
configHandler config.Interface, eventGen event.Interface,
pvGenerator policyviolation.GeneratorInterface,
prGenerator policyreport.GeneratorInterface,
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister,
namespaces informers.NamespaceInformer,
job *jobs.Job,
@ -150,6 +155,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"),
configHandler: configHandler,
pvGenerator: pvGenerator,
prGenerator: prGenerator,
resourceWebhookWatcher: resourceWebhookWatcher,
job: job,
log: log,
@ -206,19 +212,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
// rebuild after 300 seconds/ 5 mins
//TODO: pass the time in seconds instead of converting it internally
pc.rm = NewResourceManager(30)
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
go func(pc PolicyController) {
for k := range time.Tick(60 * time.Second) {
pc.log.V(2).Info("Policy Background sync at", "time", k.String())
if len(pc.policySync.policy) > 0 {
pc.job.Add(jobs.JobInfo{
JobType: "POLICYSYNC",
JobData: strings.Join(pc.policySync.policy, ","),
})
}
}
}(pc)
}
return &pc, nil
}
@ -367,6 +360,20 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
}
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
go func(pc *PolicyController) {
for k := range time.Tick(constant.PolicyReportPolicyChangeResync) {
pc.log.V(2).Info("Policy Background sync at", "time", k.String())
if len(pc.policySync.policy) > 0 {
pc.job.Add(jobs.JobInfo{
JobType: "POLICYSYNC",
JobData: strings.Join(pc.policySync.policy, ","),
})
}
}
}(pc)
}
for i := 0; i < workers; i++ {
go wait.Until(pc.worker, constant.PolicyControllerResync, stopCh)
}

View file

@ -2,6 +2,9 @@ package policy
import (
"fmt"
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/policyreport"
"os"
"github.com/go-logr/logr"
"github.com/nirmata/kyverno/pkg/engine/response"
@ -23,7 +26,14 @@ func (pc *PolicyController) cleanupAndReport(engineResponses []response.EngineRe
pvInfos[i].FromSync = true
}
pc.pvGenerator.Add(pvInfos...)
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
for _, v := range pvInfos {
pc.prGenerator.Add(policyreport.Info(v))
}
} else {
pc.pvGenerator.Add(pvInfos...)
}
// cleanup existing violations if any
// if there is any error in clean up, we dont re-queue the resource
// it will be re-tried in the next controller cache resync

View file

@ -2,17 +2,16 @@ package policyreport
import (
"encoding/json"
"github.com/nirmata/kyverno/pkg/config"
"github.com/nirmata/kyverno/pkg/jobs"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/nirmata/kyverno/pkg/config"
"github.com/nirmata/kyverno/pkg/jobs"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
policyreportclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -118,6 +117,11 @@ type PVEvent struct {
Cluster map[string][]Info
}
//GeneratorInterface provides API to create PVs
type GeneratorInterface interface {
Add(infos ...Info)
}
// NewPRGenerator returns a new instance of policy violation generator
func NewPRGenerator(client *policyreportclient.Clientset,
dclient *dclient.Client,
@ -125,8 +129,7 @@ func NewPRGenerator(client *policyreportclient.Clientset,
nsprInformer policyreportinformer.PolicyReportInformer,
policyStatus policystatus.Listener,
job *jobs.Job,
log logr.Logger,
stopChna <-chan struct{}) *Generator {
log logr.Logger) *Generator {
gen := Generator{
policyreportInterface: client.PolicyV1alpha1(),
dclient: dclient,
@ -178,11 +181,8 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
go func() {
for k := range time.Tick(60 * time.Second) {
go func(gen *Generator) {
for k := range time.Tick(constant.PolicyReportResourceChangeResync) {
gen.log.V(2).Info("Configmap sync at ", "time", k.String())
err := gen.createConfigmap()
scops := []string{}
@ -208,7 +208,12 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
Cluster: make(map[string][]Info),
}
}
}()
}(gen)
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
<-stopCh
}

View file

@ -134,19 +134,6 @@ func NewPVGenerator(client *kyvernoclient.Clientset,
job: job,
policyStatusListener: policyStatus,
}
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
gen.prgen = policyreport.NewPRGenerator(client,
dclient,
prInformer,
nsprInformer,
policyStatus,
job,
log,
stopChna,
)
go gen.prgen.Run(3, stopChna)
}
return &gen
}

View file

@ -36,7 +36,6 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
logger.Error(err, "failed to convert RAR resource to unstructured format")
return
}
// CREATE resources, do not have name, assigned in admission-request
policyContext := engine.PolicyContext{

View file

@ -1,6 +1,9 @@
package webhooks
import (
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/policyreport"
"os"
"reflect"
"sort"
"time"
@ -89,6 +92,13 @@ func (ws *WebhookServer) HandleMutation(
// generate violation when response fails
pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses, logger)
ws.pvGenerator.Add(pvInfos...)
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
for _, v := range pvInfos {
ws.prGenerator.Add(policyreport.Info(v))
}
} else {
ws.pvGenerator.Add(pvInfos...)
}
// REPORTING EVENTS
// Scenario 1:

View file

@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/nirmata/kyverno/pkg/policyreport"
"io/ioutil"
"net/http"
"time"
@ -100,6 +101,9 @@ type WebhookServer struct {
// policy violation generator
pvGenerator policyviolation.GeneratorInterface
// policy report generator
prGenerator policyreport.GeneratorInterface
// generate request generator
grGenerator *generate.Generator
@ -130,6 +134,7 @@ func NewWebhookServer(
statusSync policystatus.Listener,
configHandler config.Interface,
pvGenerator policyviolation.GeneratorInterface,
prGenerator policyreport.GeneratorInterface,
grGenerator *generate.Generator,
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister,
auditHandler AuditHandler,
@ -172,6 +177,7 @@ func NewWebhookServer(
cleanUp: cleanUp,
lastReqTime: resourceWebhookWatcher.LastReqTime,
pvGenerator: pvGenerator,
prGenerator: prGenerator,
grGenerator: grGenerator,
resourceWebhookWatcher: resourceWebhookWatcher,
auditHandler: auditHandler,
@ -347,7 +353,7 @@ func (ws *WebhookServer) ResourceMutation(request *v1beta1.AdmissionRequest) *v1
ws.auditHandler.Add(request.DeepCopy())
// VALIDATION
ok, msg := HandleValidation(request, validatePolicies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.log, ws.configHandler)
ok, msg := HandleValidation(request, validatePolicies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.prGenerator, ws.log, ws.configHandler)
if !ok {
logger.Info("admission request denied")
return &v1beta1.AdmissionResponse{
@ -473,7 +479,7 @@ func (ws *WebhookServer) resourceValidation(request *v1beta1.AdmissionRequest) *
logger.Error(err, "failed to load service account in context")
}
ok, msg := HandleValidation(request, policies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.log, ws.configHandler)
ok, msg := HandleValidation(request, policies, nil, ctx, userRequestInfo, ws.statusListener, ws.eventGen, ws.pvGenerator, ws.prGenerator, ws.log, ws.configHandler)
if !ok {
logger.Info("admission request denied")
return &v1beta1.AdmissionResponse{

View file

@ -10,6 +10,7 @@ import (
enginectx "github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/policycache"
"github.com/nirmata/kyverno/pkg/policyreport"
"github.com/nirmata/kyverno/pkg/policystatus"
"github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/userinfo"
@ -44,6 +45,7 @@ type auditHandler struct {
eventGen event.Interface
statusListener policystatus.Listener
pvGenerator policyviolation.GeneratorInterface
prGenerator policyreport.GeneratorInterface
rbLister rbaclister.RoleBindingLister
rbSynced cache.InformerSynced
@ -59,6 +61,7 @@ func NewValidateAuditHandler(pCache policycache.Interface,
eventGen event.Interface,
statusListener policystatus.Listener,
pvGenerator policyviolation.GeneratorInterface,
prGenerator policyreport.GeneratorInterface,
rbInformer rbacinformer.RoleBindingInformer,
crbInformer rbacinformer.ClusterRoleBindingInformer,
log logr.Logger,
@ -75,6 +78,7 @@ func NewValidateAuditHandler(pCache policycache.Interface,
crbLister: crbInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
log: log,
prGenerator: prGenerator,
configHandler: dynamicConfig,
}
}
@ -167,7 +171,7 @@ func (h *auditHandler) process(request *v1beta1.AdmissionRequest) error {
return errors.Wrap(err, "failed to load service account in context")
}
HandleValidation(request, policies, nil, ctx, userRequestInfo, h.statusListener, h.eventGen, h.pvGenerator, logger, h.configHandler)
HandleValidation(request, policies, nil, ctx, userRequestInfo, h.statusListener, h.eventGen, h.pvGenerator, h.prGenerator, logger, h.configHandler)
return nil
}

View file

@ -1,7 +1,10 @@
package webhooks
import (
"github.com/nirmata/kyverno/pkg/common"
"github.com/nirmata/kyverno/pkg/config"
"github.com/nirmata/kyverno/pkg/policyreport"
"os"
"reflect"
"sort"
"time"
@ -34,6 +37,7 @@ func HandleValidation(
statusListener policystatus.Listener,
eventGen event.Interface,
pvGenerator policyviolation.GeneratorInterface,
prGenerator policyreport.GeneratorInterface,
log logr.Logger,
dynamicConfig config.Interface) (bool, string) {
@ -121,8 +125,13 @@ func HandleValidation(
// ADD POLICY VIOLATIONS
// violations are created with resource on "audit"
pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses, logger)
pvGenerator.Add(pvInfos...)
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
for _, v := range pvInfos {
prGenerator.Add(policyreport.Info(v))
}
} else {
pvGenerator.Add(pvInfos...)
}
return true, ""
}