1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

refactor: wait for cache sync (#3765)

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-05-02 19:41:39 +02:00 committed by GitHub
parent 05c5f1b340
commit 18af55ed49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 34 additions and 252 deletions

View file

@ -153,7 +153,6 @@ func main() {
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace))
kubedynamicInformer := client.NewDynamicSharedInformerFactory(resyncPeriod)
// load image registry secrets
secrets := strings.Split(imagePullSecrets, ",")
@ -368,7 +367,9 @@ func main() {
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", setupLog)
registerWebhookConfigurations := func() {
certManager.InitTLSPemPair()
webhookCfg.Start()
pInformer.WaitForCacheSync(stopCh)
kubeInformer.WaitForCacheSync(stopCh)
kubeKyvernoInformer.WaitForCacheSync(stopCh)
// validate the ConfigMap format
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace, configData.GetInitConfigMapName()); err != nil {
@ -483,10 +484,18 @@ func main() {
os.Exit(1)
}
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
kubeKyvernoInformer.Start(stopCh)
pInformer.WaitForCacheSync(stopCh)
kubeInformer.WaitForCacheSync(stopCh)
kubeKyvernoInformer.WaitForCacheSync(stopCh)
pCacheController.CheckPolicySync(stopCh)
// init events handlers
// start Kyverno controllers
go le.Run(ctx)
go reportReqGen.Run(2, stopCh)
go configData.Run(stopCh)
go eventGenerator.Run(3, stopCh)
@ -495,12 +504,6 @@ func main() {
go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh)
}
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
kubeKyvernoInformer.Start(stopCh)
kubedynamicInformer.Start(stopCh)
pCacheController.CheckPolicySync(stopCh)
// verifies if the admission control is enabled and active
server.RunAsync(stopCh)

View file

@ -60,18 +60,6 @@ type Controller struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister corelister.NamespaceLister
// pSynced returns true if the cluster policy has been synced at least once
pSynced cache.InformerSynced
// pSynced returns true if the Namespace policy has been synced at least once
npSynced cache.InformerSynced
// urSynced returns true if the update request store has been synced at least once
urSynced cache.InformerSynced
// nsListerSynced returns true if the namespace store has been synced at least once
nsListerSynced cache.InformerSynced
// logger
log logr.Logger
}
@ -103,11 +91,6 @@ func NewController(
c.urLister = urInformer.Lister().UpdateRequests(config.KyvernoNamespace)
c.nsLister = namespaceInformer.Lister()
c.pSynced = pInformer.Informer().HasSynced
c.npSynced = npInformer.Informer().HasSynced
c.urSynced = urInformer.Informer().HasSynced
c.nsListerSynced = namespaceInformer.Informer().HasSynced
return &c, nil
}
@ -234,11 +217,6 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, c.pSynced, c.urSynced, c.npSynced, c.nsListerSynced) {
logger.Info("failed to sync informer cache")
return
}
c.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.deletePolicy, // we only cleanup if the policy is delete
})

View file

@ -63,17 +63,6 @@ type Controller struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister corelister.NamespaceLister
// policySynced returns true if the Cluster policy store has been synced at least once
policySynced cache.InformerSynced
// policySynced returns true if the Namespace policy store has been synced at least once
npolicySynced cache.InformerSynced
// urSynced returns true if the Update Request store has been synced at least once
urSynced cache.InformerSynced
nsSynced cache.InformerSynced
log logr.Logger
Config config.Interface
@ -104,12 +93,6 @@ func NewController(
}
c.statusControl = common.StatusControl{Client: kyvernoClient}
c.policySynced = policyInformer.Informer().HasSynced
c.npolicySynced = npolicyInformer.Informer().HasSynced
c.urSynced = urInformer.Informer().HasSynced
urInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addUR,
UpdateFunc: c.updateUR,
@ -119,9 +102,7 @@ func NewController(
c.policyLister = policyInformer.Lister()
c.npolicyLister = npolicyInformer.Lister()
c.urLister = urInformer.Lister().UpdateRequests(config.KyvernoNamespace)
c.nsLister = namespaceInformer.Lister()
c.nsSynced = namespaceInformer.Informer().HasSynced
return &c, nil
}
@ -132,11 +113,6 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer c.queue.ShutDown()
defer c.log.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, c.policySynced, c.urSynced, c.npolicySynced, c.nsSynced) {
c.log.Info("failed to sync informer cache")
return
}
c.policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: c.updatePolicy, // We only handle updates to policy
// Deletion of policy will be handled by cleanup controller

View file

@ -39,7 +39,6 @@ type ConfigData struct {
restrictDevelopmentUsername []string
webhooks []WebhookConfig
generateSuccessEvents bool
cmSycned cache.InformerSynced
reconcilePolicyReport chan<- bool
updateWebhookConfigurations chan<- bool
}
@ -137,7 +136,6 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI
cd := ConfigData{
client: rclient,
cmName: os.Getenv(cmNameEnv),
cmSycned: cmInformer.Informer().HasSynced,
reconcilePolicyReport: reconcilePolicyReport,
updateWebhookConfigurations: updateWebhookConfigurations,
}
@ -172,10 +170,6 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI
// Run checks syncing
func (cd *ConfigData) Run(stopCh <-chan struct{}) {
// wait for cache to populate first time
if !cache.WaitForCacheSync(stopCh, cd.cmSycned) {
logger.Info("configuration: failed to sync informer cache")
}
}
func (cd *ConfigData) addCM(obj interface{}) {

View file

@ -14,7 +14,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
@ -25,12 +24,8 @@ type Generator struct {
client *client.Client
// list/get cluster policy
cpLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has been synced at least once
cpSynced cache.InformerSynced
// list/get policy
pLister kyvernolister.PolicyLister
// returns true if the policy store has been synced at least once
pSynced cache.InformerSynced
// queue to store event generation requests
queue workqueue.RateLimitingInterface
// events generated at policy controller
@ -55,9 +50,7 @@ func NewEventGenerator(client *client.Client, cpInformer kyvernoinformer.Cluster
gen := Generator{
client: client,
cpLister: cpInformer.Lister(),
cpSynced: cpInformer.Informer().HasSynced,
pLister: pInformer.Lister(),
pSynced: pInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(rateLimiter(), eventWorkQueueName),
policyCtrRecorder: initRecorder(client, PolicyController, log),
admissionCtrRecorder: initRecorder(client, AdmissionController, log),
@ -122,10 +115,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, gen.cpSynced, gen.pSynced) {
logger.Info("failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -76,18 +76,6 @@ type PolicyController struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister listerv1.NamespaceLister
// pListerSynced returns true if the cluster policy store has been synced at least once
pListerSynced cache.InformerSynced
// npListerSynced returns true if the namespace policy store has been synced at least once
npListerSynced cache.InformerSynced
// nsListerSynced returns true if the namespace store has been synced at least once
nsListerSynced cache.InformerSynced
// urListerSynced returns true if the update request store has been synced at least once
urListerSynced cache.InformerSynced
// Resource manager, manages the mapping for already processed resource
rm resourceManager
@ -155,12 +143,6 @@ func NewPolicyController(
pc.nsLister = namespaces.Lister()
pc.urLister = urInformer.Lister()
pc.pListerSynced = pInformer.Informer().HasSynced
pc.npListerSynced = npInformer.Informer().HasSynced
pc.nsListerSynced = namespaces.Informer().HasSynced
pc.urListerSynced = urInformer.Informer().HasSynced
// resource manager
// rebuild after 300 seconds/ 5 mins
pc.rm = NewResourceManager(30)
@ -422,11 +404,6 @@ func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, stopCh <-c
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.npListerSynced, pc.nsListerSynced, pc.urListerSynced) {
logger.Info("failed to sync informer cache")
return
}
pc.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,

View file

@ -20,8 +20,6 @@ import (
// This cache is only used in the admission webhook to fast retrieve
// policies based on types (Mutate/ValidateEnforce/Generate/imageVerify).
type Controller struct {
pSynched cache.InformerSynced
nspSynched cache.InformerSynced
Cache Interface
log logr.Logger
cpolLister kyvernolister.ClusterPolicyLister
@ -55,8 +53,6 @@ func NewPolicyCacheController(
DeleteFunc: pc.deleteNsPolicy,
})
pc.pSynched = pInformer.Informer().HasSynced
pc.nspSynched = nspInformer.Informer().HasSynced
pc.cpolLister = pInformer.Lister()
pc.polLister = nspInformer.Lister()
pc.pCounter = -1
@ -110,11 +106,6 @@ func (c *Controller) CheckPolicySync(stopCh <-chan struct{}) {
logger := c.log
logger.Info("starting")
if !cache.WaitForCacheSync(stopCh, c.pSynched, c.nspSynched) {
logger.Error(nil, "Failed to sync informer cache")
os.Exit(1)
}
policies := []kyverno.PolicyInterface{}
polList, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything())
if err != nil {

View file

@ -58,20 +58,11 @@ type ReportGenerator struct {
reportReqInformer requestinformer.ReportChangeRequestInformer
clusterReportReqInformer requestinformer.ClusterReportChangeRequestInformer
reportLister policyreport.PolicyReportLister
reportSynced cache.InformerSynced
clusterReportLister policyreport.ClusterPolicyReportLister
clusterReportSynced cache.InformerSynced
reportChangeRequestLister requestlister.ReportChangeRequestLister
reportReqSynced cache.InformerSynced
reportLister policyreport.PolicyReportLister
clusterReportLister policyreport.ClusterPolicyReportLister
reportChangeRequestLister requestlister.ReportChangeRequestLister
clusterReportChangeRequestLister requestlister.ClusterReportChangeRequestLister
clusterReportReqSynced cache.InformerSynced
nsLister listerv1.NamespaceLister
nsListerSynced cache.InformerSynced
nsLister listerv1.NamespaceLister
queue workqueue.RateLimitingInterface
@ -106,15 +97,10 @@ func NewReportGenerator(
}
gen.clusterReportLister = clusterReportInformer.Lister()
gen.clusterReportSynced = clusterReportInformer.Informer().HasSynced
gen.reportLister = reportInformer.Lister()
gen.reportSynced = reportInformer.Informer().HasSynced
gen.clusterReportChangeRequestLister = clusterReportReqInformer.Lister()
gen.clusterReportReqSynced = clusterReportReqInformer.Informer().HasSynced
gen.reportChangeRequestLister = reportReqInformer.Lister()
gen.reportReqSynced = reportReqInformer.Informer().HasSynced
gen.nsLister = namespace.Lister()
gen.nsListerSynced = namespace.Informer().HasSynced
return gen, nil
}
@ -241,10 +227,6 @@ func (g *ReportGenerator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, g.reportReqSynced, g.clusterReportReqSynced, g.reportSynced, g.clusterReportSynced, g.nsListerSynced) {
logger.Info("failed to sync informer cache")
}
g.reportReqInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: g.addReportChangeRequest,

View file

@ -19,7 +19,6 @@ import (
"github.com/kyverno/kyverno/pkg/engine/response"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -40,18 +39,6 @@ type Generator struct {
// polLister can list/get namespace policy from the shared informer's store
polLister kyvernolister.PolicyLister
// returns true if the cluster report request store has been synced at least once
reportReqSynced cache.InformerSynced
// returns true if the namespaced report request store has been synced at at least once
clusterReportReqSynced cache.InformerSynced
// cpolListerSynced returns true if the cluster policy store has been synced at least once
cpolListerSynced cache.InformerSynced
// polListerSynced returns true if the namespace policy store has been synced at least once
polListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
dataStore *dataStore
@ -71,13 +58,9 @@ func NewReportChangeRequestGenerator(client *policyreportclient.Clientset,
gen := Generator{
dclient: dclient,
clusterReportChangeRequestLister: clusterReportReqInformer.Lister(),
clusterReportReqSynced: clusterReportReqInformer.Informer().HasSynced,
reportChangeRequestLister: reportReqInformer.Lister(),
reportReqSynced: reportReqInformer.Informer().HasSynced,
cpolLister: cpolInformer.Lister(),
cpolListerSynced: cpolInformer.Informer().HasSynced,
polLister: polInformer.Lister(),
polListerSynced: polInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
requestCreator: newChangeRequestCreator(client, 3*time.Second, log.WithName("requestCreator")),
@ -178,10 +161,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, gen.reportReqSynced, gen.clusterReportReqSynced, gen.cpolListerSynced, gen.polListerSynced) {
logger.Info("failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -116,11 +116,6 @@ func (m *certManager) GetTLSPemPair() (*tls.PemPair, error) {
}
func (m *certManager) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, m.secretInformer.Informer().HasSynced) {
m.log.Info("failed to sync informer cache")
return
}
m.secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.addSecretFunc,
UpdateFunc: m.updateSecretFunc,

View file

@ -51,18 +51,10 @@ type webhookConfigManager struct {
// npLister can list/get namespace policy from the shared informer's store
npLister kyvernolister.PolicyLister
// pListerSynced returns true if the cluster policy store has been synced at least once
pListerSynced cache.InformerSynced
// npListerSynced returns true if the namespace policy store has been synced at least once
npListerSynced cache.InformerSynced
mutateInformer adminformers.MutatingWebhookConfigurationInformer
validateInformer adminformers.ValidatingWebhookConfigurationInformer
mutateLister admlisters.MutatingWebhookConfigurationLister
validateLister admlisters.ValidatingWebhookConfigurationLister
mutateInformerSynced cache.InformerSynced
validateInformerSynced cache.InformerSynced
mutateInformer adminformers.MutatingWebhookConfigurationInformer
validateInformer adminformers.ValidatingWebhookConfigurationInformer
mutateLister admlisters.MutatingWebhookConfigurationLister
validateLister admlisters.ValidatingWebhookConfigurationLister
queue workqueue.RateLimitingInterface
@ -114,14 +106,10 @@ func newWebhookConfigManager(
m.pLister = pInformer.Lister()
m.npLister = npInformer.Lister()
m.pListerSynced = pInformer.Informer().HasSynced
m.npListerSynced = npInformer.Informer().HasSynced
m.mutateInformer = mwcInformer
m.mutateLister = mwcInformer.Lister()
m.mutateInformerSynced = mwcInformer.Informer().HasSynced
m.validateInformer = vwcInformer
m.validateLister = vwcInformer.Lister()
m.validateInformerSynced = vwcInformer.Informer().HasSynced
return m
}
@ -296,11 +284,6 @@ func (m *webhookConfigManager) start() {
m.log.Info("starting")
defer m.log.Info("shutting down")
if !cache.WaitForCacheSync(m.stopCh, m.pListerSynced, m.npListerSynced, m.mutateInformerSynced, m.validateInformerSynced) {
m.log.Info("failed to sync informer cache")
return
}
m.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.addClusterPolicy,
UpdateFunc: m.updateClusterPolicy,

View file

@ -27,7 +27,6 @@ import (
admlisters "k8s.io/client-go/listers/admissionregistration/v1"
listers "k8s.io/client-go/listers/apps/v1"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const (
@ -51,11 +50,6 @@ type Register struct {
vwcLister admlisters.ValidatingWebhookConfigurationLister
kDeplLister listers.DeploymentLister
// sync
mwcListerSynced cache.InformerSynced
vwcListerSynced cache.InformerSynced
kDeplListerSynced cache.InformerSynced
serverIP string // when running outside a cluster
timeoutSeconds int32
log logr.Logger
@ -93,9 +87,6 @@ func NewRegister(
mwcLister: mwcInformer.Lister(),
vwcLister: vwcInformer.Lister(),
kDeplLister: kDeplInformer.Lister(),
mwcListerSynced: mwcInformer.Informer().HasSynced,
vwcListerSynced: vwcInformer.Informer().HasSynced,
kDeplListerSynced: kDeplInformer.Informer().HasSynced,
serverIP: serverIP,
timeoutSeconds: webhookTimeout,
log: log.WithName("Register"),
@ -157,12 +148,6 @@ func (wrc *Register) Register() error {
return nil
}
func (wrc *Register) Start() {
if !cache.WaitForCacheSync(wrc.stopCh, wrc.mwcListerSynced, wrc.vwcListerSynced, wrc.kDeplListerSynced) {
wrc.log.Info("failed to sync kyverno deployment informer cache")
}
}
// Check returns an error if any of the webhooks are not configured
func (wrc *Register) Check() error {
if _, err := wrc.mwcLister.Get(wrc.getVerifyWebhookMutatingWebhookName()); err != nil {

View file

@ -35,7 +35,6 @@ import (
rbacinformer "k8s.io/client-go/informers/rbac/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
rbaclister "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
)
// WebhookServer contains configured TLS server with MutationWebhook.
@ -47,12 +46,6 @@ type WebhookServer struct {
// urLister can list/get update requests from the shared informer's store
urLister urlister.UpdateRequestNamespaceLister
// urSynced returns true if the Update Request store has been synced at least once
urSynced cache.InformerSynced
// returns true if the cluster policy store has synced atleast
pSynced cache.InformerSynced
// list/get role binding resource
rbLister rbaclister.RoleBindingLister
@ -62,21 +55,9 @@ type WebhookServer struct {
// list/get role binding resource
crLister rbaclister.ClusterRoleLister
// return true if role bining store has synced atleast once
rbSynced cache.InformerSynced
// return true if role store has synced atleast once
rSynced cache.InformerSynced
// list/get cluster role binding resource
crbLister rbaclister.ClusterRoleBindingLister
// return true if cluster role binding store has synced atleast once
crbSynced cache.InformerSynced
// return true if cluster role store has synced atleast once
crSynced cache.InformerSynced
// generate events
eventGen event.Interface
@ -103,9 +84,6 @@ type WebhookServer struct {
nsLister listerv1.NamespaceLister
// nsListerSynced returns true if the namespace store has been synced at least once
nsListerSynced cache.InformerSynced
auditHandler AuditHandler
log logr.Logger
@ -157,18 +135,11 @@ func NewWebhookServer(
client: client,
kyvernoClient: kyvernoClient,
urLister: urInformer.Lister().UpdateRequests(config.KyvernoNamespace),
urSynced: urInformer.Informer().HasSynced,
pSynced: pInformer.Informer().HasSynced,
rbLister: rbInformer.Lister(),
rbSynced: rbInformer.Informer().HasSynced,
rLister: rInformer.Lister(),
rSynced: rInformer.Informer().HasSynced,
nsLister: namespace.Lister(),
nsListerSynced: namespace.Informer().HasSynced,
crbLister: crbInformer.Lister(),
crLister: crInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
crSynced: crInformer.Informer().HasSynced,
eventGen: eventGen,
pCache: pCache,
webhookRegister: webhookRegistrationClient,
@ -255,9 +226,6 @@ func (ws *WebhookServer) buildPolicyContext(request *admissionv1.AdmissionReques
// RunAsync TLS server in separate thread and returns control immediately
func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, ws.urSynced, ws.pSynced, ws.rbSynced, ws.crbSynced, ws.rSynced, ws.crSynced) {
ws.log.Info("failed to sync informer cache")
}
go func() {
ws.log.V(3).Info("started serving requests", "addr", ws.server.Addr)
if err := ws.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed {

View file

@ -37,7 +37,6 @@ type Generator struct {
log logr.Logger
urLister urkyvernolister.UpdateRequestNamespaceLister
urSynced cache.InformerSynced
}
// NewGenerator returns a new instance of UpdateRequest resource generator
@ -47,7 +46,6 @@ func NewGenerator(client *kyvernoclient.Clientset, urInformer urkyvernoinformer.
stopCh: stopCh,
log: log,
urLister: urInformer.Lister().UpdateRequests(config.KyvernoNamespace),
urSynced: urInformer.Informer().HasSynced,
}
return gen
}
@ -75,11 +73,6 @@ func (g *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.V(4).Info("shutting down")
}()
if !cache.WaitForCacheSync(stopCh, g.urSynced) {
logger.Info("failed to sync informer cache")
return
}
<-g.stopCh
}

View file

@ -24,7 +24,6 @@ import (
rbacinformer "k8s.io/client-go/informers/rbac/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
rbaclister "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -49,12 +48,9 @@ type auditHandler struct {
eventGen event.Interface
prGenerator policyreport.GeneratorInterface
rbLister rbaclister.RoleBindingLister
rbSynced cache.InformerSynced
crbLister rbaclister.ClusterRoleBindingLister
crbSynced cache.InformerSynced
nsLister listerv1.NamespaceLister
nsListerSynced cache.InformerSynced
rbLister rbaclister.RoleBindingLister
crbLister rbaclister.ClusterRoleBindingLister
nsLister listerv1.NamespaceLister
log logr.Logger
configHandler config.Interface
@ -74,20 +70,17 @@ func NewValidateAuditHandler(pCache policycache.Interface,
promConfig *metrics.PromConfig) AuditHandler {
return &auditHandler{
pCache: pCache,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
eventGen: eventGen,
rbLister: rbInformer.Lister(),
rbSynced: rbInformer.Informer().HasSynced,
crbLister: crbInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
nsLister: namespaces.Lister(),
nsListerSynced: namespaces.Informer().HasSynced,
log: log,
prGenerator: prGenerator,
configHandler: dynamicConfig,
client: client,
promConfig: promConfig,
pCache: pCache,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
eventGen: eventGen,
rbLister: rbInformer.Lister(),
crbLister: crbInformer.Lister(),
nsLister: namespaces.Lister(),
log: log,
prGenerator: prGenerator,
configHandler: dynamicConfig,
client: client,
promConfig: promConfig,
}
}
@ -104,10 +97,6 @@ func (h *auditHandler) Run(workers int, stopCh <-chan struct{}) {
h.log.V(4).Info("shutting down")
}()
if !cache.WaitForCacheSync(stopCh, h.rbSynced, h.crbSynced) {
h.log.Info("failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(h.runWorker, time.Second, stopCh)
}