1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-15 17:51:20 +00:00

Added Kyverno specific SharedInformerFactory (#2987)

* Added Kyverno specific SharedInformerFactory

Signed-off-by: Kumar Mallikarjuna <kumar@nirmata.com>

* Replace ToUnstructured()

Signed-off-by: Kumar Mallikarjuna <kumar@nirmata.com>

* Add GVK to returned resource

Signed-off-by: Kumar Mallikarjuna <kumar@nirmata.com>

Co-authored-by: shuting <shutting06@gmail.com>
This commit is contained in:
Kumar Mallikarjuna 2022-01-18 21:22:48 +05:30 committed by GitHub
parent 421e6d9622
commit 771d62b735
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 21 deletions

View file

@ -167,6 +167,7 @@ func main() {
}
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace))
kubedynamicInformer := client.NewDynamicSharedInformerFactory(resyncPeriod)
rCache, err := resourcecache.NewResourceCache(client, kubedynamicInformer, log.Log.WithName("resourcecache"))
@ -238,6 +239,8 @@ func main() {
client,
pclient,
rCache,
kubeKyvernoInformer.Apps().V1().Deployments(),
kubeInformer.Core().V1().Namespaces(),
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().Policies(),
serverIP,
@ -259,7 +262,7 @@ func main() {
// if the configMap is update, the configuration will be updated :D
configData := config.NewConfigData(
kubeClient,
kubeInformer.Core().V1().ConfigMaps(),
kubeKyvernoInformer.Core().V1().ConfigMaps(),
filterK8sResources,
excludeGroupRole,
excludeUsername,
@ -379,7 +382,7 @@ func main() {
certRenewer := ktls.NewCertRenewer(client, clientConfig, ktls.CertRenewalInterval, ktls.CertValidityDuration, serverIP, log.Log.WithName("CertRenewer"))
certManager, err := webhookconfig.NewCertManager(
kubeInformer.Core().V1().Secrets(),
kubeKyvernoInformer.Core().V1().Secrets(),
kubeClient,
certRenewer,
log.Log.WithName("CertManager"),
@ -394,6 +397,7 @@ func main() {
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, setupLog)
registerWebhookConfigurations := func() {
certManager.InitTLSPemPair()
webhookCfg.Start()
// validate the ConfigMap format
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace, configData.GetInitConfigMapName()); err != nil {
@ -527,6 +531,7 @@ func main() {
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
kubeKyvernoInformer.Start(stopCh)
kubedynamicInformer.Start(stopCh)
// verifies if the admission control is enabled and active

View file

@ -1,6 +1,7 @@
package webhookconfig
import (
"encoding/json"
"errors"
"io/ioutil"
"path/filepath"
@ -13,7 +14,7 @@ import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
rest "k8s.io/client-go/rest"
)
@ -92,16 +93,21 @@ func (wrc *Register) GetKubePolicyClusterRoleName() (*unstructured.Unstructured,
// GetKubePolicyDeployment gets Kyverno deployment using the resource cache
// it does not initialize any client call
func (wrc *Register) GetKubePolicyDeployment() (*apps.Deployment, *unstructured.Unstructured, error) {
lister, _ := wrc.resCache.GetGVRCache("Deployment")
kubePolicyDeployment, err := lister.NamespacedLister(config.KyvernoNamespace).Get(config.KyvernoDeploymentName)
deploy, err := wrc.kDeplLister.Deployments(config.KyvernoNamespace).Get(config.KyvernoDeploymentName)
if err != nil {
return nil, nil, err
}
deploy := apps.Deployment{}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(kubePolicyDeployment.UnstructuredContent(), &deploy); err != nil {
return nil, kubePolicyDeployment, err
deploy.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "apps/v1", Kind: "Deployment"})
kubePolicyDeployment := unstructured.Unstructured{}
rawDepl, err := json.Marshal(deploy)
if err != nil {
return nil, nil, err
}
return &deploy, kubePolicyDeployment, nil
err = json.Unmarshal(rawDepl, &kubePolicyDeployment.Object)
if err != nil {
return deploy, nil, err
}
return deploy, &kubePolicyDeployment, nil
}
// debug mutating webhook

View file

@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informers "k8s.io/client-go/informers/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -75,6 +77,9 @@ type webhookConfigManager struct {
stopCh <-chan struct{}
log logr.Logger
nsLister listers.NamespaceLister
nsListerSynced func() bool
}
type manage interface {
@ -87,6 +92,7 @@ func newWebhookConfigManager(
pInformer kyvernoinformer.ClusterPolicyInformer,
npInformer kyvernoinformer.PolicyInformer,
resCache resourcecache.ResourceCache,
nsInformer informers.NamespaceInformer,
serverIP string,
autoUpdateWebhooks bool,
createDefaultWebhook chan<- string,
@ -111,6 +117,9 @@ func newWebhookConfigManager(
m.pLister = pInformer.Lister()
m.npLister = npInformer.Lister()
m.nsLister = nsInformer.Lister()
m.nsListerSynced = nsInformer.Informer().HasSynced
m.pListerSynced = pInformer.Informer().HasSynced
m.npListerSynced = npInformer.Informer().HasSynced
@ -265,16 +274,7 @@ func (m *webhookConfigManager) enqueueAllPolicies() {
logger.V(4).Info("added CLusterPolicy to the queue", "name", cpol.GetName())
}
nsCache, ok := m.resCache.GetGVRCache("Namespace")
if !ok {
nsCache, err = m.resCache.CreateGVKInformer("Namespace")
if err != nil {
logger.Error(err, "unabled to create Namespace listser")
return
}
}
namespaces, err := nsCache.Lister().List(labels.Everything())
namespaces, err := m.nsLister.List(labels.Everything())
if err != nil {
logger.Error(err, "unabled to list namespaces")
return
@ -311,7 +311,7 @@ func (m *webhookConfigManager) start() {
m.log.Info("starting")
defer m.log.Info("shutting down")
if !cache.WaitForCacheSync(m.stopCh, m.pListerSynced, m.npListerSynced, m.mutateInformerSynced, m.validateInformerSynced) {
if !cache.WaitForCacheSync(m.stopCh, m.pListerSynced, m.npListerSynced, m.mutateInformerSynced, m.validateInformerSynced, m.nsListerSynced) {
m.log.Info("failed to sync informer cache")
return
}

View file

@ -22,7 +22,11 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
informers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
listers "k8s.io/client-go/listers/apps/v1"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const (
@ -45,10 +49,14 @@ type Register struct {
log logr.Logger
debug bool
autoUpdateWebhooks bool
stopCh <-chan struct{}
UpdateWebhookChan chan bool
createDefaultWebhook chan string
kDeplLister listers.DeploymentLister
kDeplListerSynced func() bool
// manage implements methods to manage webhook configurations
manage
}
@ -59,6 +67,8 @@ func NewRegister(
client *client.Client,
kyvernoClient *kyvernoclient.Clientset,
resCache resourcecache.ResourceCache,
kDeplInformer informers.DeploymentInformer,
nsInformer coreinformers.NamespaceInformer,
pInformer kyvernoinformer.ClusterPolicyInformer,
npInformer kyvernoinformer.PolicyInformer,
serverIP string,
@ -78,9 +88,12 @@ func NewRegister(
autoUpdateWebhooks: autoUpdateWebhooks,
UpdateWebhookChan: make(chan bool),
createDefaultWebhook: make(chan string),
kDeplLister: kDeplInformer.Lister(),
kDeplListerSynced: kDeplInformer.Informer().HasSynced,
stopCh: stopCh,
}
register.manage = newWebhookConfigManager(client, kyvernoClient, pInformer, npInformer, resCache, serverIP, register.autoUpdateWebhooks, register.createDefaultWebhook, stopCh, log.WithName("WebhookConfigManager"))
register.manage = newWebhookConfigManager(client, kyvernoClient, pInformer, npInformer, resCache, nsInformer, serverIP, register.autoUpdateWebhooks, register.createDefaultWebhook, stopCh, log.WithName("WebhookConfigManager"))
return register
}
@ -132,6 +145,12 @@ func (wrc *Register) Register() error {
return nil
}
func (wrc *Register) Start() {
if !cache.WaitForCacheSync(wrc.stopCh, wrc.kDeplListerSynced) {
wrc.log.Info("failed to sync kyverno deployment informer cache")
}
}
// Check returns an error if any of the webhooks are not configured
func (wrc *Register) Check() error {
mutatingCache, _ := wrc.resCache.GetGVRCache(kindMutating)