1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-04-15 08:46:36 +00:00

fix: cancel context for proper shutdown in reports-controller (#10415)

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2024-06-10 11:11:07 +02:00 committed by GitHub
parent 954245ae78
commit a0932cf734
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 811 additions and 803 deletions

View file

@ -121,140 +121,142 @@ func main() {
)
// parse flags
internal.ParseFlags(appConfig)
// setup
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-background-controller", false)
defer sdown()
var err error
bgscanInterval := time.Hour
val := os.Getenv("BACKGROUND_SCAN_INTERVAL")
if val != "" {
if bgscanInterval, err = time.ParseDuration(val); err != nil {
setup.Logger.Error(err, "failed to set the background scan interval")
var wg sync.WaitGroup
func() {
// setup
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-background-controller", false)
defer sdown()
var err error
bgscanInterval := time.Hour
val := os.Getenv("BACKGROUND_SCAN_INTERVAL")
if val != "" {
if bgscanInterval, err = time.ParseDuration(val); err != nil {
setup.Logger.Error(err, "failed to set the background scan interval")
os.Exit(1)
}
}
setup.Logger.V(2).Info("setting the background scan interval", "value", bgscanInterval.String())
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
}
setup.Logger.V(2).Info("setting the background scan interval", "value", bgscanInterval.String())
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
// informer factories
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
var wg sync.WaitGroup
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.KyvernoClient,
gcstore,
// informer factories
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
) // this controller only subscribe to events, nothing is returned...
policymetricscontroller.NewController(
setup.MetricsManager,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
&wg,
)
engine := internal.NewEngine(
signalCtx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-background-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
// create leader controllers
leaderControllers, err := createrLeaderControllers(
engine,
genWorkers,
kubeInformer,
kyvernoInformer,
setup.KyvernoClient,
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.Configuration,
setup.MetricsManager,
setup.KyvernoClient,
gcstore,
eventGenerator,
setup.Jp,
bgscanInterval,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
os.Exit(1)
}
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, logger, kyvernoInformer, kubeInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
gceController.Run(signalCtx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(signalCtx, setup.Logger, &wg)
}
// start leader election
le.Run(signalCtx)
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
) // this controller only subscribe to events, nothing is returned...
policymetricscontroller.NewController(
setup.MetricsManager,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
&wg,
)
engine := internal.NewEngine(
signalCtx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-background-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
// create leader controllers
leaderControllers, err := createrLeaderControllers(
engine,
genWorkers,
kubeInformer,
kyvernoInformer,
setup.KyvernoClient,
setup.KyvernoDynamicClient,
setup.Configuration,
setup.MetricsManager,
eventGenerator,
setup.Jp,
bgscanInterval,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
os.Exit(1)
}
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, logger, kyvernoInformer, kubeInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
gceController.Run(signalCtx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(signalCtx, setup.Logger, &wg)
}
// start leader election
le.Run(signalCtx)
}()
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -109,269 +109,271 @@ func main() {
)
// parse flags
internal.ParseFlags(appConfig)
// setup
ctx, setup, sdown := internal.Setup(appConfig, "kyverno-cleanup-controller", false)
defer sdown()
if caSecretName == "" {
setup.Logger.Error(errors.New("exiting... caSecretName is a required flag"), "exiting... caSecretName is a required flag")
os.Exit(1)
}
if tlsSecretName == "" {
setup.Logger.Error(errors.New("exiting... tlsSecretName is a required flag"), "exiting... tlsSecretName is a required flag")
os.Exit(1)
}
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
// certificates informers
caSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), caSecretName, resyncPeriod)
tlsSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), tlsSecretName, resyncPeriod)
if !informers.StartInformersAndWaitForCacheSync(ctx, setup.Logger, caSecret, tlsSecret) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
checker := checker.NewSelfChecker(setup.KubeClient.AuthorizationV1().SelfSubjectAccessReviews())
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
var wg sync.WaitGroup
// listers
nsLister := kubeInformer.Core().V1().Namespaces().Lister()
// log policy changes
genericloggingcontroller.NewController(
setup.Logger.WithName("cleanup-policy"),
"CleanupPolicy",
kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(),
genericloggingcontroller.CheckGeneration,
)
genericloggingcontroller.NewController(
setup.Logger.WithName("cluster-cleanup-policy"),
"ClusterCleanupPolicy",
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
genericloggingcontroller.CheckGeneration,
)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.KyvernoClient,
gcstore,
func() {
// setup
ctx, setup, sdown := internal.Setup(appConfig, "kyverno-cleanup-controller", false)
defer sdown()
if caSecretName == "" {
setup.Logger.Error(errors.New("exiting... caSecretName is a required flag"), "exiting... caSecretName is a required flag")
os.Exit(1)
}
if tlsSecretName == "" {
setup.Logger.Error(errors.New("exiting... tlsSecretName is a required flag"), "exiting... tlsSecretName is a required flag")
os.Exit(1)
}
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
// certificates informers
caSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), caSecretName, resyncPeriod)
tlsSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), tlsSecretName, resyncPeriod)
if !informers.StartInformersAndWaitForCacheSync(ctx, setup.Logger, caSecret, tlsSecret) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
checker := checker.NewSelfChecker(setup.KubeClient.AuthorizationV1().SelfSubjectAccessReviews())
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
// listers
nsLister := kubeInformer.Core().V1().Namespaces().Lister()
// log policy changes
genericloggingcontroller.NewController(
setup.Logger.WithName("cleanup-policy"),
"CleanupPolicy",
kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(),
genericloggingcontroller.CheckGeneration,
)
genericloggingcontroller.NewController(
setup.Logger.WithName("cluster-cleanup-policy"),
"ClusterCleanupPolicy",
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
genericloggingcontroller.CheckGeneration,
)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kubeInformer, kyvernoInformer) {
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-cleanup-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.KyvernoClient,
gcstore,
eventGenerator,
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kubeInformer, kyvernoInformer) {
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-cleanup-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
cmResolver := internal.NewConfigMapResolver(ctx, setup.Logger, setup.KubeClient, resyncPeriod)
cmResolver := internal.NewConfigMapResolver(ctx, setup.Logger, setup.KubeClient, resyncPeriod)
// controllers
renewer := tls.NewCertRenewer(
setup.KubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
tls.CertRenewalInterval,
tls.CAValidityDuration,
tls.TLSValidityDuration,
renewBefore,
serverIP,
config.KyvernoServiceName(),
config.DnsNames(config.KyvernoServiceName(), config.KyvernoNamespace()),
config.KyvernoNamespace(),
caSecretName,
tlsSecretName,
)
certController := internal.NewController(
certmanager.ControllerName,
certmanager.NewController(
caSecret,
tlsSecret,
renewer,
// controllers
renewer := tls.NewCertRenewer(
setup.KubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
tls.CertRenewalInterval,
tls.CAValidityDuration,
tls.TLSValidityDuration,
renewBefore,
serverIP,
config.KyvernoServiceName(),
config.DnsNames(config.KyvernoServiceName(), config.KyvernoNamespace()),
config.KyvernoNamespace(),
caSecretName,
tlsSecretName,
config.KyvernoNamespace(),
),
certmanager.Workers,
)
policyValidatingWebhookController := internal.NewController(
policyWebhookControllerName,
genericwebhookcontroller.NewController(
)
certController := internal.NewController(
certmanager.ControllerName,
certmanager.NewController(
caSecret,
tlsSecret,
renewer,
caSecretName,
tlsSecretName,
config.KyvernoNamespace(),
),
certmanager.Workers,
)
policyValidatingWebhookController := internal.NewController(
policyWebhookControllerName,
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
caSecret,
config.CleanupValidatingWebhookConfigurationName,
config.CleanupValidatingWebhookServicePath,
serverIP,
int32(servicePort),
int32(webhookServerPort),
nil,
[]admissionregistrationv1.RuleWithOperations{
{
Rule: admissionregistrationv1.Rule{
APIGroups: []string{"kyverno.io"},
APIVersions: []string{"v2alpha1"},
Resources: []string{
"cleanuppolicies/*",
"clustercleanuppolicies/*",
genericwebhookcontroller.NewController(
policyWebhookControllerName,
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
caSecret,
config.CleanupValidatingWebhookConfigurationName,
config.CleanupValidatingWebhookServicePath,
serverIP,
int32(servicePort),
int32(webhookServerPort),
nil,
[]admissionregistrationv1.RuleWithOperations{
{
Rule: admissionregistrationv1.Rule{
APIGroups: []string{"kyverno.io"},
APIVersions: []string{"v2alpha1"},
Resources: []string{
"cleanuppolicies/*",
"clustercleanuppolicies/*",
},
},
Operations: []admissionregistrationv1.OperationType{
admissionregistrationv1.Create,
admissionregistrationv1.Update,
},
},
Operations: []admissionregistrationv1.OperationType{
admissionregistrationv1.Create,
admissionregistrationv1.Update,
},
},
},
genericwebhookcontroller.Fail,
genericwebhookcontroller.None,
setup.Configuration,
caSecretName,
),
webhookWorkers,
)
ttlWebhookController := internal.NewController(
ttlWebhookControllerName,
genericwebhookcontroller.NewController(
genericwebhookcontroller.Fail,
genericwebhookcontroller.None,
setup.Configuration,
caSecretName,
),
webhookWorkers,
)
ttlWebhookController := internal.NewController(
ttlWebhookControllerName,
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
caSecret,
config.TtlValidatingWebhookConfigurationName,
config.TtlValidatingWebhookServicePath,
serverIP,
int32(servicePort),
int32(webhookServerPort),
&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
genericwebhookcontroller.NewController(
ttlWebhookControllerName,
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
caSecret,
config.TtlValidatingWebhookConfigurationName,
config.TtlValidatingWebhookServicePath,
serverIP,
int32(servicePort),
int32(webhookServerPort),
&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: kyverno.LabelCleanupTtl,
Operator: metav1.LabelSelectorOpExists,
},
},
},
[]admissionregistrationv1.RuleWithOperations{
{
Key: kyverno.LabelCleanupTtl,
Operator: metav1.LabelSelectorOpExists,
Rule: admissionregistrationv1.Rule{
APIGroups: []string{"*"},
APIVersions: []string{"*"},
Resources: []string{"*"},
},
Operations: []admissionregistrationv1.OperationType{
admissionregistrationv1.Create,
admissionregistrationv1.Update,
},
},
},
},
[]admissionregistrationv1.RuleWithOperations{
{
Rule: admissionregistrationv1.Rule{
APIGroups: []string{"*"},
APIVersions: []string{"*"},
Resources: []string{"*"},
},
Operations: []admissionregistrationv1.OperationType{
admissionregistrationv1.Create,
admissionregistrationv1.Update,
},
},
},
genericwebhookcontroller.Ignore,
genericwebhookcontroller.None,
setup.Configuration,
caSecretName,
),
webhookWorkers,
)
cleanupController := internal.NewController(
cleanup.ControllerName,
cleanup.NewController(
setup.KyvernoDynamicClient,
setup.KyvernoClient,
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(),
nsLister,
setup.Configuration,
cmResolver,
setup.Jp,
eventGenerator,
gcstore,
),
cleanup.Workers,
)
ttlManagerController := internal.NewController(
ttlcontroller.ControllerName,
ttlcontroller.NewManager(
setup.MetadataClient,
setup.KubeClient.Discovery(),
checker,
interval,
),
ttlcontroller.Workers,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, logger, kyvernoInformer, kubeInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
certController.Run(ctx, logger, &wg)
policyValidatingWebhookController.Run(ctx, logger, &wg)
ttlWebhookController.Run(ctx, logger, &wg)
cleanupController.Run(ctx, logger, &wg)
ttlManagerController.Run(ctx, logger, &wg)
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// create handlers
policyHandlers := policyhandlers.New(setup.KyvernoDynamicClient)
resourceHandlers := resourcehandlers.New(checker)
// create server
server := NewServer(
func() ([]byte, []byte, error) {
secret, err := tlsSecret.Lister().Secrets(config.KyvernoNamespace()).Get(tlsSecretName)
if err != nil {
return nil, nil, err
}
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
},
policyHandlers.Validate,
resourceHandlers.Validate,
setup.MetricsManager,
webhooks.DebugModeOptions{
DumpPayload: dumpPayload,
},
probes{},
setup.Configuration,
)
// start server
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
gceController.Run(ctx, setup.Logger, &wg)
// start leader election
le.Run(ctx)
genericwebhookcontroller.Ignore,
genericwebhookcontroller.None,
setup.Configuration,
caSecretName,
),
webhookWorkers,
)
cleanupController := internal.NewController(
cleanup.ControllerName,
cleanup.NewController(
setup.KyvernoDynamicClient,
setup.KyvernoClient,
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(),
nsLister,
setup.Configuration,
cmResolver,
setup.Jp,
eventGenerator,
gcstore,
),
cleanup.Workers,
)
ttlManagerController := internal.NewController(
ttlcontroller.ControllerName,
ttlcontroller.NewManager(
setup.MetadataClient,
setup.KubeClient.Discovery(),
checker,
interval,
),
ttlcontroller.Workers,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, logger, kyvernoInformer, kubeInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
certController.Run(ctx, logger, &wg)
policyValidatingWebhookController.Run(ctx, logger, &wg)
ttlWebhookController.Run(ctx, logger, &wg)
cleanupController.Run(ctx, logger, &wg)
ttlManagerController.Run(ctx, logger, &wg)
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// create handlers
policyHandlers := policyhandlers.New(setup.KyvernoDynamicClient)
resourceHandlers := resourcehandlers.New(checker)
// create server
server := NewServer(
func() ([]byte, []byte, error) {
secret, err := tlsSecret.Lister().Secrets(config.KyvernoNamespace()).Get(tlsSecretName)
if err != nil {
return nil, nil, err
}
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
},
policyHandlers.Validate,
resourceHandlers.Validate,
setup.MetricsManager,
webhooks.DebugModeOptions{
DumpPayload: dumpPayload,
},
probes{},
setup.Configuration,
)
// start server
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
gceController.Run(ctx, setup.Logger, &wg)
// start leader election
le.Run(ctx)
}()
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -294,295 +294,297 @@ func main() {
)
// parse flags
internal.ParseFlags(appConfig)
// setup
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-admission-controller", false)
defer sdown()
if caSecretName == "" {
setup.Logger.Error(errors.New("exiting... caSecretName is a required flag"), "exiting... caSecretName is a required flag")
os.Exit(1)
}
if tlsSecretName == "" {
setup.Logger.Error(errors.New("exiting... tlsSecretName is a required flag"), "exiting... tlsSecretName is a required flag")
os.Exit(1)
}
// check if validating admission policies are registered in the API server
generateValidatingAdmissionPolicy := toggle.FromContext(context.TODO()).GenerateValidatingAdmissionPolicy()
if generateValidatingAdmissionPolicy {
registered, err := validatingadmissionpolicy.IsValidatingAdmissionPolicyRegistered(setup.KubeClient)
if !registered {
setup.Logger.Error(err, "ValidatingAdmissionPolicies isn't supported in the API server")
os.Exit(1)
}
}
caSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), caSecretName, resyncPeriod)
tlsSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), tlsSecretName, resyncPeriod)
if !informers.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, caSecret, tlsSecret) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// show version
showWarnings(signalCtx, setup.Logger)
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
// check we can run
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
var wg sync.WaitGroup
certRenewer := tls.NewCertRenewer(
setup.KubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
tls.CertRenewalInterval,
tls.CAValidityDuration,
tls.TLSValidityDuration,
renewBefore,
serverIP,
config.KyvernoServiceName(),
config.DnsNames(config.KyvernoServiceName(), config.KyvernoNamespace()),
config.KyvernoNamespace(),
caSecretName,
tlsSecretName,
)
policyCache := policycache.NewCache()
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.KyvernoClient,
gcstore,
eventGenerator,
maxAPICallResponseLength,
true,
),
globalcontextcontroller.Workers,
)
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// this controller only subscribe to events, nothing is returned...
policymetricscontroller.NewController(
setup.MetricsManager,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
&wg,
)
// log policy changes
genericloggingcontroller.NewController(
setup.Logger.WithName("policy"),
"Policy",
kyvernoInformer.Kyverno().V1().Policies(),
genericloggingcontroller.CheckGeneration,
)
genericloggingcontroller.NewController(
setup.Logger.WithName("cluster-policy"),
"ClusterPolicy",
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
genericloggingcontroller.CheckGeneration,
)
runtime := runtimeutils.NewRuntime(
setup.Logger.WithName("runtime-checks"),
serverIP,
kubeKyvernoInformer.Apps().V1().Deployments(),
certRenewer,
)
// engine
engine := internal.NewEngine(
signalCtx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// create non leader controllers
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
kyvernoInformer,
setup.KyvernoDynamicClient,
policyCache,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// bootstrap non leader controllers
if nonLeaderBootstrap != nil {
if err := nonLeaderBootstrap(signalCtx); err != nil {
setup.Logger.Error(err, "failed to bootstrap non leader controllers")
func() {
// setup
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-admission-controller", false)
defer sdown()
if caSecretName == "" {
setup.Logger.Error(errors.New("exiting... caSecretName is a required flag"), "exiting... caSecretName is a required flag")
os.Exit(1)
}
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
// create leader controllers
leaderControllers, warmup, err := createrLeaderControllers(
generateValidatingAdmissionPolicy,
admissionReports,
serverIP,
webhookTimeout,
autoUpdateWebhooks,
kubeInformer,
kubeKyvernoInformer,
kyvernoInformer,
caSecret,
tlsSecret,
setup.KubeClient,
setup.KyvernoClient,
if tlsSecretName == "" {
setup.Logger.Error(errors.New("exiting... tlsSecretName is a required flag"), "exiting... tlsSecretName is a required flag")
os.Exit(1)
}
// check if validating admission policies are registered in the API server
generateValidatingAdmissionPolicy := toggle.FromContext(context.TODO()).GenerateValidatingAdmissionPolicy()
if generateValidatingAdmissionPolicy {
registered, err := validatingadmissionpolicy.IsValidatingAdmissionPolicyRegistered(setup.KubeClient)
if !registered {
setup.Logger.Error(err, "ValidatingAdmissionPolicies isn't supported in the API server")
os.Exit(1)
}
}
caSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), caSecretName, resyncPeriod)
tlsSecret := informers.NewSecretInformer(setup.KubeClient, config.KyvernoNamespace(), tlsSecretName, resyncPeriod)
if !informers.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, caSecret, tlsSecret) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// show version
showWarnings(signalCtx, setup.Logger)
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
// check we can run
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
certRenewer := tls.NewCertRenewer(
setup.KubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
tls.CertRenewalInterval,
tls.CAValidityDuration,
tls.TLSValidityDuration,
renewBefore,
serverIP,
config.KyvernoServiceName(),
config.DnsNames(config.KyvernoServiceName(), config.KyvernoNamespace()),
config.KyvernoNamespace(),
caSecretName,
tlsSecretName,
)
policyCache := policycache.NewCache()
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
certRenewer,
runtime,
int32(servicePort),
int32(webhookServerPort),
setup.Configuration,
setup.KyvernoClient,
gcstore,
eventGenerator,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
maxAPICallResponseLength,
true,
),
globalcontextcontroller.Workers,
)
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// this controller only subscribe to events, nothing is returned...
policymetricscontroller.NewController(
setup.MetricsManager,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
&wg,
)
// log policy changes
genericloggingcontroller.NewController(
setup.Logger.WithName("policy"),
"Policy",
kyvernoInformer.Kyverno().V1().Policies(),
genericloggingcontroller.CheckGeneration,
)
genericloggingcontroller.NewController(
setup.Logger.WithName("cluster-policy"),
"ClusterPolicy",
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
genericloggingcontroller.CheckGeneration,
)
runtime := runtimeutils.NewRuntime(
setup.Logger.WithName("runtime-checks"),
serverIP,
kubeKyvernoInformer.Apps().V1().Deployments(),
certRenewer,
)
// engine
engine := internal.NewEngine(
signalCtx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// create non leader controllers
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
kyvernoInformer,
setup.KyvernoDynamicClient,
policyCache,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// bootstrap non leader controllers
if nonLeaderBootstrap != nil {
if err := nonLeaderBootstrap(signalCtx); err != nil {
setup.Logger.Error(err, "failed to bootstrap non leader controllers")
os.Exit(1)
}
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
if warmup != nil {
if err := warmup(ctx); err != nil {
logger.Error(err, "failed to run warmup")
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
// create leader controllers
leaderControllers, warmup, err := createrLeaderControllers(
generateValidatingAdmissionPolicy,
admissionReports,
serverIP,
webhookTimeout,
autoUpdateWebhooks,
kubeInformer,
kubeKyvernoInformer,
kyvernoInformer,
caSecret,
tlsSecret,
setup.KubeClient,
setup.KyvernoClient,
setup.KyvernoDynamicClient,
certRenewer,
runtime,
int32(servicePort),
int32(webhookServerPort),
setup.Configuration,
eventGenerator,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
os.Exit(1)
}
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// create webhooks server
urgen := webhookgenerate.NewGenerator(
setup.KyvernoClient,
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
)
policyHandlers := webhookspolicy.NewHandlers(
setup.KyvernoDynamicClient,
setup.KyvernoClient,
backgroundServiceAccountName,
)
resourceHandlers := webhooksresource.NewHandlers(
engine,
setup.KyvernoDynamicClient,
setup.KyvernoClient,
setup.Configuration,
setup.MetricsManager,
policyCache,
kubeInformer.Core().V1().Namespaces().Lister(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
urgen,
eventGenerator,
admissionReports,
backgroundServiceAccountName,
setup.Jp,
maxAuditWorkers,
maxAuditCapacity,
)
exceptionHandlers := webhooksexception.NewHandlers(exception.ValidationOptions{
Enabled: internal.PolicyExceptionEnabled(),
Namespace: internal.ExceptionNamespace(),
})
globalContextHandlers := webhooksglobalcontext.NewHandlers(globalcontext.ValidationOptions{
Enabled: internal.PolicyExceptionEnabled(),
})
server := webhooks.NewServer(
signalCtx,
policyHandlers,
resourceHandlers,
exceptionHandlers,
globalContextHandlers,
setup.Configuration,
setup.MetricsManager,
webhooks.DebugModeOptions{
DumpPayload: dumpPayload,
},
func() ([]byte, []byte, error) {
secret, err := tlsSecret.Lister().Secrets(config.KyvernoNamespace()).Get(tlsSecretName)
if err != nil {
return nil, nil, err
}
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
},
setup.KubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations(),
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
setup.KubeClient.CoordinationV1().Leases(config.KyvernoNamespace()),
runtime,
kubeInformer.Rbac().V1().RoleBindings().Lister(),
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
setup.KyvernoDynamicClient.Discovery(),
int32(webhookServerPort),
)
// start informers and wait for cache sync
// we need to call start again because we potentially registered new informers
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start webhooks server
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
gceController.Run(signalCtx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(signalCtx, setup.Logger, &wg)
}
for _, controller := range nonLeaderControllers {
controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg)
}
// start leader election
le.Run(signalCtx)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(signalCtx, logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
if warmup != nil {
if err := warmup(ctx); err != nil {
logger.Error(err, "failed to run warmup")
os.Exit(1)
}
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// create webhooks server
urgen := webhookgenerate.NewGenerator(
setup.KyvernoClient,
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
)
policyHandlers := webhookspolicy.NewHandlers(
setup.KyvernoDynamicClient,
setup.KyvernoClient,
backgroundServiceAccountName,
)
resourceHandlers := webhooksresource.NewHandlers(
engine,
setup.KyvernoDynamicClient,
setup.KyvernoClient,
setup.Configuration,
setup.MetricsManager,
policyCache,
kubeInformer.Core().V1().Namespaces().Lister(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
urgen,
eventGenerator,
admissionReports,
backgroundServiceAccountName,
setup.Jp,
maxAuditWorkers,
maxAuditCapacity,
)
exceptionHandlers := webhooksexception.NewHandlers(exception.ValidationOptions{
Enabled: internal.PolicyExceptionEnabled(),
Namespace: internal.ExceptionNamespace(),
})
globalContextHandlers := webhooksglobalcontext.NewHandlers(globalcontext.ValidationOptions{
Enabled: internal.PolicyExceptionEnabled(),
})
server := webhooks.NewServer(
signalCtx,
policyHandlers,
resourceHandlers,
exceptionHandlers,
globalContextHandlers,
setup.Configuration,
setup.MetricsManager,
webhooks.DebugModeOptions{
DumpPayload: dumpPayload,
},
func() ([]byte, []byte, error) {
secret, err := tlsSecret.Lister().Secrets(config.KyvernoNamespace()).Get(tlsSecretName)
if err != nil {
return nil, nil, err
}
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
},
setup.KubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations(),
setup.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
setup.KubeClient.CoordinationV1().Leases(config.KyvernoNamespace()),
runtime,
kubeInformer.Rbac().V1().RoleBindings().Lister(),
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
setup.KyvernoDynamicClient.Discovery(),
int32(webhookServerPort),
)
// start informers and wait for cache sync
// we need to call start again because we potentially registered new informers
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start webhooks server
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
gceController.Run(signalCtx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(signalCtx, setup.Logger, &wg)
}
for _, controller := range nonLeaderControllers {
controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg)
}
// start leader election
le.Run(signalCtx)
}()
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -240,156 +240,158 @@ func main() {
internal.WithDefaultQps(300),
internal.WithDefaultBurst(300),
)
// setup
ctx, setup, sdown := internal.Setup(appConfig, "kyverno-reports-controller", skipResourceFilters)
defer sdown()
// show warnings
if reportsChunkSize != 0 {
logger := setup.Logger.WithName("wanings")
logger.Info("Warning: reportsChunkSize is deprecated and will be removed in 1.13.")
}
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
setup.Logger.Info("background scan interval", "duration", backgroundScanInterval.String())
// check if validating admission policies are registered in the API server
if validatingAdmissionPolicyReports {
registered, err := validatingadmissionpolicy.IsValidatingAdmissionPolicyRegistered(setup.KubeClient)
if !registered {
setup.Logger.Error(err, "ValidatingAdmissionPolicies isn't supported in the API server")
var wg sync.WaitGroup
func() {
// setup
ctx, setup, sdown := internal.Setup(appConfig, "kyverno-reports-controller", skipResourceFilters)
defer sdown()
// show warnings
if reportsChunkSize != 0 {
logger := setup.Logger.WithName("wanings")
logger.Info("Warning: reportsChunkSize is deprecated and will be removed in 1.13.")
}
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
if err := sanityChecks(setup.ApiServerClient); err != nil {
setup.Logger.Error(err, "sanity checks failed")
os.Exit(1)
}
}
// informer factories
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
var wg sync.WaitGroup
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.KyvernoClient,
gcstore,
setup.Logger.Info("background scan interval", "duration", backgroundScanInterval.String())
// check if validating admission policies are registered in the API server
if validatingAdmissionPolicyReports {
registered, err := validatingadmissionpolicy.IsValidatingAdmissionPolicyRegistered(setup.KubeClient)
if !registered {
setup.Logger.Error(err, "ValidatingAdmissionPolicies isn't supported in the API server")
os.Exit(1)
}
}
// informer factories
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
polexCache, polexController := internal.NewExceptionSelector(setup.Logger, kyvernoInformer)
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
)
// engine
engine := internal.NewEngine(
ctx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-reports-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
metadataInformer := metadatainformers.NewSharedInformerFactory(setup.MetadataClient, 15*time.Minute)
// create leader controllers
leaderControllers, warmup, err := createrLeaderControllers(
engine,
backgroundScan,
admissionReports,
aggregateReports,
policyReports,
validatingAdmissionPolicyReports,
aggregationWorkers,
backgroundScanWorkers,
kubeInformer,
kyvernoInformer,
metadataInformer,
setup.KyvernoClient,
event.Workers,
)
gcstore := store.New()
gceController := internal.NewController(
globalcontextcontroller.ControllerName,
globalcontextcontroller.NewController(
kyvernoInformer.Kyverno().V2alpha1().GlobalContextEntries(),
setup.KyvernoDynamicClient,
setup.Configuration,
setup.Jp,
setup.KyvernoClient,
gcstore,
eventGenerator,
backgroundScanInterval,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
os.Exit(1)
}
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
internal.StartInformers(ctx, metadataInformer)
if !internal.CheckCacheSync(logger, metadataInformer.WaitForCacheSync(ctx.Done())) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
if err := warmup(ctx); err != nil {
logger.Error(err, "failed to run warmup")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(ctx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
gceController.Run(ctx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(ctx, setup.Logger, &wg)
}
// start leader election
le.Run(ctx)
maxAPICallResponseLength,
false,
),
globalcontextcontroller.Workers,
)
// engine
engine := internal.NewEngine(
ctx,
setup.Logger,
setup.Configuration,
setup.MetricsConfiguration,
setup.Jp,
setup.KyvernoDynamicClient,
setup.RegistryClient,
setup.ImageVerifyCacheClient,
setup.KubeClient,
setup.KyvernoClient,
setup.RegistrySecretLister,
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
polexCache,
gcstore,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kyvernoInformer) {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
"kyverno-reports-controller",
config.KyvernoNamespace(),
setup.LeaderElectionClient,
config.KyvernoPodName(),
internal.LeaderElectionRetryPeriod(),
func(ctx context.Context) {
logger := setup.Logger.WithName("leader")
// create leader factories
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
metadataInformer := metadatainformers.NewSharedInformerFactory(setup.MetadataClient, 15*time.Minute)
// create leader controllers
leaderControllers, warmup, err := createrLeaderControllers(
engine,
backgroundScan,
admissionReports,
aggregateReports,
policyReports,
validatingAdmissionPolicyReports,
aggregationWorkers,
backgroundScanWorkers,
kubeInformer,
kyvernoInformer,
metadataInformer,
setup.KyvernoClient,
setup.KyvernoDynamicClient,
setup.Configuration,
setup.Jp,
eventGenerator,
backgroundScanInterval,
)
if err != nil {
logger.Error(err, "failed to create leader controllers")
os.Exit(1)
}
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, logger, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
internal.StartInformers(ctx, metadataInformer)
if !internal.CheckCacheSync(logger, metadataInformer.WaitForCacheSync(ctx.Done())) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
if err := warmup(ctx); err != nil {
logger.Error(err, "failed to run warmup")
os.Exit(1)
}
// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(ctx, logger.WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
gceController.Run(ctx, setup.Logger, &wg)
if polexController != nil {
polexController.Run(ctx, setup.Logger, &wg)
}
// start leader election
le.Run(ctx)
}()
// wait for everything to shut down and exit
wg.Wait()
}