mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-13 19:28:55 +00:00
fix: add workers to the controller interface (#4776)
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com> Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
parent
081330d564
commit
25cf8d6c1e
10 changed files with 86 additions and 44 deletions
16
cmd/kyverno/controller.go
Normal file
16
cmd/kyverno/controller.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kyverno/kyverno/pkg/controllers"
|
||||
)
|
||||
|
||||
type controller struct {
|
||||
controller controllers.Controller
|
||||
workers int
|
||||
}
|
||||
|
||||
func (c *controller) run(ctx context.Context) {
|
||||
c.controller.Run(ctx, c.workers)
|
||||
}
|
|
@ -21,7 +21,6 @@ import (
|
|||
kyvernoclient "github.com/kyverno/kyverno/pkg/clients/wrappers"
|
||||
"github.com/kyverno/kyverno/pkg/common"
|
||||
"github.com/kyverno/kyverno/pkg/config"
|
||||
controllers "github.com/kyverno/kyverno/pkg/controllers"
|
||||
"github.com/kyverno/kyverno/pkg/controllers/certmanager"
|
||||
configcontroller "github.com/kyverno/kyverno/pkg/controllers/config"
|
||||
policycachecontroller "github.com/kyverno/kyverno/pkg/controllers/policycache"
|
||||
|
@ -461,7 +460,7 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
webhookCfg.UpdateWebhookChan <- true
|
||||
go certManager.Run(signalCtx)
|
||||
go certManager.Run(signalCtx, certmanager.Workers)
|
||||
go policyCtrl.Run(2, stopCh)
|
||||
|
||||
reportControllers := setupReportControllers(
|
||||
|
@ -478,7 +477,7 @@ func main() {
|
|||
metadataInformer.WaitForCacheSync(stopCh)
|
||||
|
||||
for _, controller := range reportControllers {
|
||||
go controller.Run(signalCtx)
|
||||
go controller.run(signalCtx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,10 +512,10 @@ func main() {
|
|||
|
||||
// init events handlers
|
||||
// start Kyverno controllers
|
||||
go policyCacheController.Run(signalCtx)
|
||||
go policyCacheController.Run(signalCtx, policycachecontroller.Workers)
|
||||
go urc.Run(genWorkers, stopCh)
|
||||
go le.Run(signalCtx)
|
||||
go configurationController.Run(signalCtx)
|
||||
go configurationController.Run(signalCtx, configcontroller.Workers)
|
||||
go eventGenerator.Run(3, stopCh)
|
||||
if !debug {
|
||||
go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh)
|
||||
|
@ -556,8 +555,8 @@ func setupReportControllers(
|
|||
metadataFactory metadatainformers.SharedInformerFactory,
|
||||
kubeInformer kubeinformers.SharedInformerFactory,
|
||||
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
||||
) []controllers.Controller {
|
||||
var ctrls []controllers.Controller
|
||||
) []controller {
|
||||
var ctrls []controller
|
||||
kyvernoV1 := kyvernoInformer.Kyverno().V1()
|
||||
if backgroundScan || admissionReports {
|
||||
resourceReportController := resourcereportcontroller.NewController(
|
||||
|
@ -565,30 +564,39 @@ func setupReportControllers(
|
|||
kyvernoV1.Policies(),
|
||||
kyvernoV1.ClusterPolicies(),
|
||||
)
|
||||
ctrls = append(ctrls, resourceReportController)
|
||||
ctrls = append(ctrls, aggregatereportcontroller.NewController(
|
||||
kyvernoClient,
|
||||
metadataFactory,
|
||||
resourceReportController,
|
||||
reportsChunkSize,
|
||||
))
|
||||
if admissionReports {
|
||||
ctrls = append(ctrls, admissionreportcontroller.NewController(
|
||||
ctrls = append(ctrls, controller{resourceReportController, resourcereportcontroller.Workers})
|
||||
ctrls = append(ctrls, controller{
|
||||
aggregatereportcontroller.NewController(
|
||||
kyvernoClient,
|
||||
metadataFactory,
|
||||
resourceReportController,
|
||||
))
|
||||
reportsChunkSize,
|
||||
),
|
||||
aggregatereportcontroller.Workers,
|
||||
})
|
||||
if admissionReports {
|
||||
ctrls = append(ctrls, controller{
|
||||
admissionreportcontroller.NewController(
|
||||
kyvernoClient,
|
||||
metadataFactory,
|
||||
resourceReportController,
|
||||
),
|
||||
admissionreportcontroller.Workers,
|
||||
})
|
||||
}
|
||||
if backgroundScan {
|
||||
ctrls = append(ctrls, backgroundscancontroller.NewController(
|
||||
client,
|
||||
kyvernoClient,
|
||||
metadataFactory,
|
||||
kyvernoV1.Policies(),
|
||||
kyvernoV1.ClusterPolicies(),
|
||||
kubeInformer.Core().V1().Namespaces(),
|
||||
resourceReportController,
|
||||
))
|
||||
ctrls = append(ctrls, controller{
|
||||
backgroundscancontroller.NewController(
|
||||
client,
|
||||
kyvernoClient,
|
||||
metadataFactory,
|
||||
kyvernoV1.Policies(),
|
||||
kyvernoV1.ClusterPolicies(),
|
||||
kubeInformer.Core().V1().Namespaces(),
|
||||
resourceReportController,
|
||||
),
|
||||
backgroundscancontroller.Workers,
|
||||
})
|
||||
}
|
||||
}
|
||||
return ctrls
|
||||
|
|
|
@ -16,6 +16,9 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// Workers is the number of workers for this controller
|
||||
const Workers = 1
|
||||
|
||||
type Controller interface {
|
||||
controllers.Controller
|
||||
// GetTLSPemPair gets the existing TLSPemPair from the secret
|
||||
|
@ -46,7 +49,7 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer *t
|
|||
return manager, nil
|
||||
}
|
||||
|
||||
func (m *controller) Run(ctx context.Context) {
|
||||
func (m *controller) Run(ctx context.Context, workers int) {
|
||||
logger.Info("start managing certificate")
|
||||
certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval)
|
||||
defer certsRenewalTicker.Stop()
|
||||
|
|
|
@ -15,8 +15,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 3
|
||||
maxRetries = 10
|
||||
workers = 3
|
||||
)
|
||||
|
||||
type controller struct {
|
||||
|
@ -44,7 +45,7 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
|
|||
return &c
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.configmapSynced)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,5 +4,5 @@ import "context"
|
|||
|
||||
type Controller interface {
|
||||
// Run starts the controller
|
||||
Run(context.Context)
|
||||
Run(context.Context, int)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
|
||||
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
|
||||
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
|
||||
"github.com/kyverno/kyverno/pkg/controllers"
|
||||
pcache "github.com/kyverno/kyverno/pkg/policycache"
|
||||
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -17,10 +18,16 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 3
|
||||
maxRetries = 10
|
||||
workers = 3
|
||||
)
|
||||
|
||||
type Controller interface {
|
||||
controllers.Controller
|
||||
WarmUp() error
|
||||
}
|
||||
|
||||
type controller struct {
|
||||
cache pcache.Cache
|
||||
|
||||
|
@ -37,7 +44,7 @@ type controller struct {
|
|||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterPolicyInformer, polInformer kyvernov1informers.PolicyInformer) *controller {
|
||||
func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterPolicyInformer, polInformer kyvernov1informers.PolicyInformer) Controller {
|
||||
c := controller{
|
||||
cache: pcache,
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
|
@ -80,7 +87,7 @@ func (c *controller) WarmUp() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.cpolSynced, c.polSynced)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/go-logr/logr"
|
||||
kyvernov1alpha2 "github.com/kyverno/kyverno/api/kyverno/v1alpha2"
|
||||
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
|
||||
"github.com/kyverno/kyverno/pkg/controllers"
|
||||
"github.com/kyverno/kyverno/pkg/controllers/report/resource"
|
||||
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
|
||||
reportutils "github.com/kyverno/kyverno/pkg/utils/report"
|
||||
|
@ -21,8 +22,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 2
|
||||
maxRetries = 10
|
||||
workers = 2
|
||||
)
|
||||
|
||||
type controller struct {
|
||||
|
@ -46,7 +48,7 @@ func NewController(
|
|||
client versioned.Interface,
|
||||
metadataFactory metadatainformers.SharedInformerFactory,
|
||||
metadataCache resource.MetadataCache,
|
||||
) *controller {
|
||||
) controllers.Controller {
|
||||
admrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("admissionreports"))
|
||||
cadmrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("clusteradmissionreports"))
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName)
|
||||
|
@ -62,7 +64,7 @@ func NewController(
|
|||
return &c
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) {
|
||||
selector, err := reportutils.SelectorResourceUidEquals(uid)
|
||||
if err != nil {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
kyvernov1alpha2 "github.com/kyverno/kyverno/api/kyverno/v1alpha2"
|
||||
policyreportv1alpha2 "github.com/kyverno/kyverno/api/policyreport/v1alpha2"
|
||||
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
|
||||
"github.com/kyverno/kyverno/pkg/controllers"
|
||||
"github.com/kyverno/kyverno/pkg/controllers/report/resource"
|
||||
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
|
||||
reportutils "github.com/kyverno/kyverno/pkg/utils/report"
|
||||
|
@ -26,8 +27,9 @@ import (
|
|||
// TODO: policy hash
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 1
|
||||
maxRetries = 10
|
||||
workers = 1
|
||||
)
|
||||
|
||||
type controller struct {
|
||||
|
@ -58,7 +60,7 @@ func NewController(
|
|||
metadataFactory metadatainformers.SharedInformerFactory,
|
||||
metadataCache resource.MetadataCache,
|
||||
chunkSize int,
|
||||
) *controller {
|
||||
) controllers.Controller {
|
||||
admrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("admissionreports"))
|
||||
cadmrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("clusteradmissionreports"))
|
||||
bgscanrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("backgroundscanreports"))
|
||||
|
@ -81,7 +83,7 @@ func NewController(
|
|||
return &c
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
|
||||
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
|
||||
"github.com/kyverno/kyverno/pkg/clients/dclient"
|
||||
"github.com/kyverno/kyverno/pkg/controllers"
|
||||
"github.com/kyverno/kyverno/pkg/controllers/report/resource"
|
||||
"github.com/kyverno/kyverno/pkg/controllers/report/utils"
|
||||
"github.com/kyverno/kyverno/pkg/engine/response"
|
||||
|
@ -30,8 +31,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 2
|
||||
maxRetries = 10
|
||||
workers = 2
|
||||
)
|
||||
|
||||
type controller struct {
|
||||
|
@ -63,7 +65,7 @@ func NewController(
|
|||
cpolInformer kyvernov1informers.ClusterPolicyInformer,
|
||||
nsInformer corev1informers.NamespaceInformer,
|
||||
metadataCache resource.MetadataCache,
|
||||
) *controller {
|
||||
) controllers.Controller {
|
||||
bgscanr := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("backgroundscanreports"))
|
||||
cbgscanr := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("clusterbackgroundscanreports"))
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName)
|
||||
|
@ -85,7 +87,7 @@ func NewController(
|
|||
return &c
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, resource resource.Resource) {
|
||||
selector, err := reportutils.SelectorResourceUidEquals(uid)
|
||||
if err != nil {
|
||||
|
|
|
@ -23,8 +23,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Workers is the number of workers for this controller
|
||||
Workers = 1
|
||||
maxRetries = 5
|
||||
workers = 1
|
||||
)
|
||||
|
||||
type Resource struct {
|
||||
|
@ -84,7 +85,7 @@ func NewController(
|
|||
return &c
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context) {
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue