1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

fix: re-use the maxQueuedEvents (#10024)

* fix: re-use the maxQueuedEvents

Signed-off-by: Mariam Fahmy <mariam.fahmy@nirmata.com>

* fix: use the apierrors.IsNotFound instead of checking a specfic error msg

Signed-off-by: Mariam Fahmy <mariam.fahmy@nirmata.com>

---------

Signed-off-by: Mariam Fahmy <mariam.fahmy@nirmata.com>
Co-authored-by: shuting <shuting@nirmata.com>
This commit is contained in:
Mariam Fahmy 2024-04-10 09:41:22 +02:00 committed by GitHub
parent 87dffbe5be
commit 39da5bd927
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 14 additions and 3 deletions

View file

@ -148,6 +148,7 @@ func main() {
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(

View file

@ -154,6 +154,7 @@ func main() {
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
)
eventController := internal.NewController(
event.ControllerName,

View file

@ -354,6 +354,7 @@ func main() {
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
gcstore := store.New()

View file

@ -266,6 +266,7 @@ func main() {
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
maxQueuedEvents,
strings.Split(omitEvents, ",")...,
)
eventController := internal.NewController(

View file

@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/otel/metric"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -44,10 +45,11 @@ type controller struct {
clock clock.Clock
hostname string
droppedEventsCounter metric.Int64Counter
maxQueuedEvents int
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, omitEvents ...string) *controller {
func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, maxQueuedEvents int, omitEvents ...string) *controller {
clock := clock.RealClock{}
hostname, _ := os.Hostname()
meter := otel.GetMeterProvider().Meter(metrics.MeterName)
@ -66,6 +68,7 @@ func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, om
clock: clock,
hostname: hostname,
droppedEventsCounter: droppedEventsCounter,
maxQueuedEvents: maxQueuedEvents,
}
}
@ -73,6 +76,10 @@ func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, om
func (gen *controller) Add(infos ...Info) {
logger := gen.logger
logger.V(3).Info("generating events", "count", len(infos))
if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents {
logger.V(2).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len())
return
}
for _, info := range infos {
// don't create event for resources with generateName as the name is not generated yet
if info.Regarding.Name == "" {
@ -119,7 +126,7 @@ func (gen *controller) processNextWorkItem(ctx context.Context) bool {
return true
}
_, err := gen.eventsClient.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.Error(err, "failed to create event", "key", key)
gen.queue.AddRateLimited(key)

View file

@ -27,7 +27,7 @@ func TestEventGenerator(t *testing.T) {
logger := logr.Discard()
eventsClient := clientset.EventsV1()
eventGenerator := NewEventGenerator(eventsClient, logger)
eventGenerator := NewEventGenerator(eventsClient, logger, 1000)
go eventGenerator.Run(ctx, Workers)
time.Sleep(1 * time.Second)