1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

bug fixes

This commit is contained in:
evalsocket 2020-09-10 05:10:29 -07:00
parent 64d9879141
commit 37f96c5722
11 changed files with 311 additions and 249 deletions

View file

@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/nirmata/kyverno/pkg/jobs"
"net/http"
_ "net/http/pprof"
"os"
"time"
"github.com/nirmata/kyverno/pkg/jobs"
"github.com/nirmata/kyverno/pkg/openapi"
"github.com/nirmata/kyverno/pkg/policycache"
@ -42,6 +43,7 @@ var (
kubeconfig string
serverIP string
webhookTimeout int
backgroundSync int
runValidationInMutatingWebhook string
profile bool
//TODO: this has been added to backward support command line arguments
@ -63,6 +65,7 @@ func main() {
flag.StringVar(&excludeGroupRole, "excludeGroupRole", "", "")
flag.StringVar(&excludeUsername, "excludeUsername", "", "")
flag.IntVar(&webhookTimeout, "webhooktimeout", 3, "timeout for webhook configurations")
flag.IntVar(&backgroundSync, "backgroundsync", 100, "background sync for policy report")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.StringVar(&runValidationInMutatingWebhook, "runValidationInMutatingWebhook", "", "Validation will also be done using the mutation webhook, set to 'true' to enable. Older kubernetes versions do not work properly when a validation webhook is registered.")
@ -164,6 +167,7 @@ func main() {
filterK8Resources,
excludeGroupRole,
excludeUsername,
backgroundSync,
log.Log.WithName("ConfigData"),
)
@ -182,7 +186,7 @@ func main() {
// Job Controller
// - Create Jobs for report
jobController := jobs.NewJobsJob(client, log.Log.WithName("jobController"))
jobController := jobs.NewJobsJob(client, configData, log.Log.WithName("jobController"))
// POLICY VIOLATION GENERATOR
// -- generate policy violation
@ -197,28 +201,24 @@ func main() {
log.Log.WithName("PolicyViolationGenerator"),
stopCh,
)
var policyCtrl *policy.PolicyController
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status aggregator: receives stats when a policy is applied & updates the policy status
policyCtrl, err = policy.NewPolicyController(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().Policies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
configData,
eventGenerator,
pvgen,
rWebhookWatcher,
kubeInformer.Core().V1().Namespaces(),
jobController,
log.Log.WithName("PolicyController"),
)
}
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status aggregator: receives stats when a policy is applied & updates the policy status
policyCtrl, err := policy.NewPolicyController(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().Policies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
configData,
eventGenerator,
pvgen,
rWebhookWatcher,
kubeInformer.Core().V1().Namespaces(),
jobController,
log.Log.WithName("PolicyController"),
)
if err != nil {
setupLog.Error(err, "Failed to create policy controller")
@ -342,6 +342,8 @@ func main() {
go configData.Run(stopCh)
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
go policyCtrl.Run(3, stopCh)
} else {
go policyCtrl.Run(1, stopCh)
}
go eventGenerator.Run(3, stopCh)

View file

@ -59,7 +59,7 @@ var (
// KubePolicyDeploymentName define the default deployment namespace
KubePolicyDeploymentName = "kyverno"
// Kyverno CLI Image
KyvernoCliImage = "nirmata/kyverno-cli:latest"
KyvernoCliImage = "evalsocket/kyverno-cli:latest"
//WebhookServiceName default kyverno webhook service name
WebhookServiceName = getWebhookServiceName()

View file

@ -41,7 +41,10 @@ type ConfigData struct {
restrictDevelopmentUsername []string
// hasynced
cmSycned cache.InformerSynced
log logr.Logger
// background sync for policy report
backgroundSync int
log logr.Logger
}
// ToFilter checks if the given resource is set to be filtered in the configuration
@ -77,25 +80,34 @@ func (cd *ConfigData) GetExcludeUsername() []string {
return cd.excludeUsername
}
// GetBackgroundSync return exclude username
func (cd *ConfigData) GetBackgroundSync() int {
cd.mux.RLock()
defer cd.mux.RUnlock()
return cd.backgroundSync
}
// Interface to be used by consumer to check filters
type Interface interface {
ToFilter(kind, namespace, name string) bool
GetExcludeGroupRole() []string
GetExcludeUsername() []string
RestrictDevelopmentUsername() []string
GetBackgroundSync() int
}
// NewConfigData ...
func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapInformer, filterK8Resources, excludeGroupRole, excludeUsername string, log logr.Logger) *ConfigData {
func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapInformer, filterK8Resources, excludeGroupRole, excludeUsername string, backgroundSync int, log logr.Logger) *ConfigData {
// environment var is read at start only
if cmNameEnv == "" {
log.Info("ConfigMap name not defined in env:INIT_CONFIG: loading no default configuration")
}
cd := ConfigData{
client: rclient,
cmName: os.Getenv(cmNameEnv),
cmSycned: cmInformer.Informer().HasSynced,
log: log,
client: rclient,
cmName: os.Getenv(cmNameEnv),
cmSycned: cmInformer.Informer().HasSynced,
backgroundSync: backgroundSync,
log: log,
}
cd.restrictDevelopmentUsername = []string{"minikube-user", "kubernetes-admin"}

View file

@ -1,10 +1,7 @@
package jobs
import (
"context"
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"time"
@ -30,20 +27,22 @@ const workQueueRetryLimit = 3
//Job creates policy report
type Job struct {
dclient *dclient.Client
log logr.Logger
queue workqueue.RateLimitingInterface
dataStore *dataStore
mux sync.Mutex
dclient *dclient.Client
log logr.Logger
queue workqueue.RateLimitingInterface
dataStore *dataStore
configHandler config.Interface
mux sync.Mutex
}
// Job Info Define Job Type
type JobInfo struct {
JobType string
JobData string
}
func (i JobInfo) toKey() string {
return fmt.Sprintf("kyverno-%v", rand.Int63n(1000))
return fmt.Sprintf("kyverno-%v", i.JobType)
}
//NewDataStore returns an instance of data store
@ -87,13 +86,26 @@ type JobsInterface interface {
// NewJobsJob returns a new instance of policy violation generator
func NewJobsJob(dclient *dclient.Client,
configHandler config.Interface,
log logr.Logger) *Job {
gen := Job{
dclient: dclient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
log: log,
dclient: dclient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
configHandler: configHandler,
log: log,
}
go func(configHandler config.Interface) {
for k := range time.Tick(time.Duration(configHandler.GetBackgroundSync()) * time.Second) {
gen.log.V(2).Info("Background Sync sync at ", "time", k.String())
var wg sync.WaitGroup
wg.Add(3)
go gen.syncKyverno(&wg, "Helm", "SYNC","")
go gen.syncKyverno(&wg, "Namespace", "SYNC","")
go gen.syncKyverno(&wg, "Cluster", "SYNC","")
wg.Wait()
}
}(configHandler)
return &gen
}
@ -124,26 +136,6 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
go func() {
ctx := context.Background()
ticker := time.NewTicker(100 * time.Second)
for {
select {
case <-ticker.C:
var wg sync.WaitGroup
wg.Add(3)
go j.syncNamespace(&wg, "Helm", "SYNC")
go j.syncNamespace(&wg, "Namespace", "SYNC")
go j.syncNamespace(&wg, "Cluster", "SYNC")
wg.Wait()
case <-ctx.Done():
break
// Create Jobs
}
}
}()
<-stopCh
}
@ -195,12 +187,6 @@ func (j *Job) processNextWorkItem() bool {
// lookup data store
info := j.dataStore.lookup(keyHash)
if reflect.DeepEqual(info, JobInfo{}) {
// empty key
j.queue.Forget(obj)
logger.Info("empty key")
return nil
}
err := j.syncHandler(info)
j.handleErr(err, obj)
@ -220,28 +206,34 @@ func (j *Job) syncHandler(info JobInfo) error {
j.mux.Unlock()
}()
j.mux.Lock()
if info.JobType == "POLICYSYNC" {
var wg sync.WaitGroup
wg.Add(3)
go j.syncKyverno(&wg, "Helm", "SYNC",info.JobData)
go j.syncKyverno(&wg, "Namespace", "SYNC",info.JobData)
go j.syncKyverno(&wg, "Cluster", "SYNC",info.JobData)
wg.Wait()
return nil
}
var wg sync.WaitGroup
wg.Add(3)
go j.syncNamespace(&wg, "Helm", "CONFIGMAP")
go j.syncNamespace(&wg, "Namespace", "CONFIGMAP")
go j.syncNamespace(&wg, "Cluster", "CONFIGMAP")
go j.syncKyverno(&wg, "Helm", "CONFIGMAP","")
go j.syncKyverno(&wg, "Namespace", "CONFIGMAP","")
go j.syncKyverno(&wg, "Cluster", "CONFIGMAP","")
wg.Wait()
return nil
}
func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope string) {
defer func() {
wg.Done()
}()
func (j *Job) syncKyverno(wg *sync.WaitGroup, jobType, scope,data string) {
var args []string
var mode string
if scope == "SYNC" {
if scope == "SYNC" || scope == "POLICYSYNC" {
mode = "cli"
} else {
mode = "configmap"
}
var job *v1.Job
switch jobType {
case "Helm":
args = []string{
@ -265,38 +257,22 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope string) {
}
break
}
job = CreateJob(args, jobType, scope)
_, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false)
if err != nil {
return
}
deadline := time.Now().Add(80 * time.Second)
for {
resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName())
if err != nil {
continue
}
job := v1.Job{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
continue
}
if time.Now().After(deadline) {
break
}
}
err = j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false)
if err != nil {
return
}
return
if scope == "POLICYSYNC" && data != "" {
args = append(args,fmt.Sprintf("-p=%s", data))
}
go j.CreateJob(args, jobType, scope, wg)
}
// CreateJob will create Job template for background scan
func CreateJob(args []string, jobType, scope string) *v1.Job {
func (j *Job) CreateJob(args []string, jobType, scope string, wg *sync.WaitGroup) {
job := &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: config.KubePolicyNamespace,
Labels : map[string]string{
"scope" : scope,
"type" : jobType,
},
},
Spec: v1.JobSpec{
Template: apiv1.PodTemplateSpec{
@ -316,5 +292,26 @@ func CreateJob(args []string, jobType, scope string) *v1.Job {
},
}
job.SetGenerateName("kyverno-policyreport-")
return job
_, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false)
if err != nil {
return
}
deadline := time.Now().Add(30 * time.Second)
for {
resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName())
if err != nil {
continue
}
job := v1.Job{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
continue
}
if time.Now().After(deadline) {
if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil {
continue
}
break
}
}
wg.Done()
}

View file

@ -10,7 +10,7 @@ import (
func ClusterCommand() *cobra.Command {
kubernetesConfig := genericclioptions.NewConfigFlags(true)
var mode string
var mode,policy string
cmd := &cobra.Command{
Use: "cluster",
Short: "generate report",
@ -24,7 +24,7 @@ func ClusterCommand() *cobra.Command {
var wg sync.WaitGroup
wg.Add(1)
if mode == "cli" {
go backgroundScan("", "Cluster", &wg, restConfig)
go backgroundScan("", "Cluster",policy, &wg, restConfig)
wg.Wait()
return nil
}
@ -34,6 +34,7 @@ func ClusterCommand() *cobra.Command {
},
}
cmd.Flags().StringVarP(&mode, "mode", "m", "cli", "mode of cli")
cmd.Flags().StringVarP(&policy, "policy", "p", "", "define specific policy")
return cmd
}

View file

@ -42,7 +42,7 @@ const (
Cluster string = "Cluster"
)
func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config) {
func backgroundScan(n, scope,policychange string, wg *sync.WaitGroup, restConfig *rest.Config) {
defer func() {
wg.Done()
}()
@ -96,22 +96,54 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config
"",
"",
"",
600000,
log.Log.WithName("ConfigData"),
)
var cpolicies []*kyvernov1.ClusterPolicy
cpolicies, err = cpi.Lister().List(labels.Everything())
if err != nil {
os.Exit(1)
}
policies, err := pi.Lister().List(labels.Everything())
if err != nil {
os.Exit(1)
var removePolicy []string
policySelector := strings.Split(policychange,",")
if len(policySelector) > 0 {
for _,v := range policySelector {
cpolicy, err := cpi.Lister().Get(v);
if err != nil {
if apierrors.IsNotFound(err){
removePolicy = append(removePolicy,cpolicy.GetName())
}
}else{
cpolicies = append(cpolicies, cpolicy)
}
for _,v := range policySelector {
policies, err := pi.Lister().List(labels.Everything())
if err == nil {
for _, p := range policies {
if v == p.GetName() {
cp := policy.ConvertPolicyToClusterPolicy(p)
cpolicies = append(cpolicies, cp)
}
}
}
}
}
}else{
cpolicies, err = cpi.Lister().List(labels.Everything())
if err != nil {
os.Exit(1)
}
policies, err := pi.Lister().List(labels.Everything())
if err != nil {
os.Exit(1)
}
for _, p := range policies {
cp := policy.ConvertPolicyToClusterPolicy(p)
cpolicies = append(cpolicies, cp)
}
}
for _, p := range policies {
cp := policy.ConvertPolicyToClusterPolicy(p)
cpolicies = append(cpolicies, cp)
}
// key uid
resourceMap := map[string]unstructured.Unstructured{}
@ -229,29 +261,10 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = &policyreportv1alpha1.PolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
Namespace: n,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(k)
availablepr.SetNamespace(n)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "PolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
availablepr = initPolicyReport(scope, n, k)
}
}
availablepr, action := mergeReport(availablepr, results[k])
availablepr, action := mergeReport(availablepr, results[k], removePolicy)
if action == "Create" {
_, err := kclient.PolicyV1alpha1().PolicyReports(n).Create(availablepr)
if err != nil {
@ -267,24 +280,7 @@ func backgroundScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config
availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = &policyreportv1alpha1.ClusterPolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(k)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "ClusterPolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
availablepr = initClusterPolicyReport(scope, k)
}
}
availablepr, action := mergeClusterReport(availablepr, results[k])
@ -404,30 +400,11 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config)
}
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = &policyreportv1alpha1.PolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
Namespace: namespace,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(k)
availablepr.SetNamespace(namespace)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "PolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
availablepr = initPolicyReport(scope, namespace, k)
}
}
availablepr, action := mergeReport(availablepr, results[k])
availablepr, action := mergeReport(availablepr, results[k],[]string{})
if action == "Create" {
availablepr.SetLabels(map[string]string{
"policy-state": "state",
@ -446,24 +423,7 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config)
availablepr, err := kclient.PolicyV1alpha1().ClusterPolicyReports().Get(k, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
availablepr = &policyreportv1alpha1.ClusterPolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(k)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "ClusterPolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
availablepr = initClusterPolicyReport(scope, k)
}
}
availablepr, action := mergeClusterReport(availablepr, results[k])
@ -483,7 +443,7 @@ func configmapScan(n, scope string, wg *sync.WaitGroup, restConfig *rest.Config)
}
}
func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1alpha1.PolicyReportResult) (*policyreportv1alpha1.PolicyReport, string) {
func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1alpha1.PolicyReportResult,removePolicy []string) (*policyreportv1alpha1.PolicyReport, string) {
labels := pr.GetLabels()
var action string
if labels["policy-state"] == "init" {
@ -508,10 +468,7 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1
uniqueResponse = append(uniqueResponse, &r)
}
}
if len(pr.Results) == 0 {
pr.Results = append(pr.Results, uniqueResponse...)
return pr, action
}
for _, r := range uniqueResponse {
var isExist = false
for _, v := range pr.Results {
@ -529,6 +486,16 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1
pr.Results = append(pr.Results, r)
}
}
if len(removePolicy) > 0 {
for _, v := range removePolicy {
for i, r := range pr.Results {
if r.Policy == v {
pr.Results = append(pr.Results[:i], pr.Results[i+1:]...)
}
}
}
}
return pr, action
}
@ -640,3 +607,49 @@ func changeClusterReportCount(status, oldStatus string, report *policyreportv1al
}
return report
}
func initPolicyReport(scope, namespace, name string) *policyreportv1alpha1.PolicyReport {
availablepr := &policyreportv1alpha1.PolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
Namespace: namespace,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(name)
availablepr.SetNamespace(namespace)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "PolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
return availablepr
}
func initClusterPolicyReport(scope, name string) *policyreportv1alpha1.ClusterPolicyReport {
availablepr := &policyreportv1alpha1.ClusterPolicyReport{
Scope: &corev1.ObjectReference{
Kind: scope,
},
Summary: policyreportv1alpha1.PolicyReportSummary{},
Results: []*policyreportv1alpha1.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": scope,
"policy-state": "init",
}
availablepr.SetName(name)
availablepr.SetLabels(labelMap)
availablepr.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "PolicyReport",
Version: "v1alpha1",
Group: "policy.kubernetes.io",
})
return availablepr
}

View file

@ -2,21 +2,22 @@ package report
import (
"fmt"
"os"
"sync"
"time"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/cli-runtime/pkg/genericclioptions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"os"
log "sigs.k8s.io/controller-runtime/pkg/log"
"sync"
"time"
)
func HelmCommand() *cobra.Command {
kubernetesConfig := genericclioptions.NewConfigFlags(true)
var mode, namespace string
var mode,policy, namespace string
cmd := &cobra.Command{
Use: "helm",
Short: "generate report",
@ -36,7 +37,7 @@ func HelmCommand() *cobra.Command {
if mode == "cli" && namespace != "" {
var wg sync.WaitGroup
wg.Add(1)
go backgroundScan(namespace, "Helm", &wg, restConfig)
go backgroundScan(namespace, "Helm",policy,&wg,restConfig)
wg.Wait()
return nil
} else if namespace != "" {
@ -67,7 +68,7 @@ func HelmCommand() *cobra.Command {
var wg sync.WaitGroup
wg.Add(len(ns))
for _, n := range ns {
go backgroundScan(n.GetName(), "Helm", &wg, restConfig)
go backgroundScan(n.GetName(), "Helm",policy, &wg, restConfig)
}
wg.Wait()
} else {
@ -81,6 +82,7 @@ func HelmCommand() *cobra.Command {
},
}
cmd.Flags().StringVarP(&namespace, "namespace", "n", "", "define specific namespace")
cmd.Flags().StringVarP(&policy, "policy", "p", "", "define specific policy")
cmd.Flags().StringVarP(&mode, "mode", "m", "cli", "mode")
return cmd
}

View file

@ -16,7 +16,7 @@ import (
func NamespaceCommand() *cobra.Command {
kubernetesConfig := genericclioptions.NewConfigFlags(true)
var mode, namespace string
var mode, namespace, policy string
cmd := &cobra.Command{
Use: "namespace",
Short: "generate report",
@ -36,7 +36,7 @@ func NamespaceCommand() *cobra.Command {
if mode == "cli" && namespace != "" {
var wg sync.WaitGroup
wg.Add(1)
go backgroundScan(namespace, "Namespace", &wg, restConfig)
go backgroundScan(namespace,"Namespace",policy, &wg, restConfig)
wg.Wait()
return nil
} else if namespace != "" {
@ -52,11 +52,8 @@ func NamespaceCommand() *cobra.Command {
np := kubeInformer.Core().V1().Namespaces()
go np.Informer().Run(stopCh)
nSynced := np.Informer().HasSynced
nLister := np.Lister()
if !cache.WaitForCacheSync(stopCh, nSynced) {
log.Log.Error(err, "Failed to create kubernetes client")
os.Exit(1)
@ -69,7 +66,7 @@ func NamespaceCommand() *cobra.Command {
var wg sync.WaitGroup
wg.Add(len(ns))
for _, n := range ns {
go backgroundScan(n.GetName(), "Namespace", &wg, restConfig)
go backgroundScan(n.GetName(), "Namespace",policy, &wg, restConfig)
}
wg.Wait()
} else {
@ -84,6 +81,7 @@ func NamespaceCommand() *cobra.Command {
},
}
cmd.Flags().StringVarP(&namespace, "namespace", "n", "", "define specific namespace")
cmd.Flags().StringVarP(&policy, "policy", "p", "", "define specific policy")
cmd.Flags().StringVarP(&mode, "mode", "m", "cli", "mode")
return cmd
}

View file

@ -1,6 +1,11 @@
package policy
import (
"os"
"strings"
"sync"
"time"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -25,7 +30,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"time"
)
const (
@ -94,7 +98,15 @@ type PolicyController struct {
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister
job *jobs.Job
log logr.Logger
policySync *PolicySync
log logr.Logger
}
// PolicySync Policy Report Job
type PolicySync struct {
mux sync.Mutex
policy []string
}
// NewPolicyController create a new PolicyController
@ -135,6 +147,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pc.pvControl = RealPVControl{Client: kyvernoClient, Recorder: pc.eventRecorder}
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
cpvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addClusterPolicyViolation,
UpdateFunc: pc.updateClusterPolicyViolation,
@ -150,6 +163,11 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pc.cpvListerSynced = cpvInformer.Informer().HasSynced
pc.nspvLister = nspvInformer.Lister()
pc.nspvListerSynced = nspvInformer.Informer().HasSynced
} else {
pc.policySync = &PolicySync{
policy: make([]string, 0),
}
}
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
@ -177,6 +195,17 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
// rebuild after 300 seconds/ 5 mins
//TODO: pass the time in seconds instead of converting it internally
pc.rm = NewResourceManager(30)
go func() {
for k := range time.Tick(60 * time.Second) {
pc.log.V(2).Info("Policy Background sync at", "time", k.String())
if len(pc.policySync.policy) > 0 {
pc.job.Add(jobs.JobInfo{
JobType: "POLICYSYNC",
JobData : strings.Join(pc.policySync.policy,","),
})
}
}
}()
return &pc, nil
}
@ -312,17 +341,22 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.nsListerSynced) {
logger.Info("failed to sync informer cache")
return
}
} else {
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.cpvListerSynced, pc.nspvListerSynced, pc.nsListerSynced) {
logger.Info("failed to sync informer cache")
return
}
}
for i := 0; i < workers; i++ {
go wait.Until(pc.worker, constant.PolicyControllerResync, stopCh)
}
<- stopCh
<-stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -387,14 +421,18 @@ func (pc *PolicyController) syncPolicy(key string) error {
policy = ConvertPolicyToClusterPolicy(nspolicy)
}
if errors.IsNotFound(err) {
go pc.deletePolicyViolations(key)
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
pc.policySync.mux.Lock()
pc.policySync.policy = append(pc.policySync.policy, key)
pc.policySync.mux.Unlock()
return nil
}
go pc.deletePolicyViolations(key)
// remove webhook configurations if there are no policies
if err := pc.removeResourceWebhookConfiguration(); err != nil {
logger.Error(err, "failed to remove resource webhook configurations")
}
return nil
}
@ -402,11 +440,15 @@ func (pc *PolicyController) syncPolicy(key string) error {
return err
}
pc.resourceWebhookWatcher.RegisterResourceWebhook()
engineResponses := pc.processExistingResources(policy)
pc.cleanupAndReport(engineResponses)
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
pc.policySync.mux.Lock()
pc.policySync.policy = append(pc.policySync.policy, key)
pc.policySync.mux.Unlock()
} else {
pc.resourceWebhookWatcher.RegisterResourceWebhook()
engineResponses := pc.processExistingResources(policy)
pc.cleanupAndReport(engineResponses)
}
return nil
}

View file

@ -1,7 +1,6 @@
package policyreport
import (
"context"
"encoding/json"
"reflect"
"strconv"
@ -147,7 +146,18 @@ func NewPRGenerator(client *policyreportclient.Clientset,
},
job: job,
}
go func() {
for k := range time.Tick(60 * time.Second) {
gen.log.V(2).Info("Configmap sync at ", "time", k.String())
err := gen.createConfigmap()
gen.job.Add(jobs.JobInfo{
JobType: "CONFIGMAP",
})
if err != nil {
gen.log.Error(err, "configmap error")
}
}
}()
return &gen
}
@ -182,22 +192,7 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
ticker := time.NewTicker(100 * time.Second)
ctx := context.Background()
for {
select {
case <-ticker.C:
err := gen.createConfigmap()
gen.job.Add(jobs.JobInfo{})
if err != nil {
logger.Error(err, "configmap error")
}
case <-ctx.Done():
break
// Create Jobs
}
}
//<-stopCh
<-stopCh
}
func (gen *Generator) runWorker() {
@ -302,7 +297,6 @@ func (gen *Generator) createConfigmap() error {
func (gen *Generator) syncHandler(info Info) error {
logger := gen.log
defer func() {
logger.Error(nil, "DEBUG", "Key", gen.inMemoryConfigMap)
gen.mux.Unlock()
}()
gen.mux.Lock()

View file

@ -2,15 +2,16 @@ package policyviolation
import (
"errors"
policyreportinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha1"
"github.com/nirmata/kyverno/pkg/jobs"
"github.com/nirmata/kyverno/pkg/policyreport"
"os"
"reflect"
"strconv"
"strings"
"sync"
policyreportinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha1"
"github.com/nirmata/kyverno/pkg/jobs"
"github.com/nirmata/kyverno/pkg/policyreport"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"