diff --git a/main.go b/main.go index 3dae7e248e..4769d68846 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,11 @@ func main() { } // WERBHOOK REGISTRATION CLIENT - webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient(clientConfig, client, serverIP, int32(webhookTimeout)) + webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient( + clientConfig, + client, + serverIP, + int32(webhookTimeout)) if err != nil { glog.Fatalf("Unable to register admission webhooks on cluster: %v\n", err) } @@ -84,36 +88,58 @@ func main() { // - Policy // - PolicyVolation // - cache resync time: 10 seconds - pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, 10*time.Second) + pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions( + pclient, + 10*time.Second) // KUBERNETES RESOURCES INFORMER // watches namespace resource // - cache resync time: 10 seconds - kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Second) + kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions( + kubeClient, + 10*time.Second) // Configuration Data // dyamically load the configuration from configMap // - resource filters // if the configMap is update, the configuration will be updated :D - configData := config.NewConfigData(kubeClient, kubeInformer.Core().V1().ConfigMaps(), filterK8Resources) + configData := config.NewConfigData( + kubeClient, + kubeInformer.Core().V1().ConfigMaps(), + filterK8Resources) // Policy meta-data store - policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies().Lister()) + policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies()) // EVENT GENERATOR // - generate event with retry mechanism - egen := event.NewEventGenerator(client, pInformer.Kyverno().V1().ClusterPolicies()) + egen := event.NewEventGenerator( + client, + pInformer.Kyverno().V1().ClusterPolicies()) // POLICY VIOLATION GENERATOR // -- generate policy violation - pvgen := policyviolation.NewPVGenerator(pclient, client, pInformer.Kyverno().V1().ClusterPolicyViolations().Lister(), pInformer.Kyverno().V1().NamespacedPolicyViolations().Lister()) + pvgen := policyviolation.NewPVGenerator(pclient, + client, + pInformer.Kyverno().V1().ClusterPolicyViolations(), + pInformer.Kyverno().V1().NamespacedPolicyViolations()) // POLICY CONTROLLER // - reconciliation policy and policy violation // - process policy on existing resources // - status aggregator: recieves stats when a policy is applied // & updates the policy status - pc, err := policy.NewPolicyController(pclient, client, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(), egen, kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), webhookRegistrationClient, configData, pvgen, policyMetaStore) + pc, err := policy.NewPolicyController(pclient, + client, + pInformer.Kyverno().V1().ClusterPolicies(), + pInformer.Kyverno().V1().ClusterPolicyViolations(), + pInformer.Kyverno().V1().NamespacedPolicyViolations(), + egen, + kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), + webhookRegistrationClient, + configData, + pvgen, + policyMetaStore) if err != nil { glog.Fatalf("error creating policy controller: %v\n", err) } @@ -121,19 +147,36 @@ func main() { // POLICY VIOLATION CONTROLLER // policy violation cleanup if the corresponding resource is deleted // status: lastUpdatTime - pvc, err := policyviolation.NewPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations()) + pvc, err := policyviolation.NewPolicyViolationController( + client, + pclient, + pInformer.Kyverno().V1().ClusterPolicies(), + pInformer.Kyverno().V1().ClusterPolicyViolations()) if err != nil { glog.Fatalf("error creating cluster policy violation controller: %v\n", err) } - nspvc, err := policyviolation.NewNamespacedPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().NamespacedPolicyViolations()) + nspvc, err := policyviolation.NewNamespacedPolicyViolationController( + client, + pclient, + pInformer.Kyverno().V1().ClusterPolicies(), + pInformer.Kyverno().V1().NamespacedPolicyViolations()) if err != nil { glog.Fatalf("error creating namespaced policy violation controller: %v\n", err) } // GENERATE CONTROLLER // - watches for Namespace resource and generates resource based on the policy generate rule - nsc := namespace.NewNamespaceController(pclient, client, kubeInformer.Core().V1().Namespaces(), pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pc.GetPolicyStatusAggregator(), egen, configData, pvgen, policyMetaStore) + nsc := namespace.NewNamespaceController( + pclient, + client, + kubeInformer.Core().V1().Namespaces(), + pInformer.Kyverno().V1().ClusterPolicies(), + pc.GetPolicyStatusAggregator(), + egen, + configData, + pvgen, + policyMetaStore) // CONFIGURE CERTIFICATES tlsPair, err := initTLSPemPair(clientConfig, client) @@ -156,17 +199,29 @@ func main() { // -- annotations on resources with update details on mutation JSON patches // -- generate policy violation resource // -- generate events on policy and resource - server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(), - kubeInformer.Rbac().V1().RoleBindings(), kubeInformer.Rbac().V1().ClusterRoleBindings(), egen, webhookRegistrationClient, pc.GetPolicyStatusAggregator(), configData, policyMetaStore, pvgen, cleanUp) + server, err := webhooks.NewWebhookServer( + pclient, + client, + tlsPair, + pInformer.Kyverno().V1().ClusterPolicies(), + kubeInformer.Rbac().V1().RoleBindings(), + kubeInformer.Rbac().V1().ClusterRoleBindings(), + egen, + webhookRegistrationClient, + pc.GetPolicyStatusAggregator(), + configData, + policyMetaStore, + pvgen, + cleanUp) if err != nil { glog.Fatalf("Unable to create webhook server: %v\n", err) } // Start the components pInformer.Start(stopCh) kubeInformer.Start(stopCh) - if err := configData.Run(stopCh); err != nil { - glog.Fatalf("Unable to load dynamic configuration: %v\n", err) - } + + go configData.Run(stopCh) + go policyMetaStore.Run(stopCh) go pc.Run(1, stopCh) go pvc.Run(1, stopCh) go nspvc.Run(1, stopCh) diff --git a/pkg/config/dynamicconfig.go b/pkg/config/dynamicconfig.go index 5add5ebc2a..03b77926ec 100644 --- a/pkg/config/dynamicconfig.go +++ b/pkg/config/dynamicconfig.go @@ -30,7 +30,7 @@ type ConfigData struct { // configuration data filters []k8Resource // hasynced - cmListerSycned cache.InformerSynced + cmSycned cache.InformerSynced } // ToFilter checks if the given resource is set to be filtered in the configuration @@ -57,9 +57,9 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI glog.Info("ConfigMap name not defined in env:INIT_CONFIG: loading no default configuration") } cd := ConfigData{ - client: rclient, - cmName: os.Getenv(cmNameEnv), - cmListerSycned: cmInformer.Informer().HasSynced, + client: rclient, + cmName: os.Getenv(cmNameEnv), + cmSycned: cmInformer.Informer().HasSynced, } //TODO: this has been added to backward support command line arguments // will be removed in future and the configuration will be set only via configmaps @@ -76,12 +76,12 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI return &cd } -func (cd *ConfigData) Run(stopCh <-chan struct{}) error { +//Run checks syncing +func (cd *ConfigData) Run(stopCh <-chan struct{}) { // wait for cache to populate first time - if !cache.WaitForCacheSync(stopCh, cd.cmListerSycned) { - return fmt.Errorf("Configuration: Failed to sync informer cache") + if !cache.WaitForCacheSync(stopCh, cd.cmSycned) { + glog.Error("configuration: failed to sync informer cache") } - return nil } func (cd *ConfigData) addCM(obj interface{}) { diff --git a/pkg/event/controller.go b/pkg/event/controller.go index bdf664ff2c..144a6f11d7 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -14,14 +14,18 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) //Generator generate events type Generator struct { - client *client.Client - pLister kyvernolister.ClusterPolicyLister + client *client.Client + // list/get cluster policy + pLister kyvernolister.ClusterPolicyLister + // returns true if the cluster policy store has been synced at least once + pSynced cache.InformerSynced queue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -38,6 +42,7 @@ func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterP client: client, pLister: pInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), + pSynced: pInformer.Informer().HasSynced, recorder: initRecorder(client), } @@ -86,6 +91,10 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { glog.Info("Starting event generator") defer glog.Info("Shutting down event generator") + if !cache.WaitForCacheSync(stopCh, gen.pSynced) { + glog.Error("event generator: failed to sync informer cache") + } + for i := 0; i < workers; i++ { go wait.Until(gen.runWorker, time.Second, stopCh) } diff --git a/pkg/namespace/controller.go b/pkg/namespace/controller.go index 4e4075ed74..30292b0bbb 100644 --- a/pkg/namespace/controller.go +++ b/pkg/namespace/controller.go @@ -39,16 +39,12 @@ type NamespaceController struct { //nsLister provides expansion to the namespace lister to inject GVK for the resource nsLister NamespaceListerExpansion - // nLsister can list/get namespaces from the shared informer's store - // nsLister v1CoreLister.NamespaceLister - // nsListerSynced returns true if the Namespace store has been synced at least once - nsListerSynced cache.InformerSynced + // nsSynced returns true if the Namespace store has been synced at least once + nsSynced cache.InformerSynced // pvLister can list/get policy violation from the shared informer's store pLister kyvernolister.ClusterPolicyLister - // pvListerSynced retrns true if the Policy store has been synced at least once - pvListerSynced cache.InformerSynced - // pvLister can list/get policy violation from the shared informer's store - pvLister kyvernolister.ClusterPolicyViolationLister + // pSynced retrns true if the Policy store has been synced at least once + pSynced cache.InformerSynced // API to send policy stats for aggregation policyStatus policy.PolicyStatusInterface // eventGen provides interface to generate evenets @@ -70,7 +66,6 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset, client *client.Client, nsInformer v1Informer.NamespaceInformer, pInformer kyvernoinformer.ClusterPolicyInformer, - pvInformer kyvernoinformer.ClusterPolicyViolationInformer, policyStatus policy.PolicyStatusInterface, eventGen event.Interface, configHandler config.Interface, @@ -103,10 +98,9 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset, nsc.syncHandler = nsc.syncNamespace nsc.nsLister = NewNamespaceLister(nsInformer.Lister()) - nsc.nsListerSynced = nsInformer.Informer().HasSynced + nsc.nsSynced = nsInformer.Informer().HasSynced nsc.pLister = pInformer.Lister() - nsc.pvListerSynced = pInformer.Informer().HasSynced - nsc.pvLister = pvInformer.Lister() + nsc.pSynced = pInformer.Informer().HasSynced nsc.policyStatus = policyStatus // resource manager @@ -174,7 +168,8 @@ func (nsc *NamespaceController) Run(workers int, stopCh <-chan struct{}) { glog.Info("Starting namespace controller") defer glog.Info("Shutting down namespace controller") - if ok := cache.WaitForCacheSync(stopCh, nsc.nsListerSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, nsc.nsSynced, nsc.pSynced); !ok { + glog.Error("namespace generator: failed to sync cache") return } diff --git a/pkg/policy/controller.go b/pkg/policy/controller.go index 7f40735750..eaedbe0b93 100644 --- a/pkg/policy/controller.go +++ b/pkg/policy/controller.go @@ -395,6 +395,7 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) { defer glog.Info("Shutting down policy controller") if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced, pc.nspvListerSynced) { + glog.Info("failed to sync informer cache") return } for i := 0; i < workers; i++ { diff --git a/pkg/policystore/policystore.go b/pkg/policystore/policystore.go index aa16d28f8a..9a7b7dbfa3 100644 --- a/pkg/policystore/policystore.go +++ b/pkg/policystore/policystore.go @@ -3,8 +3,11 @@ package policystore import ( "sync" + "github.com/golang/glog" kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" + kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1" + "k8s.io/client-go/tools/cache" ) type policyMap map[string]interface{} @@ -13,9 +16,12 @@ type kindMap map[string]namespaceMap //PolicyStore Store the meta-data information to faster lookup policies type PolicyStore struct { - data map[string]namespaceMap - mu sync.RWMutex + data map[string]namespaceMap + mu sync.RWMutex + // list/get cluster policy pLister kyvernolister.ClusterPolicyLister + // returns true if the cluster policy store has been synced at least once + pSynched cache.InformerSynced } //UpdateInterface provides api to update policies @@ -33,14 +39,22 @@ type LookupInterface interface { } // NewPolicyStore returns a new policy store -func NewPolicyStore(pLister kyvernolister.ClusterPolicyLister) *PolicyStore { +func NewPolicyStore(pInformer kyvernoinformer.ClusterPolicyInformer) *PolicyStore { ps := PolicyStore{ - data: make(kindMap), - pLister: pLister, + data: make(kindMap), + pLister: pInformer.Lister(), + pSynched: pInformer.Informer().HasSynced, } return &ps } +//Run checks syncing +func (ps *PolicyStore) Run(stopCh <-chan struct{}) { + if !cache.WaitForCacheSync(stopCh, ps.pSynched) { + glog.Error("policy meta store: failed to sync informer cache") + } +} + //Register a new policy func (ps *PolicyStore) Register(policy kyverno.ClusterPolicy) { ps.mu.Lock() diff --git a/pkg/policyviolation/generator.go b/pkg/policyviolation/generator.go index cd9a715232..d398b70d89 100644 --- a/pkg/policyviolation/generator.go +++ b/pkg/policyviolation/generator.go @@ -12,13 +12,16 @@ import ( kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1" kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned" kyvernov1 "github.com/nirmata/kyverno/pkg/client/clientset/versioned/typed/kyverno/v1" + kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1" + client "github.com/nirmata/kyverno/pkg/dclient" dclient "github.com/nirmata/kyverno/pkg/dclient" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -29,13 +32,20 @@ const workQueueRetryLimit = 3 type Generator struct { dclient *dclient.Client pvInterface kyvernov1.KyvernoV1Interface - pvLister kyvernolister.ClusterPolicyViolationLister - nspvLister kyvernolister.NamespacedPolicyViolationLister - queue workqueue.RateLimitingInterface - dataStore *dataStore + // get/list cluster policy violation + pvLister kyvernolister.ClusterPolicyViolationLister + // get/ist namespaced policy violation + nspvLister kyvernolister.NamespacedPolicyViolationLister + // returns true if the cluster policy store has been synced at least once + pvSynced cache.InformerSynced + // returns true if the namespaced cluster policy store has been synced at at least once + nspvSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + dataStore *dataStore } -func NewDataStore() *dataStore { +//NewDataStore returns an instance of data store +func newDataStore() *dataStore { ds := dataStore{ data: make(map[string]Info), } @@ -95,15 +105,17 @@ type GeneratorInterface interface { // NewPVGenerator returns a new instance of policy violation generator func NewPVGenerator(client *kyvernoclient.Clientset, dclient *client.Client, - pvLister kyvernolister.ClusterPolicyViolationLister, - nspvLister kyvernolister.NamespacedPolicyViolationLister) *Generator { + pvInformer kyvernoinformer.ClusterPolicyViolationInformer, + nspvInformer kyvernoinformer.NamespacedPolicyViolationInformer) *Generator { gen := Generator{ pvInterface: client.KyvernoV1(), dclient: dclient, - pvLister: pvLister, - nspvLister: nspvLister, + pvLister: pvInformer.Lister(), + pvSynced: pvInformer.Informer().HasSynced, + nspvLister: nspvInformer.Lister(), + nspvSynced: nspvInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), - dataStore: NewDataStore(), + dataStore: newDataStore(), } return &gen } @@ -130,6 +142,10 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { glog.Info("Start policy violation generator") defer glog.Info("Shutting down policy violation generator") + if !cache.WaitForCacheSync(stopCh, gen.pvSynced, gen.nspvSynced) { + glog.Error("event generator: failed to sync informer cache") + } + for i := 0; i < workers; i++ { go wait.Until(gen.runWorker, time.Second, stopCh) } diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 3ddf27904a..15f9889218 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -34,18 +34,24 @@ import ( // WebhookServer contains configured TLS server with MutationWebhook. // MutationWebhook gets policies from policyController and takes control of the cluster with kubeclient. type WebhookServer struct { - server http.Server - client *client.Client - kyvernoClient *kyvernoclient.Clientset - pLister kyvernolister.ClusterPolicyLister - pvLister kyvernolister.ClusterPolicyViolationLister - namespacepvLister kyvernolister.NamespacedPolicyViolationLister - pListerSynced cache.InformerSynced - pvListerSynced cache.InformerSynced - namespacepvListerSynced cache.InformerSynced - rbLister rbaclister.RoleBindingLister - crbLister rbaclister.ClusterRoleBindingLister - eventGen event.Interface + server http.Server + client *client.Client + kyvernoClient *kyvernoclient.Clientset + // list/get cluster policy resource + pLister kyvernolister.ClusterPolicyLister + // returns true if the cluster policy store has synced atleast + pSynced cache.InformerSynced + // list/get role binding resource + rbLister rbaclister.RoleBindingLister + // return true if role bining store has synced atleast once + rbSynced cache.InformerSynced + // list/get cluster role binding resource + crbLister rbaclister.ClusterRoleBindingLister + // return true if cluster role binding store has synced atleast once + crbSynced cache.InformerSynced + // generate events + eventGen event.Interface + // webhook registration client webhookRegistrationClient *webhookconfig.WebhookRegistrationClient // API to send policy stats for aggregation policyStatus policy.PolicyStatusInterface @@ -68,8 +74,6 @@ func NewWebhookServer( client *client.Client, tlsPair *tlsutils.TlsPemPair, pInformer kyvernoinformer.ClusterPolicyInformer, - pvInformer kyvernoinformer.ClusterPolicyViolationInformer, - namespacepvInformer kyvernoinformer.NamespacedPolicyViolationInformer, rbInformer rbacinformer.RoleBindingInformer, crbInformer rbacinformer.ClusterRoleBindingInformer, eventGen event.Interface, @@ -96,17 +100,15 @@ func NewWebhookServer( client: client, kyvernoClient: kyvernoClient, pLister: pInformer.Lister(), - pvLister: pvInformer.Lister(), - namespacepvLister: namespacepvInformer.Lister(), - pListerSynced: pvInformer.Informer().HasSynced, - pvListerSynced: pInformer.Informer().HasSynced, - namespacepvListerSynced: namespacepvInformer.Informer().HasSynced, + pSynced: pInformer.Informer().HasSynced, + rbLister: rbInformer.Lister(), + rbSynced: rbInformer.Informer().HasSynced, + crbLister: crbInformer.Lister(), + crbSynced: crbInformer.Informer().HasSynced, eventGen: eventGen, webhookRegistrationClient: webhookRegistrationClient, policyStatus: policyStatus, configHandler: configHandler, - rbLister: rbInformer.Lister(), - crbLister: crbInformer.Lister(), cleanUp: cleanUp, lastReqTime: checker.NewLastReqTime(), pvGenerator: pvGenerator, @@ -246,6 +248,10 @@ func (ws *WebhookServer) handleAdmissionRequest(request *v1beta1.AdmissionReques // RunAsync TLS server in separate thread and returns control immediately func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) { + if !cache.WaitForCacheSync(stopCh, ws.pSynced, ws.rbSynced, ws.crbSynced) { + glog.Error("webhook: failed to sync informer cache") + } + go func(ws *WebhookServer) { glog.V(3).Infof("serving on %s\n", ws.server.Addr) if err := ws.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed {