1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-15 17:51:20 +00:00
* Re-implement #4155

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Address https://github.com/kyverno/kyverno/pull/4162 comments

Signed-off-by: ShutingZhao <shuting@nirmata.com>
This commit is contained in:
shuting 2022-06-28 17:27:34 +08:00 committed by GitHub
parent 4ba30ee140
commit 1ca2f3ce1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 129 additions and 48 deletions

View file

@ -42,6 +42,7 @@ import (
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"sigs.k8s.io/controller-runtime/pkg/log"
@ -338,7 +339,12 @@ func main() {
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", setupLog)
registerWebhookConfigurations := func() {
certManager.InitTLSPemPair()
waitForCacheSync(stopCh, kyvernoInformer, kubeInformer, kubeKyvernoInformer)
// wait for cache to be synced before use it
cache.WaitForCacheSync(stopCh,
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced,
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced,
)
// validate the ConfigMap format
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace, config.KyvernoConfigMapName); err != nil {

View file

@ -59,6 +59,8 @@ type Controller struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister corelister.NamespaceLister
informersSynced []cache.InformerSynced
// logger
log logr.Logger
}
@ -90,6 +92,8 @@ func NewController(
c.urLister = urInformer.Lister().UpdateRequests(config.KyvernoNamespace)
c.nsLister = namespaceInformer.Lister()
c.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced}
return &c, nil
}
@ -192,6 +196,10 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("update-request-cleanup", stopCh, c.informersSynced...) {
return
}
c.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.deletePolicy, // we only cleanup if the policy is delete
})

View file

@ -52,6 +52,8 @@ type controller struct {
nsLister corev1listers.NamespaceLister
podLister corev1listers.PodLister
informersSynced []cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
@ -81,7 +83,7 @@ func NewController(
urLister: urLister,
nsLister: namespaceInformer.Lister(),
podLister: podInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "update-request"),
eventGen: eventGen,
configuration: dynamicConfig,
}
@ -98,6 +100,9 @@ func NewController(
UpdateFunc: c.updatePolicy,
DeleteFunc: c.deletePolicy,
})
c.informersSynced = []cache.InformerSynced{cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced, podInformer.Informer().HasSynced}
return &c
}
@ -109,6 +114,10 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("background", stopCh, c.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}

View file

@ -28,16 +28,14 @@ type Controller interface {
}
type controller struct {
renewer *tls.CertRenewer
secretInformer informerv1.SecretInformer
secretQueue chan bool
renewer *tls.CertRenewer
secretQueue chan bool
}
func NewController(secretInformer informerv1.SecretInformer, kubeClient kubernetes.Interface, certRenewer *tls.CertRenewer) (Controller, error) {
manager := &controller{
renewer: certRenewer,
secretInformer: secretInformer,
secretQueue: make(chan bool, 1),
renewer: certRenewer,
secretQueue: make(chan bool, 1),
}
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: manager.addSecretFunc,

View file

@ -26,6 +26,9 @@ type controller struct {
// listers
configmapLister corev1listers.ConfigMapLister
// configmapSynced returns true if the configmap shared informer has synced at least once
configmapSynced cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
}
@ -41,6 +44,9 @@ func NewController(configmapInformer corev1informers.ConfigMapInformer, configur
UpdateFunc: c.update,
DeleteFunc: c.delete,
})
c.configmapSynced = configmapInformer.Informer().HasSynced
return &c
}
@ -102,6 +108,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("config-controller", stopCh, c.configmapSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}

View file

@ -79,6 +79,8 @@ type PolicyController struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister listerv1.NamespaceLister
informersSynced []cache.InformerSynced
// Resource manager, manages the mapping for already processed resource
rm resourceManager
@ -146,6 +148,8 @@ func NewPolicyController(
pc.nsLister = namespaces.Lister()
pc.urLister = urInformer.Lister()
pc.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaces.Informer().HasSynced}
// resource manager
// rebuild after 300 seconds/ 5 mins
pc.rm = NewResourceManager(30)
@ -416,6 +420,10 @@ func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, cleanupCha
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("PolicyController", stopCh, pc.informersSynced...) {
return
}
pc.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,

View file

@ -44,8 +44,12 @@ func (dl dummyNsLister) Get(name string) (*kyverno.Policy, error) {
return nil, fmt.Errorf("not implemented")
}
func fakeHasSynced() bool {
return true
}
func Test_All(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newPolicy(t)
//add
pCache.add(policy)
@ -77,7 +81,7 @@ func Test_All(t *testing.T) {
}
func Test_Add_Duplicate_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newPolicy(t)
pCache.add(policy)
pCache.add(policy)
@ -103,7 +107,7 @@ func Test_Add_Duplicate_Policy(t *testing.T) {
}
func Test_Add_Validate_Audit(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newPolicy(t)
pCache.add(policy)
pCache.add(policy)
@ -128,7 +132,7 @@ func Test_Add_Validate_Audit(t *testing.T) {
}
func Test_Add_Remove(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newPolicy(t)
kind := "Pod"
pCache.add(policy)
@ -156,7 +160,7 @@ func Test_Add_Remove(t *testing.T) {
}
func Test_Add_Remove_Any(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newAnyPolicy(t)
kind := "Pod"
pCache.add(policy)
@ -184,7 +188,7 @@ func Test_Add_Remove_Any(t *testing.T) {
}
func Test_Remove_From_Empty_Cache(t *testing.T) {
pCache := newPolicyCache(nil, nil)
pCache := newPolicyCache(nil, nil, fakeHasSynced, fakeHasSynced)
policy := newPolicy(t)
pCache.remove(policy)
@ -925,7 +929,7 @@ func newValidateEnforcePolicy(t *testing.T) *kyverno.ClusterPolicy {
}
func Test_Ns_All(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newNsPolicy(t)
//add
pCache.add(policy)
@ -957,7 +961,7 @@ func Test_Ns_All(t *testing.T) {
}
func Test_Ns_Add_Duplicate_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newNsPolicy(t)
pCache.add(policy)
pCache.add(policy)
@ -984,7 +988,7 @@ func Test_Ns_Add_Duplicate_Policy(t *testing.T) {
}
func Test_Ns_Add_Validate_Audit(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newNsPolicy(t)
pCache.add(policy)
pCache.add(policy)
@ -1009,7 +1013,7 @@ func Test_Ns_Add_Validate_Audit(t *testing.T) {
}
func Test_Ns_Add_Remove(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newNsPolicy(t)
nspace := policy.GetNamespace()
kind := "Pod"
@ -1027,7 +1031,7 @@ func Test_Ns_Add_Remove(t *testing.T) {
}
func Test_GVk_Cache(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newGVKPolicy(t)
//add
pCache.add(policy)
@ -1043,7 +1047,7 @@ func Test_GVk_Cache(t *testing.T) {
}
func Test_GVK_Add_Remove(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newGVKPolicy(t)
kind := "ClusterRole"
pCache.add(policy)
@ -1060,7 +1064,7 @@ func Test_GVK_Add_Remove(t *testing.T) {
}
func Test_Add_Validate_Enforce(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newUserTestPolicy(t)
nspace := policy.GetNamespace()
//add
@ -1076,7 +1080,7 @@ func Test_Add_Validate_Enforce(t *testing.T) {
}
func Test_Ns_Add_Remove_User(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newUserTestPolicy(t)
nspace := policy.GetNamespace()
kind := "Deployment"
@ -1094,7 +1098,7 @@ func Test_Ns_Add_Remove_User(t *testing.T) {
}
func Test_Mutate_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newMutatePolicy(t)
//add
pCache.add(policy)
@ -1113,7 +1117,7 @@ func Test_Mutate_Policy(t *testing.T) {
}
func Test_Generate_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newgenratePolicy(t)
//add
pCache.add(policy)
@ -1130,7 +1134,7 @@ func Test_Generate_Policy(t *testing.T) {
}
func Test_NsMutate_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy := newMutatePolicy(t)
nspolicy := newNsMutatePolicy(t)
//add
@ -1155,7 +1159,7 @@ func Test_NsMutate_Policy(t *testing.T) {
}
func Test_Validate_Enforce_Policy(t *testing.T) {
pCache := newPolicyCache(dummyLister{}, dummyNsLister{})
pCache := newPolicyCache(dummyLister{}, dummyNsLister{}, fakeHasSynced, fakeHasSynced)
policy1 := newValidateAuditPolicy(t)
policy2 := newValidateEnforcePolicy(t)
pCache.add(policy1)

View file

@ -29,7 +29,7 @@ type Controller struct {
// NewPolicyCacheController create a new PolicyController
func NewPolicyCacheController(pInformer kyvernoinformer.ClusterPolicyInformer, nspInformer kyvernoinformer.PolicyInformer) *Controller {
pc := Controller{
Cache: newPolicyCache(pInformer.Lister(), nspInformer.Lister()),
Cache: newPolicyCache(pInformer.Lister(), nspInformer.Lister(), pInformer.Informer().HasSynced, nspInformer.Informer().HasSynced),
}
// ClusterPolicy Informer
@ -106,6 +106,10 @@ func (c *Controller) deleteNsPolicy(obj interface{}) {
func (c *Controller) CheckPolicySync(stopCh <-chan struct{}) {
logger.Info("starting")
if !cache.WaitForNamedCacheSync("config-controller", stopCh, c.Cache.informerHasSynced()...) {
return
}
policies := []kyverno.PolicyInterface{}
polList, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything())
if err != nil {

View file

@ -5,6 +5,7 @@ import (
kyvernolister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
"github.com/kyverno/kyverno/pkg/policy"
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
"k8s.io/client-go/tools/cache"
)
// Interface ...
@ -24,6 +25,8 @@ type Interface interface {
update(kyverno.PolicyInterface, kyverno.PolicyInterface)
get(PolicyType, string, string) []string
informerHasSynced() []cache.InformerSynced
}
// policyCache ...
@ -35,10 +38,12 @@ type policyCache struct {
// npLister can list/get namespace policy from the shared informer's store
npLister kyvernolister.PolicyLister
informersSynced []cache.InformerSynced
}
// newPolicyCache ...
func newPolicyCache(pLister kyvernolister.ClusterPolicyLister, npLister kyvernolister.PolicyLister) Interface {
func newPolicyCache(pLister kyvernolister.ClusterPolicyLister, npLister kyvernolister.PolicyLister, pSynced, npSynced cache.InformerSynced) Interface {
namesCache := map[PolicyType]map[string]bool{
Mutate: make(map[string]bool),
ValidateEnforce: make(map[string]bool),
@ -55,6 +60,7 @@ func newPolicyCache(pLister kyvernolister.ClusterPolicyLister, npLister kyvernol
},
pLister,
npLister,
[]cache.InformerSynced{pSynced, npSynced},
}
}
@ -78,6 +84,10 @@ func (pc *policyCache) GetPolicies(pkey PolicyType, kind, nspace string) []kyver
return append(policies, nsPolicies...)
}
func (pc *policyCache) informerHasSynced() []cache.InformerSynced {
return pc.informersSynced
}
// Remove a policy from cache
func (pc *policyCache) remove(p kyverno.PolicyInterface) {
pc.pMap.remove(p)

View file

@ -10,11 +10,8 @@ import (
"github.com/go-logr/logr"
changerequest "github.com/kyverno/kyverno/api/kyverno/v1alpha2"
policyreportv1alpha2 "github.com/kyverno/kyverno/api/policyreport/v1alpha2"
report "github.com/kyverno/kyverno/api/policyreport/v1alpha2"
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
kyvernov1alpha2informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha2"
requestinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha2"
policyreportinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha2"
policyreportv1alpha2informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha2"
requestlister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha2"
policyreport "github.com/kyverno/kyverno/pkg/client/listers/policyreport/v1alpha2"
@ -60,10 +57,10 @@ type ReportGenerator struct {
pclient kyvernoclient.Interface
dclient dclient.Interface
clusterReportInformer policyreportinformer.ClusterPolicyReportInformer
reportInformer policyreportinformer.PolicyReportInformer
reportReqInformer requestinformer.ReportChangeRequestInformer
clusterReportReqInformer requestinformer.ClusterReportChangeRequestInformer
clusterReportInformer policyreportv1alpha2informers.ClusterPolicyReportInformer
reportInformer policyreportv1alpha2informers.PolicyReportInformer
reportReqInformer kyvernov1alpha2informers.ReportChangeRequestInformer
clusterReportReqInformer kyvernov1alpha2informers.ClusterReportChangeRequestInformer
reportLister policyreport.PolicyReportLister
clusterReportLister policyreport.ClusterPolicyReportLister
@ -71,6 +68,8 @@ type ReportGenerator struct {
clusterReportChangeRequestLister requestlister.ClusterReportChangeRequestLister
nsLister listerv1.NamespaceLister
informersSynced []cache.InformerSynced
queue workqueue.RateLimitingInterface
// ReconcileCh sends a signal to policy controller to force the reconciliation of policy report
@ -113,6 +112,8 @@ func NewReportGenerator(
gen.reportChangeRequestLister = reportReqInformer.Lister()
gen.nsLister = namespace.Lister()
gen.informersSynced = []cache.InformerSynced{clusterReportInformer.Informer().HasSynced, reportInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, clusterReportInformer.Informer().HasSynced, namespace.Informer().HasSynced}
return gen, nil
}
@ -217,7 +218,7 @@ func (g *ReportGenerator) updateClusterReportChangeRequest(old interface{}, cur
}
func (g *ReportGenerator) deletePolicyReport(obj interface{}) {
report, ok := kubeutils.GetObjectWithTombstone(obj).(*report.PolicyReport)
report, ok := kubeutils.GetObjectWithTombstone(obj).(*policyreportv1alpha2.PolicyReport)
if ok {
g.log.V(2).Info("PolicyReport deleted", "name", report.GetName())
} else {
@ -240,6 +241,10 @@ func (g *ReportGenerator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("PolicyReportGenerator", stopCh, g.informersSynced...) {
return
}
g.reportReqInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: g.addReportChangeRequest,
@ -460,7 +465,7 @@ func (g *ReportGenerator) removeFromClusterPolicyReport(policyName, ruleName str
}
for _, cpolr := range cpolrs {
newRes := []report.PolicyReportResult{}
newRes := []policyreportv1alpha2.PolicyReportResult{}
for _, result := range cpolr.Results {
if ruleName != "" && result.Rule == ruleName && result.Policy == policyName {
continue
@ -471,7 +476,7 @@ func (g *ReportGenerator) removeFromClusterPolicyReport(policyName, ruleName str
}
cpolr.Results = newRes
cpolr.Summary = calculateSummary(newRes)
gv := report.SchemeGroupVersion
gv := policyreportv1alpha2.SchemeGroupVersion
cpolr.SetGroupVersionKind(schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: "ClusterPolicyReport"})
if _, err := g.pclient.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), cpolr, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update clusterPolicyReport %s %v", cpolr.Name, err)
@ -491,7 +496,7 @@ func (g *ReportGenerator) removeFromPolicyReport(policyName, ruleName string) er
g.log.Error(err, "failed to build labelSelector")
}
policyReports := []*report.PolicyReport{}
policyReports := []*policyreportv1alpha2.PolicyReport{}
for _, ns := range namespaces.Items {
reports, err := g.reportLister.PolicyReports(ns.GetName()).List(selector)
if err != nil {
@ -501,7 +506,7 @@ func (g *ReportGenerator) removeFromPolicyReport(policyName, ruleName string) er
}
for _, r := range policyReports {
newRes := []report.PolicyReportResult{}
newRes := []policyreportv1alpha2.PolicyReportResult{}
for _, result := range r.Results {
if ruleName != "" && result.Rule == ruleName && result.Policy == policyName {
continue
@ -513,7 +518,7 @@ func (g *ReportGenerator) removeFromPolicyReport(policyName, ruleName string) er
r.Results = newRes
r.Summary = calculateSummary(newRes)
gv := report.SchemeGroupVersion
gv := policyreportv1alpha2.SchemeGroupVersion
gvk := schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: "PolicyReport"}
r.SetGroupVersionKind(gvk)
if _, err := g.pclient.Wgpolicyk8sV1alpha2().PolicyReports(r.GetNamespace()).Update(context.TODO(), r, metav1.UpdateOptions{}); err != nil {
@ -570,7 +575,7 @@ func (g *ReportGenerator) aggregateReports(namespace string) (
}
func mergeRequests(ns, kyvernoNs *v1.Namespace, requestsGeneral interface{}) (*unstructured.Unstructured, interface{}, error) {
results := []report.PolicyReportResult{}
results := []policyreportv1alpha2.PolicyReportResult{}
if requests, ok := requestsGeneral.([]*changerequest.ClusterReportChangeRequest); ok {
aggregatedRequests := []*changerequest.ClusterReportChangeRequest{}
@ -584,7 +589,7 @@ func mergeRequests(ns, kyvernoNs *v1.Namespace, requestsGeneral interface{}) (*u
aggregatedRequests = append(aggregatedRequests, request)
}
report := &report.ClusterPolicyReport{
report := &policyreportv1alpha2.ClusterPolicyReport{
Results: results,
Summary: calculateSummary(results),
}
@ -611,7 +616,7 @@ func mergeRequests(ns, kyvernoNs *v1.Namespace, requestsGeneral interface{}) (*u
aggregatedRequests = append(aggregatedRequests, request)
}
report := &report.PolicyReport{
report := &policyreportv1alpha2.PolicyReport{
Results: results,
Summary: calculateSummary(results),
}
@ -631,7 +636,7 @@ func mergeRequests(ns, kyvernoNs *v1.Namespace, requestsGeneral interface{}) (*u
}
func setReport(reportUnstructured *unstructured.Unstructured, ns, kyvernoNs *v1.Namespace) {
reportUnstructured.SetAPIVersion(report.SchemeGroupVersion.String())
reportUnstructured.SetAPIVersion(policyreportv1alpha2.SchemeGroupVersion.String())
reportUnstructured.SetLabels(LabelSelector.MatchLabels)
if kyvernoNs != nil {
@ -668,7 +673,7 @@ func (g *ReportGenerator) updateReport(old interface{}, new *unstructured.Unstru
oldUnstructured := make(map[string]interface{})
if oldTyped, ok := old.(*report.ClusterPolicyReport); ok {
if oldTyped, ok := old.(*policyreportv1alpha2.ClusterPolicyReport); ok {
if oldTyped.GetDeletionTimestamp() != nil {
return g.pclient.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Delete(context.TODO(), oldTyped.Name, metav1.DeleteOptions{})
}
@ -678,7 +683,7 @@ func (g *ReportGenerator) updateReport(old interface{}, new *unstructured.Unstru
}
new.SetUID(oldTyped.GetUID())
new.SetResourceVersion(oldTyped.GetResourceVersion())
} else if oldTyped, ok := old.(*report.PolicyReport); ok {
} else if oldTyped, ok := old.(*policyreportv1alpha2.PolicyReport); ok {
if oldTyped.GetDeletionTimestamp() != nil {
return g.pclient.Wgpolicyk8sV1alpha2().PolicyReports(oldTyped.Namespace).Delete(context.TODO(), oldTyped.Name, metav1.DeleteOptions{})
}

View file

@ -20,6 +20,7 @@ import (
cmap "github.com/orcaman/concurrent-map"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -43,6 +44,8 @@ type Generator struct {
// polLister can list/get namespace policy from the shared informer's store
polLister kyvernolister.PolicyLister
informersSynced []cache.InformerSynced
queue workqueue.RateLimitingInterface
dataStore *dataStore
@ -82,6 +85,8 @@ func NewReportChangeRequestGenerator(client policyreportclient.Interface,
log: log,
}
gen.informersSynced = []cache.InformerSynced{clusterReportReqInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced}
return &gen
}
@ -214,6 +219,10 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("requestCreator", stopCh, gen.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -24,6 +24,7 @@ import (
rbacinformer "k8s.io/client-go/informers/rbac/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
rbaclister "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -52,6 +53,8 @@ type auditHandler struct {
crbLister rbaclister.ClusterRoleBindingLister
nsLister listerv1.NamespaceLister
informersSynced []cache.InformerSynced
log logr.Logger
configHandler config.Configuration
promConfig *metrics.PromConfig
@ -69,7 +72,7 @@ func NewValidateAuditHandler(pCache policycache.Interface,
client client.Interface,
promConfig *metrics.PromConfig) AuditHandler {
return &auditHandler{
a := &auditHandler{
pCache: pCache,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
eventGen: eventGen,
@ -82,6 +85,8 @@ func NewValidateAuditHandler(pCache policycache.Interface,
client: client,
promConfig: promConfig,
}
a.informersSynced = []cache.InformerSynced{rbInformer.Informer().HasSynced, crbInformer.Informer().HasSynced, namespaces.Informer().HasSynced}
return a
}
func (h *auditHandler) Add(request *admissionv1.AdmissionRequest) {
@ -97,6 +102,10 @@ func (h *auditHandler) Run(workers int, stopCh <-chan struct{}) {
h.log.V(4).Info("shutting down")
}()
if !cache.WaitForNamedCacheSync("ValidateAuditHandler", stopCh, h.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(h.runWorker, time.Second, stopCh)
}