1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-15 17:51:20 +00:00

refactor: add controller helper to internal package (#5506)

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-30 12:13:57 +01:00 committed by GitHub
parent 288c9091ec
commit 9b1860a4e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 45 additions and 73 deletions

View file

@ -1,33 +0,0 @@
package main
import (
"context"
"sync"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/controllers/cleanup"
)
type controller struct {
name string
controller cleanup.Controller
workers int
}
func newController(name string, c cleanup.Controller, w int) controller {
return controller{
name: name,
controller: c,
workers: w,
}
}
func (c controller) run(ctx context.Context, logger logr.Logger, wg *sync.WaitGroup) {
wg.Add(1)
go func(logger logr.Logger) {
logger.Info("starting controller", "workers", c.workers)
defer logger.Info("controller stopped")
defer wg.Done()
c.controller.Run(ctx, c.workers)
}(logger.WithValues("name", c.name))
}

View file

@ -48,16 +48,16 @@ func main() {
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod) kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace())) kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod) kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
cleanupController := cleanup.NewController(
kubeClient,
kyvernoInformer.Kyverno().V1alpha1().ClusterCleanupPolicies(),
kyvernoInformer.Kyverno().V1alpha1().CleanupPolicies(),
kubeInformer.Batch().V1().CronJobs(),
)
// controllers // controllers
controller := newController(cleanup.ControllerName, *cleanupController, cleanup.Workers) controller := internal.NewController(
policyHandlers := NewHandlers( cleanup.ControllerName,
dClient, cleanup.NewController(
kubeClient,
kyvernoInformer.Kyverno().V1alpha1().ClusterCleanupPolicies(),
kyvernoInformer.Kyverno().V1alpha1().CleanupPolicies(),
kubeInformer.Batch().V1().CronJobs(),
),
cleanup.Workers,
) )
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister() secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
// start informers and wait for cache sync // start informers and wait for cache sync
@ -65,9 +65,9 @@ func main() {
os.Exit(1) os.Exit(1)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
controller.run(ctx, logger.WithName("cleanup-controller"), &wg) controller.Run(ctx, logger.WithName("cleanup-controller"), &wg)
server := NewServer( server := NewServer(
policyHandlers, NewHandlers(dClient),
func() ([]byte, []byte, error) { func() ([]byte, []byte, error) {
secret, err := secretLister.Secrets(config.KyvernoNamespace()).Get("cleanup-controller-tls") secret, err := secretLister.Secrets(config.KyvernoNamespace()).Get("cleanup-controller-tls")
if err != nil { if err != nil {

View file

@ -1,4 +1,4 @@
package main package internal
import ( import (
"context" "context"
@ -8,13 +8,17 @@ import (
"github.com/kyverno/kyverno/pkg/controllers" "github.com/kyverno/kyverno/pkg/controllers"
) )
type Controller interface {
Run(context.Context, logr.Logger, *sync.WaitGroup)
}
type controller struct { type controller struct {
name string name string
controller controllers.Controller controller controllers.Controller
workers int workers int
} }
func newController(name string, c controllers.Controller, w int) controller { func NewController(name string, c controllers.Controller, w int) Controller {
return controller{ return controller{
name: name, name: name,
controller: c, controller: c,
@ -22,7 +26,7 @@ func newController(name string, c controllers.Controller, w int) controller {
} }
} }
func (c controller) run(ctx context.Context, logger logr.Logger, wg *sync.WaitGroup) { func (c controller) Run(ctx context.Context, logger logr.Logger, wg *sync.WaitGroup) {
wg.Add(1) wg.Add(1)
go func(logger logr.Logger) { go func(logger logr.Logger) {
logger.Info("starting controller", "workers", c.workers) logger.Info("starting controller", "workers", c.workers)

View file

@ -218,7 +218,7 @@ func createNonLeaderControllers(
policyCache policycache.Cache, policyCache policycache.Cache,
eventGenerator event.Interface, eventGenerator event.Interface,
manager openapi.Manager, manager openapi.Manager,
) ([]controller, func() error) { ) ([]internal.Controller, func() error) {
policyCacheController := policycachecontroller.NewController( policyCacheController := policycachecontroller.NewController(
policyCache, policyCache,
kyvernoInformer.Kyverno().V1().ClusterPolicies(), kyvernoInformer.Kyverno().V1().ClusterPolicies(),
@ -243,11 +243,11 @@ func createNonLeaderControllers(
eventGenerator, eventGenerator,
configuration, configuration,
) )
return []controller{ return []internal.Controller{
newController(policycachecontroller.ControllerName, policyCacheController, policycachecontroller.Workers), internal.NewController(policycachecontroller.ControllerName, policyCacheController, policycachecontroller.Workers),
newController(openapicontroller.ControllerName, openApiController, openapicontroller.Workers), internal.NewController(openapicontroller.ControllerName, openApiController, openapicontroller.Workers),
newController(configcontroller.ControllerName, configurationController, configcontroller.Workers), internal.NewController(configcontroller.ControllerName, configurationController, configcontroller.Workers),
newController("update-request-controller", updateRequestController, genWorkers), internal.NewController("update-request-controller", updateRequestController, genWorkers),
}, },
func() error { func() error {
return policyCacheController.WarmUp() return policyCacheController.WarmUp()
@ -262,8 +262,8 @@ func createReportControllers(
metadataFactory metadatainformers.SharedInformerFactory, metadataFactory metadatainformers.SharedInformerFactory,
kubeInformer kubeinformers.SharedInformerFactory, kubeInformer kubeinformers.SharedInformerFactory,
kyvernoInformer kyvernoinformer.SharedInformerFactory, kyvernoInformer kyvernoinformer.SharedInformerFactory,
) ([]controller, func(context.Context) error) { ) ([]internal.Controller, func(context.Context) error) {
var ctrls []controller var ctrls []internal.Controller
var warmups []func(context.Context) error var warmups []func(context.Context) error
kyvernoV1 := kyvernoInformer.Kyverno().V1() kyvernoV1 := kyvernoInformer.Kyverno().V1()
if backgroundScan || admissionReports { if backgroundScan || admissionReports {
@ -275,12 +275,12 @@ func createReportControllers(
warmups = append(warmups, func(ctx context.Context) error { warmups = append(warmups, func(ctx context.Context) error {
return resourceReportController.Warmup(ctx) return resourceReportController.Warmup(ctx)
}) })
ctrls = append(ctrls, newController( ctrls = append(ctrls, internal.NewController(
resourcereportcontroller.ControllerName, resourcereportcontroller.ControllerName,
resourceReportController, resourceReportController,
resourcereportcontroller.Workers, resourcereportcontroller.Workers,
)) ))
ctrls = append(ctrls, newController( ctrls = append(ctrls, internal.NewController(
aggregatereportcontroller.ControllerName, aggregatereportcontroller.ControllerName,
aggregatereportcontroller.NewController( aggregatereportcontroller.NewController(
kyvernoClient, kyvernoClient,
@ -293,7 +293,7 @@ func createReportControllers(
aggregatereportcontroller.Workers, aggregatereportcontroller.Workers,
)) ))
if admissionReports { if admissionReports {
ctrls = append(ctrls, newController( ctrls = append(ctrls, internal.NewController(
admissionreportcontroller.ControllerName, admissionreportcontroller.ControllerName,
admissionreportcontroller.NewController( admissionreportcontroller.NewController(
kyvernoClient, kyvernoClient,
@ -304,7 +304,7 @@ func createReportControllers(
)) ))
} }
if backgroundScan { if backgroundScan {
ctrls = append(ctrls, newController( ctrls = append(ctrls, internal.NewController(
backgroundscancontroller.ControllerName, backgroundscancontroller.ControllerName,
backgroundscancontroller.NewController( backgroundscancontroller.NewController(
client, client,
@ -342,7 +342,7 @@ func createrLeaderControllers(
eventGenerator event.Interface, eventGenerator event.Interface,
certRenewer tls.CertRenewer, certRenewer tls.CertRenewer,
runtime runtimeutils.Runtime, runtime runtimeutils.Runtime,
) ([]controller, func(context.Context) error, error) { ) ([]internal.Controller, func(context.Context) error, error) {
policyCtrl, err := policy.NewPolicyController( policyCtrl, err := policy.NewPolicyController(
kyvernoClient, kyvernoClient,
dynamicClient, dynamicClient,
@ -393,10 +393,10 @@ func createrLeaderControllers(
kyvernoInformer, kyvernoInformer,
) )
return append( return append(
[]controller{ []internal.Controller{
newController("policy-controller", policyCtrl, 2), internal.NewController("policy-controller", policyCtrl, 2),
newController(certmanager.ControllerName, certManager, certmanager.Workers), internal.NewController(certmanager.ControllerName, certManager, certmanager.Workers),
newController(webhookcontroller.ControllerName, webhookController, webhookcontroller.Workers), internal.NewController(webhookcontroller.ControllerName, webhookController, webhookcontroller.Workers),
}, },
reportControllers..., reportControllers...,
), ),
@ -591,7 +591,7 @@ func main() {
// start leader controllers // start leader controllers
var wg sync.WaitGroup var wg sync.WaitGroup
for _, controller := range leaderControllers { for _, controller := range leaderControllers {
controller.run(signalCtx, logger.WithName("controllers"), &wg) controller.Run(signalCtx, logger.WithName("controllers"), &wg)
} }
// wait all controllers shut down // wait all controllers shut down
wg.Wait() wg.Wait()
@ -605,7 +605,7 @@ func main() {
// start non leader controllers // start non leader controllers
var wg sync.WaitGroup var wg sync.WaitGroup
for _, controller := range nonLeaderControllers { for _, controller := range nonLeaderControllers {
controller.run(signalCtx, logger.WithName("controllers"), &wg) controller.Run(signalCtx, logger.WithName("controllers"), &wg)
} }
// start leader election // start leader election
go func() { go func() {

View file

@ -9,6 +9,7 @@ import (
kyvernov1alpha1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1" kyvernov1alpha1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1"
kyvernov1alpha1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha1" kyvernov1alpha1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha1"
"github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller" controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -19,7 +20,7 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
type Controller struct { type controller struct {
// clients // clients
client kubernetes.Interface client kubernetes.Interface
@ -43,8 +44,8 @@ func NewController(
cpolInformer kyvernov1alpha1informers.ClusterCleanupPolicyInformer, cpolInformer kyvernov1alpha1informers.ClusterCleanupPolicyInformer,
polInformer kyvernov1alpha1informers.CleanupPolicyInformer, polInformer kyvernov1alpha1informers.CleanupPolicyInformer,
cjInformer batchv1informers.CronJobInformer, cjInformer batchv1informers.CronJobInformer,
) *Controller { ) controllers.Controller {
c := &Controller{ c := &controller{
client: client, client: client,
cpolLister: cpolInformer.Lister(), cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(), polLister: polInformer.Lister(),
@ -141,11 +142,11 @@ func NewController(
return c return c
} }
func (c *Controller) Run(ctx context.Context, workers int) { func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger.V(3), ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile) controllerutils.Run(ctx, logger.V(3), ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
} }
func (c *Controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupPolicyInterface, error) { func (c *controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupPolicyInterface, error) {
if namespace == "" { if namespace == "" {
cpolicy, err := c.cpolLister.Get(name) cpolicy, err := c.cpolLister.Get(name)
if err != nil { if err != nil {
@ -161,7 +162,7 @@ func (c *Controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupP
} }
} }
func (c *Controller) getCronjob(namespace, name string) (*batchv1.CronJob, error) { func (c *controller) getCronjob(namespace, name string) (*batchv1.CronJob, error) {
cj, err := c.cjLister.CronJobs(namespace).Get(name) cj, err := c.cjLister.CronJobs(namespace).Get(name)
if err != nil { if err != nil {
return nil, err return nil, err
@ -169,7 +170,7 @@ func (c *Controller) getCronjob(namespace, name string) (*batchv1.CronJob, error
return cj, nil return cj, nil
} }
func (c *Controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
policy, err := c.getPolicy(namespace, name) policy, err := c.getPolicy(namespace, name)
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {