diff --git a/main.go b/main.go index 4769d68846..3dae7e248e 100644 --- a/main.go +++ b/main.go @@ -74,11 +74,7 @@ func main() { } // WERBHOOK REGISTRATION CLIENT - webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient( - clientConfig, - client, - serverIP, - int32(webhookTimeout)) + webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient(clientConfig, client, serverIP, int32(webhookTimeout)) if err != nil { glog.Fatalf("Unable to register admission webhooks on cluster: %v\n", err) } @@ -88,58 +84,36 @@ func main() { // - Policy // - PolicyVolation // - cache resync time: 10 seconds - pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions( - pclient, - 10*time.Second) + pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, 10*time.Second) // KUBERNETES RESOURCES INFORMER // watches namespace resource // - cache resync time: 10 seconds - kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions( - kubeClient, - 10*time.Second) + kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Second) // Configuration Data // dyamically load the configuration from configMap // - resource filters // if the configMap is update, the configuration will be updated :D - configData := config.NewConfigData( - kubeClient, - kubeInformer.Core().V1().ConfigMaps(), - filterK8Resources) + configData := config.NewConfigData(kubeClient, kubeInformer.Core().V1().ConfigMaps(), filterK8Resources) // Policy meta-data store - policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies()) + policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies().Lister()) // EVENT GENERATOR // - generate event with retry mechanism - egen := event.NewEventGenerator( - client, - pInformer.Kyverno().V1().ClusterPolicies()) + egen := event.NewEventGenerator(client, pInformer.Kyverno().V1().ClusterPolicies()) // POLICY VIOLATION GENERATOR // -- generate policy violation - pvgen := policyviolation.NewPVGenerator(pclient, - client, - pInformer.Kyverno().V1().ClusterPolicyViolations(), - pInformer.Kyverno().V1().NamespacedPolicyViolations()) + pvgen := policyviolation.NewPVGenerator(pclient, client, pInformer.Kyverno().V1().ClusterPolicyViolations().Lister(), pInformer.Kyverno().V1().NamespacedPolicyViolations().Lister()) // POLICY CONTROLLER // - reconciliation policy and policy violation // - process policy on existing resources // - status aggregator: recieves stats when a policy is applied // & updates the policy status - pc, err := policy.NewPolicyController(pclient, - client, - pInformer.Kyverno().V1().ClusterPolicies(), - pInformer.Kyverno().V1().ClusterPolicyViolations(), - pInformer.Kyverno().V1().NamespacedPolicyViolations(), - egen, - kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), - webhookRegistrationClient, - configData, - pvgen, - policyMetaStore) + pc, err := policy.NewPolicyController(pclient, client, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(), egen, kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), webhookRegistrationClient, configData, pvgen, policyMetaStore) if err != nil { glog.Fatalf("error creating policy controller: %v\n", err) } @@ -147,36 +121,19 @@ func main() { // POLICY VIOLATION CONTROLLER // policy violation cleanup if the corresponding resource is deleted // status: lastUpdatTime - pvc, err := policyviolation.NewPolicyViolationController( - client, - pclient, - pInformer.Kyverno().V1().ClusterPolicies(), - pInformer.Kyverno().V1().ClusterPolicyViolations()) + pvc, err := policyviolation.NewPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations()) if err != nil { glog.Fatalf("error creating cluster policy violation controller: %v\n", err) } - nspvc, err := policyviolation.NewNamespacedPolicyViolationController( - client, - pclient, - pInformer.Kyverno().V1().ClusterPolicies(), - pInformer.Kyverno().V1().NamespacedPolicyViolations()) + nspvc, err := policyviolation.NewNamespacedPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().NamespacedPolicyViolations()) if err != nil { glog.Fatalf("error creating namespaced policy violation controller: %v\n", err) } // GENERATE CONTROLLER // - watches for Namespace resource and generates resource based on the policy generate rule - nsc := namespace.NewNamespaceController( - pclient, - client, - kubeInformer.Core().V1().Namespaces(), - pInformer.Kyverno().V1().ClusterPolicies(), - pc.GetPolicyStatusAggregator(), - egen, - configData, - pvgen, - policyMetaStore) + nsc := namespace.NewNamespaceController(pclient, client, kubeInformer.Core().V1().Namespaces(), pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pc.GetPolicyStatusAggregator(), egen, configData, pvgen, policyMetaStore) // CONFIGURE CERTIFICATES tlsPair, err := initTLSPemPair(clientConfig, client) @@ -199,29 +156,17 @@ func main() { // -- annotations on resources with update details on mutation JSON patches // -- generate policy violation resource // -- generate events on policy and resource - server, err := webhooks.NewWebhookServer( - pclient, - client, - tlsPair, - pInformer.Kyverno().V1().ClusterPolicies(), - kubeInformer.Rbac().V1().RoleBindings(), - kubeInformer.Rbac().V1().ClusterRoleBindings(), - egen, - webhookRegistrationClient, - pc.GetPolicyStatusAggregator(), - configData, - policyMetaStore, - pvgen, - cleanUp) + server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(), + kubeInformer.Rbac().V1().RoleBindings(), kubeInformer.Rbac().V1().ClusterRoleBindings(), egen, webhookRegistrationClient, pc.GetPolicyStatusAggregator(), configData, policyMetaStore, pvgen, cleanUp) if err != nil { glog.Fatalf("Unable to create webhook server: %v\n", err) } // Start the components pInformer.Start(stopCh) kubeInformer.Start(stopCh) - - go configData.Run(stopCh) - go policyMetaStore.Run(stopCh) + if err := configData.Run(stopCh); err != nil { + glog.Fatalf("Unable to load dynamic configuration: %v\n", err) + } go pc.Run(1, stopCh) go pvc.Run(1, stopCh) go nspvc.Run(1, stopCh) diff --git a/pkg/config/dynamicconfig.go b/pkg/config/dynamicconfig.go index c48a954554..5add5ebc2a 100644 --- a/pkg/config/dynamicconfig.go +++ b/pkg/config/dynamicconfig.go @@ -76,12 +76,12 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI return &cd } -//Run checks syncing -func (cd *ConfigData) Run(stopCh <-chan struct{}) { +func (cd *ConfigData) Run(stopCh <-chan struct{}) error { // wait for cache to populate first time if !cache.WaitForCacheSync(stopCh, cd.cmListerSycned) { - glog.Error("configuration: failed to sync informer cache") + return fmt.Errorf("Configuration: Failed to sync informer cache") } + return nil } func (cd *ConfigData) addCM(obj interface{}) { diff --git a/pkg/event/controller.go b/pkg/event/controller.go index 144a6f11d7..bdf664ff2c 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -14,18 +14,14 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) //Generator generate events type Generator struct { - client *client.Client - // list/get cluster policy - pLister kyvernolister.ClusterPolicyLister - // returns true if the cluster policy store has been synced at least once - pSynced cache.InformerSynced + client *client.Client + pLister kyvernolister.ClusterPolicyLister queue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -42,7 +38,6 @@ func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterP client: client, pLister: pInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), - pSynced: pInformer.Informer().HasSynced, recorder: initRecorder(client), } @@ -91,10 +86,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { glog.Info("Starting event generator") defer glog.Info("Shutting down event generator") - if !cache.WaitForCacheSync(stopCh, gen.pSynced) { - glog.Error("event generator: failed to sync informer cache") - } - for i := 0; i < workers; i++ { go wait.Until(gen.runWorker, time.Second, stopCh) } diff --git a/pkg/namespace/controller.go b/pkg/namespace/controller.go index 30292b0bbb..4e4075ed74 100644 --- a/pkg/namespace/controller.go +++ b/pkg/namespace/controller.go @@ -39,12 +39,16 @@ type NamespaceController struct { //nsLister provides expansion to the namespace lister to inject GVK for the resource nsLister NamespaceListerExpansion - // nsSynced returns true if the Namespace store has been synced at least once - nsSynced cache.InformerSynced + // nLsister can list/get namespaces from the shared informer's store + // nsLister v1CoreLister.NamespaceLister + // nsListerSynced returns true if the Namespace store has been synced at least once + nsListerSynced cache.InformerSynced // pvLister can list/get policy violation from the shared informer's store pLister kyvernolister.ClusterPolicyLister - // pSynced retrns true if the Policy store has been synced at least once - pSynced cache.InformerSynced + // pvListerSynced retrns true if the Policy store has been synced at least once + pvListerSynced cache.InformerSynced + // pvLister can list/get policy violation from the shared informer's store + pvLister kyvernolister.ClusterPolicyViolationLister // API to send policy stats for aggregation policyStatus policy.PolicyStatusInterface // eventGen provides interface to generate evenets @@ -66,6 +70,7 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset, client *client.Client, nsInformer v1Informer.NamespaceInformer, pInformer kyvernoinformer.ClusterPolicyInformer, + pvInformer kyvernoinformer.ClusterPolicyViolationInformer, policyStatus policy.PolicyStatusInterface, eventGen event.Interface, configHandler config.Interface, @@ -98,9 +103,10 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset, nsc.syncHandler = nsc.syncNamespace nsc.nsLister = NewNamespaceLister(nsInformer.Lister()) - nsc.nsSynced = nsInformer.Informer().HasSynced + nsc.nsListerSynced = nsInformer.Informer().HasSynced nsc.pLister = pInformer.Lister() - nsc.pSynced = pInformer.Informer().HasSynced + nsc.pvListerSynced = pInformer.Informer().HasSynced + nsc.pvLister = pvInformer.Lister() nsc.policyStatus = policyStatus // resource manager @@ -168,8 +174,7 @@ func (nsc *NamespaceController) Run(workers int, stopCh <-chan struct{}) { glog.Info("Starting namespace controller") defer glog.Info("Shutting down namespace controller") - if ok := cache.WaitForCacheSync(stopCh, nsc.nsSynced, nsc.pSynced); !ok { - glog.Error("namespace generator: failed to sync cache") + if ok := cache.WaitForCacheSync(stopCh, nsc.nsListerSynced); !ok { return } diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index eaedbe0b93..7f40735750 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -395,7 +395,6 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { defer glog.Info("Shutting down policy controller") if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced, pc.nspvListerSynced) { - glog.Info("failed to sync informer cache") return } for i := 0; i < workers; i++ { diff --git a/pkg/policystore/policystore.go b/pkg/policystore/policystore.go index 9a7b7dbfa3..aa16d28f8a 100644 --- a/pkg/policystore/policystore.go +++ b/pkg/policystore/policystore.go @@ -3,11 +3,8 @@ package policystore import ( "sync" - "github.com/golang/glog" kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" - kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1" - "k8s.io/client-go/tools/cache" ) type policyMap map[string]interface{} @@ -16,12 +13,9 @@ type kindMap map[string]namespaceMap //PolicyStore Store the meta-data information to faster lookup policies type PolicyStore struct { - data map[string]namespaceMap - mu sync.RWMutex - // list/get cluster policy + data map[string]namespaceMap + mu sync.RWMutex pLister kyvernolister.ClusterPolicyLister - // returns true if the cluster policy store has been synced at least once - pSynched cache.InformerSynced } //UpdateInterface provides api to update policies @@ -39,22 +33,14 @@ type LookupInterface interface { } // NewPolicyStore returns a new policy store -func NewPolicyStore(pInformer kyvernoinformer.ClusterPolicyInformer) *PolicyStore { +func NewPolicyStore(pLister kyvernolister.ClusterPolicyLister) *PolicyStore { ps := PolicyStore{ - data: make(kindMap), - pLister: pInformer.Lister(), - pSynched: pInformer.Informer().HasSynced, + data: make(kindMap), + pLister: pLister, } return &ps } -//Run checks syncing -func (ps *PolicyStore) Run(stopCh <-chan struct{}) { - if !cache.WaitForCacheSync(stopCh, ps.pSynched) { - glog.Error("policy meta store: failed to sync informer cache") - } -} - //Register a new policy func (ps *PolicyStore) Register(policy kyverno.ClusterPolicy) { ps.mu.Lock() diff --git a/pkg/policyviolation/generator.go b/pkg/policyviolation/generator.go index d398b70d89..cd9a715232 100644 --- a/pkg/policyviolation/generator.go +++ b/pkg/policyviolation/generator.go @@ -12,16 +12,13 @@ import ( kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" kyvernov1 "github.com/nirmata/kyverno/pkg/client/clientset/versioned/typed/kyverno/v1" - kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1" - client "github.com/nirmata/kyverno/pkg/dclient" dclient "github.com/nirmata/kyverno/pkg/dclient" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -32,20 +29,13 @@ const workQueueRetryLimit = 3 type Generator struct { dclient *dclient.Client pvInterface kyvernov1.KyvernoV1Interface - // get/list cluster policy violation - pvLister kyvernolister.ClusterPolicyViolationLister - // get/ist namespaced policy violation - nspvLister kyvernolister.NamespacedPolicyViolationLister - // returns true if the cluster policy store has been synced at least once - pvSynced cache.InformerSynced - // returns true if the namespaced cluster policy store has been synced at at least once - nspvSynced cache.InformerSynced - queue workqueue.RateLimitingInterface - dataStore *dataStore + pvLister kyvernolister.ClusterPolicyViolationLister + nspvLister kyvernolister.NamespacedPolicyViolationLister + queue workqueue.RateLimitingInterface + dataStore *dataStore } -//NewDataStore returns an instance of data store -func newDataStore() *dataStore { +func NewDataStore() *dataStore { ds := dataStore{ data: make(map[string]Info), } @@ -105,17 +95,15 @@ type GeneratorInterface interface { // NewPVGenerator returns a new instance of policy violation generator func NewPVGenerator(client *kyvernoclient.Clientset, dclient *client.Client, - pvInformer kyvernoinformer.ClusterPolicyViolationInformer, - nspvInformer kyvernoinformer.NamespacedPolicyViolationInformer) *Generator { + pvLister kyvernolister.ClusterPolicyViolationLister, + nspvLister kyvernolister.NamespacedPolicyViolationLister) *Generator { gen := Generator{ pvInterface: client.KyvernoV1(), dclient: dclient, - pvLister: pvInformer.Lister(), - pvSynced: pvInformer.Informer().HasSynced, - nspvLister: nspvInformer.Lister(), - nspvSynced: nspvInformer.Informer().HasSynced, + pvLister: pvLister, + nspvLister: nspvLister, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), - dataStore: newDataStore(), + dataStore: NewDataStore(), } return &gen } @@ -142,10 +130,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { glog.Info("Start policy violation generator") defer glog.Info("Shutting down policy violation generator") - if !cache.WaitForCacheSync(stopCh, gen.pvSynced, gen.nspvSynced) { - glog.Error("event generator: failed to sync informer cache") - } - for i := 0; i < workers; i++ { go wait.Until(gen.runWorker, time.Second, stopCh) } diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 15f9889218..3ddf27904a 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -34,24 +34,18 @@ import ( // WebhookServer contains configured TLS server with MutationWebhook. // MutationWebhook gets policies from policyController and takes control of the cluster with kubeclient. type WebhookServer struct { - server http.Server - client *client.Client - kyvernoClient *kyvernoclient.Clientset - // list/get cluster policy resource - pLister kyvernolister.ClusterPolicyLister - // returns true if the cluster policy store has synced atleast - pSynced cache.InformerSynced - // list/get role binding resource - rbLister rbaclister.RoleBindingLister - // return true if role bining store has synced atleast once - rbSynced cache.InformerSynced - // list/get cluster role binding resource - crbLister rbaclister.ClusterRoleBindingLister - // return true if cluster role binding store has synced atleast once - crbSynced cache.InformerSynced - // generate events - eventGen event.Interface - // webhook registration client + server http.Server + client *client.Client + kyvernoClient *kyvernoclient.Clientset + pLister kyvernolister.ClusterPolicyLister + pvLister kyvernolister.ClusterPolicyViolationLister + namespacepvLister kyvernolister.NamespacedPolicyViolationLister + pListerSynced cache.InformerSynced + pvListerSynced cache.InformerSynced + namespacepvListerSynced cache.InformerSynced + rbLister rbaclister.RoleBindingLister + crbLister rbaclister.ClusterRoleBindingLister + eventGen event.Interface webhookRegistrationClient *webhookconfig.WebhookRegistrationClient // API to send policy stats for aggregation policyStatus policy.PolicyStatusInterface @@ -74,6 +68,8 @@ func NewWebhookServer( client *client.Client, tlsPair *tlsutils.TlsPemPair, pInformer kyvernoinformer.ClusterPolicyInformer, + pvInformer kyvernoinformer.ClusterPolicyViolationInformer, + namespacepvInformer kyvernoinformer.NamespacedPolicyViolationInformer, rbInformer rbacinformer.RoleBindingInformer, crbInformer rbacinformer.ClusterRoleBindingInformer, eventGen event.Interface, @@ -100,15 +96,17 @@ func NewWebhookServer( client: client, kyvernoClient: kyvernoClient, pLister: pInformer.Lister(), - pSynced: pInformer.Informer().HasSynced, - rbLister: rbInformer.Lister(), - rbSynced: rbInformer.Informer().HasSynced, - crbLister: crbInformer.Lister(), - crbSynced: crbInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + namespacepvLister: namespacepvInformer.Lister(), + pListerSynced: pvInformer.Informer().HasSynced, + pvListerSynced: pInformer.Informer().HasSynced, + namespacepvListerSynced: namespacepvInformer.Informer().HasSynced, eventGen: eventGen, webhookRegistrationClient: webhookRegistrationClient, policyStatus: policyStatus, configHandler: configHandler, + rbLister: rbInformer.Lister(), + crbLister: crbInformer.Lister(), cleanUp: cleanUp, lastReqTime: checker.NewLastReqTime(), pvGenerator: pvGenerator, @@ -248,10 +246,6 @@ func (ws *WebhookServer) handleAdmissionRequest(request *v1beta1.AdmissionReques // RunAsync TLS server in separate thread and returns control immediately func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) { - if !cache.WaitForCacheSync(stopCh, ws.pSynced, ws.rbSynced, ws.crbSynced) { - glog.Error("webhook: failed to sync informer cache") - } - go func(ws *WebhookServer) { glog.V(3).Infof("serving on %s\n", ws.server.Addr) if err := ws.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed {