mirror of
https://github.com/kyverno/kyverno.git
synced 2024-12-14 11:57:48 +00:00
chore: change controller rated limiting queue (#11509)
Signed-off-by: Fleezesd <1253576349@qq.com> Co-authored-by: shuting <shuting@nirmata.com>
This commit is contained in:
parent
5106d5227b
commit
6b87d70b39
15 changed files with 88 additions and 40 deletions
|
@ -83,14 +83,17 @@ func NewController(
|
|||
) Controller {
|
||||
urLister := urInformer.Lister().UpdateRequests(config.KyvernoNamespace())
|
||||
c := controller{
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
engine: engine,
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
polLister: polInformer.Lister(),
|
||||
urLister: urLister,
|
||||
nsLister: namespaceInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), "background"),
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
engine: engine,
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
polLister: polInformer.Lister(),
|
||||
urLister: urLister,
|
||||
nsLister: namespaceInformer.Lister(),
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: "background"},
|
||||
),
|
||||
eventGen: eventGen,
|
||||
configuration: configuration,
|
||||
jp: jp,
|
||||
|
|
|
@ -49,7 +49,10 @@ func NewController(
|
|||
tlsSecretName string,
|
||||
namespace string,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
caEnqueue, _, _ := controllerutils.AddDefaultEventHandlers(logger, caInformer.Informer(), queue)
|
||||
tlsEnqueue, _, _ := controllerutils.AddDefaultEventHandlers(logger, tlsInformer.Informer(), queue)
|
||||
c := controller{
|
||||
|
|
|
@ -82,7 +82,10 @@ func NewController(
|
|||
eventGen event.Interface,
|
||||
gctxStore loaders.Store,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
keyFunc := controllerutils.MetaNamespaceKeyT[kyvernov2.CleanupPolicyInterface]
|
||||
baseEnqueueFunc := controllerutils.LogError(logger, controllerutils.Parse(keyFunc, controllerutils.Queue(queue)))
|
||||
enqueueFunc := func(logger logr.Logger, operation, kind string) controllerutils.EnqueueFuncT[kyvernov2.CleanupPolicyInterface] {
|
||||
|
|
|
@ -53,7 +53,10 @@ func NewController(
|
|||
polexInformer kyvernov2informers.PolicyExceptionInformer,
|
||||
namespace string,
|
||||
) *controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
if _, _, err := controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), queue); err != nil {
|
||||
logger.Error(err, "failed to register event handlers")
|
||||
}
|
||||
|
|
|
@ -58,10 +58,13 @@ func NewController(
|
|||
informer: informer.Informer(),
|
||||
lister: informer.Lister().ConfigMaps(namespace),
|
||||
controllerName: controllerName,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), controllerName),
|
||||
logger: logging.ControllerLogger(controllerName),
|
||||
name: name,
|
||||
callback: callback,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: controllerName},
|
||||
),
|
||||
logger: logging.ControllerLogger(controllerName),
|
||||
name: name,
|
||||
callback: callback,
|
||||
}
|
||||
if _, _, err := controllerutils.AddDefaultEventHandlers(c.logger, informer.Informer(), c.queue); err != nil {
|
||||
logging.Error(err, "failed to register event handlers")
|
||||
|
|
|
@ -95,7 +95,10 @@ func NewController(
|
|||
webhookCleanupSetup func(context.Context, logr.Logger) error,
|
||||
postWebhookCleanup func(context.Context, logr.Logger) error,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), controllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: controllerName},
|
||||
)
|
||||
c := controller{
|
||||
vwcClient: vwcClient,
|
||||
vwcLister: vwcInformer.Lister(),
|
||||
|
|
|
@ -56,7 +56,10 @@ func NewController(
|
|||
maxResponseLength int64,
|
||||
shouldUpdateStatus bool,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
c := &controller{
|
||||
gceLister: gceInformer.Lister(),
|
||||
queue: queue,
|
||||
|
|
|
@ -50,8 +50,11 @@ func NewController(client dclient.Interface, pcache pcache.Cache, cpolInformer k
|
|||
cache: pcache,
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
polLister: polInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName),
|
||||
client: client,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
),
|
||||
client: client,
|
||||
}
|
||||
if _, _, err := controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue); err != nil {
|
||||
logger.Error(err, "failed to register event handlers")
|
||||
|
|
|
@ -83,8 +83,14 @@ func NewController(
|
|||
cpolLister: cpolInformer.Lister(),
|
||||
ephrLister: ephrInformer.Lister(),
|
||||
cephrLister: cephrInformer.Lister(),
|
||||
frontQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName),
|
||||
backQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName),
|
||||
frontQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
),
|
||||
backQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
),
|
||||
}
|
||||
if _, _, err := controllerutils.AddDelayedDefaultEventHandlers(logger, ephrInformer.Informer(), c.frontQueue, enqueueDelay); err != nil {
|
||||
logger.Error(err, "failed to register event handlers")
|
||||
|
|
|
@ -104,7 +104,10 @@ func NewController(
|
|||
) controllers.Controller {
|
||||
ephrInformer := metadataFactory.ForResource(reportsv1.SchemeGroupVersion.WithResource("ephemeralreports"))
|
||||
cephrInformer := metadataFactory.ForResource(reportsv1.SchemeGroupVersion.WithResource("clusterephemeralreports"))
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
c := controller{
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
|
|
|
@ -96,10 +96,13 @@ func NewController(
|
|||
vapInformer admissionregistrationv1beta1informers.ValidatingAdmissionPolicyInformer,
|
||||
) Controller {
|
||||
c := controller{
|
||||
client: client,
|
||||
polLister: polInformer.Lister(),
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName),
|
||||
client: client,
|
||||
polLister: polInformer.Lister(),
|
||||
cpolLister: cpolInformer.Lister(),
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
),
|
||||
dynamicWatchers: map[schema.GroupVersionResource]*watcher{},
|
||||
}
|
||||
if vapInformer != nil {
|
||||
|
|
|
@ -70,7 +70,10 @@ func NewController(
|
|||
eventGen event.Interface,
|
||||
checker checker.AuthChecker,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
c := &controller{
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
|
|
|
@ -154,7 +154,10 @@ func NewController(
|
|||
webhookCleanupSetup func(context.Context, logr.Logger) error,
|
||||
postWebhookCleanup func(context.Context, logr.Logger) error,
|
||||
) controllers.Controller {
|
||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName)
|
||||
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
)
|
||||
c := controller{
|
||||
discoveryClient: discoveryClient,
|
||||
mwcClient: mwcClient,
|
||||
|
|
|
@ -61,10 +61,13 @@ func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, ma
|
|||
logger.Error(err, "failed to register metric kyverno_events_dropped")
|
||||
}
|
||||
return &controller{
|
||||
logger: logger,
|
||||
eventsClient: eventsClient,
|
||||
omitEvents: sets.New(omitEvents...),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), ControllerName),
|
||||
logger: logger,
|
||||
eventsClient: eventsClient,
|
||||
omitEvents: sets.New(omitEvents...),
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
|
||||
),
|
||||
clock: clock,
|
||||
hostname: hostname,
|
||||
droppedEventsCounter: droppedEventsCounter,
|
||||
|
|
|
@ -122,14 +122,17 @@ func NewPolicyController(
|
|||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
|
||||
pc := policyController{
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
engine: engine,
|
||||
pInformer: pInformer,
|
||||
npInformer: npInformer,
|
||||
eventGen: eventGen,
|
||||
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, "policy_controller"),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), "policy"),
|
||||
client: client,
|
||||
kyvernoClient: kyvernoClient,
|
||||
engine: engine,
|
||||
pInformer: pInformer,
|
||||
npInformer: npInformer,
|
||||
eventGen: eventGen,
|
||||
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, "policy_controller"),
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[any](),
|
||||
workqueue.TypedRateLimitingQueueConfig[any]{Name: "policy"},
|
||||
),
|
||||
configuration: configuration,
|
||||
reconcilePeriod: reconcilePeriod,
|
||||
metricsConfig: metricsConfig,
|
||||
|
|
Loading…
Reference in a new issue